CLASSIFICATORE LESIONI PIGMENTATE

In [None]:
import pandas as pd
import os
import shutil
import zipfile
from sklearn.utils import shuffle
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from torch.utils.data import DataLoader
from torchvision import models, transforms, datasets
from sklearn.metrics import confusion_matrix, roc_curve
import numpy as np

In [2]:
# Chiede all'utente di inserire il percorso del file CSV da caricare
csv_path = input("Inserisci il percorso del file CSV: ")

# Legge il file CSV e lo salva in un DataFrame
df = pd.read_csv(csv_path)

# Filtra il dataset tenendo solo le righe in cui la diagnosi è 'Benign' o 'Malignant'
# (esclude quindi i casi con diagnosi 'Indifferent')
df_filtrato = df[df['diagnosis_1'].isin(['Benign', 'Malignant'])]

# Conta quante righe hanno la diagnosi 'Malignant'
n_malignant = df[df['diagnosis_1'] == 'Malignant'].shape[0]

# Campiona lo stesso numero di righe 'Benign'
benign_sample = df[df['diagnosis_1'] == 'Benign'].sample(n=n_malignant, random_state=42)

# Prende tutte le righe 'Malignant'
malignant_all = df[df['diagnosis_1'] == 'Malignant']

# Unisce i due insiemi (benigni campionati + tutti i maligni)
df_balanced = pd.concat([benign_sample, malignant_all]).sample(frac=1, random_state=42).reset_index(drop=True)

# Controlla quante righe di 'Benign' e 'Malignant' ci sono nel dataset bilanciato
print(df_balanced['diagnosis_1'].value_counts())

diagnosis_1
Malignant    2156
Benign       2156
Name: count, dtype: int64


In [3]:
# Chiede all'utente di inserire il percorso del file ZIP da caricare
zip_path = input("Inserisci il percorso della ZIP: ")

# Cartella di estrazione
extract_folder = "images"

# Cancella la cartella se esiste già
if os.path.exists(extract_folder):
    shutil.rmtree(extract_folder)

# Crea la cartella di estrazione
os.makedirs(extract_folder)

# Estrae lo ZIP
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_folder)

# Rimuove la colonna 'image_path' da df_balanced se esiste
if 'image_path' in df_balanced.columns:
    df_balanced = df_balanced.drop(columns=['image_path'])

# Crea la colonna 'image_path'
df_balanced['image_path'] = df_balanced['isic_id'].apply(lambda x: os.path.join(extract_folder, f"{x}.jpg"))

In [None]:
# Parametri
base_dir = "dataset_prepared"
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15
splits = ['train', 'validation', 'test']
classes = ['Benign', 'Malignant']

# Pulizia cartella
if os.path.exists(base_dir):
    shutil.rmtree(base_dir)

# Crea cartelle
for split in splits:
    for cls in classes:
        os.makedirs(os.path.join(base_dir, split, cls), exist_ok=True)

# Divisione 70/15/15 da df_balanced
for cls in classes:
    subset = shuffle(df_balanced[df_balanced['diagnosis_1'] == cls], random_state=42)  # prende solo le immagini della classe corrente e le mescola casualmente

    n_total = len(subset)
    n_train = int(n_total * train_ratio)
    n_val = int(n_total * val_ratio)
    n_test = n_total - n_train - n_val  # assicura che la somma dia n_total
    
    train_df = subset.iloc[:n_train]  # prime n_train righe → train
    val_df = subset.iloc[n_train:n_train + n_val] 
    test_df = subset.iloc[n_train + n_val:]

    # Copia immagini per TRAIN
    for _, row in train_df.iterrows():
        src = row['image_path']  # percorso dell'immagine originale
        dst = os.path.join(base_dir, 'train', cls, os.path.basename(src))  # percorso di destinazione
        shutil.copy(src, dst)

    # Copia immagini per VALIDATION
    for _, row in val_df.iterrows():
        src = row['image_path']
        dst = os.path.join(base_dir, 'validation', cls, os.path.basename(src))
        shutil.copy(src, dst)

    # Copia immagini per TEST
    for _, row in test_df.iterrows():
        src = row['image_path']
        dst = os.path.join(base_dir, 'test', cls, os.path.basename(src))
        shutil.copy(src, dst)

# Stampa quante immagini sono contenute in ogni classe di ogni split
for split in splits:
    print(f"{split.upper()}:")
    for cls in classes:
        path = os.path.join(base_dir, split, cls)
        print(f"{cls}: {len(os.listdir(path))} immagini")

TRAIN:
Benign: 1509 immagini
Malignant: 1509 immagini
VALIDATION:
Benign: 323 immagini
Malignant: 323 immagini
TEST:
Benign: 324 immagini
Malignant: 324 immagini


In [13]:
# Selezione del dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device selezionato:", device)


# Configurazione generale
batch_size = 32
num_epochs = 50
img_size = 224
momentum = 0.9
weight_decay = 1e-4
best_threshold = 0.5  # default iniziale
num_workers = 2
checkpoint_path = "best_model.pth"  #percorso dove salverà il modello migliore

#-----------------------------------------------------------------------

# Trasformazioni delle immagini

# Trasformazioni per il training
train_tf = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

# Trasformazioni per validation/test
val_test_tf = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])


# Dataset
train_ds = datasets.ImageFolder(os.path.join(base_dir, "train"), transform=train_tf)
val_ds   = datasets.ImageFolder(os.path.join(base_dir, "validation"), transform=val_test_tf)
test_ds  = datasets.ImageFolder(os.path.join(base_dir, "test"), transform=val_test_tf)

# DataLoader
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,
                          num_workers=num_workers, pin_memory=torch.cuda.is_available())
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False,
                        num_workers=num_workers, pin_memory=torch.cuda.is_available())
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False,
                         num_workers=num_workers, pin_memory=torch.cuda.is_available())

#-----------------------------------------------------------------------

# Modello RESNET50
model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
model.fc = nn.Linear(model.fc.in_features, 2)  # Sostituisco l'ultimo layer per adattare il modello a 2 classi
model = model.to(device)


# Learning rate differenziati

# ResNet50 è composta da:
# -layer iniziale
# -layer 1: 3 blocchi
# -layer 2: 4 blocchi
# -layer 3: 6 blocchi
# -layer 4: 3 blocchi
# -layer finale

# Prende tutti i blocchi residui della RESNET50
def count_residual_units_resnet50(model):
    units = []
    for lname in ['layer1','layer2','layer3','layer4']:
        for blk in getattr(model, lname):
            units.append(blk)
    return units

# Divide i parametri del modello in gruppi;
# asssegna a ciascun gruppo un learning rate differente
def make_param_groups(model):
    units = count_residual_units_resnet50(model)
    first6, next8, rest = [], [], []
    # Aggiunge i parametri dei primi 6 blocchi a first6
    for blk in units[:6]:
        first6 += list(blk.parameters())
    # Aggiunge i parametri dei successivi 10 blocchi a next8
    for blk in units[6:16]:
        next8 += list(blk.parameters())
    # Aggiunge i parametri del layer finale a fc_params
    fc_params = list(model.fc.parameters())
    # Inserisce eventuali parametri riamnenti in rest
    covered = set([id(p) for p in first6 + next8 + fc_params])
    for p in model.parameters():
        if id(p) not in covered:
            rest.append(p)
    return [
        {"params": first6, "lr": 0.009},
        {"params": next8 + rest, "lr": 0.003},
        {"params": fc_params, "lr": 0.01}
    ]


# Loss function
criterion = nn.CrossEntropyLoss()

#1. Ottimizza grazie a  SGD con momentum e weight decay
optimizer = optim.SGD(make_param_groups(model), momentum=momentum, weight_decay=weight_decay)

#2. Aggiorna il learning rate secondo una funzione cosine annealing con warm restart
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)

#1+2. SGDR


# Cerca la soglia che garantisce almeno la sensibilità target e minimizza i falsi positivi
def find_best_threshold(model, loader, device, target_sens=0.9):

    model.eval()  # Mette il modello in modalità test

    y_true, y_score = [], []  # y_true saranno le etichette, y_score le probabilità predette
    with torch.no_grad():  # non calcola i gradienti
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            logits = model(imgs)  # logits, tensor (batch_size, 2), rappresenta i valori grezzi prodotti dall’ultimo layer del modello, prima di essere trasformati in probabilità
            probs = torch.softmax(logits, dim=1)[:, 1]  # probabilità della classe Malignant
            y_true.extend(labels.cpu().numpy())
            y_score.extend(probs.cpu().numpy())
    y_true = np.array(y_true)
    y_score = np.array(y_score)

    if len(np.unique(y_true)) < 2:  # se le etichette sono tutti 0 o tutti 1
        return 0.5
    
    fpr, tpr, thresholds = roc_curve(y_true, y_score)  # vettore della percentuale di falsi positivi per ogni soglia, vettore della sensibilità per ogni soglia, vettore delle soglie
    idx_ok = np.where(tpr >= target_sens)[0]   # indice delle soglie che garantiscono la sensibilità minima
    if len(idx_ok) == 0:
        # Se non raggiunge la sensibilità desiderata, scegle la soglia con massimo tpr
        best_idx = np.argmax(tpr)
        return float(thresholds[best_idx])
    # Tra le soglie ok, scegle quella con minimo FPR
    best_idx = idx_ok[np.argmin(fpr[idx_ok])]
    return float(thresholds[best_idx])


def predict_with_threshold(logits, threshold=0.5):
    probs = torch.softmax(logits, dim=1)[:, 1] 
    return (probs >= threshold).long()  # ritorna 1 se maligno, 0 se benigno


def train_one_epoch(epoch):
    model.train()  # Mette il modello in modalità train
    total_loss, correct, total = 0.0, 0, 0
    for batch_idx, (imgs, labels) in enumerate(train_loader):
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()  # azzera gradienti
        logits = model(imgs)
        loss = criterion(logits, labels)
        loss.backward()  # backpropagation
        optimizer.step()  # aggiorna i pesi
        scheduler.step(epoch + batch_idx / max(1, len(train_loader)))  # aggiorna il learning rate in base al punto preciso dell’allenamento
        total_loss += loss.item() * imgs.size(0)
        preds = predict_with_threshold(logits, threshold=best_threshold)
        correct += (preds == labels).sum().item()
        total += imgs.size(0)
    avg_loss = total_loss / total if total > 0 else 0.0
    acc = correct / total if total > 0 else 0.0
    return avg_loss, acc


# Calcola sensibilità, specificità e bilanciamento
def compute_metrics(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0,1]).ravel()
    sens = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    spec = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    bal_acc = 0.5 * (sens + spec)
    return sens, spec, bal_acc

@torch.no_grad()
def results(model, loader, device, threshold=0.5):
    model.eval()
    y_true, y_pred = [], []
    for imgs, labels in loader:
        imgs, labels = imgs.to(device), labels.to(device)
        logits = model(imgs)
        y_true.extend(labels.cpu().numpy())
        preds = predict_with_threshold(logits, threshold)
        y_pred.extend(preds.cpu().numpy())
    try:
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0,1]).ravel()
    except ValueError:  #
        tn = fp = fn = tp = 0
        for yt, yp in zip(y_true, y_pred):
            if yt == 0 and yp == 0: tn += 1
            elif yt == 0 and yp == 1: fp += 1
            elif yt == 1 and yp == 0: fn += 1
            elif yt == 1 and yp == 1: tp += 1
    sens = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    spec = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    bal_acc = 0.5 * (sens + spec)
    return sens, spec, bal_acc

@torch.no_grad()
def print_results(model, loader, device, threshold=0.5):
    model.eval()
    y_true, y_pred = [], []
    for imgs, labels in loader:
        imgs, labels = imgs.to(device), labels.to(device)
        logits = model(imgs)
        y_true.extend(labels.cpu().numpy())
        preds = predict_with_threshold(logits, threshold)
        y_pred.extend(preds.cpu().numpy())
    try:
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0,1]).ravel()
    except ValueError:  #
        tn = fp = fn = tp = 0
        for yt, yp in zip(y_true, y_pred):
            if yt == 0 and yp == 0: tn += 1
            elif yt == 0 and yp == 1: fp += 1
            elif yt == 1 and yp == 0: fn += 1
            elif yt == 1 and yp == 1: tp += 1
    sens = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    spec = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    bal_acc = 0.5 * (sens + spec)
    print("Risultati Test:")
    print(f"Benigni corretti (TN): {tn} ({(tn/len(y_true)*100) if len(y_true)>0 else 0:.1f}%)")
    print(f"Maligni corretti (TP): {tp} ({(tp/len(y_true)*100) if len(y_true)>0 else 0:.1f}%)")
    print(f"Benigni sbagliati (FP): {fp} ({(fp/len(y_true)*100) if len(y_true)>0 else 0:.1f}%)")
    print(f"Maligni sbagliati (FN): {fn} ({(fn/len(y_true)*100) if len(y_true)>0 else 0:.1f}%)")
    print(f"Sensibilità: {sens:.3f} | Specificità: {spec:.3f} | Accuratezza bilanciata: {bal_acc:.3f}")
    return sens, spec, bal_acc

#----------------------------------------------------------------------------
#TRAIN
print("TRAIN:")

best_bal_acc = -1.0 
best_epoch = -1

for epoch in range(1, num_epochs + 1):
    tr_loss, tr_acc = train_one_epoch(epoch)
    best_threshold = find_best_threshold(model, val_loader, device, target_sens=0.9)
    sens, spec, bal_acc = results(model, val_loader, device, threshold=best_threshold)
    print(f"Epoch {epoch}/{num_epochs} | Train loss: {tr_loss:.4f} | Train acc: {tr_acc:.3f} |"
          f"New threshold: {best_threshold:.3f} | Sens: {sens:.3f} | Spec: {spec:.3f} | BalAcc: {bal_acc:.3f}")

    #Salva il modello migliore
    if bal_acc > best_bal_acc:
        best_bal_acc = bal_acc
        best_epoch = epoch
        torch.save({
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "best_threshold": best_threshold,
            "best_bal_acc": best_bal_acc
        }, checkpoint_path)

print(f"Accuratezza migliore ottenuto all'epoca: {best_epoch}")

#------------------------------------------------------------------------
#TEST
print("TEST:")

# Usa il modello migliore sul test set
if os.path.exists(checkpoint_path):
    ckpt = torch.load(checkpoint_path, map_location=device, weights_only=False)
    model.load_state_dict(ckpt["model_state_dict"])
    best_threshold = ckpt.get("best_threshold", best_threshold)
print_results(model, test_loader, device, threshold=best_threshold);

Device selezionato: cpu
TRAIN:
Epoch 1/50 | Train loss: 0.5216 | Train acc: 0.739 |New threshold: 0.462 | Sens: 0.907 | Spec: 0.700 | BalAcc: 0.803
Epoch 2/50 | Train loss: 0.4234 | Train acc: 0.796 |New threshold: 0.390 | Sens: 0.904 | Spec: 0.687 | BalAcc: 0.796
Epoch 3/50 | Train loss: 0.3768 | Train acc: 0.820 |New threshold: 0.467 | Sens: 0.901 | Spec: 0.737 | BalAcc: 0.819
Epoch 4/50 | Train loss: 0.3360 | Train acc: 0.849 |New threshold: 0.278 | Sens: 0.904 | Spec: 0.743 | BalAcc: 0.824
Epoch 5/50 | Train loss: 0.3083 | Train acc: 0.843 |New threshold: 0.406 | Sens: 0.910 | Spec: 0.737 | BalAcc: 0.824
Epoch 6/50 | Train loss: 0.2641 | Train acc: 0.888 |New threshold: 0.431 | Sens: 0.904 | Spec: 0.746 | BalAcc: 0.825
Epoch 7/50 | Train loss: 0.2257 | Train acc: 0.907 |New threshold: 0.444 | Sens: 0.901 | Spec: 0.780 | BalAcc: 0.841
Epoch 8/50 | Train loss: 0.2149 | Train acc: 0.917 |New threshold: 0.439 | Sens: 0.901 | Spec: 0.780 | BalAcc: 0.841
Epoch 9/50 | Train loss: 0.2063 |