# Model 2
Implement the model without the implementation of the weight.

In [50]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from PIL import Image
from pathlib import Path
import random

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225])
])

train_dataset = datasets.ImageFolder("data_example_rota/train", transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

from collections import defaultdict

class TripletDataset(Dataset):
    def __init__(self, image_folder_dataset):
        self.transform = image_folder_dataset.transform
        self.class_to_paths = defaultdict(list)
        self.data = []

        for path, class_idx in image_folder_dataset.imgs:
            class_name = image_folder_dataset.classes[class_idx]
            self.class_to_paths[class_name].append(path)
            self.data.append((class_name, path))

    def __getitem__(self, index):
        cls, anchor_path = self.data[index]
        positive_path = random.choice([p for p in self.class_to_paths[cls] if p != anchor_path])
        negative_cls = random.choice([c for c in self.class_to_paths if c != cls])
        negative_path = random.choice(self.class_to_paths[negative_cls])

        def load_img(p): return self.transform(Image.open(p).convert("RGB"))

        return load_img(anchor_path), load_img(positive_path), load_img(negative_path)

    def __len__(self):
        return len(self.data)

triplet_dataset = TripletDataset(train_dataset)
triplet_loader = DataLoader(triplet_dataset, batch_size=32, shuffle=True)

class SimpleImageDataset(Dataset):
    def __init__(self, folder_path, transform):
        self.image_paths = list(Path(folder_path).glob("*.jpg"))
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = Image.open(self.image_paths[idx]).convert("RGB")
        img = self.transform(img)
        return img, self.image_paths[idx].name

query_dataset = SimpleImageDataset("data_example_rota/test/query", transform)
gallery_dataset = SimpleImageDataset("data_example_rota/test/gallery", transform)

query_loader = DataLoader(query_dataset, batch_size=1, shuffle=False)
gallery_loader = DataLoader(gallery_dataset, batch_size=32, shuffle=False)



In [48]:
len(gallery_dataset)

6

In [39]:
# Setting the Dataloader
# We prepeare the data for the ML also setting the first hyperparmeter batch size
batch_size = 32

## Training data
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

## Test data
query_loader = DataLoader(query_dataset, batch_size=batch_size, shuffle=False)
gallery_loader = DataLoader(gallery_dataset, batch_size=batch_size, shuffle=False)

In [40]:
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F
from torchvision.models import ResNet50_Weights

class ResNetEmbedder(nn.Module):
    def __init__(self, embedding_dim=128):
        super().__init__()

        # Carica ResNet-50 pre-addestrata
        self.backbone = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        in_features = self.backbone.fc.in_features

        # Rimuove il classificatore originale
        self.backbone.fc = nn.Identity()

        # Proiezione nello spazio degli embedding
        self.embedding = nn.Linear(in_features, embedding_dim)

    def forward(self, x):
        x = self.backbone(x)           # output: [batch_size, 2048]
        x = self.embedding(x)          # output: [batch_size, embedding_dim]
        x = F.normalize(x, p=2, dim=1) # L2-normalizzazione
        return x


In [42]:
import torch
from torch.optim import Adam
from tqdm import tqdm

# Triplet loss
def triplet_loss(anchor, positive, negative, margin=1.0):
    pos_dist = F.pairwise_distance(anchor, positive, p=2)
    neg_dist = F.pairwise_distance(anchor, negative, p=2)
    return F.relu(pos_dist - neg_dist + margin).mean()


# Inizializza modello
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNetEmbedder(embedding_dim=128).to(device)

# Compila il modello per ottimizzazioni (solo se torch >= 2.0)
try:
    model = torch.compile(model, backend="aot_eager")
    print("Modello compilato con torch.compile()")
except AttributeError:
    print("torch.compile non disponibile (serve PyTorch >= 2.0)")


optimizer = Adam(model.parameters(), lr=1e-4)
epochs = 10

for epoch in range(epochs):
    model.train() #metto il modello
    running_loss = 0.0
    for anchors, positives, negatives in tqdm(triplet_loader, desc=f"Epoch {epoch+1}/{epochs}"):
        anchors = anchors.to(device)
        positives = positives.to(device)
        negatives = negatives.to(device)

        anchor_emb = model(anchors)
        positive_emb = model(positives)
        negative_emb = model(negatives)

        loss = triplet_loss(anchor_emb, positive_emb, negative_emb)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1} - Avg Loss: {running_loss / len(triplet_loader):.4f}")


Modello compilato con torch.compile()


Epoch 1/10: 100%|██████████| 3/3 [00:36<00:00, 12.00s/it]


Epoch 1 - Avg Loss: 0.7499


Epoch 2/10: 100%|██████████| 3/3 [00:36<00:00, 12.02s/it]


Epoch 2 - Avg Loss: 0.5194


Epoch 3/10: 100%|██████████| 3/3 [00:35<00:00, 11.99s/it]


Epoch 3 - Avg Loss: 0.2668


Epoch 4/10: 100%|██████████| 3/3 [00:30<00:00, 10.13s/it]


Epoch 4 - Avg Loss: 0.1043


Epoch 5/10: 100%|██████████| 3/3 [00:29<00:00,  9.95s/it]


Epoch 5 - Avg Loss: 0.0657


Epoch 6/10: 100%|██████████| 3/3 [00:31<00:00, 10.51s/it]


Epoch 6 - Avg Loss: 0.0126


Epoch 7/10: 100%|██████████| 3/3 [00:30<00:00, 10.15s/it]


Epoch 7 - Avg Loss: 0.0486


Epoch 8/10: 100%|██████████| 3/3 [00:29<00:00,  9.95s/it]


Epoch 8 - Avg Loss: 0.0111


Epoch 9/10: 100%|██████████| 3/3 [00:29<00:00,  9.79s/it]


Epoch 9 - Avg Loss: 0.0000


Epoch 10/10: 100%|██████████| 3/3 [00:29<00:00,  9.87s/it]

Epoch 10 - Avg Loss: 0.0047





In [43]:
def extract_embeddings(dataloader, model, device):
    model.eval()
    embeddings = []
    filenames = []

    with torch.no_grad():
        for images, names in dataloader:
            images = images.to(device)
            embs = model(images)
            embeddings.append(embs.cpu())
            filenames.extend(names)

    embeddings = torch.cat(embeddings)
    return embeddings, filenames

In [59]:
import torch
import torch.nn.functional as F
from tqdm import tqdm
import json
import os

@torch.no_grad()
def evaluate(query_loader, gallery_embeddings, gallery_paths, model, device, top_k=5, mapping_file=None):
    model.eval()
    results = []

    # Carica il mapping query → immagini corrette
    if mapping_file:
        with open(mapping_file, 'r') as f:
            query_mapping = {os.path.basename(entry["filename"]): set(os.path.basename(p) for p in entry["gallery_images"])
                             for entry in json.load(f)}
    else:
        query_mapping = {}

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(tqdm(query_loader, desc="Evaluating queries")):
            images = images.to(device)

            # Estrai embeddings del batch
            batch_embeddings = model(images)

            for i in range(images.size(0)):
                query_embedding = batch_embeddings[i]
                query_filename = os.path.basename(labels[i]) if isinstance(labels[i], str) else os.path.basename(labels[i][0])

                # Similarità con tutta la gallery
                similarities = F.cosine_similarity(query_embedding.unsqueeze(0), gallery_embeddings)

                # Ottieni top-k più simili
                top_indices = similarities.topk(top_k).indices
                top_paths = [os.path.basename(gallery_paths[idx]) for idx in top_indices]

                # Verifica immagini corrette secondo mapping
                true_gallery = query_mapping.get(query_filename, set())
                correct_count = sum([img in true_gallery for img in top_paths])
                total_true = len(true_gallery)

                print(f"Query #{len(results)+1} - {query_filename}")
                for j, path in enumerate(top_paths):
                    print(f"\tTop {j+1}: {path}")
                print(f"\tCorrect: {correct_count} / {total_true}")

                results.append({
                    "query": query_filename,
                    "top_k": top_paths,
                    "correct": correct_count,
                    "total_true": total_true
                })

    return results


evaluate(
    model=model,
    query_loader=query_loader,
    gallery_loader=gallery_loader,
    mapping_file="data_example_rota/query_to_gallery_mapping.json",
    device=device,
    topk=3
)


TypeError: evaluate() got an unexpected keyword argument 'gallery_loader'. Did you mean 'query_loader'?

In [52]:
for i, (img, label) in enumerate(query_loader):
    print(f"Query #{i} - shape: {img.shape}, label: {label}")

Query #0 - shape: torch.Size([1, 3, 224, 224]), label: ('n01855672_10973.jpg',)
Query #1 - shape: torch.Size([1, 3, 224, 224]), label: ('000002.jpg',)
