## Importing Required Libraries

In [None]:
# Standard libraries for data handling and visualization
import seaborn as sns  # Visualization
import matplotlib.pyplot as plt  # Plotting
import numpy as np  # Numerical operations
import os, glob, random  # File handling and randomness
import pandas as pd  # Tabular data processing

# TensorFlow and Keras for deep learning
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint  # To save model checkpoints during training
from keras.saving import register_keras_serializable  # To register custom components

# Check available GPU devices
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


## Set Hyperparameters and Data Path

In [None]:
BATCH_SIZE = 32  # Batch size for training

target_shape = (224, 224) # Target image size (height, width)

IMG_SHAPE = target_shape + (3,)  # The full shape of input images

# Path to the image dataset (update this as needed for your environment).
pathData = r'./Samples'


## Preparing Data for Siamese Network Training

In [None]:
# This class is responsible for managing the dataset directory structure and splitting it into training, validation, and test sets.

class subset_manager:

    def __init__(self, datasetPath, selectedImgs=20, split_ratio=(0.75, 0.2, 0.05)):
        """
        Initialize the subset manager.

        Args:
            datasetPath (str): Path to the dataset where each subfolder represents a class or identity.
            selectedImgs (int): Minimum number of images required to include a folder in the dataset.
            split_ratio (tuple): Train/validation/test split ratio.
        """
        self.sampleNames = list()  # List to store valid sample folder paths
        self.selectedImgs = selectedImgs  # Minimum image count threshold
        self.split_ratio = split_ratio  # Proportions for splitting

        # Iterate through each subdirectory (i.e., identity or class folder)
        for folderName in os.listdir(datasetPath):
            absoluteFolderName = os.path.join(datasetPath, folderName)
            numImages = len(os.listdir(absoluteFolderName))

            # Include only those folders with more than `selectedImgs` images
            if numImages > self.selectedImgs:
                self.sampleNames.append(absoluteFolderName)

        # Shuffle the sample list to ensure randomness in splitting
        random.shuffle(self.sampleNames)


    def get_split_folders(self):
        """
        Splits the collected sample folders into train, validation, and test sets.

        Returns:
            tuple: Lists of folder paths for train, validation, and test splits.
        """
        n_total = len(self.sampleNames)
        n_train = round(self.split_ratio[0] * n_total)
        n_val = round(self.split_ratio[1] * n_total)

        # Slice the shuffled list to create train/val/test splits
        train_folders = self.sampleNames[:n_train]
        val_folders = self.sampleNames[n_train:n_train + n_val]
        test_folders = self.sampleNames[n_train + n_val:]

        # Print the distribution of the dataset
        print(f"Train instance: {len(train_folders)} \nValidation instance: {len(val_folders)} \nTest instance: {len(test_folders)}")

        return train_folders, val_folders, test_folders


In [None]:
# This class is used to read image files from disk, decode, resize, and normalize them for use in the Siamese Network.

class MapFunction():

    def __init__(self, imageSize):
        """
        Args:
            imageSize (tuple): Target size to which all images will be resized (height, width).
        """
        self.imageSize = imageSize

    def decode_and_resize(self, imagePath):
        """
        Reads an image from disk, decodes it from JPEG, normalizes pixel values to [0,1],
        and resizes it to the target image size.

        Args:
            imagePath (tf.Tensor): File path to the image.

        Returns:
            tf.Tensor: Preprocessed image tensor.
        """
        image = tf.io.read_file(imagePath)  # Load raw image from path
        image = tf.image.decode_jpeg(image, channels=3)  # Decode JPEG to tensor
        image = tf.image.convert_image_dtype(image, dtype=tf.float32)  # Normalize to [0,1]
        image = tf.image.resize(image, self.imageSize)  # Resize to target shape

        return image

    def __call__(self, anchor, positive, negative):
        """
        Applies preprocessing to the anchor, positive, and negative images.

        Args:
            anchor (str): File path of anchor image.
            positive (str): File path of positive image (same class as anchor).
            negative (str): File path of negative image (different class from anchor).

        Returns:
            Tuple: ((anchor_tensor, positive_tensor, negative_tensor), dummy_label)
        """
        anchor = self.decode_and_resize(anchor)
        positive = self.decode_and_resize(positive)
        negative = self.decode_and_resize(negative)

        return (anchor, positive, negative), 0.0  # Output dummy label (needed for Keras training)


In [None]:
# TripletGenerator: Class to Dynamically Generate Triplets for Siamese Network Training
# This class loads images from folders and generates anchor, positive, and negative triplet samples for training.

class TripletGenerator:

    def __init__(self, datasetPath, selectedImgs=20, mode='Train'):
        """
        Initialize the triplet generator.

        Args:
            datasetPath (list): List of folder paths, where each folder contains images of one identity.
            selectedImgs (int): Number of images to sample per identity.
            mode (str): String label to indicate whether this is 'Train', 'Validation', or 'Test' mode.
        """
        self.idsNames = list()  # List to store folders (identities) with sufficient images
        self.allImages = 0  # Counter for total number of images used
        self.selectedImgs = selectedImgs
        self.mode = mode

        # Filter folders with at least `selectedImgs` images
        for absoluteFolderName in datasetPath:
            numImages = len(os.listdir(absoluteFolderName))
            if numImages > self.selectedImgs:
                self.idsNames.append(absoluteFolderName)

        # Generate dictionary mapping each identity to its selected image paths
        self.allIds = self.generate_all_ids_dict()

        # Print summary
        self.__str__()

    def count_number_images(self):
        """Returns the total number of images used across all identities."""
        return self.allImages

    def count_number_samples(self):
        """Returns the number of identities used."""
        return len(self.allIds)

    def generate_all_ids_dict(self):
        """
        Creates a dictionary where keys are identity folder paths and values are lists of selected image paths.

        Returns:
            dict: Mapping from identity name to list of selected image paths.
        """
        allIds = dict()

        for idName in self.idsNames:
            imageNames = os.listdir(idName)
            idPhotos = [os.path.join(idName, imageName) for imageName in imageNames]
            self.allImages += len(idPhotos)
            idPhotos = self.get_random_sample(idPhotos)  # Sample a fixed number of images per ID
            allIds[idName] = idPhotos

        return allIds

    def get_random_sample(self, imgs):
        """
        Uniformly selects `self.selectedImgs` samples from the list of images.

        Args:
            imgs (list): List of image paths.

        Returns:
            list: Sampled image paths.
        """
        indices = np.linspace(0, len(imgs) - 1, self.selectedImgs, dtype=int)
        return [imgs[i] for i in indices]  # Alternative: random.sample(imgs, self.selectedImgs)

    def get_next_element(self):
        """
        Generator function that continuously yields triplets (anchor, positive, negative).

        Yields:
            tuple: (anchor_path, positive_path, negative_path)
        """
        while True:
            # Choose a random identity for anchor and positive
            anchorName = random.choice(self.idsNames)

            # Choose a different identity for negative
            temporaryNames = self.idsNames.copy()
            temporaryNames.remove(anchorName)
            negativeName = random.choice(temporaryNames)

            # Select two different images from the same identity
            (anchorPhoto, positivePhoto) = np.random.choice(
                a=self.allIds[anchorName], size=2, replace=False
            )

            # Select one image from a different identity
            negativePhoto = random.choice(self.allIds[negativeName])

            yield (anchorPhoto, positivePhoto, negativePhoto)

    def __str__(self):
        """Print a summary of the current dataset status."""
        print(f"[INFO] {self.mode} Dataset:")
        print(f"TripletGenerator instance with value: {self.count_number_samples()} \n"
              f"and total imgs with value: {self.count_number_images()}")


In [None]:
# Create and Split the Dataset Using subset_manager

# Initialize the subset manager with the root dataset path
subset = subset_manager(pathData)

# Split the dataset folders into training, validation, and test sets
train_folders, val_folders, test_folders = subset.get_split_folders()


In [None]:
# Build tf.data Pipeline for Training

# Initialize the TripletGenerator with the training folders
TrainGenerator = TripletGenerator(train_folders)

# Create a tf.data.Dataset from the generator that yields (anchor, positive, negative) image paths
TfDatasetTrain = tf.data.Dataset.from_generator(
    generator=TrainGenerator.get_next_element,  # Custom generator yielding image path triplets
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),  # Anchor path
        tf.TensorSpec(shape=(), dtype=tf.string),  # Positive path
        tf.TensorSpec(shape=(), dtype=tf.string),  # Negative path
    )
)

# Number of triplets to use per epoch (controls how many samples are taken from the generator)
k = 20000

# Create a MapFunction instance to decode and preprocess images
mapFunctionTrain = MapFunction(imageSize=target_shape)

# Map preprocessing function to the dataset
train_dataset = TfDatasetTrain.map(mapFunctionTrain, num_parallel_calls=tf.data.AUTOTUNE)

# Final data pipeline:
# - Take `k` triplets
# - Cache for performance
# - Shuffle for randomness
# - Batch for training
# - Prefetch for optimized GPU utilization
train_dataset = (
    train_dataset
    .take(k)
    .cache()
    .shuffle(256)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)


In [None]:
# Build tf.data Pipeline for Validation

# Initialize the TripletGenerator with validation folders
ValGenerator = TripletGenerator(val_folders, mode='Validation')

# Create a tf.data.Dataset from the generator that yields validation triplets
TfDatasetVal = tf.data.Dataset.from_generator(
    generator=ValGenerator.get_next_element,  # Custom generator yielding triplets
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),  # Anchor path
        tf.TensorSpec(shape=(), dtype=tf.string),  # Positive path
        tf.TensorSpec(shape=(), dtype=tf.string),  # Negative path
    )
)

# Create a MapFunction instance for preprocessing validation images
mapFunctionVal = MapFunction(imageSize=target_shape)

# Map the preprocessing function to the dataset
val_dataset = TfDatasetVal.map(mapFunctionVal, num_parallel_calls=tf.data.AUTOTUNE)

# Final validation dataset pipeline:
# - Take the same number of triplets as training (k)
# - Cache for performance
# - No shuffle (important for consistent evaluation)
# - Batch for model input
# - Prefetch for efficient data loading
val_dataset = (
    val_dataset
    .take(k)
    .cache()
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.AUTOTUNE)
)


In [None]:

def visualize(anchor_batch, positive_batch, negative_batch, num_triplets=3):
    """
    Visualize a few triplets from the supplied batches.

    Parameters:
      anchor_batch, positive_batch, negative_batch: batches of images,
          each of shape (batch_size, H, W, C)
      num_triplets: how many triplets to show
    """

    fig, axs = plt.subplots(num_triplets, 3, figsize=(9, 3 * num_triplets))

    def show(ax, image, title=None):
        ax.imshow(image)
        ax.axis('off')
        if title:
            ax.set_title(title)

    for i in range(num_triplets):
        show(axs[i, 0], anchor_batch[i], 'Anchor' if i == 0 else None)
        show(axs[i, 1], positive_batch[i], 'Positive' if i == 0 else None)
        show(axs[i, 2], negative_batch[i], 'Negative' if i == 0 else None)

    plt.tight_layout()
    plt.show()


# Usage example:
for batch in train_dataset.take(1):
    inputs, targets = batch
    anchor_batch, positive_batch, negative_batch = inputs  # inputs is a tuple/list of 3 batches
    visualize(anchor_batch, positive_batch, negative_batch, num_triplets=3)


## Model Definition & Training with Distributed Strategy

In [None]:
# Setup for Distributed Training

# Create a MirroredStrategy to distribute the model across multiple GPUs
strategy = tf.distribute.MirroredStrategy()

# Print the number of replicas (i.e., GPUs) being used
print("Number of devices: ", strategy.num_replicas_in_sync)


In [None]:
# Custom Triplet Margin Loss Function
# This loss encourages the network to place the anchor closer to the positive than to the negative by at least a margin.

@register_keras_serializable()  # Makes the custom loss serializable for model saving/loading
class TripletMarginLoss(tf.keras.losses.Loss):
    def __init__(self, margin=0.5,
                 reduction=tf.keras.losses.Reduction.SUM_OVER_BATCH_SIZE,
                 name="triplet_margin_loss"):
        """
        Args:
            margin (float): Minimum distance by which a negative sample should be farther than a positive one.
            reduction (str): How to reduce loss across batches.
            name (str): Name for the loss function.
        """
        super().__init__(reduction=reduction, name=name)
        self.margin = margin

    def call(self, _, y_pred):
        """
        Computes the triplet margin loss.

        Args:
            _ (unused): Ground truth labels (not needed for triplet loss).
            y_pred (tf.Tensor): Predicted distances of shape (batch_size, 2),
                                where [:,0] is anchor-positive and [:,1] is anchor-negative.

        Returns:
            tf.Tensor: Scalar loss value.
        """
        ap_distance = y_pred[:, 0]  # Distance between anchor and positive
        an_distance = y_pred[:, 1]  # Distance between anchor and negative

        # Loss = max(ap - an + margin, 0)
        loss = tf.maximum(ap_distance - an_distance + self.margin, 0.0)

        return tf.reduce_mean(loss)  # Mean loss over the batch

    def get_config(self):
        """
        Required for Keras serialization.
        """
        config = super().get_config()
        config.update({
            "margin": self.margin
        })
        return config



In [None]:
# Custom Distance Layer for Triplet Loss
# This layer computes the squared L2 distances between:
# - Anchor and Positive embeddings
# - Anchor and Negative embeddings

class DistanceLayer(tf.keras.layers.Layer):

    def __init__(self, **kwargs):
        """
        Initialize the custom Keras layer.
        Inherits from keras.layers.Layer and accepts optional keyword arguments.
        """
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        """
        Compute the pairwise distances between anchor-positive and anchor-negative.

        Args:
            anchor (tf.Tensor): Embedding vector for the anchor.
            positive (tf.Tensor): Embedding vector for the positive sample.
            negative (tf.Tensor): Embedding vector for the negative sample.

        Returns:
            tf.Tensor: A stacked tensor of shape (batch_size, 2) with
                       [AP_distance, AN_distance] for each triplet.
        """
        # Squared L2 distance between anchor and positive
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), axis=-1)

        # Squared L2 distance between anchor and negative
        an_distance = tf.reduce_sum(tf.square(anchor - negative), axis=-1)

        # Stack the distances into a single tensor for use in loss computation
        return tf.stack([ap_distance, an_distance], axis=1)


In [None]:
# Custom layer to perform L2 normalization

class L2Normalization(tf.keras.layers.Layer):
    def __init__(self, axis=1, **kwargs):
        """
        Custom layer to perform L2 normalization on inputs along a specified axis.

        Args:
            axis (int): The axis along which to perform normalization. Default is 1.
            **kwargs: Additional keyword arguments for Layer superclass.
        """
        super(L2Normalization, self).__init__(**kwargs)
        self.axis = axis  # Store the axis to normalize along

    def call(self, inputs):
        """
        Forward pass: apply L2 normalization to the input tensor.

        Args:
            inputs (tf.Tensor): Input tensor to normalize.

        Returns:
            tf.Tensor: L2-normalized tensor along the specified axis.
        """
        return tf.math.l2_normalize(inputs, axis=self.axis)

    def get_config(self):
        """
        Returns the config of the layer, useful for saving/loading the model.

        Returns:
            dict: Configuration dictionary with the axis parameter included.
        """
        config = super().get_config()
        config.update({"axis": self.axis})
        return config

In [None]:
def create_model(checkpoint_filepath, FineTuningMode=False):
    """
    Creates and returns a Siamese network model using ResNet50 as the backbone.

    Args:
        checkpoint_filepath (str): Path to load model weights if fine-tuning.
        FineTuningMode (bool): If True, enables partial fine-tuning of ResNet50.

    Returns:
        tf.keras.Model: A Siamese network model that outputs anchor-positive and anchor-negative distances.

    Why normalize embeddings before computing distance?
    - Normalization (e.g., L2 normalization) projects embeddings onto the unit hypersphere.
    - This makes Euclidean distance equivalent to cosine similarity in terms of ranking.
    - It improves stability and convergence of training.
    - Prevents embeddings from growing arbitrarily large, stabilizing triplet loss.
    """

    base_cnn = tf.keras.applications.ResNet50(
        weights="imagenet", input_shape=IMG_SHAPE, include_top=False
    )

    if FineTuningMode:
        trainable = False
        for layer in base_cnn.layers:
            if layer.name == "conv5_block2_out":
                trainable = True
            layer.trainable = trainable
    else:
        base_cnn.trainable = False

    flatten = tf.keras.layers.Flatten()(base_cnn.output)
    dense1 = tf.keras.layers.Dense(512, activation="relu")(flatten)
    dense1 = tf.keras.layers.BatchNormalization()(dense1)

    dense2 = tf.keras.layers.Dense(256, activation="relu")(flatten)
    dense2 = tf.keras.layers.BatchNormalization()(dense2)

    output = tf.keras.layers.Dense(128)(dense1)
    output = L2Normalization(axis=1)(output)

    embedding = tf.keras.Model(base_cnn.input, output, name="Embedding")

    anchor_input = tf.keras.layers.Input(name="anchor", shape=target_shape + (3,))
    positive_input = tf.keras.layers.Input(name="positive", shape=target_shape + (3,))
    negative_input = tf.keras.layers.Input(name="negative", shape=target_shape + (3,))

    distances = DistanceLayer()(
        embedding(tf.keras.applications.resnet.preprocess_input(anchor_input)),
        embedding(tf.keras.applications.resnet.preprocess_input(positive_input)),
        embedding(tf.keras.applications.resnet.preprocess_input(negative_input)),
    )

    siamese_network = tf.keras.Model(
        inputs=[anchor_input, positive_input, negative_input], outputs=distances
    )

    if FineTuningMode:
        siamese_network.load_weights(checkpoint_filepath)

    return siamese_network


In [None]:
# Path to save/load model weights (checkpoint) - (update this as needed for your environment)
checkpoint_filepath = r'./Siamese.keras'

# Training schedule parameters
initial_epochs = 10          # Number of epochs for initial training (frozen backbone)
fine_tune_epochs = 190       # Number of epochs for fine-tuning (unfreeze backbone)
total_epochs = initial_epochs + fine_tune_epochs  # Total epochs to train

In [None]:
# Build and compile the Siamese network inside the distribution strategy scope
with strategy.scope():
    # Create the model, passing the checkpoint path (freeze mode assumed here)
    siamese_network = create_model(checkpoint_filepath=checkpoint_filepath, FineTuningMode=False)

    # Compile the model with Adam optimizer and custom Triplet Margin Loss
    siamese_network.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=TripletMarginLoss()
    )

    # Instantiate optimizer separately if needed later (e.g., for fine-tuning)
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)


In [None]:
# ModelCheckpoint callback to save the best model during training

checkpoint_cb = ModelCheckpoint(
    filepath=checkpoint_filepath,       # Where to save the model file
    monitor='val_loss',                 # Metric to monitor
    save_best_only=True,                # Save only when val_loss improves
    save_weights_only=False,            # Save full model (architecture + weights)
    mode='min',                        # 'val_loss' should be minimized
    verbose=1                         # Show messages when saving
)

In [None]:
# Training Phase 1: Train with Frozen Base Model

# In this phase, the backbone (ResNet50) layers are frozen (non-trainable),
# so only the newly added dense layers and batch norm layers get updated.
# This helps the model stabilize initial embedding learning without
# disturbing pretrained feature extraction.

history = siamese_network.fit(
    train_dataset,           # Training data (triplet batches)
    validation_data=val_dataset,  # Validation data for monitoring
    epochs=initial_epochs,   # Train for the initial number of epochs (frozen backbone)
    callbacks=[checkpoint_cb]  # Save best model based on validation loss
)


In [None]:
# Plot training and validation loss over epochs to monitor performance

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Loss')
plt.ylim([0, 1.0])  # Limit y-axis for better visualization
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.show()


In [None]:
# Fine-tuning phase: Unfreeze part of the base model and continue training

with strategy.scope():
    # Create model with fine-tuning enabled (unfreeze last ResNet blocks)
    siamese_network = create_model(checkpoint_filepath=checkpoint_filepath, FineTuningMode=True)

    # Compile with optimizer and custom triplet margin loss
    siamese_network.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=TripletMarginLoss()
    )

    # Keep an optimizer instance if needed later (optional)
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)


In [None]:
# Fine-tuning training: Continue training the model with unfrozen layers

history_fine = siamese_network.fit(
    train_dataset,           # Training data (triplets)
    validation_data=val_dataset,  # Validation data for monitoring
    epochs=total_epochs,     # Total epochs including initial + fine-tuning
    callbacks=[checkpoint_cb]  # Save the best model checkpoint during fine-tuning
)

In [None]:
# Combine loss history from initial training and fine-tuning phases

loss += history_fine.history['loss']
val_loss += history_fine.history['val_loss']

# Find the epoch with the lowest validation loss (best model)
BestWeights = val_loss.index(min(val_loss))

print(f"[INFO] the siamese network training results:")
print(f'Saved Best Weights at epoch {BestWeights}.')
print(f'Training loss at best epoch: {loss[BestWeights]:.4f}')
print(f'Validation loss at best epoch: {val_loss[BestWeights]:.4f}')


In [None]:
# Plot combined training and validation loss with markers for key events

plt.subplot(2, 1, 2)  # Use second subplot if plotting multiple figures above

plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.ylim([0, 1.1])
plt.xlim([1, BestWeights + 2])  # Focus x-axis around the best epoch

# Set x-axis ticks every 10 epochs
plt.xticks(np.arange(0, total_epochs + 1, 10))

# Vertical line marking start of fine-tuning phase
plt.axvline(x=initial_epochs, color='orange', linestyle='--', label='Start Fine Tuning')

# Vertical line marking epoch with best validation loss
plt.axvline(x=BestWeights, color='blue', linestyle='--', label='Save Best Weights')

plt.legend()
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.show()


In [None]:
# Save training and validation loss history to an Excel file for later analysis

df = pd.DataFrame()
df['Train'] = loss           # Training loss per epoch
df['Validation'] = val_loss  # Validation loss per epoch

df.to_excel('Loss.xlsx', index=False)  # Export DataFrame to 'Loss.xlsx' without row indices


## Evaluation of the Trained Model

## Test Set Evaluation Logic

- For each identity (ID), **select 21 images** evenly spaced from the available images.
- From these 21 images, **choose the first image as the "indicator"** embedding representing that ID.
- The remaining 20 images are used as **test samples** for that ID.
- Extract embeddings for all indicators and test samples using the trained embedding model.
- For each indicator:
  - Calculate distances to its own test samples (**anchor-positive distances**).
  - Calculate distances to test samples from **all other IDs** (**anchor-negative distances**).
- Using these distances, build a **confusion matrix** that reflects how often the model correctly matches the anchor to its positives versus negatives.
- This setup simulates the triplet structure used during training:
  - **Anchor** = indicator embedding
  - **Positive** = test sample from the same ID
  - **Negative** = test sample from a different ID
- The confusion matrix and distance comparisons help evaluate how well the model separates different classes based on the embeddings.

---


In [None]:
# Load the best saved weights into the Siamese network model before evaluation
siamese_network.load_weights(checkpoint_filepath)


# Extract the embedding sub-model (named 'Embedding') from the trained Siamese network
for layer in siamese_network.layers:
    if layer.name == 'Embedding':
        trainedEmbedding = layer
        break  # Optional: stop once found

In [None]:
# Save the trained model to a file named "embedding_model.keras"
# This saves the entire model architecture, weights, and optimizer state (if any).
trainedEmbedding.save("./embedding_model.keras")

In [None]:
# Load the saved model from disk.
# Because the model includes a custom layer (L2Normalization),
# we need to provide it in the `custom_objects` dictionary
# so Keras knows how to reconstruct that layer when loading.
trainedEmbedding = tf.keras.models.load_model(
    "./embedding_model.keras",
    custom_objects={"L2Normalization": L2Normalization}
)

In [None]:
# Prepare and extract embeddings for the test set samples

def decode_and_resize(imagePath, imageSize):
    # Read image from disk, decode JPEG, convert to float32, and resize to target shape
    image = tf.io.read_file(imagePath)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    image = tf.image.resize(image, imageSize)
    return image

TestSamples, indSamples = [], []

for folder in test_folders:
    # Collect image file paths and shuffle them
    imgs = glob.glob(os.path.join(folder, '*.jpg'))
    random.shuffle(imgs)

    # Select 21 evenly spaced images from the folder
    indices = np.linspace(0, len(imgs)-1, 21, dtype=int)
    imgs = [imgs[i] for i in indices]

    allImgs = []
    for img_path in imgs:
        img = decode_and_resize(img_path, target_shape)
        allImgs.append(img)

    # Convert list to numpy array and preprocess for ResNet
    allImgs = np.array(allImgs)
    allImgs = tf.keras.applications.resnet.preprocess_input(allImgs)

    # Get embeddings from the trained embedding model
    features = trainedEmbedding(allImgs)

    # First embedding as indicator, rest as test samples
    indSamples.append(features[0])
    TestSamples.append(features[1:])

# Convert lists to numpy arrays
indSamples = np.array(indSamples)
TestSamples = np.array(TestSamples)

print(f'Indicators shape: {indSamples.shape} and Test Samples shape: {TestSamples.shape}')


In [None]:
# Calculate a confusion matrix based on triplet distances between embeddings

confusionMatrix = []

for i in range(indSamples.shape[0]):
    ind = indSamples[i]               # Anchor indicator embedding for class i
    samples = TestSamples[i, ...]     # Positive samples embeddings for class i

    # Compute anchor-positive distances (within same class)
    ap_distance = tf.reduce_sum(tf.square(ind - samples), axis=-1).numpy()

    # Initialize confusion vector for class i with zeros
    confusionVector = [0] * indSamples.shape[0]

    for j in range(indSamples.shape[0]):
        if j == i:
            continue  # Skip same class comparison

        # Negative samples embeddings from class j
        samples_neg = TestSamples[j, ...]

        # Compute anchor-negative distances (between classes)
        an_distance = tf.reduce_sum(tf.square(ind - samples_neg), axis=-1).numpy()

        # Stack distances for comparison: shape (2, number_of_samples)
        pair = np.array([ap_distance, an_distance])

        # Find which distance is smaller for each sample (0: ap_distance, 1: an_distance)
        min_indices = tf.argmin(pair, axis=0)

        # Count how many times negative distance is smaller (hard negatives)
        count = int(tf.reduce_sum(min_indices).numpy())

        # Update confusion vector counts
        confusionVector[i] += min_indices.shape[0] - count  # Correct matches (AP smaller)
        confusionVector[j] = count                          # Confused with class j (AN smaller)

    confusionMatrix.append(confusionVector)

confusionMatrix = np.array(confusionMatrix)


## Assessing Model Accuracy and Metrics

Confusion Matrix Visualization

This function plots the confusion matrix as a heatmap using Seaborn, where:

- The x-axis corresponds to predicted classes.
- The y-axis corresponds to true classes.
- Cell values represent the number of samples classified from true class (row) to predicted class (column).
- Color intensity helps quickly identify where the model performs well or struggles.

Use this to visually inspect the model's classification performance across different identities.

---

Per-Class Evaluation Metrics Calculation

For each class, we compute the following metrics using the confusion matrix:

- **True Positives (TP):** Correctly predicted samples of this class.
- **False Positives (FP):** Samples incorrectly predicted as this class.
- **False Negatives (FN):** Samples of this class incorrectly predicted as others.
- **True Negatives (TN):** Samples correctly predicted as not belonging to this class.

From these, we calculate:

- **Accuracy:** Overall correctness of predictions.
- **Precision:** How many predicted positives are actually correct.
- **Sensitivity (Recall):** How many actual positives were correctly identified.
- **Specificity:** How many actual negatives were correctly identified.
- **Approximate AUC:** Average of sensitivity and specificity to estimate the Area Under the Curve.

These metrics help understand the model's performance on each individual class.

---

Adding Average Metrics Row

- After calculating per-class metrics, we compute the average of each metric across all classes.
- This average row is appended to the results matrix to provide an overall summary.
- The `class_names` list is updated accordingly to label this summary row as "Average per class".

---

Visualization of Per-Class Performance Metrics

- The heatmap displays the evaluation metrics (Accuracy, Precision, Sensitivity, Specificity, AUC) for each class.
- Scores are shown in percentage form for better readability.
- The color intensity indicates higher or lower metric values, providing a quick visual assessment.
- The last row corresponds to the average metrics across all classes.


In [None]:
def show_confusion_matrix(cm, labels):
    """
    Display a confusion matrix as a heatmap using Seaborn.

    Args:
        cm (np.array): Confusion matrix (2D array).
        labels (list): List of label names for the axes.
    """
    plt.figure(figsize=(20, 20))
    sns.heatmap(cm, xticklabels=labels, yticklabels=labels,
                annot=True, fmt='g', cmap='Greens')
    plt.xlabel('Prediction')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix Heatmap')
    plt.show()

In [None]:
# Create class names based on the number of identities in the confusion matrix
class_names = [f'S{i}' for i in range(confusionMatrix.shape[0])]

# Visualize the confusion matrix using the class names as labels
show_confusion_matrix(confusionMatrix, class_names)

In [None]:
result_matrix = []
num_class = confusionMatrix.shape[0]

for i in range(num_class):
    # True Positives: correctly predicted samples for class i
    n_tp = confusionMatrix[i][i]

    # False Positives: samples predicted as class i but belonging to other classes
    n_fp = sum([confusionMatrix[i][j] for j in range(num_class)]) - n_tp

    # False Negatives: samples belonging to class i but predicted as other classes
    n_fn = sum([confusionMatrix[j][i] for j in range(num_class)]) - n_tp

    # True Negatives: samples neither predicted as class i nor belonging to class i
    n_tn = confusionMatrix.sum() - n_tp - n_fp - n_fn

    # Compute evaluation metrics for class i
    acc = (n_tp + n_tn) / (n_tp + n_tn + n_fp + n_fn)  # Accuracy
    Pr = n_tp / (n_tp + n_fp)                          # Precision
    Sen = n_tp / (n_tp + n_fn)                         # Sensitivity / Recall
    Spe = n_tn / (n_tn + n_fp)                         # Specificity
    AUC = (Sen + Spe) / 2                              # Approximate AUC

    result_matrix.append([acc, Pr, Sen, Spe, AUC])

result_matrix = np.asarray(result_matrix)


In [None]:
# Add label for the average metrics row
class_names.append('Average per class')

# Calculate average metrics across all classes
average = result_matrix.sum(axis=0) / num_class

# Expand dims to make average compatible for appending as a new row
average = np.expand_dims(average, axis=0)

# Append the average metrics as a new row to the result matrix
result_matrix = np.append(result_matrix, average, axis=0)

In [None]:
metrics_names = ['Accuracy', 'Precision', 'Sensitivity', 'Specificity', 'AUC']

# Plot per-class evaluation metrics as a heatmap
plt.figure(figsize=(12, 14))
sns.heatmap(result_matrix * 100, annot=True, fmt=".2f", cmap="Greens",
            xticklabels=metrics_names, yticklabels=class_names,
            linewidths=0.5, cbar_kws={'label': 'Score (%)'})
plt.title("Per-Class Performance Metrics")
plt.xlabel("Metrics")
plt.ylabel("Classes")
plt.tight_layout()
plt.show()
