# Setting

In [1]:
import sqlite3
import mocov2_model
import torch
from torchvision import transforms
from PIL import Image
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import os
import time
import csv
from annoy import AnnoyIndex

# Cosine Similarity가 가장 높은 이미지 찾기

## 모든 특징 벡터 간 Cosine Similarity 계산

## Test

In [13]:
conn = sqlite3.connect('Image_db_final.db')
cur = conn.cursor()

cur.execute('''
SELECT feature_vector FROM IMAGE LIMIT 5
''')

rows = cur.fetchall()

conn.commit()
for row in rows:
    print(row)

(b'Xi\xbc\xbf\xf0\xf5]\xbf\x86e\x8d\xbe\x18\xae\x87?U\xd5\x8d\xbft\x1e\xbc>C9\xa5<\xa7\x92\xd8\xbf\x0c;W><\xc49@C\xe0\x19@H\x05\xa4\xbd(\xde\xb6\xbe\xb7\x9f\xb5>\xfdO\xf9?\xac\xff\xe6\xbfi\nv?\x92i\x89?\xc6u\x96?\x01\xc1\xa0\xbf0\x0b,\xbf\xa26\xa8\xbe\xfb\xcf\xfd=\x8e\xee\xde\xbe\x99\x12x?M\x0e\xcc\xbfTY\x9e\xbf\x02\x9f*\xc0\xd7\xd0\x03\xc0\xd9\xe5\xb2\xbf\x90\x06\xa1>\xeddW=}\xb2/\xbfYe\x87>b>\xa5\xbf\xd2^\x17\xc0\xd7\xd4\xc4\xbf"\x80\xd4=m\x9c\xf4?\xd6\xe4t\xbfL\xf7\xa0=\xe2\xf7\x95?\xc7L\x82\xbf\x07\xcd\x1f\xc0Z\x15\x11@\xd9\xb6\r\xc0\xbc\x0b?@\xa5Y\'@\xe7QG\xc0\x9e\x9e\x12\xc0<\xf8\x84\xbf\xe2\x9bx?x\xb54\xc0fk$\xbf\xdf\x85\xd1?NQ\xf2\xbf^\t\n@6\xd5\xc5?\x85\xf0\t@F\xf82>\x80 \x99?\xef\x0cp\xbf@f\x01?O\xdaZ\xbf\x87\xff\x8a\xbf\x1c\x82\xe2>:\xa2\xd8>\xf5\xa1*\xbf>\x19?\xbf:\xb1\xb6\xbf\x82@\x9f\xbf\xa1\xf2\x11\xc0\xba\x12S\xbe\xc5<"@\xdc\x90U\xc0\x1f\xa1.?\x7fcP?[K\xf4=\x89r\x82\xbf:@\x87\xbf\xe0z%\xbe\x12$\xba\xbf\xe0|\x9d\xbf\x9b\x9df\xbfH\x91\xdd\xbf0k\xf9>\x0c\xb3\x98>s\xb9\x8a?

## Initialization

In [2]:
# 모델, db RGB 채널 평균 & 표준편차 로드
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, (mean_, std_) = mocov2_model.load_model(model_path='./mocov2_best_model_231216.pth', mean_std_path='./dataset_mean_std.pkl', train=False)
model = model.to(device)
model.eval()

# 이미지 변환 정의
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean_, std_),
])

# db path 정의
db_path = './Image_db_final.db'

## Calculate Cosine Similarity

In [3]:
def fetch_feature_vectors(db_path, table_name):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute(f"SELECT id, feature_vector FROM {table_name}")
    data = cursor.fetchall()
    conn.close()
    return data

def calculate_cosine_similarity(vector1, vector2):
    vector1 = np.array(vector1).reshape(1, -1)
    vector2 = np.array(vector2).reshape(1, -1)
    similarity = cosine_similarity(vector1, vector2)
    return similarity[0][0]

def extract_feature_vector(image_path, model, transform, device):
    # 이미지 로드
    image = Image.open(image_path).convert('RGB')
    
    # 이미지 변환
    image = transform(image)
    
    # 배치 차원 추가
    image = image.unsqueeze(0).to(device)
    
    # 특징 벡터 추출
    with torch.no_grad():
        feature_vector = model(image)
    
    # GPU에서 CPU로 이동 및 Numpy 배열로 변환
    feature_vector = feature_vector.cpu().numpy().flatten()
    
    return feature_vector

def calculate_and_store_similarities(image_folder, db_path, table_name, model, transform, output_csv, device):
    # 데이터베이스에서 저장된 특징 벡터 가져오기
    data = fetch_feature_vectors(db_path, table_name)
    
    # CSV 파일을 덮어쓰는 모드로 열어 기존 내용을 비움
    with open(output_csv, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['image_name', 'db_row_ids', 'time_elapsed'])
        writer.writeheader()
    
    for image_name in os.listdir(image_folder):
        image_path = os.path.join(image_folder, image_name)
        
        if not os.path.isfile(image_path):
            continue
        
        start_time = time.time()
        
        # 주어진 이미지에서 특징 벡터 추출
        target_vector = extract_feature_vector(image_path, model, transform, device)
        
        similarities = []
        
        for row in data:
            row_id = row[0]
            stored_vector = np.frombuffer(row[1], dtype=np.float32)
            similarity = calculate_cosine_similarity(target_vector, stored_vector)
            similarities.append((row_id, similarity))
        
        # 코사인 유사도 값 기준으로 정렬
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # 상위 10개 결과 저장
        top_10_similarities = similarities[:10]
        elapsed_time = time.time() - start_time
        
        db_row_ids = ",".join([str(row_id) for row_id, _ in top_10_similarities])
        
        with open(output_csv, mode='a', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=['image_name', 'db_row_ids', 'time_elapsed'])
            writer.writerow({
                'image_name': image_name,
                'db_row_ids': db_row_ids,
                'time_elapsed': elapsed_time
            })

## Test with my images

In [22]:
# 사용 예시
db_path = './Image_db_final.db'
table_name = 'IMAGE'
image_folder = './ImagesForAnnoyTest'
output_csv = 'cosine_similarity_results.csv'

# 모델과 변환값 불러오기
model, (mean_, std_) = mocov2_model.load_model(model_path='./mocov2_best_model_231216.pth', mean_std_path='./dataset_mean_std.pkl', train=False)
model = model.to(device)
model.eval()

# 이미지 변환 정의
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean_, std_),
])

calculate_and_store_similarities(image_folder, db_path, table_name, model, test_transform, output_csv, device)

# Annoy

In [3]:
# 특징 벡터의 차원
f = 128

# Annoy 인덱스 생성
db_path = './Image_db_final.db'
annoy_index = AnnoyIndex(f, 'angular')
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# 특징 벡터 가져오기
cursor.execute('SELECT id, feature_vector FROM IMAGE')

# 특징 벡터를 ANN 인덱스에 추가
for row in cursor:
    id, feature_vector = row
    feature_vector = np.frombuffer(feature_vector, dtype=np.float32)
    annoy_index.add_item(id, feature_vector)

annoy_index.build(20)
annoy_index.save('./annoy_index_for_test.ann')

conn.close()

## Test for one image with Annoy

In [4]:
# 한 이미지에서 추출한 특징 벡터와 가장 유사할 확률이 높은 이미지 id 찾기

# 이미지 path
image_path = './ImagesForAnnoyTest/test1.jpg'

# 모델과 변환값 불러오기
model, (mean_, std_) = mocov2_model.load_model(model_path='./mocov2_best_model_231216.pth', mean_std_path='./dataset_mean_std.pkl', train=False)
model = model.to(device)
model.eval()

# 이미지 변환 정의
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean_, std_),
])

# 이미지 변환 및 특징 벡터 추출
target_vector = extract_feature_vector(image_path, model, test_transform, device)

In [7]:
f = 128  # 특징 벡터의 차원
t = AnnoyIndex(f, 'angular')
t.load('./annoy_index_for_test.ann')  # 인덱스 파일이 저장된 경로로 수정
n = 10 # 상위 n개
nearest_ids = t.get_nns_by_vector(target_vector.reshape(-1), n) # 가장 유사할 확률이 높은 이미지 id를 list로 저장

In [8]:
# result
nearest_ids

[32882, 113783, 35132, 57622, 47214, 41673, 35545, 164223, 67947, 61965]