In [None]:
!pip install torchsummary

In [None]:
!pip install faiss-gpu

# Import Libraries

In [None]:
import glob
from collections import Counter
from itertools import chain
import os
import re
import random
import zipfile
import shutil
from tqdm.notebook import tqdm

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import cv2

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, CosineAnnealingLR, ReduceLROnPlateau
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models
from torchsummary import summary

import faiss  

# Split Dataset

In [None]:
test_size = 0.15
val_size = 0.15

In [None]:
data_dir = '/kaggle/input/ucmerced-landuse/UCMerced_LandUse/Images'
train_dir = 'train'
val_dir = 'val'
test_dir = 'test'

# Create directories to store train, validation and test data 
if not os.path.exists(train_dir):
    os.makedirs(train_dir)
if not os.path.exists(val_dir):
    os.makedirs(val_dir)
if not os.path.exists(test_dir):
    os.makedirs(test_dir)
    
classes = os.listdir(data_dir)

# Iterate through each class 
for clss in classes:
    # Define class path
    clss_path = os.path.join(data_dir, clss)
    # Get paths to all images in class path
    images = os.listdir(clss_path)
    
    # Split into train, validation and test
    train_images, test_images = train_test_split(images, test_size=test_size, random_state=42)
    train_images, val_images = train_test_split(train_images, test_size=val_size/(1-test_size), random_state=42)
    
    # Copy train images to train directory
    for train_image in train_images:
        src = os.path.join(clss_path, train_image)
        dst = os.path.join(train_dir, clss)
        if not os.path.exists(dst):
            os.makedirs(dst)
        shutil.copy(src, os.path.join(dst, train_image))
    
    # Copy validation images to validation directory
    for val_image in val_images:
        src = os.path.join(clss_path, val_image)
        dst = os.path.join(val_dir, clss)
        if not os.path.exists(dst):
            os.makedirs(dst)
        shutil.copy(src, os.path.join(dst, val_image))
    
    # Copy test images to test directory
    for test_image in test_images:
        src = os.path.join(clss_path, test_image)
        dst = os.path.join(test_dir, clss)
        if not os.path.exists(dst):
            os.makedirs(dst)
        shutil.copy(src, os.path.join(dst, test_image))
    

# Define Dataloader

In [None]:
PATH_TRAIN = "/kaggle/working/train"
PATH_VALID = "/kaggle/working/val"
PATH_TEST = "/kaggle/working/test"

In [None]:
class TripletData(Dataset):
    def __init__(self, path, transforms, split="train"):
        self.path = path # path of data folder
        self.split = split    # train or valid
        self.transforms = transforms # data transformations
        self.class_folders = os.listdir(path)  # list of class folder names

    def __getitem__(self, idx):
        # Select a positive class
        positive_class = self.class_folders[idx % len(self.class_folders)]
        # Choose a pair of positive images (im1, im2)
        positives = os.listdir(os.path.join(self.path, positive_class))
        im1, im2 = random.sample(positives, 2)
        # Choose a negative class and an image from it (im3)
        negative_class = random.choice([c for c in self.class_folders if c != positive_class])
        negatives = os.listdir(os.path.join(self.path, negative_class))
        im3 = random.choice(negatives)
        
        # Construct full image paths
        im1_path = os.path.join(self.path, positive_class, im1)
        im2_path = os.path.join(self.path, positive_class, im2)
        im3_path = os.path.join(self.path, negative_class, im3)

        # Apply transforms and open images
        im1 = self.transforms(Image.open(im1_path))
        im2 = self.transforms(Image.open(im2_path))
        im3 = self.transforms(Image.open(im3_path))

        return [im1, im2, im3]

    def __len__(self):
        return len(self.class_folders) * 8

# Transforms
train_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# Datasets and Dataloaders
train_data = TripletData(PATH_TRAIN, train_transforms)
val_data = TripletData(PATH_VALID, val_transforms)

train_loader = torch.utils.data.DataLoader(dataset = train_data, batch_size=32, shuffle=True, num_workers=4)
val_loader = torch.utils.data.DataLoader(dataset = val_data, batch_size=32, shuffle=False, num_workers=4)

# Define Loss Function

In [None]:
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin
        
    def calc_euclidean(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    # Distances in embedding space is calculated in euclidean
    def forward(self, anchor, positive, negative):
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        losses = torch.relu(distance_positive - distance_negative + self.margin)
        return losses.mean()

# Extract Image Features w/ ResNet

In [None]:
# Define no. epochs and device
epochs = 15
device = 'cuda'

# Base model
model = models.resnet34().cuda()
# Change last FC layer to identity layer
model.fc = torch.nn.Identity()

# For double GPU usage
model = nn.DataParallel(model)
model.to(device)

# ADAM optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)
triplet_loss = TripletLoss()

for epoch in range(epochs):
    # Training
    model.train() # Set the model in training mode
    epoch_loss = 0.0 # Initialize the loss as 0
    for data in tqdm(train_loader):
        optimizer.zero_grad() # Set gradients to 0 for current mini-batch
        x1,x2,x3 = data # Get data and send to GPU
        e1 = model(x1.to(device))
        e2 = model(x2.to(device))
        e3 = model(x3.to(device)) 
        
        loss = triplet_loss(e1,e2,e3) # Compute triplet loss
        epoch_loss += loss # Add current loss to overall epoch_loss
        loss.backward() # Backpropagate
        optimizer.step() # Update weights
    print("Train Loss: {}".format(epoch_loss.item())) # Print epoch loss
    
    # Validation
    model.eval() # Put model in evaluation mode
    val_loss = 0.0 # Initialize the loss to 0
    with torch.no_grad():
        for data in val_loader:
            x1, x2, x3 = data # Get images and send to GPU
            e1 = model(x1.to(device))
            e2 = model(x2.to(device))
            e3 = model(x3.to(device))

            loss = triplet_loss(e1, e2, e3) # Compute triplet loss
            val_loss += loss.item() # Add current loss to overall validation loss
    print("Validation Loss: {:.4f}".format(val_loss)) # Print validation loss

In [None]:
summary(model, (3, 224, 224))

# Index Images

In [None]:
faiss_index = faiss.IndexFlatL2(512)   # build the index

im_indices = []
with torch.no_grad(): # Set such that no gradients are computed 
    for f in tqdm(glob.glob(os.path.join(PATH_TRAIN, '*/*'))): # For each image in training dataset
        im = Image.open(f) # Open image
        im = im.resize((224,224)) # Resize
        im = torch.tensor([val_transforms(im).numpy()]).cuda() # Add transforms, convert to np, then to tensor and send to GPU
    
        preds = model(im) # Get image prediction
        preds = np.array([preds[0].cpu().numpy()]) # Send back to cpu and convert to numpy array
        faiss_index.add(preds) # Add the representation to index
        im_indices.append(f)   # Store the image name to find it later on

# Retrieve Images

In [None]:
with torch.no_grad(): # Set such that no gradients are computed 
    for class_folder in os.listdir(PATH_TEST): # For each class in test dataset
        class_folder_path = os.path.join(PATH_TEST, class_folder) # Define class folder path
        for f in os.listdir(class_folder_path): # For each image in test dataset
            file_path = os.path.join(class_folder_path, f) # Get image path
            im = Image.open(file_path) # Open image
            im = im.resize((224,224)) # Resize
            im = torch.tensor([val_transforms(im).numpy()]).cuda() # Add transforms, convert to np, then to tensor and send to GPU
        
            test_embed = model(im).cpu().numpy() # Get image prediction, send back to cpu and convert to numpy array
            _, I = faiss_index.search(test_embed, 5) # Get the most similar images
            print("Retrieved Image: {}".format(im_indices[I[0][0]]))  

# Plot Examples

In [None]:
# Function to retrieve similar images
def process_image(image_path, target_size=(224,224)):
    im = Image.open(image_path)
    im = im.resize((224,224))
    im = torch.tensor([val_transforms(im).numpy()]).cuda()
    
    return im

In [None]:
def get_similar_images(img_path, model, faiss_index, im_indices, num_similar=5):

    im = process_image(img_path)

    test_embed = model(im).cpu().detach().numpy()
    
    distances, I = faiss_index.search(test_embed, num_similar)
    similar_images_paths = [im_indices[i] for i in I[0]]
    
    return similar_images_paths, I, distances

In [None]:
# Select 10 random test images from different subdirectories
test_image_paths = []
for class_folder in random.sample(os.listdir(PATH_TEST), 10):
    class_folder_path = os.path.join(PATH_TEST, class_folder)
    image_file = random.choice(os.listdir(class_folder_path))
    test_image_paths.append(os.path.join(class_folder_path, image_file))

## Example 1: IndexFlatL2

In [None]:
# Plotting
fig, axs = plt.subplots(10, 6, figsize=(15, 15))
for row, test_image_path in enumerate(test_image_paths):
    similar_images, I, _ = get_similar_images(test_image_path, model, faiss_index, im_indices)             
    # Extracting class name from path                        
    class_name = os.path.basename(os.path.dirname(test_image_path))  
    
    # Plot setup                         
    axs[row, 0].imshow(Image.open(test_image_path))
    axs[row, 0].set_title(f"Test: {class_name}")
    axs[row, 0].axis('off')
    
    for col, similar_image_path in enumerate(similar_images, 1):
        similar_class_name = os.path.basename(os.path.dirname(similar_image_path))  # Class of similar image
        axs[row, col].imshow(Image.open(similar_image_path))
        axs[row, col].set_title(f"Similar {col} ({similar_class_name})")
        axs[row, col].axis('off')

plt.tight_layout()
plt.show()

In [None]:
!mkdir /kaggle/working/figures

In [None]:
def plot_faiss_distances(distances):
    """
    Plots the evolution of distances from a FAISS search.
    
    :param distances: A 2D numpy array where each row contains the distances 
                      of the nearest neighbors for a particular query.
    """
    if distances.ndim == 1:
        # If there's only one query, reshape the array for compatibility
        distances = distances.reshape(1, -1)
    
    num_queries = distances.shape[0]
    
    plt.figure(figsize=(12, 8))
    
    for i in range(num_queries):
        plt.plot(distances[i])
    
    plt.title('Evolution of FAISS Distances')
    plt.xlabel('Rank of Neighbors')
    plt.ylabel('Distance')
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig("/kaggle/working/figures/Distances.png")
    
    plt.show()
    
for row, test_image_path in enumerate(test_image_paths):
    similar_images, I, distances = get_similar_images(test_image_path, model, faiss_index, im_indices, 50)
    
print("Check if distance are sorted:", np.all(distances[:-1] <= distances[1:]))

plot_faiss_distances(distances)

In [None]:
def plot_histograms(data_dicts, title_list, multiplier):
    """
    Plots up to 3 histograms in a single figure from a list of dictionaries.

    :param data_dicts: List of dictionaries, each with class names as keys and numeric values.
    """
    
    # Create a new figure
    plt.figure(figsize=(12, 4))

    for i, data_dict in enumerate(data_dicts, start=1):
        # Creating a subplot for each histogram
        plt.subplot(1, 3, i)
        
        # Extracting class names and their corresponding values
        class_names = list(data_dict.keys())
        values = list(data_dict.values())

        # Creating the histogram in the subplot
        plt.bar(class_names, values)
        plt.xticks(rotation=90)  # Rotate the x-axis labels vertically
        # Adding subplot title
        plt.title(f'Retrieved Images for class "{title_list[i+multiplier-1]}"')

    # Adjust the layout so that labels don't overlap
    plt.tight_layout()

    # Display the plot
    plt.show()
    
def split_filename(filename):
    """
    Splits a filename into a non-digit part and a digit part.

    :param filename: The filename string.
    :return: A tuple of (non-digit part, digit part).
    """
    filename = filename.split('/')[-1].split('__')[-1].split(".")[0]
    match = re.search(r'(\D+)(\d+)$', filename)
    if match:
        return match.group(1)
    else:
        return filename
    
    dict_list = []
title_list = []
    
for class_test_path in os.listdir(PATH_TEST):
    title_list.append(class_test_path)
    classes_counts = {}
    for image_path in os.listdir(os.path.join(PATH_TEST, class_test_path)):
        # Iterate through each test photo
        image_path = os.path.join(PATH_TEST, class_test_path, image_path)
        
        # Retrieve similar images for each test image in each class   
        similar_images,_,_ = get_similar_images(image_path, model, faiss_index, im_indices, 50)
        
        # Only keep the class name
        class_name = split_filename(test_image_path)
        similar_images = list(map(split_filename, similar_images))

        # Count the classes occurances
        for class_instance in similar_images:
            if class_instance in classes_counts:
                classes_counts[class_instance] += 1
            else:
                classes_counts[class_instance] = 1
    dict_list.append(classes_counts)

for i in range(0, len(dict_list), 3):
    # Selecting 3 histograms at a time
    histogram_data_set = dict_list[i:i+3]
    # Plotting the set of up to 3 histograms
    plot_histograms(histogram_data_set, title_list, i)

## Custom Example

In [None]:
PATH_BUCURESTI = "/kaggle/input/test-bucuresti-uc"

In [None]:
# Plotting
fig, axs = plt.subplots(8, 6, figsize=(15, 15))
for row, test_image_path in enumerate(test_image_paths):
    similar_images, I = get_similar_images(test_image_path, model, faiss_index, im_indices, 5)             
    # Extracting class name from path                        
    class_name = test_image_path.split('/')[-1].split('__')[-1]
    
    # Plot setup       
    img = Image.open(test_image_path)
    img = img.resize((224, 224))
    axs[row, 0].imshow(img)
    axs[row, 0].set_title(f"Test: {class_name}")
    axs[row, 0].axis('off')

    for col, similar_image_path in enumerate(similar_images, 1):
        similar_class_name = os.path.basename(os.path.dirname(similar_image_path))  # Class of similar image
        axs[row, col].imshow(Image.open(similar_image_path))
        axs[row, col].set_title(f"Similar {col} ({similar_class_name})")
        axs[row, col].axis('off')

plt.tight_layout()
plt.show()