In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import models, transforms
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
import pandas as pd
import timm
import random
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, ConfusionMatrixDisplay, confusion_matrix


In [None]:
def balance_data(data, target_length):
    """Duplica casualmente le immagini per bilanciare le classi fino alla lunghezza target."""
    initial_data = data.copy()
    while len(data) < target_length:
        data.extend(random.sample(initial_data, min(len(initial_data), target_length - len(data))))
    return data[:target_length]  # Assicura che la lunghezza sia esatta


class BalancedCellDataset(Dataset):
    def __init__(self, csv_files, dataset_type, image_column="AllChannels", root_dir="", transform=None, balance=True, crop_radius=50):
        """
        Dataset personalizzato per caricare immagini dai CSV con bilanciamento opzionale e ritaglio centrato.

        Args:
        - csv_files (list): Lista dei file CSV [benigno, maligno].
        - dataset_type (str): Indica "train", "validation" o "test".
        - image_column (str): Colonna con i percorsi delle immagini.
        - root_dir (str): Percorso base delle immagini.
        - transform (callable, optional): Trasformazioni da applicare alle immagini.
        - balance (bool): Se True, bilancia solo il training set.
        - crop_radius (int): Raggio `r` per il ritaglio dell'immagine (default: 50 pixel).
        """
        self.transform = transform
        self.root_dir = root_dir
        self.crop_radius = crop_radius
        self.min_size = (100, 100)  # Dimensione minima richiesta

        benign_data = []
        malignant_data = []

        for csv_file, label, target_list in zip(csv_files, [0, 1], [benign_data, malignant_data]):
            df = pd.read_csv(csv_file)
            df["Set"] = df["Set"].str.strip().str.lower()
            dataset_type = dataset_type.lower()
            df = df[df["Set"] == dataset_type]

            for _, row in df.iterrows():
                img_path = os.path.join(
                    self.root_dir,
                    str(row["TypeOfCell"]),
                    str(row["Nome Acquisizione"]),
                    str(int(row["Numero cellula"])),  # Evita problemi con float
                    row[image_column]
                )
                target_list.append((img_path, label))

        if balance and dataset_type == "train":
            max_length = max(len(benign_data), len(malignant_data))
            self.data = balance_data(benign_data, max_length) + balance_data(malignant_data, max_length)
            random.shuffle(self.data)  # Evita pattern nei dati
        else:
            self.data = benign_data + malignant_data

    def pad_image_to_min_size(self, image, min_size=(100, 100)):
        """Aggiunge padding all'immagine se è più piccola delle dimensioni minime richieste."""
        h, w = image.shape[:2]
        pad_h = max(0, min_size[0] - h)
        pad_w = max(0, min_size[1] - w)

        if pad_h > 0 or pad_w > 0:
            # Calcola il padding su ogni lato per centrare l'immagine
            top = pad_h // 2
            bottom = pad_h - top
            left = pad_w // 2
            right = pad_w - left

            image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0)

        return image

    def crop_around_center(self, image, r):
        """Ritaglia un'area centrata sull'immagine in base al r specificato."""
        h, w = image.shape[:2]
        if h < 2 * r or w < 2 * r:
            raise ValueError(f"L'immagine è troppo piccola ({h}x{w}) per il ritaglio con r={r}")
        x1, x2 = (w // 2 - r, w // 2 + r)
        y1, y2 = (h // 2 - r, h // 2 + r)
        return image[y1:y2, x1:x2]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]

        if not os.path.exists(img_path):
            raise FileNotFoundError(f"File non trovato: {img_path}")

        try:
            image = np.load(img_path)  # Carica come NumPy array
        except Exception as e:
            raise RuntimeError(f"Errore nel caricamento dell'immagine {img_path}: {e}")

        # **Passo 1: Padding se necessario**
        image = self.pad_image_to_min_size(image, self.min_size)

        # **Passo 2: Ritaglio**
        image = self.crop_around_center(image, self.crop_radius)

        # **Passo 3: Trasformazioni**
        if self.transform:
            image = self.transform(image)  # Passa il NumPy array alle trasformazioni

        return image, label


class Normalize01(torch.nn.Module):
    """Normalizza i valori di un tensore tra 0 e 1."""
    def __init__(self):
        super().__init__()

    def forward(self, img):
        min_val, max_val = img.min(), img.max()
        return (img - min_val) / (max_val - min_val) if max_val > min_val else torch.zeros_like(img)

    def __repr__(self):
        return self.__class__.__name__ + '()'


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score

def train_one_epoch(model, loader, criterion, optimizer, device, epoch, num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    all_train_labels = []
    all_train_preds = []

    print(f"\nEpoch {epoch + 1}/{num_epochs} - Training...")

    for batch_idx, (imgs, labels) in enumerate(loader):
        imgs = imgs.type(torch.FloatTensor).to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        model.zero_grad()

        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        all_train_labels.extend(labels.detach().cpu().numpy())
        all_train_preds.extend(predicted.detach().cpu().numpy())

        if (batch_idx + 1) % 10 == 0:
            print(f"Batch {batch_idx + 1}/{len(loader)} - Loss: {loss.item():.4f}")

    epoch_loss = running_loss / total
    train_accuracy = accuracy_score(all_train_labels, all_train_preds)

    print(f"[TRAIN] Loss: {epoch_loss:.6f}, Accuracy: {train_accuracy:.4f}")
    return epoch_loss, train_accuracy


def validate(model, loader, criterion, device, epoch, num_epochs):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_val_labels = []
    all_val_preds = []

    print(f"\nEpoch {epoch + 1}/{num_epochs} - Validation...")

    with torch.no_grad():
        for batch_idx, (imgs, labels) in enumerate(loader):
            imgs = imgs.type(torch.FloatTensor).to(device)
            labels = labels.to(device)

            outputs = model(imgs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * imgs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            all_val_labels.extend(labels.detach().cpu().numpy())
            all_val_preds.extend(predicted.detach().cpu().numpy())

            if (batch_idx + 1) % 10 == 0:
                print(f"Batch {batch_idx + 1}/{len(loader)} - Loss: {loss.item():.4f}")

    epoch_loss = running_loss / total
    val_accuracy = accuracy_score(all_val_labels, all_val_preds)

    print(f"[VALIDATION] Loss: {epoch_loss:.6f}, Accuracy: {val_accuracy:.4f}")
    return epoch_loss, val_accuracy


def train_loop(model, train_loader, val_loader, best_val_loss, optimizer, criterion, save_path, device, num_epochs=50):
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    if not os.path.exists(save_path):
        os.makedirs(save_path)

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch + 1}/{num_epochs}")

        epoch_loss_train, train_accuracy = train_one_epoch(model, train_loader, criterion, optimizer, device, epoch, num_epochs)
        epoch_loss_val, val_accuracy = validate(model, val_loader, criterion, device, epoch, num_epochs)

        train_losses.append(epoch_loss_train)
        val_losses.append(epoch_loss_val)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        # Salvataggio metriche
        with open(os.path.join(save_path, 'classification_loss_train.txt'), 'a') as file:
            file.write(str(epoch_loss_train) + '\n')

        with open(os.path.join(save_path, 'classification_acc_train.txt'), 'a') as file:
            file.write(str(train_accuracy) + '\n')

        with open(os.path.join(save_path, 'classification_loss_val.txt'), 'a') as file:
            file.write(str(epoch_loss_val) + '\n')

        with open(os.path.join(save_path, 'classification_acc_val.txt'), 'a') as file:
            file.write(str(val_accuracy) + '\n')

        # Salva i pesi del modello
        torch.save(model.state_dict(), os.path.join(save_path, 'training_weights.pth'))

        if epoch_loss_val < best_val_loss:
            best_val_loss = epoch_loss_val
            torch.save(model.state_dict(), os.path.join(save_path, 'best_weights.pth'))
            print("[INFO] Miglior modello salvato.")

    print("\n[INFO] Training completato.")
    return model, train_losses, val_losses, train_accuracies, val_accuracies


In [None]:
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision import models
import torch.nn as nn
import torch
'''
MICHI: FAI SOLO IL CHECK DEI VALORI DOPO LA ToTensor SE SONO VERAMENTE TRA 0 E 1 .. Se non lo sono, non fa niente! Tanto abbiamo la Normalize01
'''
import torchvision.transforms as transforms

def check_tensor_range(img):
    """ Controlla se i valori del tensore sono nel range [0,1] dopo ToTensor. """
    if img.min() < 0 or img.max() > 1:
        print("Warning: Tensor values are out of range [0,1] after ToTensor")

def pad_to_100x100(img):
    """ Aggiunge padding nero per rendere l'immagine 100x100 se necessario. """
    _, h, w = img.shape  # Ottieni altezza e larghezza
    if h == 100 and w == 100:
        return img  # Se è già 100x100, non serve padding

    # Calcola il padding necessario
    pad_h = max(0, (100 - h) // 2)
    pad_w = max(0, (100 - w) // 2)

    # Applica il padding uniformemente sopra/sotto e a sinistra/destra
    return transforms.functional.pad(img, (pad_w, pad_h, pad_w, pad_h), fill=0)

# Trasformazioni per il training (include data augmentation)
train_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: (check_tensor_range(x), x)[1]),  # Controllo range senza modificarlo
    transforms.Resize((224, 224)),  # Porta l'immagine alla dimensione richiesta dal modello
    transforms.RandomRotation(degrees=90),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    Normalize01()
])

# Trasformazioni per validazione e test
val_test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: (check_tensor_range(x), x)[1]),
    transforms.Resize((224, 224)),  # Porta tutto a 224x224 per il modello
    Normalize01()
])

# Percorsi ai file CSV e directory di base
csv_files = [
    "/content/Dataset Finale/csv/labeled/mcf7ControlCells_wo_mask_labeled.csv",
    "/content/Dataset Finale/csv/labeled/mcf10aControlCells_wo_mask_labeled.csv"
]
root_dir = "/content/Dataset Finale"

csv_files_cross_test = [
    "/content/Dataset Finale/csv/labeled/mcf7CdExposed_wo_mask_labeled.csv",
    "/content/Dataset Finale/csv/labeled/mcf10aCdExposed_wo_mask_labeled.csv"
]

# Parametri
image_column = "CellNucleus"

# Creazione dei Dataset con crop_radius specifico per ogni set
train_dataset = BalancedCellDataset(
    csv_files,
    dataset_type="train",
    image_column=image_column,
    root_dir=root_dir,
    transform=train_transform,
    balance=True,
    crop_radius=50  # Imposta r per ritagli 2rx2r
)

val_dataset = BalancedCellDataset(
    csv_files,
    dataset_type="validation",
    image_column=image_column,
    root_dir=root_dir,
    transform=val_test_transform,
    crop_radius=50  # Mantiene r = 50 pixel → ritagli 100x100
)

test_dataset_1 = BalancedCellDataset(
    csv_files,
    dataset_type="test",
    image_column=image_column,
    root_dir=root_dir,
    transform=val_test_transform,
    crop_radius=50  # Mantiene r = 50 pixel → ritagli 100x100
)

test_dataset_2 = BalancedCellDataset(
    csv_files=csv_files_cross_test,
    dataset_type="test",
    image_column=image_column,
    root_dir=root_dir,
    transform=val_test_transform,
    crop_radius=50  # Mantiene r = 50 pixel → ritagli 100x100
)

batch_size = 32


# Creazione dei DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
test_loader_1 = DataLoader(test_dataset_1, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
test_loader_2 = DataLoader(test_dataset_2, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

# Configura il numero di canali di input
num_input_channels = 2  # Cambia a 2 se hai immagini bic-anale
num_classes = 2  # Numero di classi nel dataset

print("Caricamento del modello EfficientNetB0 con timm...")
model = timm.create_model('efficientnet_b0', pretrained=True, in_chans=num_input_channels, num_classes=num_classes)

# Sposta il modello su GPU se disponibile
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

print(f"Modello EfficientNetB0 caricato e adattato per {num_input_channels} canali di input.")


In [None]:
# ===========================
#  Loop di Training & Validazione
# ===========================

import os
save_path = "/content/drive/MyDrive/DeepLearningResults/"

# Creazione della directory se non esiste
if not os.path.exists(save_path):
    os.makedirs(save_path)

# Criterio di perdita (CrossEntropyLoss per classificazione)
criterion = nn.CrossEntropyLoss()

# Ottimizzatore (Adam con parametri standard)
# Cambiamo il learning rate per l'Adam optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4,betas=(0.9, 0.999), weight_decay=1e-4)

# Numero di epoche
num_epochs = 50

best_val_loss = float('inf')  # Inizializza con un valore molto alto per il primo confronto

# Avvia il training e ottieni le metriche per il plotting
model, train_losses, val_losses, train_accuracies, val_accuracies = train_loop(
    model, train_loader, val_loader, best_val_loss, optimizer, criterion, save_path, device, num_epochs=num_epochs)

In [None]:
import matplotlib.pyplot as plt

def plot_training_curves(train_losses, val_losses, train_accuracies, val_accuracies):
    epochs = range(1, len(train_losses) + 1)

    # Plot della Loss
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label="Train Loss", linewidth=2, color='blue')
    plt.plot(epochs, val_losses, label="Validation Loss", linewidth=2, color='orange')
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Training & Validation Loss")
    plt.legend()
    plt.grid(alpha=0.3)

    # Plot dell'Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accuracies, label="Train Accuracy", linewidth=2, color='blue')
    plt.plot(epochs, val_accuracies, label="Validation Accuracy", linewidth=2, color='orange')
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.title("Training & Validation Accuracy")
    plt.legend()
    plt.grid(alpha=0.3)

    # Mostra il grafico
    plt.tight_layout()
    plt.show()


# Genera i grafici senza smoothing
plot_training_curves(train_losses, val_losses, train_accuracies, val_accuracies)

In [None]:
# Funzione di valutazione
def evaluate_model(model, test_loader, device, save_path, test_set_name):
    model.load_state_dict(torch.load(save_path + 'best_weights.pth'))
    test_save_path = os.path.join(save_path, test_set_name)
    os.makedirs(test_save_path, exist_ok=True)
    model.to(device)
    model.eval()

    all_preds = []
    all_labels = []
    all_probs = []

    for images, labels in test_loader:
        images, labels = images.to(device, dtype=torch.float32), labels.to(device)

        with torch.no_grad():  # Disattiva grad solo per inferenza standard
            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)
            predicted = torch.argmax(probs, dim=1)

        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        all_probs.extend(probs[:, 1].cpu().numpy())

    all_labels = np.array(all_labels)
    all_preds = np.array(all_preds)
    all_probs = np.array(all_probs)

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average="binary", zero_division=0)
    recall = recall_score(all_labels, all_preds, average="binary", zero_division=0)
    f1 = f1_score(all_labels, all_preds, average="binary", zero_division=0)
    auc = roc_auc_score(all_labels, all_probs)

    print("\nMetriche di valutazione:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-score: {f1:.4f}")
    print(f"AUC: {auc:.4f}")

    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot(cmap="Blues", values_format="d")
    plt.title("Confusion Matrix")
    plt.show()

    return accuracy, precision, recall, f1, auc

In [None]:
# Eseguire l'inferenza sui test set
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("Eseguo l'inferenza sul primo test set...")
test1_accuracy, test1_precision, test1_recall, test1_f1, test1_auc = evaluate_model(model, test_loader_1, device, save_path, "test_set_1")

print("\nEseguo l'inferenza sul secondo test set...")
test2_accuracy, test2_precision, test2_recall, test2_f1, test2_auc = evaluate_model(model, test_loader_2, device, save_path, "test_set_2")
