In [23]:
import torch
import torch.nn as nn
from torchvision import models, datasets, transforms
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.utils.data import Dataset
from PIL import Image
import os
from utils.training import prepare_model_for_training

In [24]:
train_dir = '../../data/training'
gallery_dir = '../../data/test/gallery'
query_dir = '../../data/test/query'

In [25]:
class ImageOnlyDataset(Dataset):
    # class constructor
    def __init__(self, image_dir, transform=None):
        # get image directory
        self.image_dir = image_dir
        # get filenames by joining the image_dir path with fname where fname is every 
        self.image_paths = [os.path.join(image_dir, fname) for fname in os.listdir(image_dir)
                            if fname.lower().endswith(('.png', '.jpg', '.jpeg'))]
        # specify the transformations to be applied
        self.transform = transform

    def __len__(self):
        # specify this to then be able to use the len() function on objects of this class
        return len(self.image_paths)

    def __getitem__(self, idx):
        # specify how to get the path of the image by extacting image from the above-specified list
        img_path = self.image_paths[idx]
        # open image and convert it to RGB mode (not grayscale, not anything else)
        image = Image.open(img_path).convert("RGB")
        # if transformations have been specified at the step above (i.e., not None)
        if self.transform:
            # apply transformations to the image
            image = self.transform(image)
        # at the end, return both the image as a PIL instance and an img_path to find it later in the folders
        return image, img_path  # Return path so you can match later


In [26]:
# Load the pretrained ResNet-18 model
model = models.resnet18(weights="IMAGENET1K_V1") 

In [27]:
model = prepare_model_for_training(model, finetune=False)

In [28]:
# move the model to the mps to use GPU
device = torch.device("mps" if torch.mps.is_available() else "cpu")
model = model.to(device)

# Data preparation

In [29]:
# Define the transformations: resize, normalize, and convert to tensor
transform = transforms.Compose([
    # perform data augmentation: flip the image horizontally
    transforms.RandomHorizontalFlip(),
    # rotate the image by 45 degrees
    transforms.RandomRotation(45),
    # convert the image to a tensor
    transforms.ToTensor(),
    # reshape the tensor to have two dimensions
    transforms.Resize((224, 224)),  # Adjust to your image size
    # transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained weights normalization
])


In [30]:
# Load the dataset using ImageFolder
# train_dataset = datasets.ImageFolder(root = train_dir, transform=transform)

In [31]:
from torchvision import transforms
from torch.utils.data import DataLoader

# create an instance of the ImageOnlyDataset by specifying the transformations
gallery_dataset = ImageOnlyDataset(gallery_dir, transform=transform)
query_dataset = ImageOnlyDataset(query_dir, transform=transform)

# creates a data loader to load images in batches
gallery_loader = DataLoader(gallery_dataset, batch_size=32, shuffle=False)
query_loader = DataLoader(query_dataset, batch_size=32, shuffle=False)


In [32]:
# # Create DataLoader for batching
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# # val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Model

In [33]:

# class ResNetEmbedding(nn.Module):
#     # class constructor taking the resnet model and the dimension of the embeddings as input
#     def __init__(self, resnet_model, embedding_dim):
#         super(ResNetEmbedding, self).__init__()
#         self.resnet = resnet_model

#         # Get the number of input features to the last fully connected layer
#         in_features = self.resnet.fc.in_features

#         # Replace the final fully connected layer with an identity layer
#         self.resnet.fc = nn.Identity()

#         # Define a new fully connected layer for embedding
#         self.fc = nn.Linear(in_features, embedding_dim)

#     def forward(self, x):
#         x = self.resnet(x)  # Forward pass through ResNet backbone (up to before the classification layer)
#         x = self.fc(x)      # Pass through the embedding layer
#         return x

# # Define the embedding dimension (e.g., 128)
# embedding_dim = 128

# # Load ResNet18 with pretrained weights
# model = models.resnet18(weights="IMAGENET1K_V1")

# # Create the custom model
# model = ResNetEmbedding(model, embedding_dim)

# # Move to device (GPU or CPU)
# device = torch.device("mps" if torch.mps.is_available() else "cpu")
# model = model.to(device)

In [34]:
import torch
from torch.utils.data import Dataset
import random
from torchvision import datasets, transforms
from PIL import Image

class TripletDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the class folders.
            transform (callable, optional): Optional transform to be applied on an image.
        """
        self.dataset = datasets.ImageFolder(root_dir, transform=transform)
        self.transform = transform
        self.class_to_idx = self.dataset.class_to_idx
        self.imgs = self.dataset.imgs  # List of (image_path, class_index)
        self.class_indices = {class_name: [] for class_name in self.class_to_idx.keys()}

        for idx, (img_path, class_idx) in enumerate(self.imgs):
            class_name = list(self.class_to_idx.keys())[list(self.class_to_idx.values()).index(class_idx)]
            self.class_indices[class_name].append(idx)

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        anchor_img_path, anchor_label = self.imgs[idx]
        anchor_image = Image.open(anchor_img_path)

        # Apply the transform (e.g., resizing, normalization)
        if self.transform:
            anchor_image = self.transform(anchor_image)

        # Positive: A random image from the same class
        positive_idx = random.choice(self.class_indices[list(self.class_to_idx.keys())[anchor_label]])
        positive_img_path, positive_label = self.imgs[positive_idx]
        positive_image = Image.open(positive_img_path)

        if self.transform:
            positive_image = self.transform(positive_image)

        # Negative: A random image from a different class
        negative_class = random.choice(list(self.class_to_idx.keys()))
        while negative_class == list(self.class_to_idx.keys())[anchor_label]:  # Ensure it's not the same class
            negative_class = random.choice(list(self.class_to_idx.keys()))

        negative_idx = random.choice(self.class_indices[negative_class])
        negative_img_path, negative_label = self.imgs[negative_idx]
        negative_image = Image.open(negative_img_path)

        if self.transform:
            negative_image = self.transform(negative_image)

        # Return the triplet
        return anchor_image, positive_image, negative_image


In [35]:
from torch.utils.data import DataLoader
from torchvision import transforms

# Define the transformation (resize, normalization, etc.)
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(45),
    transforms.ToTensor(),
    transforms.Resize((224, 224)),  # Adjust to your image size
    # transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained weights normalization
])

# Create the triplet dataset and DataLoader
train_dataset = TripletDataset(root_dir=train_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)

# Now you can use this `train_loader` in your training loop


In [36]:
import torch.optim as optim
import torch.nn.functional as F

torch.seed()

# Define the TripletMarginLoss (you can adjust the margin parameter)
triplet_loss = nn.TripletMarginLoss(margin=0.000001, p=2)

# Set up the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.000001)
# optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for anchor, positive, negative in train_loader:
        # Move the data to the GPU (if available)
        anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Get embeddings
        anchor_emb = model(anchor)
        positive_emb = model(positive)
        negative_emb = model(negative)

        # Compute the triplet loss
        loss = triplet_loss(anchor_emb, positive_emb, negative_emb)
        
        # Backpropagate and optimize
        loss.backward()
        optimizer.step()

        # Track the loss
        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss}")


Epoch 1/10, Loss: 0.498185396194458
Epoch 2/10, Loss: 1.0565192699432373
Epoch 3/10, Loss: 0.0
Epoch 4/10, Loss: 0.20272397994995117
Epoch 5/10, Loss: 5.170644521713257
Epoch 6/10, Loss: 1.0551159381866455
Epoch 7/10, Loss: 2.9503393173217773
Epoch 8/10, Loss: 0.535742998123169
Epoch 9/10, Loss: 0.7651948928833008
Epoch 10/10, Loss: 1.1809091567993164


In [37]:
model.eval()
with torch.no_grad():
    gallery_embeddings = []
    query_embeddings = []
    gallery_paths = []
    query_paths = []

    # Extract gallery embeddings
    for images, paths in gallery_loader:
        images = images.to(device)
        emb = model(images)
        gallery_embeddings.append(emb.cpu().numpy())
        gallery_paths.extend(paths)

    # Extract query embeddings
    for images, paths in query_loader:
        images = images.to(device)
        emb = model(images)
        query_embeddings.append(emb.cpu().numpy())
        query_paths.extend(paths)

In [38]:
# Convert to numpy arrays
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

gallery_embeddings = np.vstack(gallery_embeddings)
query_embeddings = np.vstack(query_embeddings)

# Compute cosine similarity
similarity_matrix = cosine_similarity(query_embeddings, gallery_embeddings)

# For each query image, get the most similar gallery image
retrieved_indices = np.argmax(similarity_matrix, axis=1)

# Print results
for i, idx in enumerate(retrieved_indices):
    print(f"Query image: {query_paths[i]}")
    print(f"Retrieved gallery image: {gallery_paths[idx]}")
    print()

Query image: ../../data/test/query/4597118805213184.jpg
Retrieved gallery image: ../../data/test/gallery/painting_085_000045.jpg

Query image: ../../data/test/query/n01855672_10973.jpg
Retrieved gallery image: ../../data/test/gallery/n01855672_1037.jpg



In [39]:
submission = dict()

In [40]:
# Convert to numpy arrays
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

gallery_embeddings = np.vstack(gallery_embeddings)
query_embeddings = np.vstack(query_embeddings)

top_k = 5  # You can change this to any value (e.g., 1, 3, 10)

# Compute top-k most similar gallery indices for each query
topk_indices = np.argsort(-similarity_matrix, axis=1)[:, :top_k]

# Display results
for i, indices in enumerate(topk_indices):
    print(f"\nQuery image: {query_paths[i]}")
    print("Top {} retrieved gallery images:".format(top_k))
    idx_last_slash = str(query_paths[i]).rfind("/")
    submission[str(query_paths[i][idx_last_slash+1:])] = list()
    for rank, gallery_idx in enumerate(indices):
        print(f"  Rank {rank+1}: {gallery_paths[gallery_idx]}")
        idx_last_slash_res = str(gallery_paths[gallery_idx]).rfind("/")
        submission[str(query_paths[i][idx_last_slash+1:])].append(gallery_paths[gallery_idx][idx_last_slash_res+1:])



Query image: ../../data/test/query/4597118805213184.jpg
Top 5 retrieved gallery images:
  Rank 1: ../../data/test/gallery/painting_085_000045.jpg
  Rank 2: ../../data/test/gallery/painting_085_000118.jpg
  Rank 3: ../../data/test/gallery/painting_085_000084.jpg
  Rank 4: ../../data/test/gallery/n01855672_4393.jpg
  Rank 5: ../../data/test/gallery/n01855672_1037.jpg

Query image: ../../data/test/query/n01855672_10973.jpg
Top 5 retrieved gallery images:
  Rank 1: ../../data/test/gallery/n01855672_1037.jpg
  Rank 2: ../../data/test/gallery/n01855672_4197.jpg
  Rank 3: ../../data/test/gallery/n01855672_4393.jpg
  Rank 4: ../../data/test/gallery/painting_085_000045.jpg
  Rank 5: ../../data/test/gallery/painting_085_000084.jpg


In [41]:
submission

{'4597118805213184.jpg': ['painting_085_000045.jpg',
  'painting_085_000118.jpg',
  'painting_085_000084.jpg',
  'n01855672_4393.jpg',
  'n01855672_1037.jpg'],
 'n01855672_10973.jpg': ['n01855672_1037.jpg',
  'n01855672_4197.jpg',
  'n01855672_4393.jpg',
  'painting_085_000045.jpg',
  'painting_085_000084.jpg']}

In [42]:
from utils.submission import create_dict_submission

In [43]:
dictionary = create_dict_submission(gallery_embeddings, query_embeddings, similarity_matrix, query_paths, gallery_paths)


Query image: ../../data/test/query/4597118805213184.jpg
Top 5 retrieved gallery images:
  Rank 1: ../../data/test/gallery/painting_085_000045.jpg
  Rank 2: ../../data/test/gallery/painting_085_000118.jpg
  Rank 3: ../../data/test/gallery/painting_085_000084.jpg
  Rank 4: ../../data/test/gallery/n01855672_4393.jpg
  Rank 5: ../../data/test/gallery/n01855672_1037.jpg

Query image: ../../data/test/query/n01855672_10973.jpg
Top 5 retrieved gallery images:
  Rank 1: ../../data/test/gallery/n01855672_1037.jpg
  Rank 2: ../../data/test/gallery/n01855672_4197.jpg
  Rank 3: ../../data/test/gallery/n01855672_4393.jpg
  Rank 4: ../../data/test/gallery/painting_085_000045.jpg
  Rank 5: ../../data/test/gallery/painting_085_000084.jpg


In [44]:
dictionary

{'4597118805213184.jpg': ['painting_085_000045.jpg',
  'painting_085_000118.jpg',
  'painting_085_000084.jpg',
  'n01855672_4393.jpg',
  'n01855672_1037.jpg'],
 'n01855672_10973.jpg': ['n01855672_1037.jpg',
  'n01855672_4197.jpg',
  'n01855672_4393.jpg',
  'painting_085_000045.jpg',
  'painting_085_000084.jpg']}