In [7]:
# @title import dependencies

from typing import Mapping, Union, Optional
from pathlib import Path

import numpy as np
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import plotly.graph_objects as go
import plotly.express as px
import matplotlib.pyplot as plt
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, Dataset
import os
import pickle
from tqdm.notebook import tqdm
from torchvision.utils import save_image
import random
import pandas as pd
from typing import Callable, Optional
from torchsummary import summary

from __future__ import print_function, division

import random
torch.manual_seed(42)
np.random.seed(42)
random.seed(0)

torch.cuda.manual_seed(0)
torch.backends.cudnn.deterministic = True  # Note that this Deterministic mode can have a performance impact
torch.backends.cudnn.benchmark = False

from sklearn.datasets import load_iris
from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split


# Dataset functions

In [8]:
# Use it on the global dataset 

class MyData(Dataset):
    def __init__(self, data):
        super().__init__()
        self.target_names = data['target_names']
        self.feature_names = data['feature_names']
        self.data = data['data']
        self.target = data['target']    
        
    def __len__(self):
        return self.data.shape[0]
        
    def __getitem__(self,idx):
        obj = self.data[idx]
        label = self.target[idx]
        return obj, label
    
# Use it on the single subsets (train, test, validation)

class MyDataSet(Dataset):
    def __init__(self, x,y,data):
        super().__init__()
        self.target_names = data['target_names']
        self.feature_names = data['feature_names']
        self.data = x
        self.target = y
    
    def __len__(self):
        return self.data.shape[0]
    
    def __getitem__(self, idx):
        obj = self.data[idx]
        label = self.target[idx]
        return obj, label

# Architecture (net)

In [9]:
class MLP_iris (nn.Module):
    def __init__(self) -> None:
        super().__init__() 
        self.lin1 = nn.Linear(4,32, bias = False)
        self.lin2 = nn.Linear(32,16, bias = False)
        self.linout = nn.Linear(16,3, bias = False) 
        
        self.bn1 = nn.BatchNorm1d(32)
        self.bn2 = nn.BatchNorm1d(16)
        
        self.acthid = nn.ReLU()
        self.actout = nn.Softmax(1)
        
    def forward(self, x):
        x = x.float()
        x = self.acthid(self.bn1(self.lin1(x)))
        x = self.acthid(self.bn2(self.lin2(x)))
        x = self.linout(x)
        return x #self.actout(x) # when we don't use CE, cause it has a built in sigmoid

class MLP_forest (nn.Module):
    def __init__(self) -> None:
        super().__init__() 
        self.lin1 = nn.Linear(54,500, bias = False)
        self.lin2 = nn.Linear(500,300, bias = False)
        self.lin3 = nn.Linear(300,150, bias = False)
        self.lin4 = nn.Linear(150,70, bias = False)
        self.linout = nn.Linear(70,7, bias = False)
        
        self.bn1 = nn.BatchNorm1d(500)
        self.bn2 = nn.BatchNorm1d(300)
        self.bn3 = nn.BatchNorm1d(150)
        self.bn4 = nn.BatchNorm1d(70)
        
        self.acthid = nn.ReLU()
        self.actout = nn.Softmax(1)
        
    def forward(self, x):
        x = x.float()
        x = self.acthid(self.bn1(self.lin1(x)))
        x = self.acthid(self.bn2(self.lin2(x)))
        x = self.acthid(self.bn3(self.lin3(x)))
        x = self.acthid(self.bn4(self.lin4(x)))
        x = self.linout(x)
        return x  #self.actout(x) # when we don't use CE

    
class MLP_forest_noBN (nn.Module):
    def __init__(self) -> None:
        super().__init__() 
        self.lin1 = nn.Linear(54,500, bias = False)
        self.lin2 = nn.Linear(500,300, bias = False)
        self.lin3 = nn.Linear(300,150, bias = False)
        self.lin4 = nn.Linear(150,70, bias = False)
        self.linout = nn.Linear(70,7, bias = False)
        
        self.acthid = nn.ReLU()
        self.actout = nn.Softmax(1)
        
    def forward(self, x):
        x = x.float()
        x = self.acthid(self.lin1(x))
        x = self.acthid(self.lin2(x))
        x = self.acthid(self.lin3(x))
        x = self.acthid(self.lin4(x))
        x = self.linout(x)
        return x  #self.actout(x) # when we don't use CE
    
class MyLinearLayer(nn.Module):
    """ Custom Linear layer
        so we can select just some sparse random weight and set them to be trainable """
    def __init__(self, size_in, size_out, num_layer):
        # num_layer goes from 1 to 5 depending on the layer that comes before this one
        super().__init__()

        memory, num_weight_lin = torch.load('memory.pth')
        self.start = 0
        for i in range(num_layer-1):
            self.start += num_weight_lin[i]
        self.num_weights = num_weight_lin[num_layer-1]
        self.index = memory[self.start : self.start + self.num_weights] # Indices of the randomly selected trainable parameters

        self.size_in, self.size_out = size_in, size_out
        weights = torch.Tensor(size_out, size_in)

        self.weight = nn.Parameter(weights)  # Parameters that will be set to untrainable
        self.tr_weights = nn.Parameter(torch.Tensor(self.num_weights)) # Trainable parameters
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

         # initialize weights and biases
        nn.init.kaiming_uniform_(self.weight, a=np.sqrt(5)) # weight init
        fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.weight)
        bound = 1 / np.sqrt(fan_in)
        nn.init.uniform_(self.tr_weights, -bound, bound)  # bias init

    def forward(self, x):
        w1 = torch.zeros(self.size_out,self.size_in).to(self.device)
        w1[self.index[:,0], self.index[:,1]] = self.tr_weights
        w_times_x= torch.mm(x, (self.weight + w1).t())
        return w_times_x
    
class MLP_forest_RandWeights (nn.Module):
    def __init__(self) -> None:
        super().__init__() 
        self.lin1 = MyLinearLayer(54,500, 1)
        self.lin2 = MyLinearLayer(500,300, 2)
        self.lin3 = MyLinearLayer(300,150, 3)
        self.lin4 = MyLinearLayer(150,70, 4)
        self.linout = MyLinearLayer(70,7, 5)
        
        self.bn1 = nn.BatchNorm1d(500)
        self.bn2 = nn.BatchNorm1d(300)
        self.bn3 = nn.BatchNorm1d(150)
        self.bn4 = nn.BatchNorm1d(70)
        
        self.acthid = nn.ReLU()
        self.actout = nn.Softmax(1)
        
    def forward(self, x):
        x = x.float()
        x = self.acthid(self.bn1(self.lin1(x)))
        x = self.acthid(self.bn2(self.lin2(x)))
        x = self.acthid(self.bn3(self.lin3(x)))
        x = self.acthid(self.bn4(self.lin4(x)))
        x = self.linout(x)
        return x  #self.actout(x) # when we don't use CE
    

# Training functions

In [10]:
def make_averager() -> Callable[[Optional[float]], float]:
    """ Returns a function that maintains a running average

    :returns: running average function
    """
    count = 0
    total = 0

    def averager(new_value: Optional[float]) -> float:
        """ Running averager

        :param new_value: number to add to the running average,
                          if None returns the current average
        :returns: the current average
        """
        nonlocal count, total
        if new_value is None:
            return total / count if count else float("nan")
        count += 1
        total += new_value
        return total / count

    return averager

def refresh_bar(bar, desc):
    bar.set_description(desc)
    bar.refresh()

def plot_loss(losses, title= "Train loss", axis = "Loss"):
    fig = go.Figure()

    fig.add_trace(go.Scatter(
        x=list(range(len(losses))),
        y=losses,
        # name="Name of Trace 1"       # this sets its legend entry
    ))

    fig.update_layout(
        title=title,
        xaxis_title="Epoch",
        yaxis_title=axis,
        font=dict(
            family="Courier New, monospace",
            size=18,
            color="#7f7f7f"
        )
    )
    return fig

class SaveBestModel:
    def __init__(self, best_valid_loss=float('inf'), last_model_train_loss=float('inf')): #object initialized with best_loss = +infinite
        self.best_valid_loss = best_valid_loss
        self.last_model_train_loss = last_model_train_loss
        
    def __call__(
        self, current_valid_loss, 
        epoch, model, optimizer, criterion, loss, acc,
    ):
        if current_valid_loss < self.best_valid_loss and loss < self.last_model_train_loss:
            self.best_valid_loss = current_valid_loss
            self.last_model_train_loss = loss
            print(f"\nBest validation loss: {self.best_valid_loss}")
            print(f"\nSaving best model for epoch: {epoch+1}\n")
            # method to save a model (the state_dict: a python dictionary object that 
            # maps each layer to its parameter tensor) and other useful parametrers
            # see: https://pytorch.org/tutorials/beginner/saving_loading_models.html
            torch.save({
                'epoch': epoch+1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': criterion,
                'train_loss_value': loss,
                'val_loss_value' : current_valid_loss,
                'accuracy': acc,
                }, 'best_model.pth')

def Train(opt, inputs, classes, device, net, loss_func, rand = False):

    inputs = inputs.to(device)
    classes = (classes.to(device)-1).long() # I put -1 cause the classes are definied in the interval [1;7] while I need [0;6]

    opt.zero_grad()
    pred = net(inputs)
    loss = loss_func(pred, classes)

    loss.backward()
    opt.step()
    #if rand:
     #   with torch.no_grad():
            #RandomWeights(net, device)
    return loss
    
def Validation(inputs, classes, device, net, loss_func):    
    inputs = inputs.to(device)
    classes = (classes.to(device)-1).long() # I put -1 cause the classes are definied in the interval [1;7] while I need [0;6]
        
    pred = net(inputs)
    loss = loss_func(pred, classes)
    _, pred = torch.max(pred, 1)
    missclassified = torch.sum(torch.clamp(torch.abs(pred-classes), 0,1))
    
    return loss, missclassified

def lr_schedule(epoch):
    lr = 1e-3
    if epoch > 10:
        lr *= 0.1
    elif epoch > 20:
        lr *= 0.01
    return lr

def InitializeWeight (rete, file = 'weight_initialization.pth', is_dict = True):
    # Load the weight in file to forest_net and checks that this is done successfully
    init_weight_dict = torch.load(file)
    if not is_dict:
        init_weight_dict = init_weight_dict['model_state_dict']
    
    # Overwrite the weights
    with torch.no_grad():
        rete.load_state_dict(init_weight_dict)

def HaveSameWeight(rete, file, device, is_dict=True):
    # Check if the weights of the two nets are equal
    
    init_weight_dict = torch.load(file)
    if not is_dict:
        init_weight_dict = init_weight_dict['model_state_dict']
    
    with torch.no_grad():
        for i in rete.state_dict().keys():
            j = all((rete.state_dict()[i].to(device) == init_weight_dict[i].to(device)).detach().numpy().reshape(-1))
            print(i,j)

# Initial attempt to select just some random weights
def RandomWeights(forest_net, device):
    # Reset the values of all the weights that are not trainable
    memory, num_weight_layers = torch.load('memory.pth')
    saved_weights = torch.Tensor(2040)
    res = 0
    layers = ['1','2','3','4','out']
    
    #Save the current value of the trainable weights
    for i, num in enumerate(num_weight_layers):
        saved_weights[res : res+num] = forest_net.state_dict()[f'lin{layers[i]}.weight'][memory[res : res+num,0], memory[res : res+num,1]]
        res += num
    
    #Reset all the weights (trainable and not) to their initial value and check this
    InitializeWeight(forest_net)
    #HaveSameWeight(forest_net, 'weight_initialization.pth', 'cpu', is_dict=True)
    
    #Reset the trainable weights to the value after the training (so the updated one) 
    res = 0
    for i, num in enumerate(num_weight_layers):
        forest_net.state_dict()[f'lin{layers[i]}.weight'][memory[res : res+num,0], memory[res : res+num,1]] = saved_weights[res : res+num].to(device)
        res += num
    
    #Check that this update is successful
    #HaveSameWeight(forest_net, 'weight_initialization.pth', 'cpu', is_dict=True)

def ConvList(l):
    # It puts the list l from CUDA to CPU
    return list(torch.Tensor(l).to('cpu'))

def InitializeCheckRandWieght(rete):
    wi = torch.load('weight_initialization.pth')

    rete.lin1.weight = nn.Parameter(wi['lin1.weight'])
    rete.lin2.weight = nn.Parameter(wi['lin2.weight'])
    rete.lin3.weight = nn.Parameter(wi['lin3.weight'])
    rete.lin4.weight = nn.Parameter(wi['lin4.weight'])
    rete.linout.weight = nn.Parameter(wi['linout.weight'])

    for i in wi:
            j = all((rete.state_dict()[i] == wi[i]).detach().numpy().reshape(-1))
            print(i,j)


# Compare

In [11]:
def ValueModel(file, train, val, test, device, loss_func, len_train, len_val, len_test, net = MLP_forest()):
    # Given the model's weight in file it loads it in the net and then evaluate this in the different dataset
    # file is the name of the file where there is the dict saved with save_best_model. Il file deve stare in una cartella chiamata BestModels che si trova dove siamo
    # train, val, test are the dataloader of the corresponding sets
    #%cd BestModels
    InitializeWeight(net, file, is_dict = False)
    with torch.no_grad():
        loss_avg = make_averager()
        miss_sum = 0
        batch_bar =  tqdm(train, leave=False, desc='train', total=len(train))
        for inputs, classes in batch_bar:
            loss, miss = Validation(inputs, classes, device, net, loss_func)
            miss_sum += miss
            loss_avg(loss.item())
        tr_acc = 1 - miss_sum/len_train
        tr_loss = loss_avg(None)

        loss_avg = make_averager()
        miss_sum = 0
        batch_bar =  tqdm(val, leave=False, desc='validation', total=len(val))
        for inputs, classes in batch_bar:
            loss, miss = Validation(inputs, classes, device, net, loss_func)
            miss_sum += miss
            loss_avg(loss.item())
        val_acc = 1 - miss_sum/len_val
        val_loss = loss_avg(None)
        
        loss_avg = make_averager()
        miss_sum = 0
        batch_bar =  tqdm(test, leave=False, desc='test', total=len(test))
        for inputs, classes in batch_bar:
            loss, miss = Validation(inputs, classes, device, net, loss_func)
            miss_sum += miss
            loss_avg(loss.item())
        test_acc = 1 - miss_sum/len_test
        test_loss = loss_avg(None)
    
    print(f'Model loss and acc: Train: {tr_loss:.4f}     {tr_acc:.4f} \tVal: {val_loss:.4f}      {val_acc:.4f} \tTest: {test_loss:.4f}     {test_acc:.4f}')
    #%cd ..
    return (tr_loss, tr_acc), (val_loss, val_acc), (test_loss, test_acc)

def ShowTrainFile(name):
    
    with open(f'train_loss{name}.pkl', 'rb') as f:
        train_loss = pickle.load(f)
    
    with open(f'val_loss{name}.pkl', 'rb') as f1:
        val_loss = pickle.load(f1)
        
    with open(f'val_acc{name}.pkl', 'rb') as f2:
        val_acc = pickle.load(f2)
    a = len(val_loss)
    if len(train_loss) == a and len(val_acc) == a:
        print(f'The training phase of the model {name} is:\n')
        for i in range(a):
            print(f'Epochs: {i} \tTrain Loss: {train_loss[i]:.4f} \tValidation Loss: {val_loss[i]:.4f} \tValidation Accuracy: {val_acc[i]:.4f}\n')
            
def ShowTrain(train_loss,val_loss,val_acc):
    a = len(val_loss)
    if len(train_loss) == a and len(val_acc) == a:
        print(f'The training phase of the model is:\n')
        for i in range(a):
            print(f'Epochs: {i} \tTrain Loss: {train_loss[i]:.4f} \tValidation Loss: {val_loss[i]:.4f} \tValidation Accuracy: {val_acc[i]:.4f}')

        
def ROC_nets(truth, score):
    titles = ['Particle Net Lite', 'Particle Net Very Lite ch3', 'Particle Net Very Lite ch2']
    
    for i in range(len(truth)):
        fpr, tpr, _ = roc_curve(truth[i], score[i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label = ('AUC = %0.2f of the ' % roc_auc)+ titles[i])

    plt.title('ROC')
    plt.legend(loc = 'lower right')
    plt.plot([0, 1], [0, 1],'r--')
    plt.xlim([0, 1])
    plt.ylim([0, 1])
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.show()

def plot_loss_nets(losses, name_title = 'Train loss', name_axis = 'Loss'):
    # losses: list of list
    
    fig = go.Figure()
    titles = ['Train', 'BN', 'Rand']
    for i,los in enumerate(losses):
        fig.add_trace(go.Scatter(
            x=list(range(len(los))),
            y=los,
            name= titles[i]
        ))

    fig.update_layout(
        title= name_title + ' for all the trained networks',
        xaxis_title="Epoch",
        yaxis_title=name_axis,
        font=dict(
            family="Courier New, monospace",
            size=18,
            color="#7f7f7f"
        )
    )
    return fig
