In [None]:
# ====================================================
# faiss 벡터스토어 및 라벨 파일 생성 소스코드
#
# 주요 기능
# 1. 1차 전처리된 이미지를 불러와 DINOv2에 넣을 수 있도록 2차 전처리
# 2. 전처리된 이미지 DINOv2-vits 모델에 통과시켜 특징벡터 추출
# 3. 추출된 특징벡터를 faiss 벡터스토어에 저장, 해당 특징벡터에 맞는 라벨을 .npy 파일에 인덱스값을 맞춰 저장
#
# 이 코드는 구글 코랩 환경에서 실행되었습니다.
# ====================================================

In [None]:
# ====================================================
# 1. 초기 설정 및 라이브러리 설치
# ====================================================

# FAISS CPU 버전 및 기타 필수 라이브러리 설치
!pip install faiss-cpu torch torchvision numpy Pillow tqdm

import torch
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import numpy as np
import faiss
import os
import glob
from tqdm import tqdm
import sys
import io
import requests
import sqlite3

In [None]:
# !pip install faiss-gpu

## 구글 드라이브 마운트 코드
###

In [None]:
# Colab 환경에서 Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# ----------------------------------------------------
# 1. 모델 로드 및 전처리 정의
# ----------------------------------------------------

# DINOv2 모델 로드 함수
def load_dinov2_vits():
    """DINOv2 ViT-S 모델을 로드하고 GPU 사용 가능 시 GPU로 이동시킵니다."""
    try:
        # dinov2_vits14 (ViT-Small) 모델 로드
        model = torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14')
    except Exception as e:
        print(f"DINOv2 모델 로드 실패: {e}")
        return None

    if torch.cuda.is_available():
        model = model.cuda()
    model.eval()
    print("DINOv2 ViT-S 모델 로드 완료.")
    return model

# 이미지 전처리 파이프라인
# 256 크기 리사이즈
# 224 크기 이미지 중앙 크롭
# 이미지 텐서 변환
# 색상 채널을 표준편차 1을 갖도록 정규화
transform = transforms.Compose([
    transforms.Resize(256, interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.485, 0.485, 0.485), std=(0.229, 0.229, 0.229)),
])

# 구글 드라이브 이미지 파일(크롭되어 전처리된 이미지) 특징 벡터 추출 함수
# 파일 경로, 특징 추출을 위한 모델, 전처리 함수를 인수로 받는 함수
def extract_features_from_mounted_drive(file_path, model, transform):
    """
    Google Drive에 마운트된 경로에서 이미지를 로드하고 특징 벡터를 추출합니다.
    """
    try:
        # 경로의 파일 이미지를 읽어와 RGB 형식으로 변환
        img = Image.open(file_path).convert("RGB")
    except FileNotFoundError:
        print(f"오류: 쿼리 파일을 찾을 수 없습니다. 경로를 확인하세요: {file_path}")
        return None
    except Exception as e:
        print(f"쿼리 이미지 로드 오류: {e}")
        return None

    # 이미지를 전처리한 후 리턴받은 텐서에 배치 차원을 추가
    img_tensor = transform(img).unsqueeze(0)
    if torch.cuda.is_available():
        img_tensor = img_tensor.cuda()

    # 경사도 계산 비활성화(학습과정이 아니므로)
    with torch.no_grad():
        # 전처리된 텐서를 DINOv2 모델에 통과시켜 특징벡터 추출
        features = model(img_tensor)
        # 특징 벡터를 L2 정규화 (코사인 유사도 계산을 위함)
        features = F.normalize(features, p=2, dim=1)

    # 특징벡터를 cpu 메모리로 이동시킨 후 numpy 배열로 변환하여 반환
    return features.cpu().numpy()

In [None]:
# ----------------------------------------------------
# 2. Google Drive 이미지 경로 읽기 및 특징 벡터 추출
# ----------------------------------------------------

# 마운트된 구글 드라이브에서 이미지 경로를 읽어오는 함수
def get_image_paths_from_drive():
    """
    Google Drive에 마운트된 폴더에서 이미지 파일 경로를 읽어옵니다.
    """

    # 이미지가 있는 구글 드라이브 경로
    IMAGE_DIRECTORY = '/content/drive/MyDrive/output_crops_image'

    # 파일 존재 여부 확인(경로 오류 방지)
    if not os.path.exists(IMAGE_DIRECTORY):
        print(f"오류: 이미지 폴더를 찾을 수 없습니다. 경로를 확인하세요: {IMAGE_DIRECTORY}")
        print("   - Google Drive 마운트와 경로가 올바른지 확인하세요.")
        return []

    # jpg, jpeg, png 파일 경로 수집
    image_paths = glob.glob(os.path.join(IMAGE_DIRECTORY, '*.[jp][pn]g'))

    print(f"폴더 '{IMAGE_DIRECTORY}'에서 총 {len(image_paths)}개의 이미지 경로를 가져왔습니다.")
    return image_paths

# DINOv2 모델 로드 함수를 이용해 모델을 로드해서
# 이미지의 특징벡터 추출 후 faiss 벡터스토어 및 라벨 파일 생성 함수
def run_feature_extraction():
    print("카탈로그 데이터셋 구축 시작")

    # DINOv2 모델 로드
    dinov2_model = load_dinov2_vits()

    if dinov2_model is None:
        sys.exit("모델 로드 실패. 프로그램 종료.")

    # Google Drive에서 이미지 파일 경로 목록 가져오기
    image_paths = get_image_paths_from_drive()

    catalog_vectors = []
    catalog_labels = []

    if not image_paths:
        print(f"경고: 이미지 경로를 찾을 수 없습니다. 인덱스 구축 불가.")
        return None, None, None

    print(f"{len(image_paths)}개의 카탈로그 이미지를 처리합니다...")

    for path in tqdm(image_paths):
        # 파일 경로를 특징 추출 함수에 전달
        features_np = extract_features_from_mounted_drive(path, dinov2_model, transform)

        if features_np is not None:
            catalog_vectors.append(features_np.squeeze())

            # 상품명(Label)은 파일 이름(확장자 제외)으로 가정합니다.
            label = os.path.basename(path).split('.')[0]
            catalog_labels.append(label)

    # 리스트를 최종 NumPy 배열로 변환
    if not catalog_vectors:
        print("경고: 특징 벡터 추출에 성공한 파일이 없어 인덱스를 구축할 수 없습니다.")
        return None, None, None

    catalog_vectors_np = np.array(catalog_vectors, dtype='float32')
    D = catalog_vectors_np.shape[1]
    print(f"데이터셋 구축 완료: 총 {len(catalog_vectors_np)}개 벡터, 차원: {D}")

    return catalog_vectors_np, catalog_labels, D

In [None]:
# ----------------------------------------------------
# 3. FAISS 인덱스 구축 및 저장 (Google Drive에 저장)
# ----------------------------------------------------

# faiss 인덱스 파일 생성 함수
def build_and_save_faiss_index(catalog_vectors_np, catalog_labels, D):
    print("FAISS 인덱스 구축 시작")

    # faiss 벡터스토어 및 npy 라벨 파일 경로 및 이름 지정
    INDEX_FILE_PATH = "dinov2_product_catalog.faiss"
    LABEL_FILE_PATH = "catalog_labels.npy"

    # 카탈로그 벡터 존재 여부 확인
    if catalog_vectors_np is None or len(catalog_vectors_np) == 0:
        print("FAISS 인덱스 구축 실패: 카탈로그 벡터가 존재하지 않습니다.")
        return

    # IndexFlatIP (내적)를 사용하여 코사인 유사도 검색 인덱스 생성
    index = faiss.IndexFlatIP(D)
    index.add(catalog_vectors_np)

    print(f"FAISS 인덱스 구축 완료: 총 {index.ntotal}개 항목")

    # FAISS 인덱스 로컬 저장
    print("FAISS 인덱스 로컬 저장 시작")
    try:
        faiss.write_index(index, INDEX_FILE_PATH)
        print(f"FAISS 인덱스가 로컬 파일 '{INDEX_FILE_PATH}'에 성공적으로 저장되었습니다.")

        # 라벨 매핑 정보를 NumPy 파일로 저장 (검색 시 사용)
        np.save(LABEL_FILE_PATH, np.array(catalog_labels))
        print(f"카탈로그 라벨(product_name) 정보가 '{LABEL_FILE_PATH}'에 저장되었습니다.")

    except Exception as e:
        print(f"FAISS 인덱스 저장 실패: {e}")

# ----------------------------------------------------
# 메인 실행
# ----------------------------------------------------
if __name__ == "__main__":
    vectors, labels, dimension = run_feature_extraction()

    if vectors is not None and labels is not None:
        build_and_save_faiss_index(vectors, labels, dimension)

        print("\n--- FAISS 인덱스 구축 완료 ---")
        print("이제 'dinov2_product_catalog.faiss'와 'catalog_labels.npy' 파일을 사용하여 검색을 수행할 수 있습니다.")

In [None]:
# 저장된 인덱스 로드 예시
loaded_index = faiss.read_index('content/dinov2_product_catalog.faiss')
print(f"인덱스 로드 완료. 총 항목 수: {loaded_index.ntotal}")

In [None]:
# 구글 드라이브 이미지 파일(크롭되어 전처리된 이미지) 특징 벡터 추출 함수 - 이미지 테스트를 위해 전역에서 호출 가능하게 다시 선언
def extract_features_from_mounted_drive(file_path, model, transform):
    """
    Google Drive에 마운트된 경로에서 이미지를 로드하고 특징 벡터를 추출합니다.
    (검색 쿼리 이미지를 파일 경로에서 읽는 경우 사용)
    """
    try:
        img = Image.open(file_path).convert("RGB")
    except FileNotFoundError:
        print(f"오류: 쿼리 파일을 찾을 수 없습니다. 경로를 확인하세요: {file_path}")
        return None
    except Exception as e:
        print(f"쿼리 이미지 로드 오류: {e}")
        return None

    img_tensor = transform(img).unsqueeze(0)
    if torch.cuda.is_available():
        img_tensor = img_tensor.cuda()

    with torch.no_grad():
        features = model(img_tensor)
        # 특징 벡터를 L2 정규화 (코사인 유사도 계산을 위함)
        features = F.normalize(features, p=2, dim=1)

    return features.cpu().numpy()

In [None]:
# faiss 벡터스토어 기반 겁색 기능 테스트 메인 함수
def run_search_test():
    # --- 1. 파일 경로 및 검색 설정 ---
    # faiss 파일 경로와 npy 파일 경로 지정
    INDEX_FILE_PATH = "dinov2_product_catalog2.faiss"
    LABEL_FILE_PATH = "catalog_labels2.npy"

    # 테스트 이미지 경로
    TEST_DIR = '/content/test_images'
    K = 5 # 상위 K개 유사 이미지

    print("검색 환경 설정 및 파일 로드")

    # DINOv2 모델 로드 (쿼리 이미지 특징 추출용)
    dinov2_model = load_dinov2_vits()
    if dinov2_model is None:
        return

    # --- 2. FAISS 인덱스 로드 ---
    loaded_index = None
    if os.path.exists(INDEX_FILE_PATH):
        try:
            loaded_index = faiss.read_index(INDEX_FILE_PATH)
            print(f"FAISS 인덱스 로드 완료: 총 {loaded_index.ntotal}개 항목.")
        except Exception as e:
            print(f"FAISS 인덱스 로드 중 오류 발생: {e}")
            return
    else:
        print(f"오류: FAISS 인덱스 파일 '{INDEX_FILE_PATH}'을 찾을 수 없습니다.")
        return

    # --- 3. 매핑 라벨 (product_name) 로드 ---
    catalog_labels = None
    if os.path.exists(LABEL_FILE_PATH):
        try:
            # np.load를 사용하여 npy 파일에서 라벨 배열 로드
            catalog_labels = np.load(LABEL_FILE_PATH).tolist()
            print(f"라벨 파일 로드 완료: 총 {len(catalog_labels)}개 라벨.")
        except Exception as e:
            print(f"라벨 파일 로드 중 오류 발생: {e}")
            return
    else:
        print(f"오류: 라벨 파일 '{LABEL_FILE_PATH}'을 찾을 수 없습니다.")
        return

    # --- 4. 로드된 인덱스로 검색 실행 ---
    print("5. 테스트 이미지 검색 시작")

    # 테스트 이미지 파일 목록 찾기
    test_image_files = glob.glob(os.path.join(TEST_DIR, '*.[jp][pn]g'))

    # 테스트 이미지 파일 목록 점검
    if not test_image_files:
        print(f"경고: {TEST_DIR} 폴더에서 테스트 이미지를 찾을 수 없습니다.")
    else:
        print(f"{len(test_image_files)}개의 테스트 이미지 검증을 시작합니다.")

        # 테스트 이미지 경로 순회
        for query_path in tqdm(test_image_files):
            # 쿼리 이미지의 특징 벡터 추출
            query_vector_np = extract_features_from_mounted_drive(
                query_path, dinov2_model, transform
            )

            if query_vector_np is not None:
                # 검색 실행
                distances, indices = loaded_index.search(query_vector_np, K)

                print(f"\n[쿼리 파일: {os.path.basename(query_path)}]")

                for i in range(K):
                    # 검색된 FAISS 인덱스 번호
                    vector_id = indices[0][i]
                    # 코사인 유사도 점수 (1.0에 가까울수록 유사)
                    similarity_score = distances[0][i]

                    # FAISS 인덱스를 사용하여 product_name으로 매핑
                    try:
                        # vector_id는 catalog_labels의 인덱스와 동일
                        product_name = catalog_labels[vector_id]
                    except IndexError:
                        product_name = f"ID: {vector_id} (매핑 라벨 범위 초과)"
                    except Exception:
                        product_name = f"ID: {vector_id} (라벨 정보 오류)"

                    print(f"  {i+1}위: 상품명: {product_name}, 코사인 유사도: {similarity_score:.4f}")
            else:
                print(f"  [오류] 쿼리 이미지 '{os.path.basename(query_path)}' 특징 추출에 실패했습니다.")


if __name__ == "__main__":
  # 검색 기능 테스트 실행
    run_search_test()