# NN implemetation with Pythorch

# Imports

In [6]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch.nn as nn
import itertools
import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

## Utils functions and classes

In [30]:
class CustomDataset(Dataset):
    def __init__(self, X, Y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.Y = torch.tensor(Y, dtype=torch.float32)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

In [8]:
# Funzione per normalizzare i dati
def normalize(data):
    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(data)
    return normalized_data

In [9]:
def split_data(data, labels, k=5):
    """
    Divide i dati in k fold.
    
    Args:
        data (np.ndarray | pd.DataFrame): Dati di input.
        labels (np.ndarray | pd.Series): Etichette.
        k (int): Numero di fold.
    
    Returns:
        list: Lista di tuple (fold_data, fold_labels).
    """
    if not isinstance(data, np.ndarray):
        data = data.to_numpy()  # Converte DataFrame in NumPy array
    if not isinstance(labels, np.ndarray):
        labels = labels.to_numpy()  # Converte Series in NumPy array
    
    data = normalize(data)

    fold_size = len(data) // k
    indices = np.arange(len(data))
    np.random.shuffle(indices)
    folds = []

    for i in range(k):
        fold_indices = indices[i * fold_size: (i + 1) * fold_size]
        fold_data = data[fold_indices]
        fold_labels = labels[fold_indices]
        folds.append((fold_data, fold_labels))
    
    return folds

In [10]:
def generate_hyperparameter_combinations(param_ranges):
    """
    Genera tutte le combinazioni di iperparametri basate su range e step specificati.

    :param param_ranges: Dizionario con i nomi degli iperparametri come chiavi.
                         Ogni valore è una tupla (start, stop, step).
    :return: Lista di dizionari con tutte le combinazioni possibili.
    """
    param_values = {
        key: np.arange(start, stop + step, step)
        for key, (start, stop, step) in param_ranges.items()
    }
    
    param_combinations = list(itertools.product(*param_values.values()))
    return [
        dict(zip(param_values.keys(), combination))
        for combination in param_combinations
    ]

## Model creation, train and evaluation

In [11]:
class NN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, hidden_layers, alpha):
        super(NN, self).__init__()
        self.fc_input = nn.Linear(input_size, hidden_size)
        self.fc_hidden = [None] * (hidden_layers-1)
        for i in range(hidden_layers-1):
            self.fc_hidden[i] = nn.Linear(hidden_size, hidden_size)
        self.fc_output = nn.Linear(hidden_size, output_size)
        self.leacky_relu = nn.LeakyReLU(alpha)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.leacky_relu(self.fc_input(x))
        for i in range(len(self.fc_hidden)):
            x = self.leacky_relu(self.fc_hidden[i](x))
        x = self.sigmoid(self.fc_output(x))
        return x

In [33]:
def fit(data_loader, model, learning_rate, momentum, weight_decay, epochs, val_data=[], val_labels=[]):
    criterion = nn.BCELoss()  # Binary Cross Entropy Loss
    optimizer = optim.SGD(
        model.parameters(), 
        lr=learning_rate, 
        momentum=momentum, 
        nesterov=True, 
        weight_decay=weight_decay
    )
    history = {'train_loss': [], 'train_accuracy': [], 'val_loss': [], 'val_accuracy': []}
    # Addestramento
    for epoch in range(epochs):
        model.train()
        correct_predictions = 0  
        total_samples = 0  
        last_loss = 0 
        for inputs, labels in data_loader:
            optimizer.zero_grad()  # Reset dei gradienti
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, labels.unsqueeze(1))  # Calcolo della perdita
            loss.backward()  # Backpropagation
            optimizer.step()  # Aggiornamento dei pesi

            last_loss = loss.item()  # Memorizziamo l'ultima perdita

            # Calcolo dell'accuratezza
            predicted = (outputs >= 0.5).float()  # La soglia 0.5 per classificazione binaria
            correct_predictions += (predicted == labels.unsqueeze(1)).sum().item()  # Somma delle previsioni corrette
            total_samples += labels.size(0)  # Numero totale di esempi nel batch


        # Calcolo dell'accuratezza del train per epoca
        accuracy = 100 * correct_predictions / total_samples
        history['train_loss'].append(last_loss)
        history['train_accuracy'].append(accuracy)

        #validazione di quell'epoca
        if val_data is not None and val_labels is not None and len(val_data) > 0 and len(val_labels) > 0:
            result = evaluation(model, val_data, val_labels, criterion)
            history['val_loss'].append(result[0])
            history['val_accuracy'].append(result[1])
    return history

def evaluation(model, X, Y, criterion):
    model.eval()
    with torch.no_grad():
        inputs = torch.tensor(X, dtype=torch.float32)
        labels = torch.tensor(Y, dtype=torch.float32)
        outputs = model(inputs)
        loss = criterion(outputs, labels.unsqueeze(1))
        predictions = (outputs.squeeze() > 0.5).float()  # Soglia 0.5 per classificazione
        accuracy = (predictions == labels.unsqueeze(1)).float().mean()
        return [loss,accuracy]


## Double-k-fold cross validation

In [25]:
def double_k_fold_cross_validation(data, labels, in_size = 6, out_size = 1, outer_k = 5, inner_k = 5, param_grid=None):
    """
    Implementa una Double K-Fold Cross-Validation

    Args:
        data (np.ndarray): Caratteristiche del dataset.
        labels (np.ndarray): Etichette del dataset.
        outer_k (int): Numero di fold per la validazione esterna.
        inner_k (int): Numero di fold per l'ottimizzazione iperparametri.
        param_grid (list): Lista di dizionari con gli iperparametri da provare.
    
    Returns:
        list: Lista dei punteggi ottenuti per ogni fold esterno.
    """
    outer_scores = []
    outer_params = []
    outer_folds = split_data(data, labels, k=outer_k)
    
    for i in range(outer_k):
        print("Outer fold", i + 1)
        outer_test_data, outer_test_labels = outer_folds[i]
        outer_train_data = np.concatenate([fold[0] for j, fold in enumerate(outer_folds) if j != i])
        outer_train_labels = np.concatenate([fold[1] for j, fold in enumerate(outer_folds) if j != i])
        
        best_params = {}
        best_score = -np.inf
        
        # Validazione interna per ottimizzazione iperparametri
        inner_folds = split_data(outer_train_data, outer_train_labels, k=inner_k)

        for params in param_grid:
            inner_scores = []
            
            for j in range(inner_k):
                inner_val_data, inner_val_labels = inner_folds[j]
                inner_train_data = np.concatenate([fold[0] for l, fold in enumerate(inner_folds) if l != j])
                inner_train_labels = np.concatenate([fold[1] for l, fold in enumerate(inner_folds) if l != j])
                dataset = CustomDataset(inner_train_data, inner_train_labels)
                data_loader = DataLoader(dataset, batch_size=int(params['batch_size']), shuffle=False)
                # Creazione della rete neurale
                model = NN(input_size = in_size, hidden_size = params['hidden_size'], output_size = out_size, 
                           hidden_layers = params['hidden_layers'], alpha=params['alpha'])
                # Allena il modello e ottieni la cronologia (history)
                history = fit(data_loader=data_loader, model=model, learning_rate=params['learning_rate'],
                              weight_decay=params['regularization'], momentum=params['momentum'],
                              epochs=params['epochs'], val_data=inner_val_data, val_labels=inner_val_labels)
    
                inner_scores.append(max(history['val_accuracy'])
)
            
            avg_score = np.mean(inner_scores)
            if avg_score > best_score:
                best_score = avg_score
                best_params = params
        
        # Addestramento finale sul set di train esterno
        dataset = CustomDataset(outer_train_data, outer_train_labels)
        data_loader = DataLoader(dataset, batch_size=int(params['batch_size']), shuffle=False)
        final_model = NN(input_size = in_size, hidden_size = params['hidden_size'], output_size = out_size, 
                         hidden_layers = params['hidden_layers'], alpha=params['alpha'])
        history = fit(data_loader=data_loader, model=final_model, learning_rate=params['learning_rate'],
                      weight_decay=params['regularization'], momentum=params['momentum'],
                      epochs=params['epochs'], val_data=outer_test_data, val_labels=outer_test_labels)
        outer_scores.append(max(history['val_accuracy']))
        outer_params.append(best_params)
    
    return outer_scores, outer_params

## K-fold cross validation

In [24]:
def k_fold_cross_validation(data, labels, in_size, out_size, params, k=5):
    """
    Implementa una K-Fold Cross-Validation
    """
    scores = []
    folds = split_data(data, labels, input_size = 6, output_size = 1, k=k)


    for i in range(k):
        # Creazione della rete neurale
        model = NN(input_dim = in_size, hidden_size = params['hidden_size'], output_dim = out_size,
                   hidden_layers = params['hidden_layers'], alpha=params['alpha'])
        test_data, test_labels = folds[i]
        train_data = np.concatenate([fold[0] for j, fold in enumerate(folds) if j != i])
        train_labels = np.concatenate([fold[1] for j, fold in enumerate(folds) if j != i])
        dataset = CustomDataset(train_data, train_labels)
        data_loader = DataLoader(dataset, batch_size=int(params['batch_size']), shuffle=False)
        history = fit(data_loader=data_loader, model=model, learning_rate=params['learning_rate'], 
                      weight_decay=params['regularization'], momentum=params['momentum'],
                      epochs=params['epochs'], val_data=test_data, val_labels=test_labels)
        score = max(history['val_accuracy'])
        scores.append(score)
            
    avg_score = np.mean(scores)

    train_data = np.concatenate([fold[0] for fold in folds])
    train_labels = np.concatenate([fold[1] for fold in folds])
    dataset = CustomDataset(train_data, train_labels)
    data_loader = DataLoader(dataset, batch_size=int(params['batch_size']), shuffle=False)
    model = NN(input_size = in_size, hidden_size = params['hidden_size'], output_size = out_size,
               hidden_layers = params['hidden_layers'], alpha=params['alpha'])
        
    history = fit(data_loader=data_loader, model=model, learning_rate=params['learning_rate'], 
                  weight_decay=params['regularization'], momentum=params['momentum'],
                  epochs=params['epochs'])


    return avg_score, model

## Data loading

In [15]:
# Carica i file di addestramento e test per ciascun dataset dal percorso specificato
monk1_train = pd.read_csv('../Datasets/Monks/monks-1.train', sep='\s+', header=None)
monk1_test = pd.read_csv('../Datasets/Monks/monks-1.test', sep='\s+', header=None)

monk2_train = pd.read_csv('../Datasets/Monks/monks-2.train', sep='\s+', header=None)
monk2_test = pd.read_csv('../Datasets/Monks/monks-2.test', sep='\s+', header=None)

monk3_train = pd.read_csv('../Datasets/Monks/monks-3.train', sep='\s+', header=None)
monk3_test = pd.read_csv('../Datasets/Monks/monks-3.test', sep='\s+', header=None)


# Separazione tra features e labels per monk1
X1_train = monk1_train.iloc[:, 1:7].values  # Caratteristiche (features)
y1_train = monk1_train.iloc[:, 0].values   # Etichette (labels)

X1_test = monk1_test.iloc[:, 1:7].values
y1_test = monk1_test.iloc[:, 0].values

# Separazione tra features e labels per monk2
X2_train = monk2_train.iloc[:, 1:7].values
y2_train = monk2_train.iloc[:, 0].values

X2_test = monk2_test.iloc[:, 1:7].values
y2_test = monk2_test.iloc[:, 0].values

# Separazione tra features e labels per monk3
X3_train = monk3_train.iloc[:, 1:7].values
y3_train = monk3_train.iloc[:, 0].values

X3_test = monk3_test.iloc[:, 1:7].values
y3_test = monk3_test.iloc[:, 0].values

In [34]:
# Definisce i parametri della rete neurale
input_size = 6
output_size = 1

# Definizione dei range degli iperparametri
param_ranges = {
    "learning_rate": (0.001, 0.001, 0.005),  # Da 0.001 a 0.01 con step di 0.005
    "epochs": (50, 50, 1),                   # Da 0 a 10 con step di 1
    "batch_size": (8, 8, 8),             # Da 8 a 32 con step di 8
    "hidden_size": (3, 3, 1),           # Da 32 a 128 con step di 32
    "hidden_layers": (1, 1, 1),             # Da 1 a 3 con step di 1
    "momentum": (0.9, 0.92, 0.01),          # Da 0.9 a 0.99 con step di 0.01
    "regularization": (0.01, 0.01, 0.05),             # Da 0.0 a 0.1 con step di 0.05
    "alpha": (0.01, 0.01, 0.01)              # Da 0.01 a 0.1 con step di 0.05
}

#start_time = time.time()
print("Generazione delle combinazioni di iperparametri...")
param_grid = []
param_grid = generate_hyperparameter_combinations(param_ranges)
#end_time = time.time()

#execution_time = end_time - start_time
#print(f"Tempo di esecuzione di generate_hyperparameter_combinations: {execution_time} secondi")

scores,params = double_k_fold_cross_validation(X1_train, y1_train, input_size, output_size, outer_k=5, inner_k=3, param_grid=param_grid)
for i in range(len(scores)):
    print(f"Fold {i + 1}: {scores[i]}")
    print(f"  params: {params[i]}")
print("Punteggio medio:", np.mean(scores))

Generazione delle combinazioni di iperparametri...
Outer fold 1
Outer fold 2
Outer fold 3
Outer fold 4
Outer fold 5
Fold 1: 0.4444444477558136
  params: {'learning_rate': 0.001, 'epochs': 50, 'batch_size': 8, 'hidden_size': 3, 'hidden_layers': 1, 'momentum': 0.9, 'regularization': 0.01, 'alpha': 0.01}
Fold 2: 0.4722222089767456
  params: {'learning_rate': 0.001, 'epochs': 50, 'batch_size': 8, 'hidden_size': 3, 'hidden_layers': 1, 'momentum': 0.92, 'regularization': 0.01, 'alpha': 0.01}
Fold 3: 0.4652777910232544
  params: {'learning_rate': 0.001, 'epochs': 50, 'batch_size': 8, 'hidden_size': 3, 'hidden_layers': 1, 'momentum': 0.91, 'regularization': 0.01, 'alpha': 0.01}
Fold 4: 0.5243055820465088
  params: {'learning_rate': 0.001, 'epochs': 50, 'batch_size': 8, 'hidden_size': 3, 'hidden_layers': 1, 'momentum': 0.92, 'regularization': 0.01, 'alpha': 0.01}
Fold 5: 0.4652777910232544
  params: {'learning_rate': 0.001, 'epochs': 50, 'batch_size': 8, 'hidden_size': 3, 'hidden_layers': 1, 'm