# Image processing and feature build-up

This notebook loads and processes the different images used in this project. Using a Siamese network with contrastive loss, produces optimal features for downstream tasks

In [None]:
import os
import numpy as np
from array import array
import matplotlib.pyplot as plt
import importlib
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from sklearn.metrics import accuracy_score



# Custom functions and modules
from auxFuns.process_images import *
from auxFuns.build_features import LeNet, ContrastiveLoss, compute_pairwise_distances

In [None]:
# Reload the packages
import auxFuns.process_images 
importlib.reload(auxFuns.process_images)

import auxFuns.build_features 
importlib.reload(auxFuns.build_features)

In [None]:
# import os
# os.chdir('..')
os.getcwd()

## 1. Load the MNIST images

In [None]:
# Loading of all MNIST images in training, testing and validation sets
processed_data = os.getcwd() + '/data/MNIST/raw'

# Training data
training_images_mnist = load_mnist_images(processed_data + '/train-images-idx3-ubyte')
training_labels_mnist = load_mnist_labels(processed_data + '/train-labels-idx1-ubyte')

# Testing and validation
all_test_images_mnist = load_mnist_images(processed_data + '/t10k-images-idx3-ubyte')
all_test_labels_mnist = load_mnist_labels(processed_data + '/t10k-labels-idx1-ubyte')


testing_images_mnist = all_test_images_mnist[:all_test_images_mnist.shape[0]//2 ,:,:]
valid_images_mnist = all_test_images_mnist[all_test_images_mnist.shape[0]//2 : ,:,:]

testing_labels_mnist = all_test_labels_mnist[:all_test_images_mnist.shape[0]//2]
valid_labels_mnist = all_test_labels_mnist[all_test_images_mnist.shape[0]//2 : ]


In [None]:
print('Shapes of the images data : ')
print(training_images_mnist.shape, testing_images_mnist.shape, valid_images_mnist.shape)

print('\nShapes of the labels : ')
print((training_labels_mnist).shape, testing_labels_mnist.shape, valid_labels_mnist.shape)

In [None]:
# As an example, show the examples of images in the training, testing and validation sets
index = 2 
show_image_MNIST(training_images_mnist[index], title = 'Training image')
show_image_MNIST(testing_images_mnist[index], title = 'Testing image')
show_image_MNIST(valid_images_mnist[index], title = 'Validation image')



In [None]:
# Now, again for visualization purposes, let us show sequence of images in training, test and validation sets

sequence_length = 5

#Training 
sequence_images_training = training_images_mnist[:sequence_length]
sequence_labels_training = training_labels_mnist[:sequence_length]
show_sequence_MNIST(sequence_images_training, sequence_labels_training)

# Testing
sequence_images_testing = testing_images_mnist[:sequence_length]
sequence_labels_testing = testing_labels_mnist[:sequence_length]
show_sequence_MNIST(sequence_images_testing, sequence_labels_testing)

# Validation
sequence_images_valid = valid_images_mnist[:sequence_length]
sequence_labels_valid = valid_labels_mnist[:sequence_length]
show_sequence_MNIST(sequence_images_valid, sequence_labels_valid)

Need to preprocess the images to make them "Pytorch-friendly". We need all of the following steps : 
1. Normalize the Images

2. Reshape for Convolutional Layer, i.e. add the channel dimension

3. Create Image Pairs (label 0 for similar, 1 for dissimilar)

4. Dataloader: Use a dataloader that can handle the pairs of images and labels for batching and shuffling during training.

In [None]:
# Normalize the images and add the channel dimension
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

training_images_mnist = np.array([transform(image) for image in training_images_mnist])
testing_images_mnist = np.array([transform(image) for image in testing_images_mnist])
valid_images_mnist = np.array([transform(image) for image in valid_images_mnist])

In [None]:
# Create image pairs
training_pairs, training_labels = create_pairs_MNIST(training_images_mnist, training_labels_mnist, num_pairs=3, p=0.4)
testing_pairs, testing_labels = create_pairs_MNIST(testing_images_mnist, testing_labels_mnist, num_pairs=3, p=0.4)
valid_pairs, valid_labels = create_pairs_MNIST(valid_images_mnist, valid_labels_mnist, num_pairs=3, p=0.4)

In [None]:
# Transform the pairs of tuples into a DataLoader to feed the network
batch_size = 64
train_dataset, train_loader = fromtuple2loader(training_pairs, training_labels, batch_size=batch_size)
test_dataset, test_loader = fromtuple2loader(testing_pairs, testing_labels, batch_size=batch_size)
valid_dataset, valid_loader = fromtuple2loader(valid_pairs, valid_labels, batch_size=batch_size)

In [None]:
# Ensure the Dataloader contains the correct information
visualize_loader_pairs_MNIST(train_loader)

## 2. Train the Siamese network (contrastive loss)

Build-up of an initial Siamese network that will bring closer similar images (similar images are defined as those whose class is the same and move away dissimilar ones). The training should follow the next steps : 
1. Forward Pass: Pass the image pairs through the network. Each leg of the Siamese network processes one image of the pair, and it should return two embeddings

2. Contrastive Loss: Use the contrastive loss function to calculate the loss based on the embeddings from the Siamese network and the label indicating whether the pair is similar or dissimilar

In [None]:
def compute_validation_loss(model, valid_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # No need to track gradients for validation
        valid_loss = 0.0
        for data, labels in valid_loader:
            image1, image2 = data[:,0], data[:,1]
            labels = labels.float()

            # Forward pass
            output1 = model(image1)
            output2 = model(image2)

            # Compute loss
            loss = criterion.forward(output1, output2, labels)
            valid_loss += loss.item()

    return valid_loss / len(valid_loader)

In [None]:
# Create the net
siamese_net = LeNet(out_dim = 4)

# Key hyperparameter : the margin. It refers to the maximum allowed distance between dissimilar classes
criterion = ContrastiveLoss(margin=2)

optimizer = torch.optim.Adam(siamese_net.parameters(), lr=0.01)  

num_epochs = 10 

train_losses = []
test_losses = []
# Parameters for early stopping
patience = 2  
best_loss = float('inf')
trigger_times = 0

for epoch in range(num_epochs):
    for batch_idx, (data, labels) in enumerate(train_loader):
        image1, image2 = data[:,0], data[:,1]
        labels = labels.float()

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output1 = siamese_net(image1)
        output2 = siamese_net(image2)

        # Calculate loss
        loss = criterion.forward(output1, output2, labels)

        # Backward pass
        loss.backward()

        # Optimize the weights
        optimizer.step()

        if batch_idx % 350 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item()}')


    train_loss = compute_validation_loss(siamese_net, train_loader, criterion)
    test_loss = compute_validation_loss(siamese_net, test_loader, criterion)

    train_losses.append(train_loss)
    test_losses.append(test_loss)    

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss}')
    print(f'Epoch [{epoch+1}/{num_epochs}], Test Loss: {test_loss}')

    # Early stopping
    if test_loss < best_loss:
        best_loss = test_loss
        trigger_times = 0
    else:
        trigger_times += 1
        if trigger_times >= patience:
            print(f"Early stopping! Best Test Loss: {best_loss}")
            break

torch.save(siamese_net.state_dict(), os.getcwd() + '/models/siamese_embedder.pth')

## 3. Visualize the resulting features

The whole goal of this notebook was to build a NNet capable of building relevant features. In this section we want to visualizae in a 3-D space that these are relevant embeddings of the images

### Evaluation metrics (accuracy per class and so on)

In [None]:
def evaluate_thresholds(distances, true_labels):
    thresholds = np.linspace(min(distances), max(distances), num=100)
    # thresholds = np.linspace(0,1, num = 100)
    accuracies = []
    
    for threshold in thresholds:
        predictions = (distances > threshold).astype(int)
        accuracy = accuracy_score(true_labels, predictions)
        accuracies.append(accuracy)
    
    return thresholds, accuracies

distances, true_labels = compute_pairwise_distances(siamese_net, valid_loader)
thresholds, accuracies = evaluate_thresholds(distances, true_labels)

plt.figure(figsize=(10, 6))
plt.plot(thresholds, accuracies, label='Accuracy')
plt.xlabel('Threshold / allowed distances to cluster')
plt.ylabel('Accuracy')
plt.title('Accuracy vs. Pairwise distance for Siamese Network')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
plt.figure(figsize=(10, 6))
plt.plot(thresholds, accuracies, label='Accuracy')
plt.xlabel('Threshold / allowed distances to cluster')
plt.ylabel('Accuracy')
plt.title('Accuracy vs. Pairwise distance for Siamese Network')
plt.legend()
plt.grid(True)
plt.show()

### Visualization in the 2D space


THIS PART NEEDS TO BE REPEATED AGAIN AS THE RESULTS ARE MISLEADING

In [None]:
valid_dataset, valid_loader = fromtuple2loader(valid_pairs, valid_labels, batch_size=5000)

def get_embeddings_every_3(model, data_loader):
    model.eval()
    embeddings = []
    
    with torch.no_grad():
        for i, (images, _) in enumerate(data_loader):
            if i % 3 == 0:  # Process only every third image
                image1 = images[:,0]  # Assuming you're interested in embeddings of image1
                emb = model(image1).numpy()  # Get embeddings
                embeddings.append(emb)
    
    embeddings = np.concatenate(embeddings, axis=0)
    return embeddings
embeddings_valid = get_embeddings_every_3(siamese_net, valid_loader)

In [None]:
embeddings_valid.shape, valid_labels_mnist.shape

In [None]:
embeddings_valid.shape

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# Assuming `embeddings` is your [5000, 3] array and `labels` is your (5000,) array
def plot_embeddings_3D_and_2D(embeddings, labels):
    fig = plt.figure(figsize=(20, 5))
    
    # 3D plot
    ax = fig.add_subplot(141, projection='3d')
    scatter = ax.scatter(embeddings[:, 0], embeddings[:, 1], embeddings[:, 2], c=labels, cmap='tab10', s=2)
    ax.set_title('3D Embeddings')
    legend1 = ax.legend(*scatter.legend_elements(), title="Classes")
    ax.add_artist(legend1)

    # XY projection
    ax2 = fig.add_subplot(142)
    scatter = ax2.scatter(embeddings[:, 0], embeddings[:, 1], c=labels, cmap='tab10', s=2)
    ax2.set_title('XY plane')
    legend2 = ax2.legend(*scatter.legend_elements(), title="Classes")
    ax2.add_artist(legend2)

    # XZ projection
    ax3 = fig.add_subplot(143)
    scatter = ax3.scatter(embeddings[:, 0], embeddings[:, 2], c=labels, cmap='tab10', s=2)
    ax3.set_title('XZ plane')
    legend3 = ax3.legend(*scatter.legend_elements(), title="Classes")
    ax3.add_artist(legend3)

    # YZ projection
    ax4 = fig.add_subplot(144)
    scatter = ax4.scatter(embeddings[:, 1], embeddings[:, 2], c=labels, cmap='tab10', s=2)
    ax4.set_title('YZ plane')
    legend4 = ax4.legend(*scatter.legend_elements(), title="Classes")
    ax4.add_artist(legend4)

    plt.show()

plot_embeddings_3D_and_2D(embeddings_valid[:5000], valid_labels_mnist)
