In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm
import os
import csv
from PIL import Image
import glob

In [15]:
# Classe del dataset personalizzato
class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(os.listdir(self.root_dir))
        self.data = []
        self.targets = []

        for idx, cls in enumerate(self.classes):
            class_dir = os.path.join(self.root_dir, cls)
            for file in os.listdir(class_dir):
                self.data.append(os.path.join(class_dir, file))
                # Target per classificazione binaria: 0 o 1
                self.targets.append(idx)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data[idx]
        image = Image.open(img_path).convert('RGB')
        target = self.targets[idx]

        if self.transform:
            image = self.transform(image)

        return image, target

In [16]:
# Trasformazioni
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Ridimensiona a 224x224
    transforms.ToTensor(),  # Converti in tensori PyTorch
])

In [17]:
# Percorsi dei dataset
train_dataset_path = '/Users/massimo/PycharmProjects/UnderwaterSoundsClassification/Normalizzazione'
val_dataset_path = '/Users/massimo/PycharmProjects/UnderwaterSoundsClassification/Modello/Valutazione'
test_dataset_path = '/Users/massimo/PycharmProjects/UnderwaterSoundsClassification/Modello/Testing'

In [18]:
# Carica i dataset
train_dataset = CustomDataset(train_dataset_path, transform=transform)
val_dataset = CustomDataset(val_dataset_path, transform=transform)
test_dataset = CustomDataset(test_dataset_path, transform=transform)

In [19]:
# DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [20]:
# Carica AlexNet pre-addestrato
model = models.alexnet(pretrained=True)

# Modifica il classificatore per la classificazione binaria
model.classifier[6] = nn.Linear(model.classifier[6].in_features, 1)  # Output di 1 per BCEWithLogitsLoss

# Congela i layer convoluzionali
for param in model.features.parameters():
    param.requires_grad = False


In [21]:
# Funzione di loss e ottimizzatore
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)

# Gestione del dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


AlexNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(6, 6))
  (classifier): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=9216, out_features=4096, bias=True)
 

In [22]:
# Funzione per calcolare le metriche
def calculate_metrics(model, data_loader):
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = (torch.sigmoid(outputs) > 0.5).float()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    return accuracy, precision, recall, f1


In [23]:
# Funzione per salvare il checkpoint
def save_checkpoint(state, filename):
    torch.save(state, filename)

# Funzione per caricare il checkpoint
def load_checkpoint(filename, model, optimizer):
    if os.path.isfile(filename):
        print(f"Caricamento del checkpoint '{filename}'")
        checkpoint = torch.load(filename)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch'] + 1
        metrics = checkpoint['metrics']
        print(f"Checkpoint caricato (epoca {checkpoint['epoch']})")
        return model, optimizer, start_epoch, metrics
    else:
        print(f"Nessun checkpoint trovato a '{filename}'")
        return model, optimizer, 0, []


In [24]:
# Directory dei checkpoint
checkpoint_dir = 'checkpoints'
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)

# Carica l'ultimo checkpoint se esiste
latest_checkpoint = glob.glob(os.path.join(checkpoint_dir, 'checkpoint_epoch_*.pth'))
if latest_checkpoint:
    latest_checkpoint = max(latest_checkpoint, key=os.path.getctime)
    model, optimizer, start_epoch, metrics = load_checkpoint(latest_checkpoint, model, optimizer)
else:
    model, optimizer, start_epoch, metrics = model, optimizer, 0, []


In [None]:
# Ciclo di addestramento con early stopping
num_epochs = 25
checkpoint_interval = 1  # Checkpoint ogni epoca
patience = 5  # Numero di epoche senza miglioramenti dopo il quale interrompere l'addestramento
best_val_loss = float('inf')
patience_counter = 0

for epoch in range(start_epoch, num_epochs):
    # Addestramento
    model.train()
    running_train_loss = 0.0
    pbar_train = tqdm(train_loader, desc=f"Epoca {epoch + 1}/{num_epochs} - Addestramento")
    for inputs, labels in pbar_train:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        train_loss = criterion(outputs, labels.float().view(-1, 1))
        train_loss.backward()
        optimizer.step()
        running_train_loss += train_loss.item()
        pbar_train.set_postfix({'Loss': running_train_loss / len(train_loader)})

    # Validazione
    model.eval()
    running_val_loss = 0.0
    pbar_val = tqdm(val_loader, desc=f"Epoca {epoch + 1}/{num_epochs} - Validazione")
    for inputs, labels in pbar_val:
        inputs, labels = inputs.to(device), labels.to(device)
        with torch.no_grad():
            outputs = model(inputs)
            val_loss = criterion(outputs, labels.float().view(-1, 1))
            running_val_loss += val_loss.item()
            pbar_val.set_postfix({'Val Loss': running_val_loss / len(val_loader)})

    # Calcola le metriche sul set di validazione
    val_accuracy, val_precision, val_recall, val_f1 = calculate_metrics(model, val_loader)
    epoch_train_loss = running_train_loss / len(train_loader)
    epoch_val_loss = running_val_loss / len(val_loader)

    # Pulisci la barra di avanzamento prima di stampare
    pbar_train.close()
    pbar_val.close()
    print(
        f"Epoca {epoch + 1}/{num_epochs} - Loss Addestramento: {epoch_train_loss:.4f} - Loss Validazione: {epoch_val_loss:.4f} - Accuratezza Validazione: {val_accuracy:.4f} - Precisione Validazione: {val_precision:.4f} - Recall Validazione: {val_recall:.4f} - F1 Validazione: {val_f1:.4f}")

    # Aggiungi le metriche
    metrics.append({
        "epoch": epoch + 1,
        "train_loss": epoch_train_loss,
        "val_loss": epoch_val_loss,
        "val_accuracy": val_accuracy,
        "val_precision": val_precision,
        "val_recall": val_recall,
        "val_f1": val_f1
    })

    # Salva le metriche
    metrics_path = os.path.join(checkpoint_dir, f'training_metrics_epoch_{epoch + 1}.csv')
    with open(metrics_path, 'w', newline='') as csvfile:
        fieldnames = ['epoch', 'train_loss', 'val_loss', 'val_accuracy', 'val_precision', 'val_recall', 'val_f1']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        for metric in metrics:
            writer.writerow(metric)

    print(f"Metriche per l'epoca {epoch + 1} salvate in: {metrics_path}")

    # Early stopping
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        patience_counter = 0
        # Salva il miglior checkpoint
        checkpoint_state = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'metrics': metrics
        }
        checkpoint_path = os.path.join(checkpoint_dir, f'best_checkpoint.pth')
        save_checkpoint(checkpoint_state, checkpoint_path)
        print(f"Miglior checkpoint salvato in: {checkpoint_path}")
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print(f"Early stopping attivato dopo {epoch + 1} epoche")
        break

In [13]:
# Salva il modello addestrato
torch.save(model.state_dict(), 'alexnet_model_binary_classification.pth')

# Stampa il percorso del modello salvato
model_path = os.path.abspath('alexnet_model_binary_classification.pth')
print(f"Modello salvato in: {model_path}")

Modello salvato in: /Users/massimo/PycharmProjects/UnderwaterSoundsClassification/alexnet_model_binary_classification.pth
