# 필요 데이터 불러오기

In [1]:
# 파이토치 및 토치비전 관련 라이브러리
import torch
import torchvision.transforms as T
from torchvision.models.detection import maskrcnn_resnet50_fpn

# 이미지 처리를 위한 PIL 라이브러리
from PIL import Image
from transformers import CLIPProcessor, CLIPModel  # CLIP 모델 관련 라이브러리

# 넘파이 및 랜덤 모듈
import numpy as np
import random

# 운영 체제 및 경로 검색 관련 모듈
import os
from glob import glob

# 진행 상황을 시각화하기 위한 tqdm 모듈
from tqdm import tqdm

# chromadb 및 모델성능평가 관련 모듈
import chromadb
import time
import pandas as pd

# 시각화 관련 모듈
from PIL import Image
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA
import plotly.graph_objects as go

import warnings

# 모든 경고를 무시합니다.
warnings.filterwarnings("ignore")

# 데이터 전처리 및 벡터DB구축

## 마스킹 함수

In [2]:
def preprocess_and_mask_image(image_path, confidence_threshold=0.5):
    image = Image.open(image_path).convert("RGB")
    transform = T.Compose([T.ToTensor()])
    input_tensor = transform(image).unsqueeze(0)
    
    mask_model = maskrcnn_resnet50_fpn(pretrained=True).eval()
    with torch.no_grad():
        prediction = mask_model(input_tensor)
    
    masks = prediction[0]['masks']
    labels = prediction[0]['labels']
    scores = prediction[0]['scores']
    np_image = np.array(image)
    
    for i in range(len(scores)):
        if scores[i] > confidence_threshold and labels[i].item() == 1:
            mask = masks[i, 0]
            np_image[mask > 0.5] = [0, 0, 0]
    
    return Image.fromarray(np_image)

## CLIP 기반 벡터라이저 생성

In [3]:
def vectorizor(image_path, mask_people=False, confidence_threshold=0.5):
    if mask_people:
        image = preprocess_and_mask_image(image_path, confidence_threshold)
    else:
        image = Image.open(image_path).convert("RGB")
    
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
    
    inputs = processor(images=image, return_tensors="pt")
    
    with torch.no_grad():
        image_features = clip_model.get_image_features(**inputs)
    
    vector = image_features.detach().cpu().numpy().squeeze()
    return vector

## 벡터DB시각화 함수

In [4]:
def visualize_embeddings_with_plotly(embeddings_list, metadatas):
    embeddings_array = np.array(embeddings_list)
    labels = [metadata['label'] for metadata in metadatas]

    pca = PCA(n_components=3)
    embeddings_reduced = pca.fit_transform(embeddings_array)

    unique_labels = list(set(labels))
    color_values = np.linspace(0, 1, len(unique_labels))
    colors = ['rgba(' + ', '.join([f'{int(x*255)}' for x in plt.cm.rainbow(c)[:3]]) + ', 0.8)' for c in color_values]
    label_to_color = {label: color for label, color in zip(unique_labels, colors)}

    fig = go.Figure()

    for label in unique_labels:
        idx = [i for i, l in enumerate(labels) if l == label]
        fig.add_trace(go.Scatter3d(
            x=embeddings_reduced[idx, 0],
            y=embeddings_reduced[idx, 1],
            z=embeddings_reduced[idx, 2],
            mode='markers',
            marker=dict(size=5, color=label_to_color[label], opacity=0.8),
            name=label
        ))

    fig.update_layout(margin=dict(l=0, r=0, b=0, t=0), scene=dict(
        xaxis_title='PCA 1',
        yaxis_title='PCA 2',
        zaxis_title='PCA 3'
    ))
    
    fig.show()

## 데이터셋 벡터DB화

In [5]:
def images_to_vector_db(image_directory, db_path, collection_name, mask_people=False):
    # 데이터베이스 클라이언트 생성
    client = chromadb.PersistentClient(path=db_path)

    # 기존 컬렉션 이름 리스트 생성
    existing_collection_names = [col.name for col in client.list_collections()]

    # 요청한 컬렉션 이름이 이미 존재하는지 확인
    if collection_name in existing_collection_names:
        # 이미 존재하는 경우 해당 컬렉션을 가져옴
        collection = client.get_collection(collection_name)
        print("Found existing collection, using it.")
    else:
        # 존재하지 않는 경우 새로운 컬렉션 생성
        collection = client.create_collection(collection_name)
        print("Created new collection.")

    # 이미지 파일 경로 리스트 생성
    img_list = sorted(glob(os.path.join(image_directory, "*/*.jpg")))
    
    # Embedding, Metadata 및 ID를 저장할 리스트 초기화
    embeddings = []
    metadatas = []
    ids = []
    
    # 이미지 처리 및 Embedding 생성 반복
    for i, img_path in enumerate(tqdm(img_list, desc="Processing images")):
        # 이미지 파일의 레이블 및 파일 이름 추출
        label_name = os.path.basename(os.path.dirname(img_path))
        file_name = os.path.basename(img_path)
        
        # 이미지의 Embedding 생성
        embedding = vectorizor(img_path, mask_people=mask_people)
        
        # Embedding, Metadata 및 ID를 리스트에 추가
        embeddings.append(embedding)
        metadatas.append({
            "uri": img_path,
            "label": label_name,
            "file_name": file_name
        })
        
        # 이미지 인덱스를 ID로 사용
        ids.append(str(i))
    
    # Embedding을 리스트로 변환
    embeddings_list = [embedding.tolist() for embedding in embeddings]
    
    # 데이터베이스에 데이터 추가
    collection.add(
        embeddings=embeddings_list,
        metadatas=metadatas,
        ids=ids,
    )
    
    print("Data upload completed!")
    
    # 벡터DB 시각화
    visualize_embeddings_with_plotly(embeddings_list, metadatas)

# 제주지역 이미지 벡터 데이터 구축

In [6]:
# 마스킹 적용한 벡터 생성
images_to_vector_db(image_directory='data/travel_imgs/jeju',
                    db_path='./data/jeju_vector_db',
                    collection_name='jeju_vector',
                    mask_people=True)

Created new collection.


Processing images: 0it [00:00, ?it/s]


ValueError: Expected IDs to be a non-empty list, got []