# 🔍 Image Similarity with Dynamic k-Nearest Neighbors

In [1]:
import torch
import torch.nn as nn
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import json
import os
from PIL import Image
from tqdm import tqdm


In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


In [4]:
def get_model(num_classes):
    model = models.resnet50(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model


In [5]:
def train_model(model, dataloader, epochs=5, lr=1e-4):
    model = model.to(device)
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}"):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Loss: {running_loss/len(dataloader):.4f}")
    return model


In [6]:
def get_feature_extractor(trained_model):
    feature_extractor = nn.Sequential(*list(trained_model.children())[:-1])
    feature_extractor.eval()
    return feature_extractor.to(device)


In [7]:
def extract_embeddings_from_folder(folder_path, model):
    image_paths = sorted([os.path.join(folder_path, fname)
                          for fname in os.listdir(folder_path)
                          if fname.lower().endswith(('.jpg', '.jpeg', '.png'))])

    all_embeddings = []
    filenames = []

    with torch.no_grad():
        for i in range(0, len(image_paths), 32):
            batch_paths = image_paths[i:i+32]
            imgs = [transform(Image.open(p).convert("RGB")) for p in batch_paths]
            imgs = torch.stack(imgs).to(device)
            vecs = model(imgs).squeeze(-1).squeeze(-1)
            all_embeddings.append(vecs.cpu())
            filenames.extend(batch_paths)

    return torch.cat(all_embeddings, dim=0).numpy(), filenames


In [11]:
def retrieve_query_vs_gallery_dynamic_cutoff(query_embs, query_files, gallery_embs, gallery_files, max_k=10, drop_threshold=0.10):
    """
    Include i vicini finché la differenza di similarità con il precedente non supera la soglia `drop_threshold`.
    """
    results = []
    sim_matrix = cosine_similarity(query_embs, gallery_embs)

    for i, query_path in enumerate(query_files):
        similarities = sim_matrix[i]
        sorted_indices = np.argsort(similarities)[::-1]
        sorted_sims = similarities[sorted_indices]

        selected = []
        for j in range(1, min(max_k, len(sorted_sims))):
            sim_prev = sorted_sims[j - 1]
            sim_curr = sorted_sims[j]
            selected.append(gallery_files[sorted_indices[j - 1]].replace("\\", "/"))

            if sim_prev - sim_curr > drop_threshold:
                break  # salto netto → ci fermiamo

        # Aggiungiamo l'ultimo solo se non ha causato il break
        if len(selected) < max_k:
            selected.append(gallery_files[sorted_indices[len(selected)]].replace("\\", "/"))

        results.append({
            "filename": query_path.replace("\\", "/"),
            "gallery_images": selected
        })

    return results


In [12]:
def save_submission(results, output_path):
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, "w") as f:
        json.dump(results, f, indent=2)


In [14]:
# === CONFIGURA I PERCORSI QUI ===
training_path = "C:/Users/utente/Desktop/UNITN/Intro to ML/ML-project/Examples/training"
query_path = "C:/Users/utente/Desktop/UNITN/Intro to ML/ML-project/Examples/test/query"
gallery_path = "C:/Users/utente/Desktop/UNITN/Intro to ML/ML-project/Examples/test/gallery"
submission_path = "C:/Users/utente/Desktop/UNITN/Intro to ML/ML-project/submission/submission.json"

# Step 1: Fine-tuning
train_dataset = datasets.ImageFolder(training_path, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
model = get_model(num_classes=len(train_dataset.classes))
model = train_model(model, train_loader, epochs=5)

# Step 2: Feature extraction
feature_extractor = get_feature_extractor(model)
query_embeddings, query_files = extract_embeddings_from_folder(query_path, feature_extractor)
gallery_embeddings, gallery_files = extract_embeddings_from_folder(gallery_path, feature_extractor)

# Step 3: Retrieval con soglia dinamica
submission = retrieve_query_vs_gallery_dynamic_cutoff(
    query_embeddings, query_files,
    gallery_embeddings, gallery_files,
    max_k=10,
    drop_threshold=0.05  # 10% di calo di similarità
)


# Step 4: Salvataggio nella repo
save_submission(submission, submission_path)
print(f"✅ Submission salvata in: {submission_path}")


Epoch 1/5: 100%|██████████| 2/2 [00:07<00:00,  3.90s/it]


Loss: 0.9085


Epoch 2/5: 100%|██████████| 2/2 [00:08<00:00,  4.27s/it]


Loss: 0.3457


Epoch 3/5: 100%|██████████| 2/2 [00:08<00:00,  4.12s/it]


Loss: 0.0995


Epoch 4/5: 100%|██████████| 2/2 [00:08<00:00,  4.04s/it]


Loss: 0.1160


Epoch 5/5: 100%|██████████| 2/2 [00:08<00:00,  4.06s/it]


Loss: 0.0303
✅ Submission salvata in: C:/Users/utente/Desktop/UNITN/Intro to ML/ML-project/submission/submission.json
