In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchsummary import summary

import my_utility as mu
from my_utility import ImageDataSet, ContrastiveLoss

Pytorch training pipeline
1. Design model (input size, output size, forward pass and operations/layers)
2. Construct loss and optimizer
3. Training loop
    - forward pass: compute prediction
    - backward pass: get gradients
    - update weights
    - iterate until "done"

# Setup

In [None]:
# Parametri della rete
epochs = 10
batch_size = 16
learning_rate = 0.01
margin = 1  # Margin for contrastive loss.

In [None]:
# Selezione del device da usare per il training
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

In [None]:
#tf.config.run_functions_eagerly(True)

n = 384 # Dimensione codifica frattale

# Parametri del dataset
num_samples_train = 8000
num_samples_val = 2000
num_samples_test = 2000

In [None]:
# Inizializzazione
# Path a cartelle di immagini e file di embeddings
root_dir = ".\img_celeba_10000"

train_img_dir = root_dir + "\img_celeba_10000_train"
train_codify_dir = root_dir + "\codify_celeba_10000_train.csv"

valid_img_dir = root_dir + "\img_celeba_10000_valid"
valid_codify_dir = root_dir + "\codify_celeba_10000_valid.csv"

test_img_dir = root_dir + "\img_celeba_2000_test"
test_codify_dir = root_dir + "\codify_celeba_2000_test.csv"


# Caricamento datasets
train_set = ImageDataSet(img_dir=train_img_dir, codify_dir=train_codify_dir)
valid_set = ImageDataSet(img_dir=valid_img_dir, codify_dir=valid_codify_dir)
test_set = ImageDataSet(img_dir=test_img_dir, codify_dir=test_codify_dir)

In [None]:
# DataLoaders
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
# Test per verificare il corretto funzionamento
train_features, train_labels = next(iter(train_loader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")
img = train_features[0].squeeze()
label = train_labels[0]
plt.imshow(img, cmap="gray")
plt.show()
print(f"Label: {label}")

# Network architecture 

## Appunti

Architettura rete
    seq_modules = nn.Sequential(
        nn.BatchNorm2d(num_features=128),
        nn.Conv2d(in_channels=1, out_channels=4, kernel_size=(5, 5)),
        nn.Tanh(),
        nn.AvgPool2d(kernel_size=(2, 2)),
        nn.Conv2d(in_channels=4, out_channels=16, kernel_size=(5, 5)),
        nn.Tanh(),
        nn.AvgPool2d(kernel_size=(2, 2)),
        nn.Flatten(),
        nn.BatchNorm2d(),
        nn.Linear(),
        nn.Tanh()            
    )

nn.Conv2d():
- in_channels: numero di canali in input. Dato che le immagini vengono convertite in scala di grigio, avremo un solo canale
- out_channels: numero di canali in output.


La formula per calcolare l'output del pooling è data da
$$output_{width} = \frac{W-F+2*P}{S}+1$$

dove
- W è la dimensione dell'immagine (es. W=128 se l'immagine è 124x124)
- F è la dimnesione del kernel (es. F=2 se il kernel è 2x2)
- P è il padding (dalla documentazione, "implicit zero padding to be added on both sides")
- S è lo stride, di default uguale al kernel size F

Se S è 2, la dimensione del tensore viene dimezzata (es. da 124x124 a 62x62)

## Model class

In [None]:
# TEST, DA RIMUOVERE
#img_tensor_1 = test_images[0:batch_size]
#emb_tensor_1 = test_embeddings[0:batch_size]

In [None]:
class SiameseNeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Batch Normalization layer 1
        self.bn1 = nn.BatchNorm2d(num_features=1)
        
        # Batch Normalization layer 2
        self.bn2 = nn.BatchNorm1d(num_features=13456)
        
        # Convolutional layer 1
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=4, kernel_size=(5, 5))
        
        # Average pooling layer
        self.avgp2_2 = nn.AvgPool2d(kernel_size=(2, 2))
        
        # Convolutional layer 2
        self.conv2 = nn.Conv2d(in_channels=4, out_channels=16, kernel_size=(5, 5))
        
        # Fully connected layer
        self.fc = nn.Linear(in_features=13456, out_features=384)
        
        
    def forward(self, x):
        # bs: batch_size
        
        # Input: tensore di immagini bsx1x128x128 (1 è il canale perché siamo in scala di grigio)
        out = self.bn1(x)
        # Output: immagine bsx1x128x128 ma con valori normalizzati
        
        # Input: immagine bsx1x128x128 ma con valori normalizzati
        out = nn.functional.tanh(self.conv1(out))
        # Output: tensore bsx4x124x124
        
        # Input: tensore bsx4x124x124
        out = self.avgp2_2(out)
        # Output: bsx4x62x62
        
        # Input: bsx4x62x62
        out = nn.functional.tanh(self.conv2(out))
        # Output: bsx16x58x58
        
        # Input: bsx16x58x58
        out = self.avgp2_2(out)
        # Output: bsx16x29x29
        
        # Input: bsx16x29x29
        out = nn.Flatten()(out)
        # Output: tensore bsx13456
        
        # Input: tensore bsx13456
        out = self.bn2(out)
        # Output: bsx13456
        
        # Input: tensore bsx13456
        out = nn.functional.tanh(self.fc(out))
        # Output: tensore di bsx384 elementi della codifica frattale
        
        return out

## Training function

In [None]:
def train(model, optimizer, loss_fn, train_loader, val_loader, epochs, device):
    for epoch in range(epochs):
        training_loss = 0.0
        valid_loss = 0.0
        model.train()
        
        for batch in train_loader:
            optimizer.zero_grad()
            inputs, label = batch
            inputs = inputs.to(device)
            label = label.to(device)

            output = model(inputs)
            loss = loss_fn(output, label)
            loss.backward()
            optimizer.step()
            
            training_loss += loss.data.item()
        #training_loss /= len(train_iterator) ???
        
        model.eval()
        #num_correct = 0
        #num_examples = 0
        for batch in val_loader:
            inputs, labels = batch

            inputs = inputs.to(device)
            output = model(inputs)
            labels = labels.to(device)
            loss = loss_fn(output, labels)
            valid_loss += loss.data.item()
            #correct = torch.eq(torch.max(F.softmax(output), dim=1)[1], label).view(-1) ???
        
            #num_correct += torch.sum(correct).item()
            #num_examples += correct.shape[0]
        #valid_loss /= len(valid_iterator) ???
        print(f"Epoch [{epoch+1}/{epochs}] ----> \nTraining loss: {training_loss:.4f} \nValidation loss: {valid_loss:.4f}\n")

## Model instance

In [None]:
model = SiameseNeuralNetwork().to(device)

In [None]:
summary(model, input_size=(1, 128, 128))

## Optimizer & learning rate scheduler

In [None]:
# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Lr scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer=optimizer, step_size=10)

## Loss function

In [None]:
# Loss function: specificare funzioni personalizzate
criterion = ContrastiveLoss()
#criterion = nn.MSELoss()


# Model training

In [None]:
train(model=model, optimizer=optimizer, loss_fn=criterion, train_loader=train_loader, val_loader=valid_loader, epochs=epochs, device=device)
