In [1]:
import os
import random
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import glob
from scipy.signal import butter, filtfilt
from scipy.signal import resample_poly
import librosa
import pickle
from collections import Counter

In [3]:
#riduzione dataset seed
import os
import numpy as np
from scipy.io import loadmat
import random

# Percorsi
input_path = '/kaggle/input/seed-dataset/SEED/SEED_EEG/Preprocessed_EEG'
output_path = '/kaggle/working/reduced_dataset'

# Crea la cartella di output
os.makedirs(output_path, exist_ok=True)

# Parametri per la riduzione
channel_reduction_ratio = 3  # Dividi il numero di canali per 3
sample_reduction_ratio = 2   # Dividi il numero di campioni per 2

# Elenca tutti i file .mat
mat_files = [f for f in os.listdir(input_path) if f.endswith('.mat') and f != 'label.mat']
print(f"Trovati {len(mat_files)} file .mat nella directory.")

# Itera su tutti i file
for file_name in mat_files:
    file_path = os.path.join(input_path, file_name)
    print(f"Processando il file: {file_name}")
    
    # Carica il file .mat
    mat_data = loadmat(file_path)
    
    # Trova tutte le chiavi che rappresentano trial (chiavi con strutture comuni come djc_eeg, ys_eeg, ecc.)
    trial_keys = [key for key in sorted(mat_data.keys()) if not key.startswith('__') and isinstance(mat_data[key], np.ndarray)]
    print(f"Trial trovati in {file_name}: {trial_keys}")
    
    if not trial_keys:
        print(f"Nessun trial trovato in {file_name}, salto il file.")
        continue
    
    # Seleziona casualmente un terzo dei trial
    random.seed(42)  # Per replicabilità
    selected_keys = random.sample(trial_keys, len(trial_keys) // 3)
    
    for key in selected_keys:  # Itera SOLO sui trial selezionati
        data = mat_data[key]
        
        # Riduzione del numero di canali
        num_channels = data.shape[0]
        reduced_channels = num_channels // channel_reduction_ratio
        reduced_data = data[:reduced_channels, :]
        
        # Riduzione del numero di campioni
        num_samples = reduced_data.shape[1]
        reduced_samples = num_samples // sample_reduction_ratio
        reduced_data = reduced_data[:, :reduced_samples]
        
        # Salva i dati ridotti
        output_file = os.path.join(output_path, f"{file_name.replace('.mat', '')}_{key}.npz")
        np.savez(output_file, data=reduced_data)

print(f"Dataset ridotto salvato in: {output_path}")


Trovati 45 file .mat nella directory.
Processando il file: 14_20140601.mat
Trial trovati in 14_20140601.mat: ['ys_eeg1', 'ys_eeg10', 'ys_eeg11', 'ys_eeg12', 'ys_eeg13', 'ys_eeg14', 'ys_eeg15', 'ys_eeg2', 'ys_eeg3', 'ys_eeg4', 'ys_eeg5', 'ys_eeg6', 'ys_eeg7', 'ys_eeg8', 'ys_eeg9']
Processando il file: 3_20140611.mat
Trial trovati in 3_20140611.mat: ['jj_eeg1', 'jj_eeg10', 'jj_eeg11', 'jj_eeg12', 'jj_eeg13', 'jj_eeg14', 'jj_eeg15', 'jj_eeg2', 'jj_eeg3', 'jj_eeg4', 'jj_eeg5', 'jj_eeg6', 'jj_eeg7', 'jj_eeg8', 'jj_eeg9']
Processando il file: 10_20131204.mat
Trial trovati in 10_20131204.mat: ['ww_eeg1', 'ww_eeg10', 'ww_eeg11', 'ww_eeg12', 'ww_eeg13', 'ww_eeg14', 'ww_eeg15', 'ww_eeg2', 'ww_eeg3', 'ww_eeg4', 'ww_eeg5', 'ww_eeg6', 'ww_eeg7', 'ww_eeg8', 'ww_eeg9']
Processando il file: 1_20131030.mat
Trial trovati in 1_20131030.mat: ['djc_eeg1', 'djc_eeg10', 'djc_eeg11', 'djc_eeg12', 'djc_eeg13', 'djc_eeg14', 'djc_eeg15', 'djc_eeg2', 'djc_eeg3', 'djc_eeg4', 'djc_eeg5', 'djc_eeg6', 'djc_eeg7', 'dj

In [4]:
import os
import glob
import numpy as np
import warnings
import matplotlib.pyplot as plt

# Percorsi
input_folder = '/kaggle/working/reduced_dataset'
output_folder = '/kaggle/working/preprocessed_1_sec'
os.makedirs(output_folder, exist_ok=True)

# Ignora i warning
warnings.filterwarnings("ignore")

# Parametri per la segmentazione e il filtro
fs = 200  # Frequenza di campionamento in Hz
epoch_length = fs  # Campioni in 1 secondo (200 campioni per 200 Hz)
upper_limit = 500  # Valore massimo per la pulizia del segnale
lower_limit = -500  # Valore minimo per la pulizia del segnale

# Etichette definite
labels = [1, 0, -1, -1, 0, 1, -1, 0, 1, 1, 0, -1, 0, 1, -1]  # 15 etichette

# Funzione per il filtro band-pass Butterworth
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return b, a

# Funzione per plottare i segnali
def plot_signals(original_signal, cleaned_signal, channel_idx, file_name):
    plt.figure(figsize=(15, 10))

    plt.subplot(2, 1, 1)
    plt.plot(original_signal, label='Originale', color='blue')
    plt.title(f"{file_name} - Canale {channel_idx} (Originale)")
    plt.xlabel("Campioni")
    plt.ylabel("Ampiezza")
    plt.legend()

    plt.subplot(2, 1, 2)
    plt.plot(cleaned_signal, label='Pulito (±500 µV)', color='red')
    plt.title(f"{file_name} - Canale {channel_idx} (Pulito)")
    plt.xlabel("Campioni")
    plt.ylabel("Ampiezza")
    plt.legend()

    plt.tight_layout()
    plt.show()

# Elaborazione file
file_paths = glob.glob(f"{input_folder}/*.npz")  # File di input

for file_path in file_paths:
    try:
        # Carica il file .npz
        file_name = os.path.basename(file_path).replace('.npz', '')
        print(f"Processando il file: {file_name}")

        npz_data = np.load(file_path)
        
        # Itera sulle chiavi del file .npz
        for key in npz_data.keys():
            data = npz_data[key]  # Forma: (n_channels, n_samples)
            print(f"Elaborazione della chiave: {key} - Forma: {data.shape}")
            
            # Epoche di 1 secondo
            n_channels, n_samples = data.shape
            n_epochs = n_samples // epoch_length  # Numero di epoche complete
            epochs = []

            for i in range(n_epochs):
                start_idx = i * epoch_length
                end_idx = start_idx + epoch_length
                epoch = data[:, start_idx:end_idx]  # Estrai epoca

                # Pulizia del segnale (valori superiori a ±500 µV)
                epoch[epoch > upper_limit] = 0
                epoch[epoch < lower_limit] = 0

                epochs.append(epoch)

            epochs = np.array(epochs)  # Forma: (n_epochs, n_channels, epoch_length)
            print(f"Dati suddivisi in epoche per la chiave {key}: {epochs.shape}")

            # Estrazione del numero per la label
            file_number = int(file_name.split('_')[0])  # Prendi il numero del file
            print(file_number)
            label_index = (file_number - 1) % len(labels)
            label = labels[label_index]

            # Salva le epoche come file .npz con la label inclusa
            output_path = os.path.join(output_folder, f"processed_{file_name}_{key}.npz")
            np.savez(output_path, data=epochs, label=label)
            print(f"File salvato con etichetta {label} in: {output_path}")

    except Exception as e:
        print(f"Errore durante il preprocessing del file {file_path}: {e}")


Processando il file: 2_20140413_jl_eeg5
Elaborazione della chiave: data - Forma: (20, 18500)
Dati suddivisi in epoche per la chiave data: (92, 20, 200)
2
File salvato con etichetta 0 in: /kaggle/working/preprocessed_1_sec/processed_2_20140413_jl_eeg5_data.npz
Processando il file: 3_20140603_jj_eeg5
Elaborazione della chiave: data - Forma: (20, 18500)
Dati suddivisi in epoche per la chiave data: (92, 20, 200)
3
File salvato con etichetta -1 in: /kaggle/working/preprocessed_1_sec/processed_3_20140603_jj_eeg5_data.npz
Processando il file: 6_20131113_mhw_eeg5
Elaborazione della chiave: data - Forma: (20, 18500)
Dati suddivisi in epoche per la chiave data: (92, 20, 200)
6
File salvato con etichetta 1 in: /kaggle/working/preprocessed_1_sec/processed_6_20131113_mhw_eeg5_data.npz
Processando il file: 10_20131130_ww_eeg6
Elaborazione della chiave: data - Forma: (20, 19500)
Dati suddivisi in epoche per la chiave data: (97, 20, 200)
10
File salvato con etichetta 1 in: /kaggle/working/preprocessed

In [5]:
import os
import glob
import numpy as np
import warnings
import random
from sklearn.model_selection import train_test_split

# Percorsi
input_folder = '/kaggle/working/preprocessed_1_sec'
train_folder = '/kaggle/working/train_set'
test_folder = '/kaggle/working/test_set'
os.makedirs(train_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

# Ignora i warning
warnings.filterwarnings("ignore")

# Recupera tutti i file elaborati
file_paths = glob.glob(f"{input_folder}/*.npz")
print(f"Trovati {len(file_paths)} file nel dataset elaborato.")

# Suddivisione in train e test
train_files, test_files = train_test_split(file_paths, test_size=0.2, random_state=42)
print(f"File di train: {len(train_files)}, File di test: {len(test_files)}")

# Funzione per spostare i file nelle rispettive cartelle
def save_files(file_list, destination_folder):
    for file_path in file_list:
        file_name = os.path.basename(file_path)
        destination_path = os.path.join(destination_folder, file_name)
        os.rename(file_path, destination_path)
        print(f"File spostato: {file_name} -> {destination_folder}")

# Salva i file
save_files(train_files, train_folder)
save_files(test_files, test_folder)

print("Divisione in train e test completata.")


Trovati 225 file nel dataset elaborato.
File di train: 180, File di test: 45
File spostato: processed_3_20140603_jj_eeg10_data.npz -> /kaggle/working/train_set
File spostato: processed_15_20131105_zjy_eeg13_data.npz -> /kaggle/working/train_set
File spostato: processed_12_20131127_wyw_eeg5_data.npz -> /kaggle/working/train_set
File spostato: processed_9_20140627_wk_eeg6_data.npz -> /kaggle/working/train_set
File spostato: processed_6_20131113_mhw_eeg10_data.npz -> /kaggle/working/train_set
File spostato: processed_1_20131030_djc_eeg10_data.npz -> /kaggle/working/train_set
File spostato: processed_7_20131106_phl_eeg10_data.npz -> /kaggle/working/train_set
File spostato: processed_4_20140702_lqj_eeg5_data.npz -> /kaggle/working/train_set
File spostato: processed_10_20131204_ww_eeg1_data.npz -> /kaggle/working/train_set
File spostato: processed_8_20140521_sxy_eeg13_data.npz -> /kaggle/working/train_set
File spostato: processed_13_20140603_xyl_eeg13_data.npz -> /kaggle/working/train_set
Fi

**FASE DI PREPROCESSING E REORDERING DEI FILE IN COLONNE**

***

In [6]:
# ------------------- Funzioni per Salvare e Caricare Modelli -------------------
save_path = "/kaggle/working/models"
os.makedirs(save_path, exist_ok=True)

def save_model(model, path):
    torch.save(model.state_dict(), path)
    print(f"Modello salvato in {path}")


#tolgo il module 
def load_model(model, path, device="cuda"):
    state_dict = torch.load(path, map_location=device)
    if "module." in list(state_dict.keys())[0]:
        state_dict = {k.replace("module.", ""): v for k, v in state_dict.items()}
    model.load_state_dict(state_dict)
    print(f"Modello caricato da {path}")

**FASE DI TRAINING DELL'ENCODER RICORRENTE E CONVOLUZIONALE**

********

In [6]:
#AUGMENTATION E CHUNK DATI
def min_max_amplitude_scale(data, scale_min=0.5, scale_max=2):
    scale_factor = random.uniform(scale_min, scale_max)
    return data * scale_factor

def time_shift(data, shift_min=-50, shift_max=50):
    shift_samples = random.randint(shift_min, shift_max)
    return np.roll(data, shift_samples)

def dc_shift(data, shift_min=-10, shift_max=10):
    shift_value = random.uniform(shift_min, shift_max)
    return data + shift_value

def zero_masking(data, mask_min=0, mask_max=150):
    mask_size = random.randint(mask_min, mask_max)
    start_idx = random.randint(0, len(data) - mask_size)
    data[start_idx:start_idx+mask_size] = 0
    return data

def add_gaussian_noise(data, sigma_min=0, sigma_max=0.2):
    sigma = random.uniform(sigma_min, sigma_max)
    noise = np.random.normal(0, sigma, len(data))
    return data + noise

def apply_random_transformations_class(channel_data):
    transformations = [min_max_amplitude_scale, time_shift, dc_shift, zero_masking, add_gaussian_noise]
    selected_transform = random.choice(transformations)
    transformed_data = selected_transform(channel_data.copy())
    return transformed_data
    
def apply_random_transformations(channel_data):
    transformations = [min_max_amplitude_scale, time_shift, dc_shift, zero_masking, add_gaussian_noise]
    selected_transforms = random.sample(transformations, 2)
    transformed_data_1 = selected_transforms[0](channel_data.copy())
    transformed_data_2 = selected_transforms[1](channel_data.copy())
    return transformed_data_1, transformed_data_2
    
    
def chunk_data(data, chunk_size=4000):
    """Divide i dati in chunk della dimensione specificata."""
    chunks = []
    num_chunks = len(data) // chunk_size

    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size
        chunks.append(data[start_idx:end_idx])
    
    return chunks

In [7]:
class ConvolutionalEncoder(nn.Module):
    def __init__(self, input_channels=1, output_dim=4, repeat_blocks=4):
        super(ConvolutionalEncoder, self).__init__()
        
        # Parallel convolutional paths
        self.conv1d_128 = nn.Sequential(
            nn.ReflectionPad1d((63, 64)),
            nn.Conv1d(input_channels, 100, kernel_size=128, stride=1)
        )
        self.conv1d_64 = nn.Sequential(
            nn.ReflectionPad1d((31, 32)),
            nn.Conv1d(input_channels, 100, kernel_size=64, stride=1)
        )
        self.conv1d_16 = nn.Sequential(
            nn.ReflectionPad1d((7, 8)),
            nn.Conv1d(input_channels, 50, kernel_size=16, stride=1)
        )

        # Dense layer to merge paths
        self.concat_dense = nn.Linear(100 + 100 + 50, 250)

        # Repeat N=4 blocks
        self.repeat_blocks = nn.ModuleList([
            nn.Sequential(
                nn.ReLU(),
                nn.BatchNorm1d(250),
                nn.ReflectionPad1d((31, 32)),
                nn.Conv1d(250, 250, kernel_size=64, stride=1)
            ) for _ in range(repeat_blocks)
        ])

        # Final block
        self.final_relu = nn.ReLU()
        self.final_bn = nn.BatchNorm1d(250)
        self.final_conv = nn.Sequential(
            nn.ReflectionPad1d((31, 32)),
            nn.Conv1d(250, output_dim, kernel_size=64, stride=1)
        )
        
    def forward(self, x):
        # Input shape: [batch_size, sequence_length, channels] modify

        
        x = x.permute(0, 2, 1)  #[batch_size, channels, sequence_length]
        
        # Parallel convolutional paths
        x1 = self.conv1d_128(x)
        x2 = self.conv1d_64(x)
        x3 = self.conv1d_16(x)

        # Concatenate paths
        x_cat = torch.cat([x1, x2, x3], dim=1)  # [batch_size, 250, sequence_length]
        
        # Dense layer
        x_dense = self.concat_dense(x_cat.permute(0, 2, 1)).permute(0, 2, 1)

        # Repeated blocks
        x_repeated = x_dense
        for block in self.repeat_blocks:
            x_repeated = x_repeated + block(x_repeated)  # Residual connection

        # Final block
        x_final = self.final_relu(x_repeated)
        x_final = self.final_bn(x_final)
        x_final = self.final_conv(x_final)
        x_final = x_final.permute(0, 2, 1)
        #print("xfinal:", x_final.shape)
        return x_final  # [batch_size, output_dim]

In [8]:
class RecurrentEncoder(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim=128, repeat_n=2):
        super(RecurrentEncoder, self).__init__()

        self.gru_256 = nn.GRU(input_dim, 256, batch_first=True)
        self.downsample_256_128 = nn.Linear(256, 128)
        self.gru_128 = nn.GRU(128, 128, batch_first=True)
        self.downsample_128_64 = nn.Linear(128, 64)
        self.gru_64 = nn.GRU(64, 64, batch_first=True)

        self.upsample_64_128 = nn.Linear(64, 128)
        self.upsample_128_256 = nn.Linear(128, 256)

        self.concat_dense = nn.Linear(256 + 128 + 64, hidden_dim)

        self.rru = nn.ModuleList([
            nn.Sequential(
                nn.LayerNorm(hidden_dim),
                nn.GRU(hidden_dim, hidden_dim, batch_first=True)
            ) for _ in range(repeat_n)
        ])

        self.output_dense = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x_256, _ = self.gru_256(x)
        x_128_input = F.relu(self.downsample_256_128(x_256))
        x_128, _ = self.gru_128(x_128_input)
        x_64_input = F.relu(self.downsample_128_64(x_128))
        x_64, _ = self.gru_64(x_64_input)

        x_128_up = F.relu(self.upsample_64_128(x_64))
        x_256_up = F.relu(self.upsample_128_256(x_128_up))

        x_concat = torch.cat([x_256, x_128, x_64], dim=-1)
        x_hidden = F.relu(self.concat_dense(x_concat))

        for i, rru_layer in enumerate(self.rru):
            residual, _ = rru_layer(x_hidden)
            x_hidden = x_hidden + residual

        output = self.output_dense(x_hidden)
        return output


In [9]:
class Projector(nn.Module):
    def __init__(self, input_dim, output_dim=32):
        super(Projector, self).__init__()

        self.downsample_1 = nn.Linear(input_dim, 256)
        self.bilstm_256 = nn.LSTM(256, 128, batch_first=True, bidirectional=True)

        self.downsample_2 = nn.Linear(256, 128)
        self.bilstm_128 = nn.LSTM(128, 64, batch_first=True, bidirectional=True)

        self.downsample_3 = nn.Linear(128, 64)
        self.bilstm_64 = nn.LSTM(64, 32, batch_first=True, bidirectional=True)

        self.concat_dense_1 = nn.Linear(256 + 128 + 64, 128)
        self.concat_dense_2 = nn.Linear(128, output_dim)

    def forward(self, x):
        #print("Input x:", x.shape)

        x_256 = F.relu(self.downsample_1(x))
        #print("Output x_256 (Downsample):", x_256.shape)

        x_256, (h_256, _) = self.bilstm_256(x_256)
        flo_256 = torch.cat([h_256[0], h_256[1]], dim=-1)
        #print("Output flo_256 (BiLSTM):", flo_256.shape)

        x_128 = F.relu(self.downsample_2(x_256))
        #print("Output x_128 (Downsample):", x_128.shape)

        x_128, (h_128, _) = self.bilstm_128(x_128)
        flo_128 = torch.cat([h_128[0], h_128[1]], dim=-1)
        #print("Output flo_128 (BiLSTM):", flo_128.shape)

        x_64 = F.relu(self.downsample_3(x_128))
        #print("Output x_64 (Downsample):", x_64.shape)

        x_64, (h_64, _) = self.bilstm_64(x_64)
        flo_64 = torch.cat([h_64[0], h_64[1]], dim=-1)
        #print("Output flo_64 (BiLSTM):", flo_64.shape)

        x_concat = torch.cat([flo_256, flo_128, flo_64], dim=-1)
        #print("Output x_concat (Concat):", x_concat.shape)

        x_hidden = F.relu(self.concat_dense_1(x_concat))
        #print("Output x_hidden (Dense):", x_hidden.shape)

        output = self.concat_dense_2(x_hidden)
        #print("Final Output projector:", output.shape)
        return output
        
class NTXentLoss(nn.Module):
    def __init__(self, temperature=0.05):
        super(NTXentLoss, self).__init__()
        self.temperature = temperature

    def forward(self, z_i, z_j, z_neg):
        z_i = F.normalize(z_i, p=2, dim=-1)
        z_j = F.normalize(z_j, p=2, dim=-1)
        z_neg = F.normalize(z_neg, p=2, dim=-1)

        # Similarità
        sim_ij = torch.matmul(z_i, z_j.T) / self.temperature  # Positiva
        sim_neg = torch.matmul(z_i, z_neg.T) / self.temperature  # Negative

        # Calcolo della loss
        numerator = torch.exp(sim_ij.diag())
        denominator = numerator + torch.sum(torch.exp(sim_neg), dim=1)

        loss = -torch.log(numerator / denominator)
        return loss.mean()

*****


In [12]:
class EEGClassificationDataset(Dataset):
    def __init__(self, file_paths, seq_len=200):
        self.samples = []
        self.labels = []
        self.seq_len = seq_len

        for file_path in file_paths:
            npz_data = np.load(file_path)
            data = npz_data['data']  # (num_samples, num_channels, seq_len)
            label = npz_data['label']  # scalare

            if isinstance(label, np.ndarray):
                label = label.item()

            # label potrebbe essere -1, 0 o 1. Li rimappiamo in 0, 1, 2.
            # Verifica che i tuoi dati seguano effettivamente questa logica.
            if label == -1:
                label = 0
            elif label == 0:
                label = 1
            elif label == 1:
                label = 2

            labels = np.full((data.shape[0],), label)

            # Cambia la forma a (num_samples, seq_len, num_channels)
            data = data.transpose(0, 2, 1)

            self.samples.extend(data)
            self.labels.extend(labels)

        class_counts = Counter(self.labels)
        print("Distribuzione delle classi nel dataset:", class_counts)
        print("Dataset creato con", len(self.samples), "segmenti.")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        data = torch.tensor(self.samples[idx], dtype=torch.float32)  # (seq_len, num_channels)
        label = int(self.labels[idx])
        return data, label


class Classifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(Classifier, self).__init__()

        self.downsample_1 = nn.Linear(input_dim, 256)
        self.bilstm_256 = nn.LSTM(256, 128, batch_first=True, bidirectional=True)

        self.downsample_2 = nn.Linear(256, 128)
        self.bilstm_128 = nn.LSTM(128, 64, batch_first=True, bidirectional=True)

        self.downsample_3 = nn.Linear(128, 64)
        self.bilstm_64 = nn.LSTM(64, 32, batch_first=True, bidirectional=True)

        self.concat_dense_1 = nn.Linear(256 + 128 + 64, 128)
        self.concat_dense_2 = nn.Linear(128, num_classes)

        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):
        x_256 = F.relu(self.downsample_1(x))
        x_256, (h_256, _) = self.bilstm_256(x_256)
        flo_256 = torch.cat([h_256[0], h_256[1]], dim=-1)

        x_128 = F.relu(self.downsample_2(x_256))
        x_128, (h_128, _) = self.bilstm_128(x_128)
        flo_128 = torch.cat([h_128[0], h_128[1]], dim=-1)

        x_64 = F.relu(self.downsample_3(x_128))
        x_64, (h_64, _) = self.bilstm_64(x_64)
        flo_64 = torch.cat([h_64[0], h_64[1]], dim=-1)

        x_concat = torch.cat([flo_256, flo_128, flo_64], dim=-1)
        x_hidden = F.relu(self.concat_dense_1(x_concat))
        output = self.concat_dense_2(x_hidden)
        return self.log_softmax(output)

def load_model(model, path, device="cuda"):
    state_dict = torch.load(path, map_location=device)
    # Rimuoviamo eventuale prefisso 'module.' se salvato con DataParallel
    if any(k.startswith('module.') for k in state_dict.keys()):
        state_dict = {k.replace('module.', ''): v for k, v in state_dict.items()}
    model.load_state_dict(state_dict)
    model.to(device)

def save_model(model, path):
    if isinstance(model, nn.DataParallel):
        model = model.module
    torch.save(model.state_dict(), path)

def train_classifier(model_type, file_paths, encoder, classifier, optimizer, epochs=1, batch_size=40, device="cuda"):
    dataset = EEGClassificationDataset(file_paths)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)

    encoder.eval()
    for param in encoder.parameters():
        param.requires_grad = False
    encoder = nn.DataParallel(encoder).to(device)

    classifier = nn.DataParallel(classifier).to(device)
    classifier.train()
    loss_fn = nn.NLLLoss()

    save_path = "./model_checkpoints"
    os.makedirs(save_path, exist_ok=True)

    for epoch in range(epochs):
        print(f"=== Inizio Epoca {epoch + 1}/{epochs} ===")
        total_loss = 0
        correct_predictions = 0
        total_predictions = 0

        for batch_idx, (data, labels) in enumerate(dataloader):
            print(f"Batch {batch_idx + 1}: Shape data: {data.shape}, Shape labels: {labels.shape}")
            data, labels = data.to(device), labels.to(device)

            batch_outputs = []
            # Itera sui canali
            for channel_idx in range(data.shape[2]):
                channel_data = data[:, :, channel_idx].unsqueeze(-1)  # (batch_size, seq_len, 1)
                with torch.no_grad():
                    channel_output = encoder(channel_data)  # (batch_size, seq_len, 4)
                batch_outputs.append(channel_output)

            # Concateniamo le feature di tutti i canali
            embeddings = torch.cat(batch_outputs, dim=-1)  # (batch_size, seq_len, 4*num_channels)
            print("Embeddings shape:", embeddings.shape)

            logits = classifier(embeddings)
            loss = loss_fn(logits, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = torch.max(logits, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

            if batch_idx % 10 == 0:
                accuracy = 100 * correct_predictions / total_predictions
                print(f"Epoca {epoch + 1}/{epochs}, Batch {batch_idx + 1}/{len(dataloader)}, Perdita: {loss.item():.4f}, Accuratezza: {accuracy:.2f}%")

        epoch_accuracy = 100 * correct_predictions / total_predictions
        print(f"Epoca {epoch + 1}/{epochs}, Perdita Totale: {total_loss:.4f}, Accuratezza: {epoch_accuracy:.2f}%")
        save_model(classifier, os.path.join(save_path, f"classifier_{model_type}_epoch_{epoch + 1}.pt"))


Distribuzione delle classi nel dataset: Counter({2: 6805, 0: 6450, 1: 6242})
Dataset creato con 19497 segmenti.
=== Inizio Epoca 1/1 ===
Batch 1: Shape data: torch.Size([40, 200, 20]), Shape labels: torch.Size([40])
Embeddings shape: torch.Size([40, 200, 80])
Epoca 1/1, Batch 1/487, Perdita: 1.1038, Accuratezza: 32.50%
Batch 2: Shape data: torch.Size([40, 200, 20]), Shape labels: torch.Size([40])
Embeddings shape: torch.Size([40, 200, 80])
Batch 3: Shape data: torch.Size([40, 200, 20]), Shape labels: torch.Size([40])
Embeddings shape: torch.Size([40, 200, 80])
Batch 4: Shape data: torch.Size([40, 200, 20]), Shape labels: torch.Size([40])
Embeddings shape: torch.Size([40, 200, 80])
Batch 5: Shape data: torch.Size([40, 200, 20]), Shape labels: torch.Size([40])
Embeddings shape: torch.Size([40, 200, 80])
Batch 6: Shape data: torch.Size([40, 200, 20]), Shape labels: torch.Size([40])
Embeddings shape: torch.Size([40, 200, 80])
Batch 7: Shape data: torch.Size([40, 200, 20]), Shape labels: to

In [None]:
if __name__ == "__main__":
    folder_path = "/kaggle/working/train_set"
    file_paths = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.npz')]

    # Assicurati che la classe RecurrentEncoder sia definita altrove o importata
    encoder = RecurrentEncoder(input_dim=1, output_dim=4)
    load_model(encoder, "/kaggle/input/rrrrrrrrr/pytorch/default/1/Recurrent_epoch_1.pth", device="cuda")

    # Il numero di canali è 20 e l'encoder produce 4 feature per canale: input_dim=4*num_channels=4*20=80
    classifier = Classifier(input_dim=4 * 20, num_classes=3)
    optimizer = torch.optim.Adam(classifier.parameters(), lr=1e-4)

    train_classifier("Recurrent", file_paths, encoder, classifier, optimizer, epochs=1, batch_size=40, device="cuda")

In [None]:
#Esecuzione Convolutional Encoder CLASSIFICATORE
# ------------------- Esecuzione -------------------
if __name__ == "__main__":
    folder_path = "/kaggle/working/train_set"
    file_paths = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.npz')]

    # Modelli
    encoder = ConvolutionalEncoder(input_channels=1, output_dim=4)
    load_model(encoder, "/kaggle/working/models/Convolutional_epoch_40.pth", device="cuda")
    
    classifier = Classifier(input_dim=4 * 20, num_classes=3)
    optimizer = torch.optim.Adam(classifier.parameters(), lr=1e-4)

    # Addestramento del classificatore
    train_classifier("Convolutional", file_paths, encoder, classifier, optimizer, epochs=120, batch_size=4, device="cuda")

**EVALUATION**

In [14]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from collections import Counter

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

class EEGClassificationDataset(Dataset):
    def __init__(self, file_paths, seq_len=200):
        self.samples = []
        self.labels = []
        self.seq_len = seq_len

        for file_path in file_paths:
            npz_data = np.load(file_path)
            data = npz_data['data']  # (num_samples, num_channels, seq_len)
            label = npz_data['label']  # scalare

            if isinstance(label, np.ndarray):
                label = label.item()

            # Mappiamo i label: -1 -> 0, 0 -> 1, 1 -> 2 (adatta se necessario)
            if label == -1:
                label = 0
            elif label == 0:
                label = 1
            elif label == 1:
                label = 2

            labels = np.full((data.shape[0],), label)

            # Cambia la forma a (num_samples, seq_len, num_channels)
            data = data.transpose(0, 2, 1)

            self.samples.extend(data)
            self.labels.extend(labels)

        class_counts = Counter(self.labels)
        print("Distribuzione delle classi nel dataset (test):", class_counts)
        print("Test set creato con", len(self.samples), "segmenti.")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        data = torch.tensor(self.samples[idx], dtype=torch.float32)  # (seq_len, num_channels)
        label = int(self.labels[idx])
        return data, label

class Classifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(Classifier, self).__init__()

        self.downsample_1 = nn.Linear(input_dim, 256)
        self.bilstm_256 = nn.LSTM(256, 128, batch_first=True, bidirectional=True)

        self.downsample_2 = nn.Linear(256, 128)
        self.bilstm_128 = nn.LSTM(128, 64, batch_first=True, bidirectional=True)

        self.downsample_3 = nn.Linear(128, 64)
        self.bilstm_64 = nn.LSTM(64, 32, batch_first=True, bidirectional=True)

        self.concat_dense_1 = nn.Linear(256 + 128 + 64, 128)
        self.concat_dense_2 = nn.Linear(128, num_classes)

        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):
        x_256 = F.relu(self.downsample_1(x))
        x_256, (h_256, _) = self.bilstm_256(x_256)
        flo_256 = torch.cat([h_256[0], h_256[1]], dim=-1)

        x_128 = F.relu(self.downsample_2(x_256))
        x_128, (h_128, _) = self.bilstm_128(x_128)
        flo_128 = torch.cat([h_128[0], h_128[1]], dim=-1)

        x_64 = F.relu(self.downsample_3(x_128))
        x_64, (h_64, _) = self.bilstm_64(x_64)
        flo_64 = torch.cat([h_64[0], h_64[1]], dim=-1)

        x_concat = torch.cat([flo_256, flo_128, flo_64], dim=-1)
        x_hidden = F.relu(self.concat_dense_1(x_concat))
        output = self.concat_dense_2(x_hidden)
        return self.log_softmax(output)

def load_model(model, path, device="cuda"):
    state_dict = torch.load(path, map_location=device)
    if any(k.startswith('module.') for k in state_dict.keys()):
        state_dict = {k.replace('module.', ''): v for k, v in state_dict.items()}
    model.load_state_dict(state_dict)
    model.to(device)

def evaluate_classifier(file_paths, encoder, classifier, batch_size=40, device="cuda"):
    dataset = EEGClassificationDataset(file_paths)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, drop_last=False)

    encoder.eval()
    classifier.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for data, labels in dataloader:
            data, labels = data.to(device), labels.to(device)

            batch_outputs = []
            for channel_idx in range(data.shape[2]):
                channel_data = data[:, :, channel_idx].unsqueeze(-1)  # (batch_size, seq_len, 1)
                channel_output = encoder(channel_data)  # (batch_size, seq_len, 4)
                batch_outputs.append(channel_output)

            embeddings = torch.cat(batch_outputs, dim=-1)  # (batch_size, seq_len, 4*num_channels)
            logits = classifier(embeddings)
            _, predicted = torch.max(logits, 1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calcolo metriche
    acc = accuracy_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)
    report = classification_report(all_labels, all_preds, digits=4)

    print("=== Risultati sul Test Set ===")
    print("Accuratezza:", acc)
    print("Matrice di Confusione:")
    print(cm)
    print("Report di Classificazione (Precision, Recall, F1-Score):")
    print(report)


Distribuzione delle classi nel dataset (test): Counter({1: 1873, 0: 1665, 2: 1310})
Test set creato con 4848 segmenti.
=== Risultati sul Test Set ===
Accuratezza: 0.5233085808580858
Matrice di Confusione:
[[711 258 696]
 [444 835 594]
 [ 45 274 991]]
Report di Classificazione (Precision, Recall, F1-Score):
              precision    recall  f1-score   support

           0     0.5925    0.4270    0.4963      1665
           1     0.6108    0.4458    0.5154      1873
           2     0.4345    0.7565    0.5519      1310

    accuracy                         0.5233      4848
   macro avg     0.5459    0.5431    0.5212      4848
weighted avg     0.5569    0.5233    0.5187      4848



In [None]:
if __name__ == "__main__":
    test_folder_path = "/kaggle/working/test_set"
    test_file_paths = [os.path.join(test_folder_path, f) for f in os.listdir(test_folder_path) if f.endswith('.npz')]
    
    # Creazione del dataset
    print("Creazione del dataset...")
    evaluation_dataset = EEGEvaluationDataset(file_paths, chunk_size=4000)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Carica i modelli già addestrati
    encoder = RecurrentEncoder(input_dim=1, output_dim=4)  # Definito altrove
    classifier = Classifier(input_dim=4 * 20, num_classes=3)  # Adatta se necessario

    load_model(encoder, "/kaggle/input/rrrrrrrrr/pytorch/default/1/Recurrent_epoch_1.pth", device=device)
    load_model(classifier, "/kaggle/working/model_checkpoints/classifier_Recurrent_epoch_1.pt", device=device)

    evaluate_classifier(test_file_paths, encoder, classifier, batch_size=40, device=device)

In [None]:




    # Percorsi per i modelli convolutional
    conv_classifier_path = "/kaggle/input/con_classify_120/pytorch/default/1/classifier_Convolutional_epoch_120.pth"
    conv_encoder_path = "/kaggle/input/encoder/pytorch/default/1/Convolutional_epoch_40.pth"

    # Percorsi per i modelli recurrent
    recurrent_classifier_path = "/kaggle/input/encoder_classifier_100/pytorch/default/1/classifier_Recurrent_epoch_100.pth"
    recurrent_encoder_path = "/kaggle/input/encoder/pytorch/default/1/Recurrent_epoch_40.pth"
        
    # Valutazione del modello convolutional
    print("\n=== Valutazione del modello Convolutional ===")
    conv_encoder = ConvolutionalEncoder(input_channels=1, output_dim=4)
    conv_classifier = Classifier(input_dim=4, num_classes=5)
    load_model(conv_encoder, conv_encoder_path, device="cuda")
    load_model(conv_classifier, conv_classifier_path, device="cuda")
    evaluate_classifier(evaluation_dataset, conv_encoder, conv_classifier, batch_size=30, device="cuda")

    # Valutazione del modello recurrent
    print("\n=== Valutazione del modello Recurrent ===")
    recurrent_encoder = RecurrentEncoder(input_dim=1, output_dim=4)
    recurrent_classifier = Classifier(input_dim=4, num_classes=5)
    load_model(recurrent_encoder, recurrent_encoder_path, device="cuda")
    load_model(recurrent_classifier, recurrent_classifier_path, device="cuda")
    evaluate_classifier(evaluation_dataset, recurrent_encoder, recurrent_classifier, batch_size=10, device="cuda")