# Digital Signal and Image Processing Project - Image Retrieval

# Imports and Dataset

## Imports

In [19]:
from google.colab import drive
from ipyfilechooser import FileChooser
from IPython.display import display
import threading
import time
import zipfile
import shutil
import os
import numpy as np
import random as python_random
import matplotlib.pyplot as plt
import pandas as pd
import json

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.utils import load_img, img_to_array
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from sklearn.neighbors import KDTree
from sklearn.metrics import accuracy_score, classification_report

from keras.layers import Input, Lambda
from keras.optimizers import Adam
from keras import backend as K
from keras.models import Model
from keras.layers import Dense, GlobalAveragePooling2D, Dropout, Activation
from keras.models import Sequential, Model
from keras.layers import ZeroPadding2D, Convolution2D, MaxPooling2D, Dropout, Flatten, Activation
from keras.optimizers import SGD
from keras.utils import to_categorical
from keras.preprocessing import image
from keras import applications
import numpy as np
from IPython.core.display import Image
from tensorflow.keras.callbacks import EarlyStopping

from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras import layers, Model
from tensorflow.keras.metrics import Metric
from scipy.spatial.distance import cdist
from sklearn.metrics import confusion_matrix

np.random.seed(0)
python_random.seed(0)
tf.random.set_seed(0)
keras.utils.set_random_seed(0)
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Dataset Import and Split

The dataset is imported, extracted and divided into train-val-test sets, with a proportion of 60-20-20.

The folder "dataset" is created which contains 3 sub folders for the 3 splits. The images are still divided by category.

Dictionaries are used to map label indices to their corresponding category names and vice versa.

In [2]:
# Transfer dataset, and extract files
shutil.copy('/content/drive/My Drive/MammalsDataset.zip', 'MammalsDataset.zip')
zipf = zipfile.ZipFile('MammalsDataset.zip')
zipf.extractall()
zipf.close()

os.remove('MammalsDataset.zip') # Delete the copied zip file

In [3]:
dataset_origin_path = "mammals"
dataset_path = "dataset"
train_ratio, val_ratio, test_ratio = 0.6, 0.2, 0.2

# Create destination directories
for split in ['train', 'val', 'test']:
    split_path = os.path.join(dataset_path, split)
    os.makedirs(split_path, exist_ok=True)

# Iterate through each category (sub-folder)
for category in sorted(os.listdir(dataset_origin_path)):
    category_path = os.path.join(dataset_origin_path, category)

    images = sorted(os.listdir(category_path))
    python_random.seed(0)
    python_random.shuffle(images)

    train_end = int(train_ratio * len(images))
    val_end = train_end + int(val_ratio * len(images))

    splits = {
        'train': images[:train_end],
        'val': images[train_end:val_end],
        'test': images[val_end:]
    }

    # Move images to respective directories
    for split, split_images in splits.items():
        split_category_path = os.path.join(dataset_path, split, category)
        os.makedirs(split_category_path, exist_ok=True)

        for img in split_images:
            shutil.move(os.path.join(category_path, img), os.path.join(split_category_path, img))

    # Remove empty category folder
    os.rmdir(category_path)

# Remove the original dataset folder if empty
if not os.listdir(dataset_origin_path):
    os.rmdir(dataset_origin_path)

print("Dataset split completed successfully.")


Dataset split completed successfully.


Dictionaries are created in order to map label indices to their corresponding category names and vice versa, whenever this will be needed later in the project.

In [4]:
category_names = sorted([d for d in os.listdir(dataset_path + "/train") if os.path.isdir(os.path.join(dataset_path + "/train", d))])
category_to_index = {name: i for i, name in enumerate(category_names)}
index_to_category = {i: name for name, i in category_to_index.items()}

## Add some demo-testing images

In [5]:
# Create the demo-testing-images directory
src = "/content/drive/My Drive/IR Saves/demo-testing-images"
dst = "/content/demo-testing-images"

# Rimuove la cartella se esiste già
if os.path.exists(dst):
    shutil.rmtree(dst)

shutil.copytree(src, dst)

# rimuovi tutti i file che contengono ".npy"
for filename in os.listdir(dst):
    if filename.endswith(".npy"):
        file_path = os.path.join(dst, filename)
        os.remove(file_path)

List of images used in demo:

In [6]:
folder_path = "/content/demo-testing-images"
full_folder = os.path.join(folder_path, "alpaca")
file_list = [os.path.join(full_folder, f) for f in os.listdir(full_folder) if os.path.isfile(os.path.join(full_folder, f))]

In [7]:
def get_file_index(filename):
    full_path = os.path.join(full_folder,filename)
    print(full_path)
    if full_path in file_list:
        return file_list.index(full_path)
    else:
        return None  # Se il file non è trovato, ritorna None

In [8]:
# test if function works
filename = "tigre.jpg"
index = get_file_index(filename)
if index is not None:
    print(f"{file_list[index]} = {index}")
else:
    print("File non trovato!")

/content/demo-testing-images/alpaca/tigre.jpg
/content/demo-testing-images/alpaca/tigre.jpg = 0


# Import models & Features

In this section we load our best performing models and setup some functions for retrival.

## Functions

In [20]:
def triplet_loss(margin=0.1):
    """
    necessary for loading siames model
    """
    def loss(y_true, y_pred):
        anchor, positive, negative = y_pred[:, 0, :], y_pred[:, 1, :], y_pred[:, 2, :]
        pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
        neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
        loss = tf.maximum(pos_dist - neg_dist + margin, 0.0)
        return tf.reduce_mean(loss)

    return loss

In [21]:
def extract_features(model, root_folder, category_to_index, preprocess_function, image_size=(224, 224)):
    """
    Extract features from images in a root folder organized into subfolders by labels.
    Returns the features, the labels, and the corresponding image paths.
    """
    features = []
    labels = []
    image_paths = []

    for category in sorted(os.listdir(root_folder)):
        category_path = os.path.join(root_folder, category)
        print("Processing folder", category_path)

        if os.path.isdir(category_path):
            if category not in category_to_index:
                print(f"Skipping folder '{category}' as it is not in the category mapping.")
                continue

            label = category_to_index[category]  # Convert category name to label index

            image_files = [fname for fname in os.listdir(category_path) if fname.lower().endswith(('jpg', 'jpeg', 'png'))]

            for fname in image_files:
                image_path = os.path.join(category_path, fname)

                image = load_img(image_path, target_size=image_size)
                image_array = img_to_array(image)
                image_array = preprocess_function(image_array)
                image_array = np.expand_dims(image_array, axis=0)

                feature = model.predict(image_array, verbose=0)

                features.append(feature.flatten())
                labels.append(label)
                image_paths.append(image_path)

    return np.array(features), np.array(labels), image_paths

In [22]:
def load_or_compute_features(model, features_path, labels_path, paths_path, images_folder, category_to_index, preprocess_function=preprocess_input):
    """
    This function loads the saved features, labels and image paths (if available),
    otherwise it computes, saves them and returns them.
    """
    # Check if the feature, label, and image path files exist
    if os.path.exists(features_path):
        # Load the files if they exist
        train_features = np.load(features_path)
        train_labels = np.load(labels_path)
        train_image_paths = list(np.load(paths_path, allow_pickle=True))
        print("Loaded existing files.")
    else:
        # Otherwise, compute them
        print("Files missing, computing features...")
        train_features, train_labels, train_image_paths = extract_features(model, images_folder, category_to_index, preprocess_function)

        # Create the folder if it doesn't exist already
        features_save_folder = os.path.dirname(features_path)
        if not os.path.exists(features_save_folder):
            os.makedirs(features_save_folder)

        # Save the computed features, labels, and image paths
        np.save(features_path, train_features)
        np.save(labels_path, train_labels)
        np.save(paths_path, train_image_paths)
        print("Computed and saved new features.")

    return train_features, train_labels, train_image_paths

## Loading MobileNet model & features

In following section we laod mobileNet model and retrive it's previosly computed features, in this way we can later process input images as queries

### Loading model

In [23]:
# Load of MobileNetV2 without the top layer
mobilenet = MobileNetV2(weights="imagenet", include_top=False, pooling="avg", input_shape=(224, 224, 3))

### Loading features

In [24]:
dataset_train_path = dataset_path + "/train"
prefix = "/content/drive/My Drive/" #""
train_features_path = prefix + "IR Saves/Pre-Trained Approach/train_features.npy"
train_labels_path = prefix + "IR Saves/Pre-Trained Approach/train_labels.npy"
train_image_names_path = prefix + "IR Saves/Pre-Trained Approach/train_image_names.npy"

train_features, train_labels, train_image_paths = load_or_compute_features(mobilenet, train_features_path, train_labels_path, train_image_names_path, dataset_train_path, category_to_index)

Loaded existing files.


In [25]:
demo_path = "demo-testing-images"
test_features_path = prefix + "IR Saves/demo-testing-images/test_features_1.npy"
test_labels_path = prefix + "IR Saves/demo-testing-images/test_labels_1.npy"
test_image_names_path = prefix + "IR Saves/demo-testing-images/test_image_names_1.npy"

test_features, test_labels, test_image_paths = load_or_compute_features(mobilenet, test_features_path, test_labels_path, test_image_names_path, demo_path, category_to_index)

Loaded existing files.


## Loading best siamese model & features

In following section we laod siamese model with triplette loss and retrive it's previosly computed features. In this way we can later process input images as queries

### Loading model

In [26]:
# Define the load/save path (the folder)
model_folder = "/content/drive/My Drive/IR Saves/SiameseNetwork Approach"

# Create the folder if it doesn't exist
os.makedirs(model_folder, exist_ok=True)

# Define the full file paths
siamese_model_file = os.path.join(model_folder, "siamese_model2.keras")
encoder_model_file = os.path.join(model_folder, "encoder_model2.keras")

# Check if model files exist
if os.path.exists(siamese_model_file) and os.path.exists(encoder_model_file):
    # Load the models
    siamese_model2 = keras.models.load_model(siamese_model_file, custom_objects={'loss': triplet_loss()},safe_mode=False)
    encoder2 = keras.models.load_model(encoder_model_file,safe_mode=False)
    print("Models loaded successfully from Google Drive.")
else:
    # Build and train the models
    print("Models not found. Please upload files to Drive...")

Models loaded successfully from Google Drive.


### Loading features

In [27]:
prefix = "/content/drive/My Drive/" #

In [28]:
dataset_train_path = dataset_path + "/train"
train_features_path = prefix+"IR Saves/SiameseNetwork2 Approach/train_features.npy"
train_labels_path = prefix+"IR Saves/SiameseNetwork2 Approach/train_labels.npy"
train_image_names_path = prefix+"IR Saves/SiameseNetwork2 Approach/train_image_names.npy"

train_features_siamese2 , train_labels_siamese2, train_image_paths_siamese2 = load_or_compute_features(encoder2, train_features_path, train_labels_path, train_image_names_path, dataset_train_path, category_to_index)

Loaded existing files.


In [29]:
dataset_test_path = "demo-testing-images"
test_features_path= prefix + "IR Saves/demo-testing-images/test_features_2.npy"
test_labels_path = prefix + "IR Saves/demo-testing-images/test_labels_2.npy"
test_image_names_path = prefix + "IR Saves/demo-testing-images/test_image_names_2.npy"

test_features_siamese2, test_labels_siamese2, test_image_paths_siamese2 = load_or_compute_features(encoder2, test_features_path, test_labels_path, test_image_names_path, dataset_test_path, category_to_index)

Files missing, computing features...
Processing folder demo-testing-images/alpaca


NameError: Exception encountered when calling Lambda.call().

[1mname 'tf' is not defined[0m

Arguments received by Lambda.call():
  • inputs=tf.Tensor(shape=(1, 128), dtype=float32)
  • mask=None
  • training=False

# Image Retrieval
This section creates the kd-trees, those are use to navigate trough feateures map and find nearest query images. In this way we can see how the features processed from each model (for input image) can be associated with previously evaluated feateures.

Select here which image you would like to load:

In [None]:
test_image_indices = [get_file_index("tigre.jpg")] # CHANGE IMAGES HERE

In [None]:
test_image_indices

## Functions

In [None]:
def print_retrieved_images(train_image_paths, test_image_paths, test_image_indices, query_indexes, query_distances):
    """
    Plots the query image and its k nearest neighbors for each test image.
    """
    # Number of test images to plot
    num_test_images = len(test_image_indices)

    # Number of nearest neighbors to display
    k = len(query_indexes[0])

    # Adjust the figsize dynamically based on the number of images to display horizontally
    fig_width = 3 * (k + 1)  # 1 for the query image and k for neighbors
    fig_height = 3 * num_test_images  # 3 units of height per image row

    # Set up the plot with multiple rows (one for each test image) and (k + 1) columns (1 for the query image and k for neighbors)
    fig, axes = plt.subplots(num_test_images, k + 1, figsize=(fig_width, fig_height))

    # Loop over each test image index
    for i, test_image_index in enumerate(test_image_indices):
        # Get the test image
        test_image_path = test_image_paths[test_image_index]
        test_image = load_img(test_image_path)

        # Extract category (folder name) and image name for the test image
        test_image_category = os.path.basename(os.path.dirname(test_image_path))
        test_image_name = os.path.basename(test_image_path)


        if (len(test_image_indices)!=1):
            # Plot the test image (query)
            axes[i, 0].imshow(test_image)
            axes[i, 0].set_title(f"Query:\n {test_image_name}")
            axes[i, 0].axis('off')  # Hide axes for the image
        else:
            # Plot the test image (query)
            axes[0].imshow(test_image)
            axes[0].set_title(f"Query:\n {test_image_name}")
            axes[0].axis('off')  # Hide axes for the image

        # Plot the k nearest images
        for j, (index, dist) in enumerate(zip(query_indexes[i], query_distances[i])):
            # Load the neighbor image
            neighbor_image_path = train_image_paths[index]
            neighbor_image = load_img(neighbor_image_path)

            # Extract the category (folder name) and name for the neighbor image
            neighbor_category = os.path.basename(os.path.dirname(neighbor_image_path))
            neighbor_name = os.path.basename(neighbor_image_path)

            if (len(test_image_indices)!=1):
                # Plot the neighbor image
                axes[i, j + 1].imshow(neighbor_image)
                axes[i, j + 1].set_title(f"{neighbor_name}\n(Dist: {dist:.2f})")
                axes[i, j + 1].axis('off')  # Hide axes for the image
            else:
                # Plot the neighbor image
                axes[j + 1].imshow(neighbor_image)
                axes[j + 1].set_title(f"{neighbor_name}\n(Dist: {dist:.2f})")
                axes[j + 1].axis('off')  # Hide axes for the image

    plt.tight_layout()
    plt.show()


In [None]:
def calculate_anmrr(query_indexes, test_labels, train_labels, k=3):
    """
    Calculate Average Normalized Modified Retrieval Rank (ANMRR)s.
    """
    tests_number = len(query_indexes)  # Number of queries
    total_score = 0

    for i, query_index in enumerate(query_indexes):
        true_label = test_labels[i]  # True label of the current query

        # Get the labels of the k nearest neighbors
        neighbor_labels = train_labels[query_index]

        # Compute AVR(q) - the average rank of relevant documents
        relevant_ranks = []
        for rank, neighbor_label in enumerate(neighbor_labels):
            if neighbor_label == true_label:
                relevant_ranks.append(rank + 1)  # Add 1 to convert from 0-indexed to 1-indexed rank

        if relevant_ranks:
            average_rank_relevant_documents = np.mean(relevant_ranks)  # Average rank of relevant documents
        else:
            average_rank_relevant_documents = k + 1  # If no relevant documents found, set to K + 1

        # Calculate the ANMRR for the query
        number_relevant_documents = 1
        denominator = 1.25 * k - 0.5 * (1 + number_relevant_documents)
        if denominator == 0:  # Handle case when the denominator becomes zero
            continue  # Skip this query, as division by zero is not valid

        score = (average_rank_relevant_documents - 0.5 * (1 + number_relevant_documents)) / denominator
        total_score += score

    # Return the final ANMRR score
    ANMRR = total_score / tests_number
    return ANMRR

In [None]:
def retrieve_and_evaluate_category(kd_tree, category_name, num_images, root_test_folder, category_to_index, test_features):
    """
    Retrieves and evaluates images for a specific category using a KD-Tree.
    """

    print(f"Retrieving images for category {category_name}...")

    # Get the category index from the category name using the category_to_index mapping
    if category_name not in category_to_index:
        raise ValueError(f"Category '{category_name}' not found in the category_to_index mapping.")

    category_index = category_to_index[category_name]

    # Find indices of test images belonging to the specified category
    test_image_indices = [i for i, label in enumerate(test_labels) if label == category_index]

    # Limit the number of images to num_images (if available)
    test_image_indices = test_image_indices[:min(num_images, len(test_image_indices))]

    # Extract features for the selected test images
    test_images = test_features[test_image_indices]

    # Perform k-NN search using the KD-Tree
    k = 3  # Number of neighbors to retrieve
    query_distances, query_indexes = kd_tree.query(test_images, k=k)

    # Print retrieved images
    print_retrieved_images(train_image_paths, test_image_paths, test_image_indices, query_indexes, query_distances)

    # Calculate and print ANMRR
    anmrr_value = calculate_anmrr(query_indexes, test_labels[test_image_indices], train_labels, k=k)
    print(f"ANMRR for category {category_name}: {anmrr_value:.4f}")

## KD-Tree for MobileNet

The kd-tree is created on the train fetures that were extracted earlier; it will be used to retrived most similiar features extracted from query image using mobileNet.

In [None]:
kd_tree = KDTree(train_features)

The images are picked with 3 random indices, and their respective feature arrays are retrieved.

In [None]:
test_images = test_features[test_image_indices]
test_images.shape

5 images are retrieved for each one of the test image selected:

In [None]:
distances, indices = kd_tree.query(test_images, k=5)

This function takes the result of the kd-tree query and displays dynamically the original image and all of the retrieved images, with useful informations displayed in the titles.

In [None]:
print_retrieved_images(train_image_paths, file_list, test_image_indices, indices, distances)

## KD-Tree for Triplet loss Siamese Network

In [None]:
kd_tree_siamese2 = KDTree(train_features_siamese2)

The images are picked with the same exact random indices as before, and their respective feature arrays are retrieved.

In [None]:
test_images_siamese2 = test_features_siamese2[test_image_indices]
test_images_siamese2.shape

5 images are retrieved for each one of the 3 test images.

In [None]:
distances_siamese2, indices_siamese2 = kd_tree_siamese2.query(test_images_siamese2, k=5)

This function takes the result of the kd-tree query and displays dynamically the original image and all of the retrieved images, with useful informations displayed in the titles.

In [None]:
print_retrieved_images(train_image_paths_siamese2, file_list, test_image_indices, indices_siamese2, distances_siamese2)

## Small comparaison evaluation:

Here we can see that most of the timmes top3 / top2 images retrived are the same in both models. Wich makes perfectly sense as:
- first model is raw mobileNet
- second model is mobileNet but fine tuned when inserted as encoder into a siamese model whit triplet loss.

It is intresting to note that even if models are basically the same some differences can be found after fine tuning. For an example same top3/top2 images have different distances from query image