In [None]:
import os
import pandas as pd
import numpy as np
import torch
import cv2
from tqdm import tqdm
from torchvision.datasets.folder import default_loader
from torch.utils.data import Dataset, DataLoader
from transformers import ViTForImageClassification, ViTImageProcessor
from torchvision import transforms
from sklearn.model_selection import train_test_split
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import average_precision_score
from sklearn.utils import shuffle

In [None]:
# from google.colab import drive

# drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# !pip install git+https://github.com/openai/CLIP.git

# Read data from database

In [None]:
data_train_path = "/kaggle/input/vn-des-xla/DATASET_2/train"
data_dev_path = "/kaggle/input/vn-des-xla/DATASET_2/dev"
data_test_path = "/kaggle/input/vn-des-xla/DATASET_2/test"

In [None]:
data_train_path = "/content/drive/MyDrive/train"
data_dev_path = "/content/drive/MyDrive/dev"
data_test_path = "/content/drive/MyDrive/test"

In [None]:
def read_data(data_path):
  dct = {
      'image_path': [],
      'label': []
  }
  for folder in os.listdir(data_path):
    folder_path = data_path + "/" + folder
    for img in os.listdir(folder_path):
      image_path = folder_path + "/" + img
      dct['image_path'].append(image_path)
      dct['label'].append(folder)
      df_ = pd.DataFrame(dct)
  return df_

In [None]:
dct_train = read_data(data_train_path)
dct_dev = read_data(data_dev_path)
dct_test = read_data(data_test_path)

In [None]:
df_combined = pd.concat([dct_train, dct_dev], ignore_index=True)
df_combined = shuffle(df_combined)

In [None]:
df_combined['label'].value_counts()

label
18    139
19    135
15    122
6     122
3     119
17    117
14    117
4     117
12    117
13    117
0     117
8     114
16    113
9     113
11    112
10    112
1     111
7     110
2     109
5     103
Name: count, dtype: int64

In [None]:
df_combined.to_csv('df_combined.csv', index = False)

# Load model

In [None]:
import torch
from transformers import ViTModel, ViTImageProcessor
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from torchvision import models, transforms
import pandas as pd

# Kiểm tra GPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

## Loading Resnet50 model

In [None]:
# 1. Load mô hình ResNet-50
model_type = "Resnet50"
resnet_model = models.resnet50(pretrained=True)
resnet_model.fc = torch.nn.Identity()  # Loại bỏ lớp FC để lấy đặc trưng
resnet_model.to(device)
resnet_model.eval()

# 2. Xử lý ảnh
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize ảnh về kích thước 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 197MB/s]


## Loading ViT model

In [None]:

model_type = "ViT"
# 1. Load mô hình ViT
vit_model = ViTModel.from_pretrained("google/vit-base-patch16-224")
vit_model.to(device)
vit_model.eval()

# 2. Xử lý ảnh
img_feature_extractor = ViTImageProcessor.from_pretrained("google/vit-base-patch16-224")
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize ảnh về kích thước 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=img_feature_extractor.image_mean, std=img_feature_extractor.image_std)
])


config.json:   0%|          | 0.00/69.7k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


preprocessor_config.json:   0%|          | 0.00/160 [00:00<?, ?B/s]

## Loading CLIP

In [None]:
# 1. Load mô hình ResNet-50
from transformers import CLIPProcessor, CLIPModel
model_type = "CLIP"

# Load model và processor
model_name = "openai/clip-vit-base-patch32"
clip_model = CLIPModel.from_pretrained(model_name)
clip_processor = CLIPProcessor.from_pretrained(model_name)

config.json:   0%|          | 0.00/4.19k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/862k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.22M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]



## Loading BLIP

In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration

In [None]:
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cuda")

preprocessor_config.json:   0%|          | 0.00/287 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/506 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/711k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.56k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/990M [00:00<?, ?B/s]

In [None]:
model_type = "BLIP"

# Auto Enhance image

In [None]:
def check_noise(img, threshold = 100):
    gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    sobel_x = cv2.Sobel(gray_img, cv2.CV_64F, 1, 0, ksize=3)
    sobel_y = cv2.Sobel(gray_img, cv2.CV_64F, 0, 1, ksize=3)

    gradient_magnitude = np.sqrt(sobel_x**2 + sobel_y**2)
    noise_level = np.mean(gradient_magnitude)

    is_noisy  = noise_level > threshold
    return is_noisy, noise_level

def check_brightness(img, dim = 10, bright_thresh = 0.9, dark_thresh = 0.4):
    img = cv2.resize(img, (dim,dim))
    L, A, B = cv2.split(cv2.cvtColor(img, cv2.COLOR_RGB2Lab))
    L = L/np.max(L)
    avg_brightness = np.mean(L)

    too_dark = avg_brightness < dark_thresh
    too_bright = avg_brightness > bright_thresh

    return too_dark, too_bright, avg_brightness

def check_blurring(img, threshold = 500):
    gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    blur_level = cv2.Laplacian(gray_img, cv2.CV_64F).var()
    is_blur = blur_level < threshold
    return is_blur, blur_level

def check_contrast(img, threshold = 50.0):
    gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    contrast = np.std(gray_img)

    is_low_contrast  = (threshold - contrast ) > 15
    is_high_contrast = contrast > threshold
    return is_low_contrast, is_high_contrast, contrast

def auto_denoising(img):
    # image_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return cv2.fastNlMeansDenoisingColored(img,None,3,8,7,21)

def auto_adjust_brightness(img: cv2, avg_brightness: float, state: int):
    # state = 1: too dark, 2: too bright
    if state == 1:
        target_brightness = 0.5
    elif state == 2:
        target_brightness = 0.8

    ratio = avg_brightness / target_brightness
    adjusted_img = cv2.convertScaleAbs(img, alpha = 1 / ratio, beta = 0)

    return adjusted_img

def auto_sharpening(img: cv2, target_sharpness = 500):
    kernel = np.array([[0,-1,0],
                        [-1,5,-1],
                        [0,-1,0]])
    max_iter = 5
    for i in range(max_iter):
        _, sharpness = check_blurring(img)
        # print(f"Iteration {i+1}: Sharpness = {sharpness}")
        if sharpness >= target_sharpness:
            return img
        img = cv2.filter2D(img,-1,kernel)
    return img

def auto_adjust_contrast(img: cv2, current_contrast: float):
    target_contrast = 50.0
    if abs(current_contrast - target_contrast) >= 10:
        scale = 1.0
    elif abs(current_contrast - target_contrast) < 10:
        scale = 0.5

    adjustment_factor = min(1.0, max(0.0, scale * (target_contrast - current_contrast) / target_contrast))
    # print(adjustment_factor)
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)

    l, a, b = cv2.split(lab)

    clahe = cv2.createCLAHE(clipLimit= 4.0 * adjustment_factor, tileGridSize=(8, 8))
    l_eq = clahe.apply(l)

    lab_eq = cv2.merge((l_eq, a, b))

    result = cv2.cvtColor(lab_eq, cv2.COLOR_LAB2BGR)
    return result

def auto_enhancing(img_path):
    # cv2 đọc ảnh
    # kiểm tra các đk, nếu đk nào k thỏa thì adjust

    cv2_img = cv2.imread(img_path)
    cv2_img = cv2.cvtColor(cv2_img, cv2.COLOR_BGR2RGB)

    is_noise, noise_level = check_noise(cv2_img)
    if is_noise:
        cv2_img = auto_denoising(cv2_img)

    is_dark, is_bright, avg_brightness = check_brightness(cv2_img)
    if is_dark or is_bright:
        if is_dark:
            cv2_img = auto_adjust_brightness(cv2_img,avg_brightness,1)
        else:
            cv2_img = auto_adjust_brightness(cv2_img,avg_brightness,2)

    is_low_contrast, is_high_contrast,contrast_lvl = check_contrast(cv2_img)
    if is_low_contrast:
        cv2_img = auto_adjust_contrast(cv2_img, contrast_lvl)



    is_blur, blurring_lvl = check_blurring(cv2_img)
    if is_blur:

        cv2_img = auto_sharpening(cv2_img)


    return cv2_img

# Extract Feature

In [None]:
if model_type == "ViT":
  model = vit_model
elif model_type == "Resnet50":
  model = resnet_model
elif model_type == "CLIP":
  model = clip_model
elif model_type == "BLIP":
  model = blip_model

In [None]:

# 3. Dataset tùy chỉnh
class ImageDataset(Dataset):
    def __init__(self, image_paths, labels=None, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx] if self.labels is not None else -1
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

class ImageEnhancedDataset(Dataset):
    def __init__(self, image_paths, labels=None, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx] if self.labels is not None else -1

        # image = Image.open(img_path).convert("RGB")
        enhanced_image = auto_enhancing(img_path)
        enhanced_image = Image.fromarray(enhanced_image)
        if self.transform:
            image = self.transform(enhanced_image)
        return image, label


# 4. Trích xuất đặc trưng từ ViT
def extract_features_vit(dataloader, model):
    features = []
    labels = []
    with torch.no_grad():
        for images, lbls in dataloader:
            images = images.to(device)
            outputs = model(pixel_values=images)
            cls_features = outputs.last_hidden_state[:, 0, :]  # Lấy vector CLS
            features.append(cls_features.cpu())
            labels.extend(lbls)
    return torch.cat(features), labels

# 4. Trích xuất đặc trưng từ ResNet-50
def extract_features_resnet50(dataloader, model):
    features = []
    labels = []
    with torch.no_grad():
        for images, lbls in dataloader:
            images = images.to(device)
            outputs = model(images)  # Lấy đặc trưng
            features.append(outputs.cpu())
            labels.extend(lbls)
    return torch.cat(features), labels



# 5. Tìm ảnh tương tự
def find_top_k_similar_vit(query_image_path, dataset_features, dataset_labels, model, transform, k=5):
    query_image = Image.open(query_image_path).convert("RGB")
    query_image = transform(query_image).unsqueeze(0).to(device)
    with torch.no_grad():
        query_feature = model(pixel_values=query_image).last_hidden_state[:, 0, :].cpu()

    similarities = cosine_similarity(query_feature.numpy(), dataset_features.numpy())[0]
    top_k_indices = np.argsort(similarities)[-k:][::-1]

    return [(i, dataset_labels[i], similarities[i]) for i in top_k_indices]


def find_top_k_similar_resnet50(query_image_path, dataset_features, dataset_labels, model, transform, k=5):
    query_image = Image.open(query_image_path).convert("RGB")
    query_image = transform(query_image).unsqueeze(0).to(device)
    with torch.no_grad():
        query_feature = model(query_image).cpu()

    similarities = cosine_similarity(query_feature.numpy(), dataset_features.numpy())[0]
    top_k_indices = np.argsort(similarities)[-k:][::-1]

    return [(i, dataset_labels[i], similarities[i]) for i in top_k_indices]


# 6. Demo sử dụng với DataFrame
def load_data_from_dataframe(df, image_column, label_column, transform):
    image_paths = df[image_column].tolist()
    labels = df[label_column].tolist()
    dataset = ImageDataset(image_paths, labels, transform=transform)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=False)
    return dataloader




In [None]:
# Trích xuất đặc trưng
dataloader = load_data_from_dataframe(df_combined, 'image_path', 'label', transform)
dataset_features, dataset_labels = extract_features_vit(dataloader, vit_model)

In [None]:
dataset_features_vit = dataset_features
dataset_labels_vit = dataset_labels

In [None]:
dataset_labels_vit[:5]

['4', '14', '5', '1', '14']

In [None]:
torch.save(dataset_features_vit, '/kaggle/working/vit_dataset_features.pt')
torch.save(dataset_labels_vit, '/kaggle/working/vit_dataset_labels.pt')

In [None]:
dataset_features = torch.load('/kaggle/input/vit-dataset/vit_dataset_features.pt')
dataset_labels = torch.load('/kaggle/input/vit-dataset/vit_dataset_labels.pt')

## CLIP retrieval

In [None]:
from PIL import Image
import torch

def preprocess_image_clip(image_path):
    image = Image.open(image_path)
    return clip_processor(images=image, return_tensors="pt", padding=True)

In [None]:
all_image_features = []

for path in df_combined["image_path"]:
    inputs = preprocess_image_clip(path)
    with torch.no_grad():
        image_features = clip_model.get_image_features(**inputs)
        normalized_features = image_features / image_features.norm(dim=-1, keepdim=True)  # Normalize
        all_image_features.append(normalized_features)

# Combine all features into a tensor
all_image_features = torch.cat(all_image_features, dim=0)



In [None]:
dataset_features_clip = all_image_features
dataset_labels_clip = df_combined['label'].tolist()

In [None]:
dataset_features_clip.shape

torch.Size([2336, 512])

In [None]:
dataset_labels_clip[:5]

['4', '14', '5', '1', '14']

In [None]:
torch.save(dataset_features, '/kaggle/working/clip_dataset_features.pt')
torch.save(dataset_labels, '/kaggle/working/clip_dataset_labels.pt')

In [None]:
def find_top_k_similar_clip(query_image_path, dataset_features, dataset_labels, model, transform, k=5):
    query_inputs = preprocess_image_clip(query_image_path)
    with torch.no_grad():
        query_feature = model.get_image_features(**query_inputs)
        query_feature = query_feature / query_feature.norm(dim=-1, keepdim=True)

    similarity = torch.matmul(query_feature, dataset_features.T)  # Cosine similarity
    retrieved_indices = similarity.topk(k, dim=1).indices.tolist()[0]
    similarities = similarity.topk(k, dim=1).values.tolist()[0]

    return [(indice, dataset_labels[indice], sml) for indice, sml in zip(retrieved_indices,similarities)]

In [None]:
# Preprocess and encode a query image
query_image_path = "/kaggle/input/vn-des-xla/DATASET_2/test/0/img_0_102.jpg"
query_inputs = preprocess_image_clip(query_image_path)
with torch.no_grad():
    query_feature = clip_model.get_image_features(**query_inputs)
    query_feature = query_feature / query_feature.norm(dim=-1, keepdim=True)  # Normalize

# Compute similarity
similarity = torch.matmul(query_feature, dataset_features_clip.T)  # Cosine similarity
top_k = 5  # Number of similar images to retrieve
retrieved_indices = similarity.topk(top_k, dim=1).indices

print("Retrieved indices:", retrieved_indices)

Retrieved indices: tensor([[1286,  228, 1921,  462, 1498]])


In [None]:
[dataset_labels[i] for i in retrieved_indices.tolist()[0]]

['0', '0', '0', '0', '0']

### enhanced dataset

In [None]:
def preprocess_image(image_path):
    # image = Image.open(image_path)
    enhanced_image = auto_enhancing(image_path)
    enhanced_image = Image.fromarray(enhanced_image)
    return processor(images=enhanced_image, return_tensors="pt", padding=True)
all_image_features = []

for path in dct_test["image_path"]:
    inputs = preprocess_image(path)
    with torch.no_grad():
        image_features = model.get_image_features(**inputs)
        normalized_features = image_features / image_features.norm(dim=-1, keepdim=True)  # Normalize
        all_image_features.append(normalized_features)

# Combine all features into a tensor
all_image_enhanced_features = torch.cat(all_image_features, dim=0)

## BLIP retrieval

In [None]:
def preprocess_image_blip(image_path):
    image = Image.open(image_path).convert("RGB")
    return blip_processor(images=image, return_tensors="pt").to(device)

In [None]:
from PIL import Image
import torch


all_image_features = []

for path in df_combined["image_path"]:
    inputs = preprocess_image_blip(path)
    with torch.no_grad():
        img_features = blip_model.vision_model(inputs["pixel_values"])[1]
        normalized_features = img_features / img_features.norm(dim=-1, keepdim=True)  # Normalize

        all_image_features.append(normalized_features)

# Combine all features into a tensor
all_image_features = torch.cat(all_image_features, dim=0)



In [None]:
all_image_features.shape

torch.Size([2336, 768])

In [None]:
dataset_features_blip = all_image_features
dataset_labels_blip = df_combined['label'].tolist()

In [None]:
torch.save(dataset_features, '/kaggle/working/blip_dataset_features.pt')
torch.save(dataset_labels, '/kaggle/working/blip_dataset_labels.pt')

In [None]:
dataset_labels_blip[:5]

['4', '14', '5', '1', '14']

In [None]:
# Preprocess and encode a query image
query_image_path = "/kaggle/input/vn-des-xla/DATASET_2/test/11/img_11_107.jpg"
query_inputs = preprocess_image_blip(query_image_path)
with torch.no_grad():
    query_feature = blip_model.vision_model(query_inputs["pixel_values"])[1]
    query_feature = query_feature / query_feature.norm(dim=-1, keepdim=True)  # Normalize

# Compute similarity
similarity = torch.matmul(query_feature, dataset_features_blip.T)  # Cosine similarity
top_k = 5  # Number of similar images to retrieve
retrieved_indices = similarity.topk(top_k, dim=1).indices

print("Retrieved indices:", retrieved_indices)

Retrieved indices: tensor([[ 819, 2173, 1646,  949, 1933]], device='cuda:0')


In [None]:
[dataset_labels[i] for i in retrieved_indices.tolist()[0]]

['11', '11', '11', '11', '11']

In [None]:
def find_top_k_similar_blip(query_image_path, dataset_features, dataset_labels, model, transform, k=5):
    query_inputs = preprocess_image_blip(query_image_path)
    with torch.no_grad():
        query_feature = model.vision_model(query_inputs["pixel_values"])[1]
        query_feature = query_feature / query_feature.norm(dim=-1, keepdim=True)


    similarity = torch.matmul(query_feature, dataset_features.T)  # Cosine similarity
    # top_k = 5  # Number of similar images to retrieve
    retrieved_indices = similarity.topk(k, dim=1).indices.tolist()[0]
    similarities = similarity.topk(k, dim=1).values.tolist()[0]

    return [(indice, dataset_labels[indice], sml) for indice, sml in zip(retrieved_indices,similarities)]

### enhanced dataset

In [None]:
def preprocess_image(image_path):
    # image = Image.open(image_path).convert("RGB")
    enhanced_image = auto_enhancing(image_path)
    enhanced_image = Image.fromarray(enhanced_image)
    # return processor(images=image, return_tensors="pt", padding=True)
    return processor(images=enhanced_image, return_tensors="pt").to(device)

all_image_features = []

for path in df_combined["image_path"]:
    inputs = preprocess_image(path)
    with torch.no_grad():
        # image_features = model(inputs, mode='image')[0,:]
        # image_features = model.get_image_features(**inputs)
        img_features = model.vision_model(inputs["pixel_values"])[1]
        normalized_features = img_features / img_features.norm(dim=-1, keepdim=True)  # Normalize

        all_image_features.append(normalized_features)

# Combine all features into a tensor
all_image_features = torch.cat(all_image_features, dim=0)

# Test query for input image

In [None]:
# Tìm top 5 ảnh tương tự cho một query ảnh
query_image_path = "/kaggle/input/vn-des-xla/DATASET_2/test/11/img_11_107.jpg"
top_k_results = find_top_k_similar_vit(query_image_path, dataset_features, dataset_labels, model, transform, k=50)

# In kết quả
for idx, label, similarity in top_k_results:
    print(f"Image Index: {idx}, Label: {label}, Similarity: {similarity:.4f}")

Image Index: 2182, Label: 11, Similarity: 0.7714
Image Index: 498, Label: 11, Similarity: 0.7069
Image Index: 2059, Label: 11, Similarity: 0.6970
Image Index: 1454, Label: 11, Similarity: 0.6834
Image Index: 628, Label: 11, Similarity: 0.6758
Image Index: 1217, Label: 11, Similarity: 0.6684
Image Index: 837, Label: 11, Similarity: 0.6673
Image Index: 1308, Label: 11, Similarity: 0.6625
Image Index: 225, Label: 11, Similarity: 0.6546
Image Index: 455, Label: 11, Similarity: 0.6341
Image Index: 1578, Label: 11, Similarity: 0.6243
Image Index: 1351, Label: 11, Similarity: 0.6202
Image Index: 403, Label: 11, Similarity: 0.6192
Image Index: 800, Label: 11, Similarity: 0.6156
Image Index: 69, Label: 4, Similarity: 0.6095
Image Index: 224, Label: 11, Similarity: 0.6083
Image Index: 935, Label: 11, Similarity: 0.6081
Image Index: 1210, Label: 11, Similarity: 0.5964
Image Index: 1093, Label: 11, Similarity: 0.5889
Image Index: 2149, Label: 11, Similarity: 0.5847
Image Index: 381, Label: 11, Sim

# Evaluation

## mAP

In [None]:
# Hàm tính mAP@K cho từng truy vấn
def compute_map_at_k_per_query(query_results, ground_truth, k):
    """
    Tính điểm Mean Average Precision (mAP@K) cho các kết quả truy vấn của mô hình, riêng lẻ cho từng truy vấn.

    Parameters:
    - query_results (list of tuples): Mỗi tuple chứa (index, label, similarity) của kết quả tìm kiếm cho một truy vấn.
    - ground_truth (list): Các nhãn thực của tập dữ liệu.
    - k (int): Số lượng ảnh hàng đầu cần xem xét.

    Returns:
    - mAP@K: Điểm Mean Average Precision tại K.
    """
    all_ap = []
    for idx, label, _ in query_results[:k]:
        if ground_truth == label:
            all_ap.append(1)
        else:
            all_ap.append(0)

    # Tính Precision at K cho từng truy vấn
    precision_at_k = np.cumsum(all_ap) / (np.arange(1, k + 1))
    mAP_at_k = np.mean(precision_at_k) if all_ap else 0.0  # Đảm bảo mAP@K không báo lỗi khi không có ảnh hàng đầu nào phù hợp

    return mAP_at_k


In [None]:
def get_query_result(k,dataset_features,dataset_labels, model,  find_top_k_similar_img, transform = None):
  query_results_per_query = []
  for image_path in tqdm(df_combined["image_path"], 'get query results: '):
    top_k_results = find_top_k_similar_img(image_path, dataset_features,dataset_labels, model,transform, k)
    query_result = []
    for idx, label, similarity in top_k_results:
      query_result.append((idx, label, similarity))
    query_results_per_query.append(query_result)
  return query_results_per_query

In [None]:
def calculate_map(k,query_results_per_query, dataset_labels):
    # Tính toán mAP@K cho từng truy vấn
    mAP_at_k_per_query = []
    i = 0
    for query_results in query_results_per_query:
        mAP_at_k_per_query.append(compute_map_at_k_per_query(query_results, dataset_labels[i], k))
        i += 1  # Tăng `i` lên 1 đơn vị
    mean_mAP_at_k = np.mean(mAP_at_k_per_query)
    print(f"Mean mAP@{k} across all queries: {mean_mAP_at_k}")

In [None]:
def evaluation(query_results_per_query, dataset_labels):
  for k in [1,5,10,50]:
    calculate_map(k, query_results_per_query, dataset_labels)

### For Resnet50 model

In [None]:
query_results_per_query = get_query_result(50,find_top_k_similar_resnet50, transform)

get query results: 100%|██████████| 2336/2336 [02:35<00:00, 15.03it/s]


In [None]:
evaluation(query_results_per_query)

Mean mAP@1 across all queries: 1.0
Mean mAP@5 across all queries: 0.9607006278538813
Mean mAP@10 across all queries: 0.9352796294031311
Mean mAP@50 across all queries: 0.8241809994524064


### For ViT model

In [None]:
query_results_per_query = get_query_result(50,find_top_k_similar_vit, transform)

get query results: 100%|██████████| 2336/2336 [02:07<00:00, 18.36it/s]


In [None]:
evaluation(query_results_per_query)

Mean mAP@1 across all queries: 1.0
Mean mAP@5 across all queries: 0.9795305365296804
Mean mAP@10 across all queries: 0.964472898320287
Mean mAP@50 across all queries: 0.8989237959866632


### CLIP

In [None]:
query_results_per_query = get_query_result(50,find_top_k_similar_clip)

get query results: 100%|██████████| 2336/2336 [03:43<00:00, 10.46it/s]


In [None]:
evaluation(query_results_per_query)

Mean mAP@1 across all queries: 1.0
Mean mAP@5 across all queries: 0.9881892123287671
Mean mAP@10 across all queries: 0.9799605043215918
Mean mAP@50 across all queries: 0.9388124350536626


### BLIP

In [None]:
query_results_per_query = get_query_result(50,find_top_k_similar_blip)

get query results: 100%|██████████| 2336/2336 [01:56<00:00, 19.97it/s]


In [None]:
evaluation(query_results_per_query)

Mean mAP@1 across all queries: 1.0
Mean mAP@5 across all queries: 0.984761700913242
Mean mAP@10 across all queries: 0.9739221162209177
Mean mAP@50 across all queries: 0.9159213280886457


### voting model (vit, clip, blip)

In [None]:
from collections import defaultdict

đ bảo số lần x hiện và rank

In [None]:
def voting(rankings, k):
    scores = defaultdict(int)
    for ranking in rankings:
        for rank, item in enumerate(ranking):
            scores[item] += k - rank
    final_ranking = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    # print(final_ranking)
    return [item[0] for item in final_ranking[:k]]

In [None]:
def get_ranking_list(query_idx,k,vit_result, clip_result, blip_result):
    ranking_list = []
    for result_list in [vit_result,clip_result,blip_result]:
        ranking_list.append([result[0] for result in result_list[query_idx][:k]])
    return ranking_list

In [None]:
# Hàm tính mAP@K cho từng truy vấn
def compute_map_at_k_per_query_voting(voting_results, ground_truth, dataset_labels, k):
    labels_of_voting_results = [dataset_labels[i] for i in voting_results]
    # print(labels_of_voting_results)
    all_ap = []
    for label in labels_of_voting_results:
        if ground_truth == label:
            all_ap.append(1)
        else:
            all_ap.append(0)
    # print(all_ap)
    # Tính Precision at K cho từng truy vấn
    precision_at_k = np.cumsum(all_ap) / (np.arange(1, k + 1))
    mAP_at_k = np.mean(precision_at_k) if all_ap else 0.0  # Đảm bảo mAP@K không báo lỗi khi không có ảnh hàng đầu nào phù hợp

    return mAP_at_k


In [None]:
compute_map_at_k_per_query_voting(temp,'4',dataset_labels_vit,5)

['14', '4', '4', '4', '4']
[0, 1, 1, 1, 1]


0.5433333333333333

In [None]:
def calculate_map_voting(k,query_results_per_query_vit, query_results_per_query_clip, query_results_per_query_blip, dataset_labels):
    # Tính toán mAP@K cho từng truy vấn

    mAP_at_k_per_query = []
    i = 0
    for query_idx in range(len(query_results_per_query_vit)):
        ranking_list = get_ranking_list(query_idx,k , query_results_per_query_vit,query_results_per_query_clip,query_results_per_query_blip)
        voting_result = voting(ranking_list, k)

        # cầm cái voting của 3 model đi tính map_at_k_per_query
        # mAP_at_k_per_query.append(compute_map_at_k_per_query(query_results, dataset_labels[i], k))
        mAP_at_k_per_query.append(compute_map_at_k_per_query_voting(voting_result, dataset_labels[i], dataset_labels, k))
        i += 1  # Tăng `i` lên 1 đơn vị
    mean_mAP_at_k = np.mean(mAP_at_k_per_query)
    print(f"Mean mAP@{k} across all queries: {mean_mAP_at_k}")

In [None]:
def evaluation_voting(query_results_per_query_vit, query_results_per_query_clip, query_results_per_query_blip, dataset_labels_vit):
  for k in [1,5,10,50]:
    calculate_map_voting(k, query_results_per_query_vit, query_results_per_query_clip, query_results_per_query_blip, dataset_labels_vit)

In [None]:
evaluation_voting(query_results_per_query_vit, query_results_per_query_clip, query_results_per_query_blip, dataset_labels_vit)

Mean mAP@1 across all queries: 1.0
Mean mAP@5 across all queries: 0.9872845319634703
Mean mAP@10 across all queries: 0.9803316447869102
Mean mAP@50 across all queries: 0.945615608066847


In [None]:
query_results_per_query_vit = get_query_result(50,dataset_features_vit, dataset_labels_vit,vit_model, find_top_k_similar_vit, transform)

get query results: 100%|██████████| 2336/2336 [02:02<00:00, 19.12it/s]


In [None]:
torch.save(query_results_per_query_vit, '/kaggle/working/query_results_vit.pt')

In [None]:
query_results_per_query_clip = get_query_result(50,dataset_features_clip, dataset_labels_clip,clip_model, find_top_k_similar_clip, None)

get query results: 100%|██████████| 2336/2336 [03:46<00:00, 10.33it/s]


In [None]:
torch.save(query_results_per_query_clip, '/kaggle/working/query_results_clip.pt')

In [None]:
query_results_per_query_blip = get_query_result(50,dataset_features_blip, dataset_labels_blip,blip_model, find_top_k_similar_blip, None)

get query results: 100%|██████████| 2336/2336 [01:50<00:00, 21.16it/s]


In [None]:
torch.save(query_results_per_query_blip, '/kaggle/working/query_results_blip.pt')

## recall

In [None]:
label_counts = df_combined.groupby('label').size()
type(label_counts)

In [None]:
label_counts

Unnamed: 0_level_0,0
label,Unnamed: 1_level_1
0,117
1,111
10,112
11,112
12,117
13,117
14,117
15,122
16,113
17,117


In [None]:
label_counts.loc['7']

110

In [None]:
def compute_recall_at_k_per_query(query_results, ground_truth, k):

    retrieved_labels = [label for _, label, _ in query_results[:k]]
    relevant_count = sum(1 for label in retrieved_labels if label == ground_truth)

    total_relevant = label_counts.loc[ground_truth]
    print(f"{ground_truth}, {total_relevant}" )

    recall_at_k = relevant_count / total_relevant if total_relevant > 0 else 0.0

    return recall_at_k

In [None]:
def get_query_result(k, find_top_k_similar_img, transform=None):
    query_results_per_query = []
    for image_path in tqdm(df_combined["image_path"], 'get query results: '):
        top_k_results = find_top_k_similar_img(image_path, all_image_features, df_combined['label'], model, transform, k)

        query_result = []
        for idx, label, similarity in top_k_results:
            query_result.append((idx, label, similarity))
        query_results_per_query.append(query_result)
    return query_results_per_query

In [None]:
def calculate_recall(k, query_results_per_query):
    recall_at_k_per_query = []
    i = 0
    for query_results in query_results_per_query:
        recall_at_k_per_query.append(compute_recall_at_k_per_query(query_results, df_combined['label'][i], k))
        i += 1  # Tăng `i` lên 1 đơn vị
    mean_recall_at_k = np.mean(recall_at_k_per_query)

    print(f"Mean Recall@{k} across all queries: {mean_recall_at_k}")

In [None]:
def recall_evaluation(find_top_k_similar_img, transform=None):
    query_results_per_query = get_query_result(50, find_top_k_similar_img, transform)

    for k in [1, 5, 10, 50]:
        calculate_recall(k, query_results_per_query)

In [None]:
recall_evaluation(find_top_k_similar_clip)

In [None]:
with open('array.pkl', 'wb') as f:
    pickle.dump(array, f)