In [1]:
import torch #pytorch
from torchvision import datasets, transforms # importo da sets come MNIST e funzioni per trasformazioni di immagini
from torch.utils.data import TensorDataset, DataLoader #datset e per caricare dati in batch
import torch.nn as nn #classi per reti neurali
import torch.nn.functional as F #relu, Cross entropy loss etc
from sklearn.model_selection import train_test_split #split dei dati
from sklearn.decomposition import PCA #PCA per riduzione dimensionale
from sklearn.datasets import fetch_openml #scaricare i dati direttamente da OpenML
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm #color map
import time
import pandas as pd

#K neighbors classifier e regressor
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn import neighbors

#random forest
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix

#tsne
from sklearn.manifold import TSNE

In [3]:
#lavoro con la GPU se possibile
device='cuda' if torch.cuda.is_available() else 'cpu'
print(f"Computation device: {device}")

Computation device: cuda


In [None]:
#Plot di figure (es. 2x2)
fig,axs= plt.subplots(2, 2, figsize=(8, 8))

for i, ax in enumerate(axs.flat):
    ax.imshow(images[i], cmap='viridis')  # Usa una colormap a tua scelta
    ax.set_title(f'Immagine {i+1}')
    ax.axis('off')

plt.suptitle("titolo del plot")
plt.tight_layout()  # Ottimizza lo spazio tra i subplot
plt.show()

In [None]:
#Istogramma
plt.hist(data, bins=30, color='skyblue', edgecolor='black')
plt.title('Istogramma dei dati')
plt.xlabel('Valori')
plt.ylabel('Frequenza')
plt.show()

In [None]:
images_reshaped = images.reshape(-1, 28, 28) #reshape da (n dati, dimensione)-> (-1, dim1, dim2)

# Suddivisione in dataset
X_train, X_2, Y_train, Y_2=train_test_split(X_1, Y_1, train_size=7000, random_state=0) #7000 train 4000 resto
X_val, X_test, Y_val, Y_test=train_test_split(X_2, Y_2, test_size=0.5, random_state=0) #2000 test 2000 validation


#per cnn creo di dataloader
# Conversione in tensori uso una funzione per alleggerire la riscrittura
def to_tensor(x, y):
    x_tensor=torch.tensor(x, dtype=torch.float32).reshape(-1, 1, 28, 28) #reshape per usare la CNN (evento, channel, h size, v size)
    y_tensor=torch.tensor(y, dtype=torch.long) #etichette a 64 bit per interi
    return TensorDataset(x_tensor, y_tensor)

#creo i dataloader
train_loader=DataLoader(to_tensor(X_train, Y_train), batch_size=64, shuffle=True)
val_loader=DataLoader(to_tensor(X_val, Y_val), batch_size=256)
test_loader= DataLoader(to_tensor(X_test, Y_test), batch_size=256)

In [None]:
#K Neighbors classifier
model=neighbors.KNeighborsClassifier(n_neighbors=5)
model.fit(train[features], train['Label']) #se non ho dati pandass usa train_features del tipo (n samples, n features)

accuracy_train=model.score(train[features], train['Label'])
accuracy_test=model.score(test[features], test['Label'])

#K neighbors regressor
model2= neighbors.KNeighborsRegressor(n_neighbors=5)
model2.fit(train[features].values, train['Label'].values) #.values se sto usando dati pandas

pred_train= model2.predict(train[features].values)
pred_test=model2.predict(test[features].values)

In [None]:
#Crea il modello RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, max_depth=None, min_samples_split=2, min_samples_leaf=1, random_state=42)
#numero di alberi, profondità massima degli alberi, numero minimo di campioni richiesti per dividere un nodo, numero minimo di campioni richiesti per essere una foglia, per riproducibilià dei risultato
model.fit(X_train, y_train) #alleno il modello
y_pred = model.predict(X_test) #predizioni
accuracy = accuracy_score(y_test, y_pred) #accuracy score

#confusion matrix
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d')
plt.title("Confusion Matrix - Random Forest")
plt.show()

In [None]:
pca = PCA(n_components=2)  # riduciamo a 2 dimensioni
X_pca = pca.fit_transform(X)

# Visualizzazione
plt.figure(figsize=(10, 7))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='viridis', alpha=0.7)
plt.colorbar(scatter)
plt.title("PCA - (2 componenti principali)")
plt.xlabel("PCA[0]")
plt.ylabel("PCA[1]")
plt.show()


In [None]:
#tsne
tsne=TSNE(n_components=2, perplexity=30, random_state=42)
X_tsne= tsne.fit_transform(X)

# Visualizzazione
plt.figure(figsize=(10, 7))
scatter= plt.scatter(X_tsne[:, 0], X_tsne[:, 1], c=y, cmap='viridis', alpha=0.7)
plt.colorbar(scatter)
plt.title("t-SNE - Digits Dataset")
plt.xlabel("t-SNE 1")
plt.ylabel("t-SNE 2")
plt.show()

In [None]:
#definisco la CNN
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1=nn.Conv2d(1, 32, kernel_size=3, padding=1) #28->14, filtro 3x3
        self.pool=nn.MaxPool2d(2, 2)
        self.conv2=nn.Conv2d(32, 64, kernel_size=3, padding=1) #14->7
        self.fc1= nn.Linear(64*7*7, 64) #7->64
        self.fc2= nn.Linear(64, 2)  # 64-> 2 neuroni per pari o dispari

    def forward(self, x):
        x= F.relu(self.conv1(x))  # Conv → ReLU
        x= self.pool(x)
        x=F.relu(self.conv2(x))
        x= self.pool(x)
        x=x.view(-1, 64*7*7)
        x=F.relu(self.fc1(x))
        x=self.fc2(x)
        return x

model= CNN()
print(model)

from torchsummary import summary
if torch.cuda.is_available():
  summary(model.cuda(), input_size=(1,28,28))
else:
  summary(model, input_size=(1,28,28))

In [None]:
def accuracy(y_pred, y_true):
  y_pred= torch.argmax(y_pred, axis=1)
  y_true= torch.argmax(y_true, axis=1)

  correct=(y_pred==y_true).sum().item()

  total= len(y_true)
  acc= correct/ total
  return acc

In [None]:
#training
num_epochs =120
patience= 20 #per early stopping

best_val_acc= 0.0 #salvo la miglior validation accuracy
epochs_since_best_val_acc=0 #salvo la miglior epoch con la migliore accuracy

train_curve=[]
val_curve=[]
lr_curve=[]
pesi=[]

# Train loop
for epoch in range(num_epochs):
    model.train()
    tmp_loss=0

    for batch_idx, (data, target) in enumerate(train_loader):
        output=model(data)
        loss=criterion(output, target)
        optimizer.zero_grad()# clear the previous gradients
        loss.backward()# compute gradient of loss
        optimizer.step()# update the weigths
        tmp_loss += loss.detach().numpy()

    if epoch>20:
     scheduler.step()

    pesi.append({parametro: tensore.clone().detach() for parametro, tensore in model.state_dict().items()}) #salvo i pesi
    lr_curve.append(optimizer.param_groups[0]['lr'])
    train_curve.append(tmp_loss/len(train_loader))

    # Validation
    model.eval() # the validation step do NOT change the parameters
    with torch.no_grad():
        val_acc=0.0
        val_total= 0
        val_loss=0
        for data, target in val_loader:
            output=model(data)
            val_loss+=criterion(output, target).item()
            val_acc+=accuracy(output, target)

        val_acc/=len(val_loader)
        val_loss/=len(val_loader)
        val_curve.append(val_loss)

        print(f"Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}, lr: {optimizer.param_groups[0]['lr']:.3e}")

        # Check if the validation accuracy has improved
        if val_acc>best_val_acc:
            best_val_acc=val_acc
            epochs_since_best_val_acc=0
            best_weights=model.state_dict()
            torch.save(best_weights, 'results/best_weights.pth')
            print("Best!")
            best_epoch=epoch
        else:
            epochs_since_best_val_acc += 1

        # Check if early stopping is necessary
        if epochs_since_best_val_acc>=patience:
            print("Early stopping!")
            break

plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('BCE Loss')
plt.legend()
plt.title(f'Training and Validation Loss - {title}')
plt.grid(True)
plt.show()



In [None]:
#train
def train_model(model, train_loader, val_loader, epochs=20):
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

  # Training loop con checkpoint
  train_losses = []
  val_losses = []
  best_val_loss = float('inf')
  best_model_path = "best_model.pt"

  for epoch in range(epochs):
      model.train()
      running_loss = 0.0
      for xb, yb in train_loader:

          optimizer.zero_grad()
          output = model(xb)
          loss = criterion(output, yb)
          loss.backward()
          optimizer.step()
          running_loss += loss.item() * xb.size(0)
      train_loss = running_loss / len(train_loader.dataset)

      model.eval()

      with torch.no_grad():
          val_loss = sum(criterion(model(xb.to("cuda")), yb.to("cuda")).item() * xb.size(0)
                        for xb, yb in val_loader) / len(val_loader.dataset)

      train_losses.append(train_loss)
      val_losses.append(val_loss)

      print(f"Epoch {epoch+1:2d}/{epochs} - Train Loss: {train_loss:.4f} - Val Loss: {val_loss:.4f}")

      if val_loss < best_val_loss:
          best_val_loss = val_loss
          torch.save(model.state_dict(), best_model_path)
          print("Model saved!")

  # Carica miglior modello
  model.load_state_dict(torch.load(best_model_path))
  return model, train_losses, val_losses

In [None]:

# Training per 20 epoche e salvo i pesi usati
pesi1 = []
for epoch in range(20):
  start_time = time.time()
  model.train()
  for batch in train_loader:
      x, y = batch
      optimizer.zero_grad()
      output = model(x)
      loss = criterion(output, y)
      loss.backward()
      optimizer.step()
  end_time = time.time()
  elapsed = end_time - start_time
  #salvo i pesi
  pesi1.append({parametro: tensore.clone().detach() for parametro, tensore in model.state_dict().items()})
  print(f"Epoch {epoch+1} completata in {elapsed:.2f} secondi")


#Validation
for epoch in range(20):
    start_time = time.time()
    # Fase di validazione
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    val_loss_q2=[]
    with torch.no_grad():
        for batch in val_loader:
            X_val, Y_val = batch
            output_val = model(X_val)
            loss_val = criterion(output_val, Y_val)
            val_loss += loss_val.item()
            _, predicted = torch.max(output_val, 1)
            correct += (predicted == Y_val).sum().item()
            total += Y_val.size(0)

    accuracy = correct / total * 100
    avg_val_loss = val_loss / len(val_loader)
    val_loss_q2.append(avg_val_loss)



    end_time = time.time()
    elapsed = end_time - start_time
    print(f"Epoch {epoch+1:02d} completata in {elapsed:.2f}s | Val Loss: {avg_val_loss:.4f} | Val Accuracy: {accuracy:.2f}%")
