We need to install some packages (pip install "package"):
- matplotlib
- numpy
- scikit-learn
- tensorboard
- torch

In [1]:
import random
import os

import numpy as np

from sklearn import datasets
from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, Subset
from torch.utils.tensorboard import SummaryWriter


In [2]:
# For reproducibility alwasy before starting a NN fix the seeds. 

def fix_random(seed: int) -> None:
    """Fix all the possible sources of randomness.

    Args:
        seed: the seed to use. 
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    
    torch.backends.cudnn.benchmark = False # Setta alcune operazioni interne a pytorch. (deterministca)
    torch.backends.cudnn.deterministic = True  # slower

seed = 42

In [3]:
# Define the Data Layer come i dati vengono chiesti e passati alla GPU

class MyDataset(Dataset):
    def __init__(self, X, y):
        
        # converto i dati in tensori 
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y) # long perchè sono interi cioè il mio è un problema
                                     # di classificazione
        self.num_features = X.shape[1] # numero di colonne. 
        self.num_classes = len(np.unique(y)) # numero di classi (ultimo layer)
    
    # i dati possono essere processati anche qua 

    # quanti sono i campioni nel dataset. Quanto è lungo il dataset
    
    def __len__(self):
        return self.X.shape[0] 

    # definisce il metodo in cui i dati vengono presi e restituiti
    # mi ritorna le righe dell'X (self.X[idx, :]) e il label dell'y (self.y[idx])

    def __getitem__(self, idx):
        return self.X[idx, :], self.y[idx]

In [4]:
# Define a simple neural network. Qui è dove definiamo il mio NN. 
# estendo la classe nn.Module (feedforward è il nome dato da noi)

class FeedForward(nn.Module):
    def __init__(self, input_size, num_classes, hidden_size):
        super(FeedForward, self).__init__()
        
        #dimensione input 
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        # due layer nn-Linear è fully connected
        
        self.input_layer = nn.Linear(self.input_size, self.hidden_size)
        
        self.output_layer = nn.Linear(self.hidden_size, num_classes)
        
        self.relu = nn.ReLU()
        
    # Qui definisco come fare il feedforward step e che definisce il grafo della computazione
    
    def forward(self, x):
        h = self.input_layer(x)  # passo all'input layer il dato di input
        h = self.relu(h)         # passo l'outpu dell'input layer ad una funzione non lineare
        output = self.output_layer(h) # qui non c'è la funzione esplicitamente ma è la softmax 
        return output                 # (classificazione multiclasse) che viene direttamente applicata
                                      # dalla cross-entropy

In [5]:
# Define a function for the training process
# criterion = loss
# optimizer = adam, sgd etc
# scheduler = per decrementare o incrementare il learning rate
# train_loader = mi restituisce il batch per il train. Prende i dati e crea il batch.
# val_loader = mi restituisce il batch per il val. Prende i dati e crea il batch.
# device = dove sto lavorando (CPU vs GPU).
# writer = tiene il log degli addesteramenti e li fa vedere graficamente
# log_name = il nome che diamo ai nostri modelli.

# iterazioni = numero di epoch / dimension batch

def train_model(model, criterion, optimizer, epoch, scheduler, train_loader, val_loader, device, writer, log_name="best_model"):
    n_iter = 0
    best_valid_loss = float('inf')  # la salvo per poter salvare il modello con la miglior loss
    
    # questo fa un'intera epoch
    for ep in range(epoch):
        model.train()  # dico al modello che è in training ad ogni epoca IMPORTANTE!
        
        #accedo ai mini batches, trail_loader genera numeri casuali e chiede al get_item di dargli i dati su quegli index
        for data, targets in train_loader:
            data, targets = data.to(device), targets.to(device)  # traferisce i dati in GPU.
            
            optimizer.zero_grad()

            # Forward pass
            y_pred = model(data)

            # Compute Loss
            loss = criterion(y_pred, targets)
            writer.add_scalar("Loss/train", loss, n_iter) #salvo e mostro la loss al training loss e n_iter servono per il grafico

            # Backward pass
            loss.backward() # calcola gli aggiornamenti di tutti i pesi
            optimizer.step() # aggiorno i pesi

            n_iter += 1
        
        labels, _, y_pred = test_model(model, val_loader, device)  # testo il modello attuale sul val set. 
        loss_val = criterion(y_pred, labels) 
        writer.add_scalar("Loss/val", loss_val, ep) #riplotto la loss per il val set. 
        
        # save best model
        if loss_val.item() < best_valid_loss:
            best_valid_loss = loss_val.item()
            if not os.path.exists('models'):
                os.makedirs('models')
            torch.save(model.state_dict(), 'models/'+log_name) # qui si caricano solo pesi e struttura state_dict()
        
        writer.add_scalar("Learning Rate", scheduler.get_last_lr()[0], ep) # salvo attuale learning rate. Step_lr è la funzione di stepper. 
        
        scheduler.step() # ad ogni epoch faccio uno step 
            
    return model

In [6]:
# Define a function to evaluate the performance on validation and test sets

def test_model(model, data_loader, device):
    model.eval() # metto il modello in evaluation IMPORTANTE!
    y_pred = []
    y_test = []
    
    for data, targets in data_loader:
        data, targets = data.to(device), targets.to(device)
        y_pred += model(data)
        y_test += targets
    
    # trasformo una lista di liste in un vettore di predizioni e lables
    y_test = torch.stack(y_test).squeeze() 
    y_pred = torch.stack(y_pred).squeeze()
    
    #calcolo l'accuratezza. 
    y_pred_c = y_pred.argmax(dim=1, keepdim=True).squeeze()
    
    return y_test, y_pred_c, y_pred

In [7]:
# look for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = torch.device('mps') per mac
print("Device: {}".format(device))

Device: cpu


In [8]:
#load the data
data = datasets.load_iris()
X = data['data']
y = data['target']

# Separate indices (PyTorch lavora in indici)
indices = np.arange(X.shape[0])
train_idx, test_idx = train_test_split(indices, test_size=0.2, stratify=y, random_state=seed) # stratify gli dice che gli esempi per classe siano equilibrati
train_idx, val_idx = train_test_split(train_idx, test_size=0.1, stratify=y[train_idx], random_state=seed)

X_train = X[train_idx,:]
y_train = y[train_idx]
X_val = X[val_idx,:]
y_val = y[val_idx]
X_test = X[test_idx,:]
y_test = y[test_idx]

print(X_train.shape[0])
print(X_val.shape[0])
print(X_test.shape[0])


# Scale data
if False:
    train_min = np.min(X_train, axis=0)
    train_max = np.max(X_train, axis=0)
    X_train = (X_train - train_min) / (train_max - train_min)
    X_val = (X_val - train_min) / (train_max - train_min)
    X_test = (X_test - train_min) / (train_max - train_min)


# Create the dataset
train_subset = MyDataset(X_train,y_train)
val_subset = MyDataset(X_val,y_val)
test_subset = MyDataset(X_test,y_test)


108
12
30


In [9]:
# hyperparameters base
num_epochs = 100
learning_rate = 0.01
gamma = 0.5 # quanto aggiorno il learning rate 
step_size = 20 # ogni quanto voglio aggiornare il leanring rate.
batch_size = 16 # la dimensione determina gli aggiornamenti. Più piccola più aggiornamenti 
hidden_size = 4

Run Tensorboard from the command line:

"tensorboard --logdir runs/"

or 

"python -m tensorboard.main --logdir runs/"

In [10]:
# fix the seed for reproducibility
fix_random(seed)


# Start tensorboard
writer = SummaryWriter()


# Create relative dataloaders mi danno i batches
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True) # se non shaffle = True i batch so sempre gli stessi ad ogni epoca
val_loader = DataLoader(val_subset, batch_size=batch_size)
test_loader = DataLoader(test_subset, batch_size=batch_size)


# Define the architecture, loss and optimizer
model = FeedForward(train_subset.num_features, train_subset.num_classes, hidden_size)
model.to(device)  # muove la rete neurale in GPU
criterion = torch.nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)


# Test before the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy before training:", acc.cpu().numpy())


# Train the model 
model = train_model(model, criterion, optimizer, num_epochs, scheduler, train_loader, val_loader, device, writer)


# Load best model
model.load_state_dict(torch.load("models/best_model", weights_only=True))
model.to(device)


# Test after the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy after training:", acc.cpu().numpy())


# Close tensorboard writer after a training
writer.flush()
writer.close()

Accuracy before training: 0.6666667
Accuracy after training: 1.0


In [12]:
# Let's define a new architecture

class FeedForwardPlus(nn.Module):
    def __init__(self, input_size, num_classes, hidden_size, depth=1):
        super(FeedForwardPlus, self).__init__()
        
        model = [
            nn.Linear(input_size, hidden_size),
            nn.BatchNorm1d(hidden_size),  # batch norm dopo lineare prima dell'attivazione
            nn.ReLU()
        ]

        # Blocco generico che definisce la serie di layer 
        block = [
            nn.Linear(hidden_size, hidden_size),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU()
        ]

        for i in range(depth):
            model += block
        
        # casto una lista così che diventi una rete neurale    
        self.model = nn.Sequential(*model)
        
        self.output = nn.Linear(hidden_size, num_classes)
        

    def forward(self, x):
        h = self.model(x)
        out = self.output(h)
        return out


In [13]:
# hyperparameters
num_epochs = 100
learning_rate = 0.01
gamma = 0.5
step_size = 20  
batch_size = 16
hidden_size = 4
depth = 1         #  new hyperparameter

In [14]:
# fix the seed for reproducibility
fix_random(seed)


# Start tensorboard
writer = SummaryWriter()


# Create relative dataloaders
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=batch_size)
test_loader = DataLoader(test_subset, batch_size=batch_size)


# Define the architecture, loss and optimizer
model = FeedForwardPlus(train_subset.num_features, train_subset.num_classes, hidden_size, depth)
print(model)
model.to(device)

criterion = torch.nn.CrossEntropyLoss()
#optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)


# Test before the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy before training:", acc.cpu().numpy())


# Train the model 
model = train_model(model, criterion, optimizer, num_epochs, scheduler, train_loader, val_loader, device, writer)


# Load best model
model.load_state_dict(torch.load("models/best_model", weights_only=True))
model.to(device)


# Test after the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy after training:", acc.cpu().numpy())


# Close tensorboard writer after a training
writer.flush()
writer.close()

FeedForwardPlus(
  (model): Sequential(
    (0): Linear(in_features=4, out_features=4, bias=True)
    (1): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Linear(in_features=4, out_features=4, bias=True)
    (4): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
  )
  (output): Linear(in_features=4, out_features=3, bias=True)
)
Accuracy before training: 0.33333334
Accuracy after training: 1.0
