In [1]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset, Subset
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from itertools import product
from sklearn.model_selection import KFold, StratifiedKFold
import matplotlib.pyplot as plt
import lightning as L
import torch.nn.functional as F
from torchmetrics import Accuracy
from lightning.pytorch.callbacks.early_stopping import EarlyStopping, Callback
from joblib import Parallel, delayed

import warnings
warnings.filterwarnings('ignore')
import logging
logger = logging.getLogger("pytorch_lightning.trainer.trainer")
logger.setLevel(logging.WARNING)

In [2]:
def plot_err_curve(result):

    plt.subplots(1,2,1)
    #accuracy plot
    plt.plot(models_train_acc[i][0:b_ep], label='train_acc') 
    plt.plot(models_val_acc[i][0:b_ep], label='valid_acc', linestyle='dashed')       
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend(loc = 'lower right')

    #Loss plot
    plt.plot(models_train_mse[i][0:b_ep], label='Loss') 
    plt.plot(models_val_mse[i][0:b_ep], label='val_loss', linestyle='dashed')       
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend(loc = 'upper right')

    plt.show()
    

# Pytorch Litghining MLP

In [3]:
class MyDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [4]:
# Define the callback to track the metrics
class MetricsCallback(Callback):
    def __init__(self):
        super().__init__()
        self.training_acc = []
        self.validation_acc = []
        self.training_loss = []
        self.validation_loss = []
        
        self.train_loss_history = []
        self.train_acc_history = []
        self.val_loss_history = []
        self.val_acc_history = []

    def on_validation_end(self, trainer, pl_module):
        self.validation_acc.append(trainer.callback_metrics['val_acc'])
        self.validation_loss.append(trainer.callback_metrics['val_loss'])

    def on_validation_epoch_end(self, trainer, pl_module):
        self.val_loss_history.append(trainer.callback_metrics['val_loss'].item())
        self.val_acc_history.append(trainer.callback_metrics['val_acc'].item())

    def on_train_end(self, trainer, pl_module):
        print(trainer.callback_metrics)
        self.training_acc.append(trainer.callback_metrics['tr_acc'])
        self.training_loss.append(trainer.callback_metrics['tr_loss'])

    def on_train_epoch_end(self, trainer, pl_module):
        self.train_loss_history.append(trainer.callback_metrics['tr_loss'].item())
        self.train_acc_history.append(trainer.callback_metrics['tr_acc'].item())

In [5]:
class MLP_Light_NN(L.LightningModule):
    def __init__(self,  input_size, hidden_size, output_size, num_h_layers, f_act, lr, momentum, weight_decay):
        super().__init__()
        self.layers = nn.ModuleList([nn.Linear(input_size, hidden_size)])
        self.func=f_act()
        # Define the hidden layers
        for i in range(1, num_h_layers):
            self.layers.append(nn.Linear(hidden_size, hidden_size))
            
        self.layers.append(nn.Linear(hidden_size, output_size))
        
        self.lr=lr
        self.momentum=momentum
        self.weight_decay=weight_decay

    def forward(self, x):
        for layer in self.layers[:-1]:
            x = self.func(layer(x))
        
        out=self.layers[-1]
        x=torch.sigmoid(out(x))
        return x

    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        x, y = batch
        x_hat = self(x)
        y = y.unsqueeze(0)
        #print(x,y,x_hat)
        loss = F.mse_loss(x_hat, y)
        acc = Accuracy(task='binary')(x_hat, y)
        self.log("tr_loss", loss, prog_bar=True,on_epoch=True, on_step=False)
        self.log('tr_acc', acc, prog_bar=True, on_epoch=True, on_step=False)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.SGD(self.parameters(), lr=self.lr,momentum=self.momentum,weight_decay=self.weight_decay)
        return optimizer
    
    def validation_step(self, batch, batch_idx):
        # this is the validation loop
        x, y = batch
        x_hat = self(x)
        y = y.unsqueeze(0)
        #print(x,y,x_hat)
        loss = F.mse_loss(x_hat, y)
        acc = Accuracy(task='binary')(x_hat, y)
        self.log("val_loss", loss, prog_bar=True,on_epoch=True, on_step=False)
        self.log('val_acc', acc, prog_bar=True, on_epoch=True, on_step=False)
        return loss


In [6]:
def training_NN(tr_data,tr_labels,dataset,params,param_grid):
    param_dict = dict(zip(param_grid.keys(), params))

    val_acc_1_fold=[]
    tr_acc_1_fold=[]

    for fold, (tr_idx, val_idx) in enumerate(StratifiedKFold(n_splits=5).split(tr_data,tr_labels)):
        print(fold)
        tr_dataset = Subset(dataset, tr_idx)
        val_dataset = Subset(dataset, val_idx)
        tr_loader = DataLoader(tr_dataset, batch_size=None, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=None, shuffle=False)

        metrics_callback = MetricsCallback()
        early_stopping = EarlyStopping(monitor='val_loss', patience=25, mode='min', verbose=False, min_delta=0.0001)

        trainer = L.Trainer(max_epochs=500,callbacks=[metrics_callback, early_stopping],enable_progress_bar=False)
        model=MLP_Light_NN(input_size=tr_data.shape[1], output_size=1, **param_dict)
        trainer.fit(model, tr_loader, val_loader)

        val_acc_1_fold.append(trainer.callback_metrics['val_acc'])

    mean_val_acc = np.mean(val_acc_1_fold)
    mean_tr_acc = np.mean(tr_acc_1_fold)

    return params,mean_val_acc

In [7]:
path=r'/home/ludovico/ML-project/data/monks-1'
monk1_train = pd.read_csv(path+'.train', header=None, delim_whitespace=True, dtype=str)
monk1_test = pd.read_csv(path+'.test', header=None, delim_whitespace=True, dtype=str)

tr_labels=monk1_train[0].astype(np.float32)
tr_data=monk1_train[monk1_train.columns[1:-1]]#.astype(np.float32)

test_labels=monk1_test[0].astype(np.float32)
test_data=monk1_test[monk1_train.columns[1:-1]]#.astype(np.float32)

tr_data=pd.get_dummies(tr_data).astype(np.float32)
test_data=pd.get_dummies(test_data).astype(np.float32)

tr_data=torch.tensor(tr_data.values)
test_data=torch.tensor(test_data.values)
tr_labels=torch.tensor(tr_labels.values)
test_labels=torch.tensor(test_labels.values)


In [None]:
from set_extraction_monks import x_train_m1, y_train_m1, x_test_m1, y_test_m1, x_train_m2, y_train_m2, x_test_m2, y_test_m2, x_train_m3, y_train_m3, x_test_m3, y_test_m3
x_train_m1 = np.array(x_train_m1, dtype='float32')
y_train_m1 = np.array(y_train_m1, dtype='float32')
x_test_m1 = np.array(x_test_m1, dtype='float32')
y_test_m1 = np.array(y_test_m1, dtype='float32')

x_train_m2 = np.array(x_train_m2, dtype='float32')
y_train_m2 = np.array(y_train_m2, dtype='float32')
x_test_m2 = np.array(x_test_m2, dtype='float32')
y_test_m2 = np.array(y_test_m2, dtype='float32')

x_train_m3 = np.array(x_train_m3, dtype='float32')
y_train_m3 = np.array(y_train_m3, dtype='float32')
x_test_m3 = np.array(x_test_m3, dtype='float32')
y_test_m3 = np.array(y_test_m3, dtype='float32')

class MyDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return torch.tensor(self.x[idx]), torch.tensor(self.y[idx])

In [None]:
dataset = MyDataset(tr_data, tr_labels)
dataloader = DataLoader(dataset, shuffle=True) 

dataset = MyDataset(x_train_m1, y_train_m1)
dataloader = DataLoader(dataset, shuffle=True,num_workers=7) 



param_grid = {
    'num_h_layers': [1],
    'hidden_size': [2,4,8],
    'f_act': [nn.ReLU],
    'lr': [0.01,0.1,1],
    'momentum': [0.1], 
    'weight_decay': [0], 
}


# Generate all possible combinations of hyperparameters
param_combinations = product(*param_grid.values())
    
results = Parallel(n_jobs=-1)(
    delayed(training_NN)(tr_data,tr_labels, dataset, params,param_grid) for params in param_combinations
)

# Stampa i risultati
for params, val_acc in results:
    print(f"Parameters: {params}, Validation acc: {val_acc}")
