# Deep Learning Applications: Laboratory #1

In this first laboratory we will work relatively simple architectures to get a feel for working with Deep Models. This notebook is designed to work with PyTorch, but as I said in the introductory lecture: please feel free to use and experiment with whatever tools you like.

**Important Notes**:
1. Be sure to **document** all of your decisions, as well as your intermediate and final results. Make sure your conclusions and analyses are clearly presented. Don't make us dig into your code or walls of printed results to try to draw conclusions from your code.
2. If you use code from someone else (e.g. Github, Stack Overflow, ChatGPT, etc) you **must be transparent about it**. Document your sources and explain how you adapted any partial solutions to creat **your** solution.



## Exercise 1: Warming Up
In this series of exercises I want you to try to duplicate (on a small scale) the results of the ResNet paper:

> [Deep Residual Learning for Image Recognition](https://arxiv.org/abs/1512.03385), Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun, CVPR 2016.

We will do this in steps using a Multilayer Perceptron on MNIST.

Recall that the main message of the ResNet paper is that **deeper** networks do not **guarantee** more reduction in training loss (or in validation accuracy). Below you will incrementally build a sequence of experiments to verify this for an MLP. A few guidelines:

+ I have provided some **starter** code at the beginning. **NONE** of this code should survive in your solutions. Not only is it **very** badly written, it is also written in my functional style that also obfuscates what it's doing (in part to **discourage** your reuse!). It's just to get you *started*.
+ These exercises ask you to compare **multiple** training runs, so it is **really** important that you factor this into your **pipeline**. Using [Tensorboard](https://pytorch.org/tutorials/recipes/recipes/tensorboard_with_pytorch.html) is a **very** good idea -- or, even better [Weights and Biases](https://wandb.ai/site).
+ You may work and submit your solutions in **groups of at most two**. Share your ideas with everyone, but the solutions you submit *must be your own*.

First some boilerplate to get you started, then on to the actual exercises!

### Preface: Some code to get you started

What follows is some **very simple** code for training an MLP on MNIST. The point of this code is to get you up and running (and to verify that your Python environment has all needed dependencies).

**Note**: As you read through my code and execute it, this would be a good time to think about *abstracting* **your** model definition, and training and evaluation pipelines in order to make it easier to compare performance of different models.

In [None]:
# Start with some standard imports.
import numpy as np
import matplotlib.pyplot as plt
from functools import reduce
import torch
from torchvision.datasets import MNIST
from torch.utils.data import Subset
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms

#### Data preparation

Here is some basic dataset loading, validation splitting code to get you started working with MNIST.

In [None]:
# Standard MNIST transform.
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# Load MNIST train and test.
ds_train = MNIST(root='./data', train=True, download=True, transform=transform)
ds_test = MNIST(root='./data', train=False, download=True, transform=transform)

# Split train into train and validation.
val_size = 5000
I = np.random.permutation(len(ds_train))
ds_val = Subset(ds_train, I[:val_size])
ds_train = Subset(ds_train, I[val_size:])

#### Boilerplate training and evaluation code

This is some **very** rough training, evaluation, and plotting code. Again, just to get you started. I will be *very* disappointed if any of this code makes it into your final submission.

In [None]:
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report

# Function to train a model for a single epoch over the data loader.
def train_epoch(model, dl, opt, epoch='Unknown', device='cpu'):
    model.train()
    losses = []
    for (xs, ys) in tqdm(dl, desc=f'Training epoch {epoch}', leave=True):
        xs = xs.to(device)
        ys = ys.to(device)
        opt.zero_grad()
        logits = model(xs)
        loss = F.cross_entropy(logits, ys)
        loss.backward()
        opt.step()
        losses.append(loss.item())
    return np.mean(losses)

# Function to evaluate model over all samples in the data loader.
def evaluate_model(model, dl, device='cpu'):
    model.eval()
    predictions = []
    gts = []
    for (xs, ys) in tqdm(dl, desc='Evaluating', leave=False):
        xs = xs.to(device)
        preds = torch.argmax(model(xs), dim=1)
        gts.append(ys)
        predictions.append(preds.detach().cpu().numpy())
        
    # Return accuracy score and classification report.
    return (accuracy_score(np.hstack(gts), np.hstack(predictions)),
            classification_report(np.hstack(gts), np.hstack(predictions), zero_division=0, digits=3))

# Simple function to plot the loss curve and validation accuracy.
def plot_validation_curves(losses_and_accs):
    losses = [x for (x, _) in losses_and_accs]
    accs = [x for (_, x) in losses_and_accs]
    plt.figure(figsize=(16, 8))
    plt.subplot(1, 2, 1)
    plt.plot(losses)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Average Training Loss per Epoch')
    plt.subplot(1, 2, 2)
    plt.plot(accs)
    plt.xlabel('Epoch')
    plt.ylabel('Validation Accuracy')
    plt.title(f'Best Accuracy = {np.max(accs)} @ epoch {np.argmax(accs)}')

#### A basic, parameterized MLP

This is a very basic implementation of a Multilayer Perceptron. Don't waste too much time trying to figure out how it works -- the important detail is that it allows you to pass in a list of input, hidden layer, and output *widths*. **Your** implementation should also support this for the exercises to come.

In [None]:
class MLP(nn.Module):
    def __init__(self, layer_sizes):
        super().__init__()
        self.layers = nn.ModuleList([nn.Linear(nin, nout) for (nin, nout) in zip(layer_sizes[:-1], layer_sizes[1:])])
    
    def forward(self, x):
        return reduce(lambda f, g: lambda x: g(F.relu(f(x))), self.layers, lambda x: x.flatten(1))(x)

#### A *very* minimal training pipeline.

Here is some basic training and evaluation code to get you started.

**Important**: I cannot stress enough that this is a **terrible** example of how to implement a training pipeline. You can do better!

In [None]:
## Training hyperparameters.
#device = 'cuda' if torch.cuda.is_available else 'cpu'
#epochs = 100
#lr = 0.0001
#batch_size = 128
#
## Architecture hyperparameters.
#input_size = 28*28
#width = 16
#depth = 2
#
## Dataloaders.
#dl_train = torch.utils.data.DataLoader(ds_train, batch_size, shuffle=True, num_workers=4)
#dl_val   = torch.utils.data.DataLoader(ds_val, batch_size, num_workers=4)
#dl_test  = torch.utils.data.DataLoader(ds_test, batch_size, shuffle=True, num_workers=4)
#
## Instantiate model and optimizer.
#model_mlp = MLP([input_size] + [width]*depth + [10]).to(device)
#opt = torch.optim.Adam(params=model_mlp.parameters(), lr=lr)
#
## Training loop.
#losses_and_accs = []
#for epoch in range(epochs):
#    loss = train_epoch(model_mlp, dl_train, opt, epoch, device=device)
#    (val_acc, _) = evaluate_model(model_mlp, dl_val, device=device)
#    losses_and_accs.append((loss, val_acc))
#
## And finally plot the curves.
#plot_validation_curves(losses_and_accs)
#print(f'Accuracy report on TEST:\n {evaluate_model(model_mlp, dl_test, device=device)[1]}')

### Exercise 1.1: A baseline MLP

Implement a *simple* Multilayer Perceptron to classify the 10 digits of MNIST (e.g. two *narrow* layers). Use my code above as inspiration, but implement your own training pipeline -- you will need it later. Train this model to convergence, monitoring (at least) the loss and accuracy on the training and validation sets for every epoch. Below I include a basic implementation to get you started -- remember that you should write your *own* pipeline!

**Note**: This would be a good time to think about *abstracting* your model definition, and training and evaluation pipelines in order to make it easier to compare performance of different models.

**Important**: Given the *many* runs you will need to do, and the need to *compare* performance between them, this would **also** be a great point to study how **Tensorboard** or **Weights and Biases** can be used for performance monitoring.

In [None]:
# Start with some standard imports.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from functools import reduce
import random
import torch
from torchvision.datasets import MNIST
from torch.utils.data import Subset
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import tensorboard
import os
import csv
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader
import datetime
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report, precision_score, recall_score, f1_score
import json
from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from IPython.display import clear_output

In [None]:
seed= 123
random.seed(seed)             
np.random.seed(seed)          
torch.manual_seed(seed)       
torch.cuda.manual_seed(seed)  
torch.cuda.manual_seed_all(seed)

In [None]:
class SkipBlock(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(SkipBlock,self).__init__()
        self.fc1 = nn.Linear(in_dim, out_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(out_dim, out_dim)
        if in_dim != out_dim:
            self.projection = nn.Linear(in_dim, out_dim)
        else:
            self.projection = nn.Identity()

    def forward(self, x):
        identity = self.projection(x)
        out = self.relu(self.fc1(x))
        out = self.fc2(out)
        return self.relu(out + identity)


In [None]:
class My_MLP(nn.Module):
    def __init__(self,layer_sizes,use_skip=False):
        super(My_MLP, self).__init__()
        layers=[]
        layers.append(nn.Flatten()) 
        for in_dim, out_dim in zip(layer_sizes[:-1], layer_sizes[1:]):
            if use_skip==True:
                layers.append(SkipBlock(in_dim, out_dim))
            else:
                layers.append(nn.Linear(in_dim, out_dim))
                if out_dim != layer_sizes[-1]:  
                    layers.append(nn.ReLU())

        self.model = nn.Sequential(*layers)
        
    def forward(self, x):
        x=self.model(x)
        return x

In [None]:
def Load_Data():
    transform= transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    ds_train= MNIST(root='./data', train= True, download=True, transform=transform)
    ds_test=MNIST(root='./data', train= False, download=True, transform=transform)

    
    return ds_train, ds_test

In [None]:
def Validation_Model(model, dl_val,device, batch_size):
    model.eval()
    predictions=[]
    ground_truth=[]
    criterion= torch.nn.CrossEntropyLoss()
    losses=[]
    dl_validation= DataLoader(dl_val, batch_size=batch_size, shuffle=False)
    
    for (data, labels) in tqdm(dl_validation, desc="Evaluating", leave=False):
        data= data.to(device)
        labels= labels.to(device)
        logits= model(data)
        loss= criterion(logits, labels)
        prediction= torch.argmax(logits, dim=1)
        losses.append(loss.item())
        ground_truth.append(labels.detach().cpu().numpy())
        predictions.append(prediction.detach().cpu().numpy())
    return (accuracy_score(np.hstack(ground_truth), np.hstack(predictions)),
            classification_report(np.hstack(ground_truth), np.hstack(predictions), zero_division=0, digits=3),
            np.mean(losses))


In [None]:
def get_grad_norms(model):
    grad_norms = {}
    for name, param in model.named_parameters():
        if param.grad is not None:
            grad_norms[name] = param.grad.norm().item()
    return grad_norms


In [None]:
def Training_Model(model,X, file_writer,device,optimizer=None,epochs=50,batch_size=8, learning_rate=0.001, weight_decay=0.001,study_grad=False):
    total_size = len(X)
    val_size = int(0.2 * total_size)  
    train_size = total_size - val_size

    indices = np.random.permutation(total_size)
    val_indices = indices[:val_size]
    train_indices = indices[val_size:]

    ds_train = Subset(X, train_indices)
    ds_val = Subset(X, val_indices)
    dl_train=DataLoader(ds_train, batch_size=batch_size,shuffle=True )
    
    if optimizer is None:
        optimizer= torch.optim.Adam(params=model.parameters(), lr= learning_rate, weight_decay=weight_decay)
    criterion= torch.nn.CrossEntropyLoss()

    for epoch in tqdm(range(epochs), desc="Model Training"):
        model.train()
        losses=[]
        count=0
        for (data,labels) in tqdm(dl_train, desc=f'Training epoch {epoch}', leave=True):
            data= data.to(device)
            labels= labels.to(device)
            optimizer.zero_grad()
            output= model(data)
            loss= criterion(output, labels)
            loss.backward()
            if count == 0 and study_grad==True:
                grad_norms = get_grad_norms(model)

                # Esempio: loggare su TensorBoard
                for name, norm in grad_norms.items():
                    file_writer.add_scalar(f"GradNorms/{name}", norm, epoch)
            count=1
            optimizer.step()
            losses.append(loss.item())
        
        loss_average= np.mean(losses)
        print(f"Training Loss: {loss_average} of Epoch {epoch}")
        
        accurancy, report_dict, losses_val= Validation_Model(model, ds_val, device, batch_size)
        print(f"Validation Loss: {losses_val}")
        file_writer.add_scalars(
                "Loss",
                {
                    "Train": loss_average,
                    "Validation": losses_val
                },
                epoch
            )
        file_writer.add_scalar("Train/Accurancy", accurancy, epoch)
        report_str = json.dumps(report_dict, indent=4)
        file_writer.add_text("Train/Classification Report", f"<pre>{report_str}</pre>", epoch)

    return model
    

In [None]:
# in questo metodo dovro usare la CNN come estrattore di features è usare una libreiria di scikit
# Devo dunque richiamare i layers fino al pooling togliendo il fc
class Feature_Extractor(nn.Module):
    def __init__(self,model):
        super(Feature_Extractor,self).__init__()
        #In questo modo tolgo l'ultimop fc mantenendo le attivazioni del pooling
        self.backbone= nn.Sequential(*list(model.children())[:-1])

    def forward(self, x):
        x= self.backbone(x)
        return torch.flatten(x,1)


In [None]:
def features_extractor(datloader, model,device,file_writer):
    features=[]
    labels= []
    with torch.no_grad():
        for data, label in datloader:
            data= data.to(device)

            feature= model(data)
            features.append(feature.cpu().numpy())
            labels.append(label)

    features = np.concatenate(features, axis=0)
    labels = np.concatenate(labels, axis=0)
    return features,labels


In [None]:
def custom_classifier(model,train_loader, test_loader, device, file_writer,type_of_classifier="svm"):
    model= Feature_Extractor(model)
    model= model.to(device)
    features_train, labels_train=features_extractor(train_loader, model,device, file_writer)
    features_test, labels_test=features_extractor(test_loader, model,device, file_writer)

    if type_of_classifier=="svm":
        clf= LinearSVC(max_iter=2000)
    elif type_of_classifier=="knn":
        clf = KNeighborsClassifier(n_neighbors=5)
    else:
        clf= GaussianNB()
    clf.fit(features_train,labels_train)
    acc= clf.score(features_test,labels_test) *100
    file_writer.add_scalar("Accurancy", acc, 0)
    y_pred = clf.predict(features_test)
    acc = accuracy_score(labels_test, y_pred)
    precision = precision_score(labels_test, y_pred, average="macro")
    recall = recall_score(labels_test, y_pred, average="macro")
    f1 = f1_score(labels_test, y_pred, average="macro")

    file_writer.add_scalar("Accuracy", acc * 100, 0)
    file_writer.add_scalar("Precision_macro", precision * 100, 0)
    file_writer.add_scalar("Recall_macro", recall * 100, 0)
    file_writer.add_scalar("F1_macro", f1 * 100, 0)


    return acc



In [None]:
#dunque scongelo gli ultimi due layer e ovviamente anche il fully connected
#pooling non serve essendo che compie operazioni matematiche quindi è un layer che non contiene i pesi e non allenabile
def fine_tuning(model, device, num_classes, optim, learning_rate,weight_decay, momentum, block_unfreeze):
    in_features = model.fully_connected.in_features
    model.fully_connected= nn.Linear(in_features,num_classes)
    model = model.to(device)
    #congelo tutti i parametri
    for param in model.parameters():
        param.requires_grad = False
    #scongelo solo quelli necessari
    for name, param in model.named_parameters():
        if any(b in name for b in block_unfreeze):
            param.requires_grad= True 
    
    trainable_params = filter(lambda p: p.requires_grad, model.parameters())
    if optim=="adam":
        optimizer = torch.optim.Adam(trainable_params, lr=learning_rate, weight_decay=weight_decay)
    elif optim=="adamw":
        optimizer = torch.optim.AdamW(trainable_params, lr=learning_rate, weight_decay=weight_decay)
    elif optim=="sgd":
        optimizer= torch.optim.SGD(trainable_params,lr=learning_rate,momentum=momentum, weight_decay=weight_decay)
    else:
        optimizer=torch.optim.RMSprop(trainable_params,lr=learning_rate,momentum=momentum, weight_decay=weight_decay)
    
    return model, optimizer
               

In [None]:
#Per fare fine-tuning devo ovviamente cambiare qualcosa in questo caso richiesto di prendere il modello e fare feature extractor e cambiare anche qualche cosa come l'ottimizzatore
#passo 1 con uno nuovo modello e usare la rete come features extractor 
#passo 2 devo scongelare qualche layer e da li riaddestrarli con un nuovo classificatore 
#quindi vorrei fare sia l'esempio con SVN-KNN o altro
#vorrei provare quindia nche ADAMW SDG per completare 
# nello scongelare i layer dobbiamo prendere quelli più profondi perchè i primi servono a estrarre cartteristiche delle immagini
#quindi anhce per cifar100 essendo immagini molto simili non c'è bisogno di riaddestrare
# invece i layer più profondi vanno a generare features più specifiche per il tipo di dataset preso.
def Customize_model(model, X_train, X_test, file_writer,num_classes, device, 
                    lr, weight_decay, batch_size,freeze_layers,cl,optim, momentum,
                    block_unfreeze):
    
    train_loader = DataLoader(X_train, batch_size=batch_size, shuffle=False) 
    test_loader  = DataLoader(X_test, batch_size=batch_size, shuffle=False)

    if freeze_layers==False:
        acc_cl=custom_classifier(model, train_loader,test_loader, device, file_writer,cl )
        print(f'Accurancy of classifier {cl}: {acc_cl} /n')
        return acc_cl, "classifier"
    
    else:
        model, optimizer=fine_tuning(model,device, num_classes,optim,lr, weight_decay,momentum,block_unfreeze)
        return (model,optimizer), "fine_tuning"
    
    

In [None]:
class Trainer(nn.Module):
    def __init__(self,model,
                 logdir,
                 date, 
                 num_classes,
                 depth=None,
                 epochs=50, 
                 batch_size=8, 
                 learning_rate=0.001, 
                 weight_decay=0.001,
                 path_exp= "Simple_MLP",
                 study_grad=False,
                 freeze_layers=False,
                 classificator= None,
                 optimizer= None,
                 momentum=None,
                 block_to_unfreeze= None
                 ):
        super(Trainer,self).__init__()
        self.model=model
        self.study_grad=study_grad
        self.epochs=epochs
        self.batch_size=batch_size
        self.learning_rate= learning_rate
        self.weight_decay= weight_decay
        self.best_model= None
        self.device= 'cuda' if torch.cuda.is_available() else 'cpu'
        self.path_experiments=f'{path_exp}/Run_{date}'
        self.num_classes= num_classes
        self.file_writer= SummaryWriter(logdir)
        self.depth=depth
        self.optimizer=optimizer
        self.freeze_layers= freeze_layers
        self.classificator=classificator
        self.momentum= momentum
        self.block_unfreeze= block_to_unfreeze


    def get_hyperparamtres_dict(self):
        result={
            'Epochs': self.epochs,
            'Batch size':self.batch_size,
            'Learning Rate': self.learning_rate,
            'Weight Decay':self.weight_decay,
            'Num Classes': self.num_classes,
            'Freeze_layers': self.freeze_layers
            
        }
        if self.depth is not None:
            result["Depth"]= self.depth
        if self.momentum is not None:
            result["Momentum"]= self.momentum
        if self.classificator is not None:
            result["Classificator"]= self.classificator
        if self.optimizer is not None:
            result["Optimizer"]= self.optimizer
        if self.block_unfreeze is not None:
            result["Block_Unfreeze"]= self.block_unfreeze
        
        return result

    def save_hyperparametres(self):
        hyperparametres_dict= self.get_hyperparamtres_dict()
        path= os.path.join(self.path_experiments, 'hyperparametres.csv')
        file_exists = os.path.isfile(path)
        is_empty = not file_exists or os.stat(path).st_size == 0
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, mode='a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=hyperparametres_dict.keys())
            if is_empty:
                writer.writeheader()
            writer.writerow(hyperparametres_dict)

    def Train(self,X):
        if not os.path.exists(self.path_experiments):
            os.makedirs(self.path_experiments)
        self.save_hyperparametres()
        self.model.to(self.device)
        self.best_model= Training_Model(self.model,X, self.file_writer,self.device, self.optimizer,self.epochs,self.batch_size, self.learning_rate, self.weight_decay)
        torch.save(self.best_model.state_dict(), os.path.join(self.path_experiments,'best_model.pt'))

    def Fine_Tuning(self,X_train,X_test):
        result, type=Customize_model(self.model, 
                                     X_train, X_test, 
                                     self.file_writer,
                                     self.num_classes, 
                                     self.device,
                                     self.learning_rate,
                                     self.weight_decay, 
                                     self.batch_size,
                                     self.freeze_layers,
                                     self.classificator,
                                     self.optimizer, 
                                     self.momentum,
                                     self.block_unfreeze)
        
        if type=="fine_tuning":
            self.model, self.optimizer= result
            self.Train(X_train)
            self.Test(X_test)

    def Test(self,X, model=None):
        if model is None:
            acc, report_dict, loss= Validation_Model(self.best_model, X, self.device,self.batch_size)
            self.file_writer.add_scalar("Test/Accurancy", acc, 0)
            self.file_writer.add_text("Test/Classification Report", f"<pre>{report_dict}</pre>", 1)
            self.file_writer.close()
        else:
            acc, report_dict, loss= Validation_Model(model, X, self.device,self.batch_size)
            self.file_writer.add_scalar("Test/Accurancy", acc, 0)
            self.file_writer.add_text("Test/Classification Report", f"<pre>{report_dict}</pre>", 1)
            self.file_writer.close()
            

        


In [None]:
now= datetime.datetime.now()
data_ora_formattata = now.strftime("%d_%m_%yT%H_%M")
name= f'run_{data_ora_formattata}'
logdir= f'tensorboard/Sample_MLP/{name}'
print(f"Train Model Sample MLP on MNIST")
input_size = 28*28
width = 16
depth = 2
channels= [input_size] + [width]*depth + [10]

minist_train, minist_test= Load_Data()
model= My_MLP(channels)
trainer= Trainer(model, logdir,data_ora_formattata,minist_train.classes,channels,100,128,0.001) 

trainer.Train(minist_train)
trainer.Test(minist_test)



### Exercise 1.2: Adding Residual Connections

Implement a variant of your parameterized MLP network to support **residual** connections. Your network should be defined as a composition of **residual MLP** blocks that have one or more linear layers and add a skip connection from the block input to the output of the final linear layer.

**Compare** the performance (in training/validation loss and test accuracy) of your MLP and ResidualMLP for a range of depths. Verify that deeper networks **with** residual connections are easier to train than a network of the same depth **without** residual connections.

**For extra style points**: See if you can explain by analyzing the gradient magnitudes on a single training batch *why* this is the case. 

In [None]:
now= datetime.datetime.now()
data_ora_formattata = now.strftime("%d_%m_%yT%H_%M")
name= f'run_{data_ora_formattata}'
print("Training Residual Net vs Simple MLP")
input_size = 28*28
width = 16
depths = [2,6,10]

minist_train, minist_test= Load_Data()

for depth in depths:
    for use_skip in [True,False]:
        channels= [input_size] + [width]*depth + [10]
        model= My_MLP(channels,use_skip=use_skip )
        if use_skip:
            print(f'Run Training of Residual_depth{depth} ')
            logdir= f'tensorboard/Residual_vs_Simple_MLP/{name}/Residual_depth{depth}'
            path=f"Residual_vs_Simple_MLP/Residual_depth{depth}"
            
        else:
            print(f'Run Training of Simple_depth{depth} ')
            logdir= f'tensorboard/Residual_vs_Simple_MLP/{name}/Simple_depth{depth}'
            path=f"Residual_vs_Simple_MLP/Simple_depth{depth}"
            
        trainer= Trainer(model,logdir,data_ora_formattata,minist_train.classes,0,100,128,0.001,0.001,path,True)

        trainer.Train(minist_train)
        trainer.Test(minist_test)


Classic MLP (without skip connections):
In the gradient plots, a typical phenomenon of deep networks can be observed: the gradient norms decrease rapidly in the deeper layers, close to the input. This is caused by the vanishing gradient problem, where the error signal backpropagating through the network becomes progressively attenuated. As a result, the early layers receive very small gradients, update their weights slowly, and contribute little to learning. This explains why a classic MLP takes longer to converge and may get stuck at relatively low performance.    

MLP with skip connections:
Adding bypass connections between layers allows gradients to skip certain layers and reach deeper layers more easily. In the gradnorm plots, it can be observed that the gradients remain more balanced across all layers, including the deeper ones. This enables the early layers to update more effectively, improving training stability and accelerating convergence. In fact, skip connections mitigate the vanishing gradient problem, making the learning of deep layers more efficient.

### Exercise 1.3: Rinse and Repeat (but with a CNN)

Repeat the verification you did above, but with **Convolutional** Neural Networks. If you were careful about abstracting your model and training code, this should be a simple exercise. Show that **deeper** CNNs *without* residual connections do not always work better and **even deeper** ones *with* residual connections.

**Hint**: You probably should do this exercise using CIFAR-10, since MNIST is *very* easy (at least up to about 99% accuracy).

**Tip**: Feel free to reuse the ResNet building blocks defined in `torchvision.models.resnet` (e.g. [BasicBlock](https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py#L59) which handles the cascade of 3x3 convolutions, skip connections, and optional downsampling). This is an excellent exercise in code diving. 

**Spoiler**: Depending on the optional exercises you plan to do below, you should think *very* carefully about the architectures of your CNNs here (so you can reuse them!).

In [None]:
import torchvision
from torchvision.models.resnet import BasicBlock

In [None]:
def Load_data_Cifar10():
    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
    ])

    train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
    
    return train_set, test_set

In [None]:
class Residual_Block_CNN(nn.Module):
    def __init__(self, in_channels, use_resnet):
        super(Residual_Block_CNN,self).__init__()
        self.use_resnet= use_resnet

        if use_resnet:
            self.block_res= BasicBlock(in_channels, in_channels, stride=1, downsample=None)
        else:
            self.first_layer= nn.Sequential(
                nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1),
                nn.BatchNorm2d(in_channels)
            )
            self.second_layer=nn.Sequential(
                nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=3),
                nn.BatchNorm2d(in_channels)
            )
            self.relu= nn.ReLU()

    def forward(self,x):
        if self.use_resnet:
            return self.block_res(x)
        else:
            identity= x
            out= self.first_layer(x)
            out= self.relu(x)
            out= self.second_layer(x)
            out= out  + identity
            return self.relu(out)


In [None]:
class CNN_Block(nn.Module):
    def __init__(self, in_channels):
        super(CNN_Block, self).__init__()
        self.block= nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(in_channels),
            nn.ReLU()
        )
    
    def forward(self,x):
        return self.block(x)
    

In [None]:
class CNN_Customize(nn.Module):
    def __init__(self,depth, in_channels,out_channels, num_classes, use_skip, use_resnet ):
        super(CNN_Customize, self).__init__()
        
        self.head= nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

        blocks=[]

        for i in range(depth):
            if use_skip:
                blocks.append(Residual_Block_CNN(out_channels, use_resnet))
            else:
                blocks.append(CNN_Block(out_channels))

        self.blocks= nn.Sequential(*blocks)
        self.pooling= nn.AdaptiveAvgPool2d((1,1))
        self.fully_connected= nn.Linear(out_channels, num_classes)

    def forward(self, x):
        out= self.head(x)
        out= self.blocks(out)
        out= self.pooling(out)
        out= torch.flatten(out, 1)
        return self.fully_connected(out)

In [None]:
now= datetime.datetime.now()
data_ora_formattata = now.strftime("%d_%m_%yT%H_%M")
name= f'run_{data_ora_formattata}'

in_channels = 3
out_channels= 64
depths = [2 ,6 ,10]
num_classes=10
cifar_train, cifartest= Load_data_Cifar10()

for depth in depths:
    for use_skip in [True,False]:

        if use_skip:
            print(f'Run Training of Residual_CNN{depth} ')
            logdir= f'tensorboard/CNN_Residual_vs_Base/{name}/Residual_depth{depth}'
            path=f"CNN_Residual_vs_Base/Residual_depth{depth}"
            
            use_res=True
        else:
            print(f'Run Training of Simple_depth{depth} ')
            logdir= f'tensorboard/CNN_Residual_vs_Base/{name}/Simple_depth{depth}'
            path=f"CNN_Residual_vs_Base/Simple_depth{depth}"
            
            use_res=False
        model= CNN_Customize(depth,in_channels,out_channels,num_classes,use_skip,use_res)
        trainer= Trainer(model,logdir,data_ora_formattata,num_classes,0,depth,100,128,0.001,0.001,path)

        trainer.Train(cifar_train)
        trainer.Test(cifartest)


-----
## Exercise 2: Choose at Least One

Below are **three** exercises that ask you to deepen your understanding of Deep Networks for visual recognition. You must choose **at least one** of the below for your final submission -- feel free to do **more**, but at least **ONE** you must submit. Each exercise is designed to require you to dig your hands **deep** into the guts of your models in order to do new and interesting things.

**Note**: These exercises are designed to use your small, custom CNNs and small datasets. This is to keep training times reasonable. If you have a decent GPU, feel free to use pretrained ResNets and larger datasets (e.g. the [Imagenette](https://pytorch.org/vision/0.20/generated/torchvision.datasets.Imagenette.html#torchvision.datasets.Imagenette) dataset at 160px).

### Exercise 2.1: *Fine-tune* a pre-trained model
Train one of your residual CNN models from Exercise 1.3 on CIFAR-10. Then:
1. Use the pre-trained model as a **feature extractor** (i.e. to extract the feature activations of the layer input into the classifier) on CIFAR-100. Use a **classical** approach (e.g. Linear SVM, K-Nearest Neighbor, or Bayesian Generative Classifier) from scikit-learn to establish a **stable baseline** performance on CIFAR-100 using the features extracted using your CNN.
2. Fine-tune your CNN on the CIFAR-100 training set and compare with your stable baseline. Experiment with different strategies:
    - Unfreeze some of the earlier layers for fine-tuning.
    - Test different optimizers (Adam, SGD, etc.).

Each of these steps will require you to modify your model definition in some way. For 1, you will need to return the activations of the last fully-connected layer (or the global average pooling layer). For 2, you will need to replace the original, 10-class classifier with a new, randomly-initialized 100-class classifier.

## Best model
Residual10 is the best model finded in the previous train

In [None]:
def Load_data_Cifar100():
    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
    ])
    train_cifar100 = torchvision.datasets.CIFAR100(root='./data', train=True, download=True, transform=transform)
    test_cifar100 = torchvision.datasets.CIFAR100(root='./data', train=False, download=True, transform=transform)

    return train_cifar100,test_cifar100


In [None]:
#load residual10 like path
#Per caricare il modello e utilizzarlo come un extract features si puo prendere sicuramente fino all' ultimo layer e sicuramente devo portarlo a 100
# essendo che le classi ora da predirre sono 100
# dopo aver visto infatti la struttura sappiamo dunque di dover cambiare l'ultimo fc
def Load_model(path,in_channels, out_channels, verbose=False):
    hyperparam= pd.read_csv(path+"/hyperparametres.csv")
    model = CNN_Customize(
        hyperparam["Depth"][0],
        in_channels,
        out_channels,
        hyperparam["Num Classes"][0],
        True,
        True
    )
    model.load_state_dict(torch.load(path+"/best_model.pt", map_location="cpu"))
    if verbose:
        print(model)
    return model, hyperparam
    

In [None]:
model ,h=Load_model("CNN_Residual_vs_Base/Residual_depth6/Run_10_09_25T09_41",3, 64)

In [None]:
model

In [None]:
def Load_configuration():
    return {
    "adam": {
        "lr": 5e-4,
        "weight_decay": 5e-5,
        "momentum": None  # non serve per AdamW
    },
    "adamw": {
        "lr": 5e-4,
        "weight_decay": 5e-5,
        "momentum": None  # non serve per AdamW
    },
    "sgd": {
        "lr": 1e-4,
        "weight_decay": 5e-4,
        "momentum": 0.9
    },
    "rmsprop": {
        "lr": 1e-5,
        "weight_decay": 5e-4,
        "momentum": 0.9
    }
}

In [None]:
now= datetime.datetime.now()
data_ora_formattata = now.strftime("%d_%m_%yT%H_%M")
name= f'run_{data_ora_formattata}'

path_model_CNN= "CNN_Residual_vs_Base/Residual_depth6/Run_10_09_25T09_41"
in_channels = 3
out_channels= 64
depth =6
num_classes= 100

model, hyperparametres= Load_model(path_model_CNN, in_channels, out_channels)
block_unfreeze = [ "blocks.1","blocks.2","blocks.3", "fully_connected"]
optimizer=["adamw", "sgd", "rmsprop","adam"]
classificator=["svm", "knn", "gaussian"]

cifar_train ,cifar_test= Load_data_Cifar100()

config_optim=Load_configuration()


for freeze_layers in [False,True]:
    if freeze_layers==False:
        for cl in classificator:
            clear_output(wait=True)
            print(f'BaseLine with CNN Extract Feature with classificator: {cl}')
            logdirs= f'tensorboard/Reusing_Model/Classification_{name}/Classificator_{cl}'
            path= f'Reusing_Model/Classification_{name}/Classificator_{cl}'
            trainer= Trainer(model,logdirs,data_ora_formattata,num_classes,depth,0,128,freeze_layers=freeze_layers,classificator=cl)
            trainer.Fine_Tuning(cifar_train,cifar_test)
    else:
        for optim in optimizer:
            clear_output(wait=True)
            model, hyperparametres= Load_model(path_model_CNN, in_channels, out_channels)
            if optim=="adam":
                print(f'Fine tuning CNN model only with unfreeze last layers')
                logdirs= f'tensorboard/Reusing_Model/Fine_Tuning_{name}/Unfreeze_last_layers'
                path=f'Reusing_Model/Fine_Tuning_{name}/Unfreeze_last_layers'
            else:
                print(f'Fine_Tuning model with unfreeze layers with optimizer {optim}')
                logdirs= f'tensorboard/Reusing_Model/Fine_Tuning_{name}/Optimizer_{optim}'
                path=f'Reusing_Model/Fine_Tuning_{name}/Optimizer_{optim}'
            trainer= Trainer(model,logdirs,data_ora_formattata,num_classes,depth,30,256,
                             config_optim[optim]["lr"],config_optim[optim]["weight_decay"],
                             path,False,freeze_layers,None,optim,config_optim[optim]["momentum"],block_unfreeze)
            trainer.Fine_Tuning(cifar_train,cifar_test)

print("finsh Fine Tuning")