In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import matthews_corrcoef
import numpy as np
import pandas as pd

In [2]:
def create_one_hot_sets(dataset):
    sets = ["1","2","3","4","5","Benchmark"]
    set_results=[]
    for i in sets:
        st=dataset.query(f"Set == '{i}'")
        tmp_x=[]
        tmp_y=[]
        for _ , row in st.iterrows():
            seq=row["Sequence"]
            if len(seq) < 90:
                seq=_increase_lenseq(seq)
            else:
                seq=seq[:90]
            encoded_seq = one_hot_encoding(seq)
            tmp_x.append(encoded_seq)
            
            if row["Class"] == "Positive":
                y=1
            else:
                y=0
            tmp_y.append(y)

        tmp_x=np.array(tmp_x, dtype=np.float32)
        tmp_y=np.array(tmp_y, dtype=np.float32)
        set_results.append((tmp_x , tmp_y))
    return set_results
            
def _increase_lenseq(seq):
    x=len(seq)
    num_of_X= 90-x
    seq=seq+("X"*num_of_X)
    return seq
            
def one_hot_encoding(sequence):
    M = []
    aa_alph = ['A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'Y','X']
    for aa in sequence:
        one_hot = np.zeros(21)
        try:
            index = aa_alph.index(aa)
            one_hot[index] = 1
        except:
            pass
        M.append(one_hot)
    M = np.array(M)
    return M

In [3]:

class SP_NN(nn.Module):
    def __init__(self, input_size, hidden_sizes,lstm_hidden_size, num_lstm_layers, output_size, dropout_p=0.5):
        super(SP_NN, self).__init__()
# 1. LSTM (Parte fissa che estrae le features temporali)
        # input_size = 21 (One-Hot)
        self.cnn_out_channels = 64
        self.conv1 = nn.Conv1d(input_size, self.cnn_out_channels, kernel_size=17, padding='same')
        self.lstm = nn.LSTM(self.cnn_out_channels, lstm_hidden_size, num_lstm_layers, 
                            batch_first=True, dropout=dropout_p if num_lstm_layers > 1 else 0)
        
        # 2. Batch Norm (Stabilizza l'output dell'LSTM)
        self.bn = nn.BatchNorm1d(lstm_hidden_size)
        
        # 3. COSTRUZIONE DINAMICA DELL'MLP (La parte che hai chiesto)
        mlp_layers = []
        
        # ATTENZIONE: L'input dell'MLP è l'output dell'LSTM!
        current_input_size = lstm_hidden_size 
        
        # Ciclo dinamico preso dal tuo vecchio codice
        for hidden_size in hidden_sizes:
            # Layer Lineare
            mlp_layers.append(nn.Linear(current_input_size, hidden_size))
            # Attivazione
            mlp_layers.append(nn.ReLU())
            # Dropout
            mlp_layers.append(nn.Dropout(p=dropout_p))
            # Aggiorna dimensione
            current_input_size = hidden_size
            
        # Layer finale di output (riduzione a 1)
        mlp_layers.append(nn.Linear(current_input_size, output_size))
        
        # Sigmoide finale (Se usi BCELoss. Se usi BCEWithLogitsLoss, toglilo!)
        mlp_layers.append(nn.Sigmoid()) 
        
        # Impacchetta tutto nel Sequential
        self.mlp = nn.Sequential(*mlp_layers)

    def forward(self, x):
        x = x.permute(0, 2, 1) 
        x = self.conv1(x)       # Esce [Batch, 64, 90]
        
        # --- PASSAGGIO LSTM ---
        # L'LSTM vuole [Batch, Lunghezza, Canali] -> permutiamo indietro
        x = x.permute(0, 2, 1)
        # Forward propagate LSTM
        out, _ = self.lstm(x)
        out = out[:, -1, :] # Prendi solo l'output finale (il "riassunto" dopo aver letto tutta la sequenza durante l'lstm) in pratica trasforma il vettore Batch,90,Hidden in Batch,Hidden. In sintesi fa si che l'output sia definito al 90-esimo timestamp, ovvero quando lstm ha letto tutti e 90 gli aminaocidi e ha formulato l'ipotesi con la conoscenza dei 90 precedenti aminaoccidi.
        out = self.bn(out)

        # Decode the hidden state of each time step
        out = self.mlp(out)
        return out

# Define a custom dataset
class SignalDataset(Dataset): #prepara i dati convertendoli in tensori
    def __init__(self, X, y):
        # Tieni i dati come sono (Numpy o Liste). NON convertirli subito.
        # Questo non occupa memoria extra.
        self.X = X 
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # 1. Converti in Tensore SOLO quando il dato viene richiesto
        #    Questo risparmia tantissima RAM.
        x_out = torch.from_numpy(self.X[idx]).float() 
        
        # 2. Gestione Etichetta (Label)
        #    .view(1) o .unsqueeze(0) serve per trasformare lo scalare "0" in un vettore "[0]"
        #    Questo evita errori con la BCELoss che si aspetta dimensioni compatibili.
        y_out = torch.tensor(self.y[idx], dtype=torch.float32).view(1)
        
        return x_out, y_out


def train_val(model, #è il modello da addestrare
              train_loader, #i dati da studiare, diviso in batch
              val_loader, #il test da fare a fine  di ogni studio
              optimizer, #metodo di studio (adam, RMSprop ecc.. dice al modlelo come aggiornare le sue consocenze).
              criterion, #il correttore, che dice al modello di quanto ha sbagliato
              epochs, #quante volte il modello rileggerà i dati per impararne
              patience, #quante volte il modello può fare un esame di prova peggiore del precedente prima di interrompere le epoche in anticipo
              scorer = matthews_corrcoef,
              init_best_score = -1,
              output_transform = lambda x: (x > 0.5).float()): #come tradurre la probabilità del modello, praticamente trasforma i valori in 1 e 0
  best_val_score = init_best_score #inizializza il miglior punteggio
  epochs_without_improvement = 0 #contatore della patience utile per vedere quante volte di fila non migliora
  best_model_state_dict = None #prepara il cassetto dove inserirci il modello che ha performato meglio

  for epoch in range(epochs): #ripeti il processo per epoche volte. 
      # Training
      model.train()  #inizializzi il modello vuoto da allenare
      loss = 0 #inizializzi la variabile per la loss
      for batch_X, batch_y in train_loader: #questo for itera su tutti i batches
          batch_X, batch_y = batch_X.to(device), batch_y.to(device) #sposta eventualmente i dati del batch sulla gpu se disponibile per fare i calcoli piu velocemente
          optimizer.zero_grad() #azzera l'optimizer che era stato utilizzato per il batch precedente
          outputs = model(batch_X) #il modello legge il batch x e produce le risposte
          loss = criterion(outputs, batch_y) # il correttore calcola il singolo numero di errore confrontando le risposte date dal modello con quelle del batch y
          loss.backward() #funzione di pytorch che  si guarda quanto ogni peso ha contribuito a quell'errore  tramite il calcolo del gradiente quindi dice di quanto un peso deve scendere o salire.
          torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
          optimizer.step() #prende i calcoli della backward e aggiorna fisicamente i pesi del cervello per ridurre l'errore

      # Validation
      model.eval() #è cruciale perche mette il modello in fase di valutazione, spegnendo il dropout, ovvero quello che spegneva neuroni a caso per evitare overfitting
      val_preds = []
      val_labels = []
      with torch.no_grad(): #dice a pytorch di non calcolare gradienti, poiche siamo in fase di valutazione, rendendo il tutto piu veloce e consumando meno memoria
          for batch_X, batch_y in val_loader: #itera su tutti i batch del validation
              batch_X, batch_y = batch_X.to(device), batch_y.to(device)
              outputs = model(batch_X)
              #preds = (outputs > 0.5).float() #qui invece utilizzi direttamente questo modo per trasformare gli output in 0 e 1
              preds = output_transform(outputs) #utilizza il metodo di traformazione conenuto in output _transform permettendolo di variare a piacimento
              val_preds.extend(preds.cpu().numpy().flatten()) #aggiunge le risposte alle liste
              val_labels.extend(batch_y.cpu().numpy().flatten())
      val_score = scorer(val_labels, val_preds) #calcola il punteggio MCC alla fine di ogni test

      if val_score > best_val_score:
          best_val_score = val_score
          epochs_without_improvement = 0
          best_model_state_dict = model.state_dict()
          print('Validation score improved to {:.4f}'.format(best_val_score))
      else:
          epochs_without_improvement += 1
          if epochs_without_improvement >= patience:
              print('Early stopping at epoch {}'.format(epoch+1))
              break

      print('Epoch [{}/{}], Loss: {:.4f}, Val score: {:.4f}'.format(epoch+1, epochs, loss.item(), val_score))
  return best_model_state_dict

def test(model, test_loader, scorer = matthews_corrcoef, output_transform = lambda x: (x > 0.5).float()):
  model.eval()
  all_preds = []
  all_labels = []
  with torch.no_grad():
      for batch_X, batch_y in test_loader:
          batch_X, batch_y = batch_X.to(device), batch_y.to(device)
          outputs = model(batch_X)
          preds = output_transform(outputs)
          all_preds.extend(preds.cpu().numpy().flatten())
          all_labels.extend(batch_y.cpu().numpy().flatten())

  score = scorer(all_labels, all_preds)
  return score


if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU is not available")

GPU is available


In [4]:
config= {'num_layers': 4, 'hidden_sizes': [256, 128, 64, 1024], 'dropout': 0.4980673167779849, 'lr': 0.00028585527498522286, 'batch_size': 20, 'num_lstm_layers': 2, 'lstm_hidden_size': 128}

In [5]:
# 1. Carica tutto il CSV
dataset = pd.read_csv("../Data_Preparation/train_bench.tsv", sep="\t")

# 2. Elabora tutto in una volta (La lista conterrà 6 elementi ordinati)
all_data = create_one_hot_sets(dataset)

In [7]:
# 2. PREPARAZIONE TRAINING (Concatenare 3 set)
training_indices=[0,1,2,3]
validation_index=4
testing_index=5
# Raccogliamo le X dei 3 set di training
train_x_list = [all_data[j][0] for j in training_indices]
# Raccogliamo le y dei 3 set di training
train_y_list = [all_data[j][1] for j in training_indices]

# Uniamo tutto in un unico array gigante
x_train_conc = np.concatenate(train_x_list, axis=0)
y_train_conc = np.concatenate(train_y_list, axis=0)

# 3. PREPARAZIONE VALIDATION & TEST (Singoli set)
x_val = all_data[validation_index][0]
y_val = all_data[validation_index][1]

x_test = all_data[testing_index][0]
y_test = all_data[testing_index][1]

# 4. CREAZIONE DATASET (Usa la tua classe SignalDataset lazy)
# Nota: Non serve trasformare in tensori qui, lo fa il Dataset dentro __getitem__
train_dataset = SignalDataset(x_train_conc, y_train_conc)
val_dataset = SignalDataset(x_val, y_val)
test_dataset = SignalDataset(x_test, y_test)

# 5. DATALOADERS
train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config["batch_size"])
test_loader = DataLoader(test_dataset, batch_size=config["batch_size"])
# --- MODELLO ---
model = SP_NN(
input_size=21, 
hidden_sizes=config["hidden_sizes"],
lstm_hidden_size=config["lstm_hidden_size"],
num_lstm_layers=config["num_lstm_layers"],
output_size=1,
dropout_p=config["dropout"]
).to(device)

optimizer = optim.Adam(model.parameters(), lr=config["lr"])
criterion = nn.BCELoss()

# Training (Verbose False per pulizia)
best_state = train_val(model, train_loader, val_loader, optimizer, criterion,
                   epochs=100, patience=20)

model.load_state_dict(best_state)

mcc = test(model, test_loader)
print("MCC on benchmark set:", mcc)


Validation score improved to 0.0000
Epoch [1/100], Loss: 0.5829, Val score: 0.0000
Epoch [2/100], Loss: 0.1066, Val score: 0.0000
Epoch [3/100], Loss: 0.2617, Val score: 0.0000
Epoch [4/100], Loss: 0.2689, Val score: 0.0000
Epoch [5/100], Loss: 0.1968, Val score: 0.0000
Epoch [6/100], Loss: 0.4944, Val score: 0.0000
Epoch [7/100], Loss: 0.1981, Val score: 0.0000
Epoch [8/100], Loss: 0.3146, Val score: 0.0000
Epoch [9/100], Loss: 0.2615, Val score: 0.0000
Epoch [10/100], Loss: 0.1170, Val score: 0.0000
Epoch [11/100], Loss: 0.1376, Val score: 0.0000
Validation score improved to 0.7968
Epoch [12/100], Loss: 0.1510, Val score: 0.7968
Epoch [13/100], Loss: 0.0895, Val score: 0.7873
Validation score improved to 0.8740
Epoch [14/100], Loss: 0.0393, Val score: 0.8740
Epoch [15/100], Loss: 0.0566, Val score: 0.8670
Epoch [16/100], Loss: 0.0476, Val score: 0.8596
Epoch [17/100], Loss: 0.0273, Val score: 0.8133
Epoch [18/100], Loss: 0.1226, Val score: 0.7029
Validation score improved to 0.9024
E