<a href="https://colab.research.google.com/github/Dr3dre/Bachelor-Degree/blob/main/Anomaly_Detection_on_Wine_Grapes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive

drive.mount('/content/drive')

### Data

In [None]:
from PIL import Image
from torch.utils.data import Dataset
import numpy as np

class CanopiesDataset(Dataset):
    def __init__(self, image_names, transform):

        self.image_names = image_names
        self.transform = transform

    def __len__(self):

        return len(self.image_names)

    def __getitem__(self, idx):

        image = np.array(Image.open(self.image_names[idx]).convert("RGB"))

        if self.transform:
            image = self.transform(image)

        if "good" in self.image_names[idx]:
            label = 0
        else:
            label = 1

        return image, label

In [None]:
from torch.utils.data import DataLoader

import os
import random

def get_image_paths(data_dir):
    return [os.path.join(data_dir, image_name) for image_name in os.listdir(data_dir) if image_name.endswith(".jpg")]


def get_images_by_scale(data_dir, scale):

    good_dir = os.path.join(data_dir, "good")
    good_dir_1 = os.path.join(good_dir, "scale_1")
    good_dir_1_5 = os.path.join(good_dir, "scale_1.5")
    good_dir_2 = os.path.join(good_dir, "scale_2")

    good_images = []

    if scale == 1 or scale == None:
        good_images += get_image_paths(good_dir_1)

    if scale == 1.5 or scale == None:
        good_images += get_image_paths(good_dir_1_5)

    if scale == 2 or scale == None:
        good_images += get_image_paths(good_dir_2)


    bad_dir = os.path.join(data_dir, "bad")
    bad_dir_1 = os.path.join(bad_dir, "scale_1")
    bad_dir_1_5 = os.path.join(bad_dir, "scale_1.5")
    bad_dir_2 = os.path.join(bad_dir, "scale_2")

    bad_images = []

    if scale == 1 or scale is None:
        bad_images += get_image_paths(bad_dir_1)

    if scale == 1.5 or scale is None:
        bad_images += get_image_paths(bad_dir_1_5)

    if scale == 2 or scale is None:
        bad_images += get_image_paths(bad_dir_2)

    return good_images, bad_images


def split_images_aux(images, train_percentage, val_percentage, train_limit, random_seed):

    train_images = images[: int(len(images) * train_percentage)]
    if train_limit is not None:
        train_images = train_images[: train_limit]

    val_images = images[
        int(len(images) * train_percentage) :
        int(len(images) * (train_percentage + val_percentage))
    ]

    test_images = images[
        int(len(images) * (train_percentage + val_percentage)) :
    ]

    return train_images, val_images, test_images


def split_images(good_images, bad_images, train_percentage, val_percentage, train_limit, random_seed):

    train_images = []
    val_images = []
    test_images = []

    train_images_good, val_images_good, test_images_good = split_images_aux(good_images, train_percentage, val_percentage, train_limit, random_seed)
    train_images_bad, val_images_bad, test_images_bad = split_images_aux(bad_images, train_percentage, val_percentage, train_limit, random_seed)

    train_images += train_images_good
    train_images += train_images_bad

    if random_seed is not None:
        random.seed(random_seed)
        random.shuffle(train_images)

    val_images += val_images_good
    val_images += val_images_bad

    test_images += test_images_good
    test_images += test_images_bad

    print(f"Good images for training: {len(train_images)}")
    print(f"Good images for validation: {len(val_images)}")
    print(f"Good images for testing: {len(test_images)}")

    print(f"Bad images for training: {len(train_images_bad)}")
    print(f"Bad images for validation: {len(val_images_bad)}")
    print(f"Bad images for testing: {len(test_images_bad)}")

    return train_images, val_images, test_images

def get_datasets(train_images, val_images, test_images, transform):

    train_dataset = CanopiesDataset(
        image_names = train_images,
        transform = transform
    )

    val_dataset = CanopiesDataset(
        image_names = val_images,
        transform = transform
    )

    test_dataset = CanopiesDataset(
        image_names = test_images,
        transform = transform
    )

    return train_dataset, val_dataset, test_dataset

def get_loaders_(train_dataset, val_dataset, test_dataset, batch_size, num_workers):

    train_loader = DataLoader(
        train_dataset,
        batch_size = batch_size,
        num_workers = num_workers,
        shuffle = True
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size = batch_size,
        num_workers = num_workers,
        shuffle = False
    )

    test_loader = DataLoader(
        test_dataset,
        batch_size = batch_size,
        num_workers = num_workers,
        shuffle = True
    )

    return train_loader, val_loader, test_loader


def get_loaders(data_dir,
                batch_size,
                train_percentage = 0.7,
                val_percentage = 0.2,
                train_limit = None,
                transform = None,
                num_workers = 2,
                random_seed = 42,
                scale = None):

    good_images, bad_images = get_images_by_scale(data_dir, scale)

    train_images, val_images, test_images = split_images(good_images, bad_images, train_percentage, val_percentage, train_limit, random_seed)

    train_dataset, val_dataset, test_dataset = get_datasets(train_images, val_images, test_images, transform)

    train_loader, val_loader, test_loader = get_loaders_(train_dataset, val_dataset, test_dataset, batch_size, num_workers)

    return train_loader, val_loader, test_loader, train_dataset, val_dataset

In [None]:
# print stats of dataset

import torch
import torchvision

DATA_DIR = "/content/drive/MyDrive/first_dataset"
MODELS_DIR = "/content/drive/MyDrive/canopies_models"
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

BATCH_SIZE = 32
NUM_WORKERS = 2

transform = torchvision.transforms.Compose(
    [
        torchvision.transforms.ToTensor(),
    ]
)

train_loader, val_loader, test_loader, train_dataset, valid_dataset = get_loaders(
        data_dir = DATA_DIR,
        batch_size = BATCH_SIZE,
        train_percentage=0.7,
        val_percentage=0.1,
        transform=transform,
        num_workers=NUM_WORKERS,
        train_limit=None,
        random_seed=42,
        scale = 1.5
    )

#### Normalization

In [None]:
# calcolo media e standard deviation per normalizzare

def batch_mean_and_sd(loader):

    cnt = 0
    fst_moment = torch.empty(3)
    snd_moment = torch.empty(3)

    for images, _ in loader:
        b, c, h, w = images.shape
        nb_pixels = b * h * w
        sum_ = torch.sum(images, dim=[0, 2, 3])
        sum_of_square = torch.sum(images ** 2,
                                  dim=[0, 2, 3])
        fst_moment = (cnt * fst_moment + sum_) / (
                      cnt + nb_pixels)
        snd_moment = (cnt * snd_moment + sum_of_square) / (
                            cnt + nb_pixels)
        cnt += nb_pixels

    mean, std = fst_moment, torch.sqrt(
      snd_moment - fst_moment ** 2)
    return mean, std

mean, std = batch_mean_and_sd(train_loader)
print("mean and std: \n", mean, std)

In [None]:
# Python code to visualize an image

import matplotlib.pyplot as plt

transform_NORMALIZED = torchvision.transforms.Compose(
    [
        torchvision.transforms.Normalize(mean = mean, std = std)
    ]
)

images, labels = next(iter(val_loader))

def display_image(images, limit=0):
  images_np = images.numpy()
  img_plt = images_np.transpose(0,2,3,1)

  num_images = limit if limit > 0 else images.shape[0]

  for i in range(num_images):
    plt.imshow(img_plt[i])
    plt.show()

display_image(images, limit=5)

In [None]:
image_norm = transform_NORMALIZED(images)

display_image(image_norm, 5)

#### Data Augmentation


In [None]:
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, ConcatDataset

data_dir = "/content/drive/MyDrive/first_dataset"
scale = 1.5
train_percentage = 0.7
val_percentage = 0.2
train_limit = None
random_seed = 42
batch_size = 32
num_workers = 2

# concatena al train_dataset originale i 3 bad_dataset ruotati

normalize = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

# trasformazioni per la rotazione delle immagini
rotate_90 = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomRotation([89.9, 90.1]),
    transforms.Normalize(mean=mean, std=std)
])

rotate_180 = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomRotation([179.9, 180.1]),
    transforms.Normalize(mean=mean, std=std)
])

rotate_270 = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomRotation([269.9, 270.1]),
    transforms.Normalize(mean=mean, std=std)
])

offset = 0.1

good_images, bad_images = get_images_by_scale(data_dir, scale)

train_images_good, val_images_good, test_images_good = split_images_aux(good_images, train_percentage, 0.1, train_limit, random_seed)
train_images_bad, val_images_bad, test_images_bad = split_images_aux(bad_images, train_percentage - offset, val_percentage + offset, train_limit, random_seed)

train_dataset_good = CanopiesDataset(train_images_good, normalize)
train_dataset_bad = CanopiesDataset(train_images_bad, normalize)
bad90_dataset = CanopiesDataset(train_images_bad, rotate_90)
bad180_dataset = CanopiesDataset(train_images_bad, rotate_180)
bad270_dataset = CanopiesDataset(train_images_bad, rotate_270)

train_dataset = ConcatDataset([train_dataset_good, train_dataset_bad, bad90_dataset, bad180_dataset, bad270_dataset])

val_images = []
test_images = []

val_images += val_images_good
val_images += val_images_bad

test_images += test_images_good
test_images += test_images_bad

_, val_dataset, test_dataset = get_datasets(train_images_good, val_images, test_images, normalize)

train_loader, val_loader, test_loader = get_loaders_(train_dataset, val_dataset, test_dataset, batch_size, num_workers)

print(len(val_images_good), len(val_images_bad))
print(len(test_images_good), len(test_images_bad))

In [None]:
len_bad = 0
for i in range(1, 5):
    len_bad += train_dataset.datasets[i].__len__()

print("Good images for training: ", train_dataset.datasets[0].__len__())
print("Bad images for training: ", len_bad)

In [None]:
train_dataset.__len__()

### Modello

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF

from typing import Sequence

class LeNet(nn.Module):
    def __init__(self, num_classes=2):
        super(LeNet, self).__init__()

        # Primo Layer convoluzionale
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=20, kernel_size=5)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Secondo Layer convoluzionale
        self.conv2 = nn.Conv2d(in_channels=20, out_channels=50, kernel_size=5)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.flat = nn.Flatten(1)

        # Fully connected layers
        self.fc1 = nn.Linear(50 * 109 * 109, 500)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(500, num_classes)
        self.softmax = nn.Softmax(dim=1)

        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.flat(x)

        x = self.relu3(self.fc1(x))
        x = self.fc2(x)
        x = self.softmax(x)

        return x

## Training

#### funzioni

In [None]:
import torch.optim as optim
import torch.nn.functional as F

from sklearn.metrics import accuracy_score

def train(
    loader,
    model,
    optimizer,
    criterion,
    device,
    verbose = False,
    show_preds = False
):
    model.train()

    acc_sum = 0
    loss_sum = 0

    true_unharmed = 0
    true_damaged = 0
    false_unharmed = 0
    false_damaged = 0

    for batch_idx, (data, targets) in enumerate(loader):
        data = data.to(device=device)
        targets_hot = F.one_hot(targets, 2).float()
        targets_hot = targets_hot.to(device=device)

        predictions = model(data)
        loss = criterion(predictions, targets_hot)

        if(show_preds):
            print(predictions)
            print(targets_hot)

        acc_cur = accuracy_score(predictions.argmax(dim=1).cpu(), targets.cpu())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if verbose:
            print(f"Batch {batch_idx + 1}/{len(loader)}, Train Loss: {loss.item()}, Accuracy del batch: {acc_cur}")

        acc_sum += acc_cur
        loss_sum += loss.item()

        preds = predictions.argmax(dim=1)
        for i in range(len(preds)):
            if targets[i] == 0 and preds[i] == 0:
                true_unharmed += 1
            if targets[i] == 0 and preds[i] == 1:
                false_damaged += 1
            if targets[i] == 1 and preds[i] == 0:
                false_unharmed += 1
            if targets[i] == 1 and preds[i] == 1:
                true_damaged += 1

    cm = [[true_unharmed, false_damaged],
          [false_unharmed, true_damaged]]


    precision = 0
    recall = 0
    if (true_damaged + false_damaged) != 0:
        precision = true_damaged / (true_damaged + false_damaged)
    if (true_damaged + false_unharmed) != 0:
        recall = true_damaged / (true_damaged + false_unharmed)

    print(f"Precision: {precision}, Recall: {recall}")

    print(cm[0][0], cm[0][1])
    print(cm[1][0], cm[1][1])

    acc_epoch = acc_sum / len(loader)
    loss_avg = loss_sum / len(loader)

    if verbose: print(f"Epoch accuracy: {acc_epoch}")

    return acc_epoch, loss_avg, cm, precision, recall


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

def test(
    loader,
    model,
    criterion,
    device
):
    model.eval()
    total_loss = 0.0
    acc_sum = 0

    true_unharmed = 0
    true_damaged = 0
    false_unharmed = 0
    false_damaged = 0

    with torch.no_grad():
        for data, targets in loader:
            data = data.to(device=device)
            targets_hot = F.one_hot(targets, 2).float()
            targets_hot = targets_hot.to(device=device)

            predictions = model(data)

            loss = criterion(predictions, targets_hot)
            total_loss += loss.item()

            preds = predictions.argmax(dim=1)
            acc_cur = accuracy_score(predictions.argmax(dim=1).cpu(), targets.cpu())
            acc_sum += acc_cur

            # confusion_matrix:
            preds = preds.cpu()
            targets = targets.cpu()

            for i in range(len(preds)):
                if targets[i] == 0 and preds[i] == 0:
                    true_unharmed += 1
                if targets[i] == 0 and preds[i] == 1:
                    false_damaged += 1
                if targets[i] == 1 and preds[i] == 0:
                    false_unharmed += 1
                if targets[i] == 1 and preds[i] == 1:
                    true_damaged += 1

    acc = acc_sum / len(loader)
    loss = total_loss / len(loader)

    cm = [[true_unharmed, false_damaged],
          [false_unharmed, true_damaged]]

    precision = 0
    recall = 0
    if (true_damaged + false_damaged) != 0:
        precision = true_damaged / (true_damaged + false_damaged)
    if (true_damaged + false_unharmed) != 0:
        recall = true_damaged / (true_damaged + false_unharmed)

    print(f"Precision: {precision}, Recall: {recall}")

    print(cm[0][0], cm[0][1])
    print(cm[1][0], cm[1][1])

    return acc, loss, cm, precision, recall

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def train_model(
    train_loader,
    val_loader,
    model,
    optimizer,
    criterion,
    epochs,
    model_path,
    device,
    save_model = False
):
    best_val_loss = float("inf")
    train_losses = []
    val_losses = []

    train_accs = []
    val_accs = []

    train_precisions = []
    val_precisions = []

    train_recalls = []
    val_recalls = []

    for epoch in range(epochs):
        print(f"\nEpoca n°{epoch}\n")
        train_acc, train_loss, _, train_precision, train_recall = train(train_loader, model, optimizer, criterion, device, verbose=False)
        val_acc, val_loss, _, val_precision, val_recall = test(val_loader, model, criterion, device)

        if val_loss < best_val_loss:
            best_val_loss = val_loss

            if save_model:
                checkpoint = {
                    "model_state_dict": model.state_dict(),
                    "optimizer_state_dict": optimizer.state_dict(),
                    "train_loss": train_loss,
                    "val_loss": val_loss
                }
                torch.save(checkpoint, model_path)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        train_accs.append(train_acc)
        val_accs.append(val_acc)

        train_precisions.append(train_precision)
        val_precisions.append(val_precision)

        train_recalls.append(train_recall)
        val_recalls.append(val_recall)

    plt.figure(figsize=(12, 6))

    # Grafico 1 - Loss
    plt.subplot(1, 3, 1)
    plt.plot(range(1, len(train_losses) + 1), train_losses, label="Training loss", color="blue")
    plt.plot(range(1, len(val_losses) + 1), val_losses, label="Validation loss", color="orange")
    plt.title(f"Optimizer: {optimizer}\nLoss durante addestramento")
    plt.legend(loc="best")
    plt.xlabel('Epoche')
    plt.ylabel('Loss')

    # Grafico 2 - Accuracy
    plt.subplot(1, 3, 2)
    plt.plot(range(1, len(train_accs) + 1), train_accs, label="Training accuracy", color="green")
    plt.plot(range(1, len(val_accs) + 1), val_accs, label="Validation accuracy", color="red")
    plt.title("Accuratezza durante l'addestramento")
    plt.legend(loc="best")
    plt.xlabel('Epoche')
    plt.ylabel('Accuracy')

    # Grafico 3 - Precision e Recall
    plt.subplot(1, 3, 3)
    plt.plot(range(1, len(train_precisions) + 1), train_precisions, label="Training precision", color="purple")
    plt.plot(range(1, len(val_precisions) + 1), val_precisions, label="Validation precision", color="pink")
    plt.plot(range(1, len(train_recalls) + 1), train_recalls, label="Training recall", color="brown")
    plt.plot(range(1, len(val_recalls) + 1), val_recalls, label="Validation recall", color="gray")
    plt.title("Precision e Recall durante l'addestramento")
    plt.legend(loc="best")
    plt.xlabel('Epoche')
    plt.ylabel('Precision/Recall')

    plt.tight_layout()  # Per evitare sovrapposizioni
    plt.show()

    return train_losses, val_losses, train_accs, val_accs

### Hyperparameters tuning

#### SGD

In [None]:
import torch.optim as optim
import torch.nn as nn


DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
MODELS_DIR = "/content/drive/MyDrive/canopies_models"
RANDOM_SEED = 42

SAVE_MODELS = False

criterion = nn.BCEWithLogitsLoss(reduction='mean')

In [None]:
# SGD

NUM_EPOCHS = 5

lrs = [0.1, 0.01, 0.001]              # Tasso di apprendimento
momentums = [0, 0.9, 0.95, 0.99]      # Momentum
weight_decays = [0.0, 0.001, 0.01]    # Decadimento del peso (L2 regularization)

train_losses_ = []
val_losses_ = []
train_accs_ = []
val_accs_ = []

best_params = {
    "lr" : 0,
    "momentum" : 0,
    "weight_decay" : 0,
    "scale" : 0,
    "val_loss" : 10,
    "val_acc" : 0
}

for lr in lrs:
    for momentum in momentums:
        for weight_decay in weight_decays:

            print(f"lr: {lr}, momentum: {momentum}, wd: {weight_decay}")

            torch.cuda.empty_cache()

            model = LeNet(num_classes=2).to(DEVICE)
            optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay, momentum=momentum)

            if SAVE_MODELS:

                model_id = f"LeNet_sc{None}_lr{lr}_mom{momentum}_wd{weight_decay}_limit{TRAIN_LIMIT}.pth"

                os.makedirs(os.path.join(MODELS_DIR, f"seed_{RANDOM_SEED}"), exist_ok=True)
                model_path = os.path.join(MODELS_DIR, f"seed_{RANDOM_SEED}", model_id)

            train_losses, val_losses, train_accs, val_accs = train_model(
                train_loader,
                val_loader,
                model,
                optimizer,
                criterion,
                NUM_EPOCHS,
                MODELS_DIR,
                DEVICE,
                SAVE_MODELS
            )


            train_losses_.append(train_losses[-1])
            val_losses_.append(val_losses[-1])
            train_accs_.append(train_accs[-1])
            val_accs_.append(val_accs[-1])

            if (val_losses[-1] <= best_params["val_loss"] and val_accs[-1] >= best_params["val_acc"]):
              best_params["lr"] = lr
              best_params["momentum"] = momentum
              best_params["weight_decay"] = weight_decay
              best_params["scale"] = None   # scale
              best_params["val_loss"] = val_losses[-1]
              best_params["val_acc"] = val_accs[-1]

print(best_params)

In [None]:
TRAIN_LIMIT = 800
NUM_EPOCHS = 15

SAVE_MODELS = False

criterion = nn.BCEWithLogitsLoss()

params15 = [
    [0.01, 0, 0.001],
    [0.01, 0, 0.01],
    [0.001, 0, 0.01],
    [0.001, 0.9, 0.001],
    [0.001, 0.95, 0.0],
    [0.001, 0.99, 0.01]
]

params = [
    [0.1, 0, 0.0],
    [0.01, 0, 0.0],
    [0.01, 0, 0.01],
    [0.001, 0, 0.0],
    [0.001, 0, 0.001],
    [0.001, 0, 0.01],
    [0.001, 0.9, 0.001],
    [0.001, 0.95, 0.0],
    [0.001, 0.95, 0.001],
    [0.001, 0.99, 0.0]
]

best_params = {
    "lr" : 0,
    "momentum" : 0,
    "weight_decay" : 0,
    "scale" : 0,
    "val_loss" : 10,
    "val_acc" : 0
}

train_losses_ = []
val_losses_ = []
train_accs_ = []
val_accs_ = []

params = params15


for i in range(len(params)):

    lr = params[i][0]
    momentum = params[i][1]
    weight_decay = params[i][2]

    print(f"lr: {lr}, momentum: {momentum}, wd: {weight_decay}")

    torch.cuda.empty_cache()

    model = LeNet(num_classes=2).to(DEVICE)
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    train_losses, val_losses, train_accs, val_accs = train_model(
                train_loader,
                val_loader,
                model,
                optimizer,
                criterion,
                NUM_EPOCHS,
                MODELS_DIR,
                DEVICE,
                SAVE_MODELS
            )


    train_losses_.append(train_losses[-1])
    val_losses_.append(val_losses[-1])
    train_accs_.append(train_accs[-1])
    val_accs_.append(val_accs[-1])

    if (val_losses[-1] <= best_params["val_loss"] and val_accs[-1] >= best_params["val_acc"]):
        best_params["lr"] = lr
        best_params["momentum"] = momentum
        best_params["weight_decay"] = weight_decay
        best_params["scale"] = None   # scale
        best_params["val_loss"] = val_losses[-1]
        best_params["val_acc"] = val_accs[-1]

print(best_params)

In [None]:
import torch
import torch.optim as optim

NUM_EPOCHS = 40


params = [
    [0.001, 0, 0.0],
    [0.001, 0, 0.001],
    [0.001, 0, 0.01],
    [0.001, 0.9, 0.001],
    [0.001, 0.95, 0.0],
    [0.001, 0.95, 0.001],
    [0.001, 0.99, 0.0]
]


best_params = {
    "lr" : 0,
    "momentum" : 0,
    "weight_decay" : 0,
    "scale" : 0,
    "val_loss" : 10,
    "val_acc" : 0
}

train_losses_ = []
val_losses_ = []
train_accs_ = []
val_accs_ = []


for i in range(len(params)):

    lr = params[i][0]
    momentum = params[i][1]
    weight_decay = params[i][2]

    print(f"lr: {lr}, momentum: {momentum}, wd: {weight_decay}")

    torch.cuda.empty_cache()

    model = LeNet(num_classes=2).to(DEVICE)
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    train_losses, val_losses, train_accs, val_accs = train_model(
                train_loader,
                val_loader,
                model,
                optimizer,
                criterion,
                NUM_EPOCHS,
                MODELS_DIR,
                DEVICE,
                SAVE_MODELS
            )


    train_losses_.append(train_losses[-1])
    val_losses_.append(val_losses[-1])
    train_accs_.append(train_accs[-1])
    val_accs_.append(val_accs[-1])

    if (val_losses[-1] <= best_params["val_loss"] and val_accs[-1] >= best_params["val_acc"]):
        best_params["lr"] = lr
        best_params["momentum"] = momentum
        best_params["weight_decay"] = weight_decay
        best_params["scale"] = None   # scale
        best_params["val_loss"] = val_losses[-1]
        best_params["val_acc"] = val_accs[-1]

print(best_params)

In [None]:
import torch
import torch.optim as optim

NUM_EPOCHS = 70

params15 = [
    [0.001, 0, 0.001],
    [0.001, 0.9, 0.001],
]

params = [
    [0.001, 0, 0.0],
    [0.001, 0, 0.001],
    [0.001, 0, 0.01],
    [0.001, 0.95, 0.001],
]

params = params15

best_params = {
    "lr" : 0,
    "momentum" : 0,
    "weight_decay" : 0,
    "scale" : 0,
    "val_loss" : 10,
    "val_acc" : 0
}

train_losses_ = []
val_losses_ = []
train_accs_ = []
val_accs_ = []


for i in range(len(params)):

    lr = params[i][0]
    momentum = params[i][1]
    weight_decay = params[i][2]

    print(f"lr: {lr}, momentum: {momentum}, wd: {weight_decay}")

    torch.cuda.empty_cache()

    model = LeNet(num_classes=2).to(DEVICE)
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    train_losses, val_losses, train_accs, val_accs = train_model(
                train_loader,
                val_loader,
                model,
                optimizer,
                criterion,
                NUM_EPOCHS,
                MODELS_DIR,
                DEVICE,
                SAVE_MODELS
            )


    train_losses_.append(train_losses[-1])
    val_losses_.append(val_losses[-1])
    train_accs_.append(train_accs[-1])
    val_accs_.append(val_accs[-1])

    if (val_losses[-1] <= best_params["val_loss"] and val_accs[-1] >= best_params["val_acc"]):
        best_params["lr"] = lr
        best_params["momentum"] = momentum
        best_params["weight_decay"] = weight_decay
        best_params["scale"] = None   # scale
        best_params["val_loss"] = val_losses[-1]
        best_params["val_acc"] = val_accs[-1]

print(best_params)

In [None]:
import torch
import torch.optim as optim

NUM_EPOCHS = 100

params = [
    [0.001, 0, 0.001],
    [0.001, 0, 0.01],
]


best_params = {
    "lr" : 0,
    "momentum" : 0,
    "weight_decay" : 0,
    "scale" : 0,
    "val_loss" : 10,
    "val_acc" : 0
}

train_losses_ = []
val_losses_ = []
train_accs_ = []
val_accs_ = []


for i in range(len(params)):

    lr = params[i][0]
    momentum = params[i][1]
    weight_decay = params[i][2]

    print(f"lr: {lr}, momentum: {momentum}, wd: {weight_decay}")

    torch.cuda.empty_cache()

    model = LeNet(num_classes=2).to(DEVICE)
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    train_losses, val_losses, train_accs, val_accs = train_model(
                train_loader,
                val_loader,
                model,
                optimizer,
                criterion,
                NUM_EPOCHS,
                MODELS_DIR,
                DEVICE,
                SAVE_MODELS
            )


    train_losses_.append(train_losses[-1])
    val_losses_.append(val_losses[-1])
    train_accs_.append(train_accs[-1])
    val_accs_.append(val_accs[-1])

    if (val_losses[-1] <= best_params["val_loss"] and val_accs[-1] >= best_params["val_acc"]):
        best_params["lr"] = lr
        best_params["momentum"] = momentum
        best_params["weight_decay"] = weight_decay
        best_params["scale"] = None   # scale
        best_params["val_loss"] = val_losses[-1]
        best_params["val_acc"] = val_accs[-1]

print(best_params)

#### Adam

In [None]:
# ADAM

from itertools import product

DATA_DIR = "/content/drive/MyDrive/first_dataset"
BATCH_SIZE = 40
TRAIN_LIMIT = 800
NUM_EPOCHS = 15
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
MODELS_DIR = "/content/drive/MyDrive/canopies_models"

SAVE_MODELS = False

pos_weight = torch.tensor([2, 1]).to(device=DEVICE)

criterion = nn.BCEWithLogitsLoss() #(pos_weight=pos_weight)

lrs = [1e-5, 1e-6, 1e-7, 1e-8, 1e-9, 1e-10]
weight_decays = [0] # [0, 1e-4, 1e-3, 1e-2]


best_params = {
    "lr" : 0,
    "momentum" : 0,
    "weight_decay" : 0,
    "scale" : 0,
    "val_loss" : 10,
    "val_acc" : 0
}

train_losses_ = []
val_losses_ = []
train_accs_ = []
val_accs_ = []


for lr, weight_decay in list(product(lrs, weight_decays)):

    print(f"lr: {lr}, wd: {weight_decay}")

    torch.cuda.empty_cache()

    model = LeNet(num_classes=2).to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

    train_losses, val_losses, train_accs, val_accs = train_model(
                train_loader,
                val_loader,
                model,
                optimizer,
                criterion,
                NUM_EPOCHS,
                MODELS_DIR,
                DEVICE,
                SAVE_MODELS
            )


    train_losses_.append(train_losses[-1])
    val_losses_.append(val_losses[-1])
    train_accs_.append(train_accs[-1])
    val_accs_.append(val_accs[-1])

    if (val_losses[-1] <= best_params["val_loss"] and val_accs[-1] >= best_params["val_acc"]):
        best_params["lr"] = lr
        best_params["weight_decay"] = weight_decay
        best_params["scale"] = None   # scale
        best_params["val_loss"] = val_losses[-1]
        best_params["val_acc"] = val_accs[-1]

print(best_params)

### main

In [None]:
# scala 1.5

BATCH_SIZE = 40
NUM_EPOCHS = 40

SAVE_MODELS = False

criterion = nn.BCEWithLogitsLoss()

lr = 0.001
momentum = 0
weight_decay =  0.001

train_losses_ = []
val_losses_ = []
train_accs_ = []
val_accs_ = []

torch.cuda.empty_cache()

model = LeNet(num_classes=2).to(DEVICE)
optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay, momentum=momentum)

train_losses, val_losses, train_accs, val_accs = train_model(
            train_loader,
            val_loader,
            model,
            optimizer,
            criterion,
            NUM_EPOCHS,
            MODELS_DIR,
            DEVICE,
            SAVE_MODELS
        )

train_losses_.append(train_losses[-1])
val_losses_.append(val_losses[-1])
train_accs_.append(train_accs[-1])
val_accs_.append(val_accs[-1])

In [None]:
save_path = '/content/drive/MyDrive/canopies_models/main/modello.pth'

torch.save(model.state_dict(), save_path)

## Evaluation

In [None]:
def eval_model(model, data_loaders, criterion, device):
    model.eval()
    true_preds, num_preds = 0., 0.

    acc_train, loss_train, cm_train, precision_train, recall_train = test(data_loaders[0], model, criterion, device)
    acc_val, loss_val, cm_val, precision_val, recall_val = test(data_loaders[1], model, criterion, device)
    acc_test, loss_test, cm_test, precision_test, recall_test = test(data_loaders[2], model, criterion, device)

    print(f"Accuracy on Train: {acc_train*100:.2f}%, Loss: {loss_train:.2f}")
    print(f"Precision on Train: {precision_train:.2f}, Recall on Train: {recall_train:.2f}")

    print(f"Accuracy on Validation: {acc_val*100:.2f}%, Loss: {loss_val:.2f}")
    print(f"Precision on Validation: {precision_val:.2f}, Recall on Validation: {recall_val:.2f}")

    print(f"Accuracy on Test: {acc_test*100:.2f}%, Loss: {loss_test:.2f}")
    print(f"Precision on Test: {precision_test:.2f}, Recall on Test: {recall_test:.2f}")

    return cm_train, cm_val, cm_test

In [None]:
%%script false --no-raise-error

# testa tutti i modelli salvati

RANDOM_SEED = 42
IMAGE_HEIGHT = IMAGE_WIDTH = 300

DATA_DIR = "/content/drive/MyDrive/first_dataset"
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

BATCH_SIZE = 32
NUM_WORKERS = 2
TRAIN_LIMIT = 500

criterion = nn.BCEWithLogitsLoss()

transform = torchvision.transforms.Compose(
    [
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Resize((IMAGE_HEIGHT, IMAGE_WIDTH), antialias=True),
    ]
)
_, _, _, train_dataset = get_loaders(
    data_dir = DATA_DIR,
    batch_size = BATCH_SIZE,
    train_percentage=0.7,
    val_percentage=0.1,
    transform=transform,
    num_workers=NUM_WORKERS,
    train_limit=TRAIN_LIMIT,
    random_seed=RANDOM_SEED,
    scale = None
)

mean, std = get_mean_std(train_loader)

transform = torchvision.transforms.Compose(
    [
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Resize((IMAGE_HEIGHT, IMAGE_WIDTH), antialias=True),
        torchvision.transforms.Normalize(mean=mean, std=std)
    ]
)

train_loader, val_loader, test_loader, _ = get_loaders(
    data_dir = DATA_DIR,
    batch_size = BATCH_SIZE,
    train_percentage=0.7,
    val_percentage=0.1,
    transform=transform,
    num_workers=NUM_WORKERS,
    train_limit=TRAIN_LIMIT,
    random_seed=RANDOM_SEED,
    scale = None
)

models_folder = "/content/drive/MyDrive/canopies_models/seed_42"
model_files = os.listdir(models_folder)

# Itera attraverso i file dei modelli
for model_file in model_files:
    if model_file.endswith(".pth"):
        model_path = os.path.join(models_folder, model_file)

        model = LeNet().to(DEVICE)  # Sostituisci con la definizione del tuo modello
        model.load_state_dict(torch.load(model_path, map_location=DEVICE)['model_state_dict'])

        # Esegui eval_model sul modello
        print("\n", model_file)
        cm_train, cm_val, cm_test = eval_model(model, [train_loader, val_loader, test_loader], criterion, DEVICE)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

invTrans = transforms.Compose([ transforms.Normalize(mean = [ 0., 0., 0. ],
                                                     std = [ 1/std[0], 1/std[1], 1/std[2] ]),
                                transforms.Normalize(mean = [ -mean[0], -mean[1], -mean[2] ],
                                                     std = [ 1., 1., 1. ]),
                               ])

# inv_tensor = invTrans(inp_tensor)

# visualizzazione delle immagini con etichetta e predizione
def visualize_predictions(loader, model, device, limit=150):
    model.eval()

    count = 0
    with torch.no_grad():
        for data, targets in loader:
            data = data.to(device=device)
            targets = targets.numpy()  # Converti le etichette in un array NumPy
            predictions = model(data)
            predictions = predictions.argmax(dim=1).cpu().numpy()  # Converti le predizioni in un array NumPy
            for i in range(len(data)):
                tensor = data[i]
                tensor_unnorm = invTrans(tensor)
                image = tensor_unnorm.permute(1, 2, 0).cpu().numpy()  # Trasforma il tensore in un array NumPy per la visualizzazione
                plt.imshow(image)
                true_label = targets[i]
                predicted_label = predictions[i]
                plt.title(f"True Label: {true_label}, Predicted Label: {predicted_label}")
                plt.show()

                count += 1
                if count >= limit:
                    return

visualize_predictions(test_loader, model, DEVICE)

In [None]:
import seaborn as sns

cm_train, cm_val, cm_test = eval_model(model, [train_loader, val_loader, test_loader], criterion, DEVICE)

sns.heatmap(cm_test, annot=True)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

cm_train, cm_val, cm_test = eval_model(model, [train_loader, val_loader, test_loader], criterion, DEVICE)

class_names = ["Unharmed", "Damaged"]

fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Train
sns.heatmap(cm_train, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names, ax=axes[0])
axes[0].set_title('Confusion Matrix - Train')

# Validation
sns.heatmap(cm_val, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names, ax=axes[1])
axes[1].set_title('Confusion Matrix - Validation')

# Test
sns.heatmap(cm_test, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names, ax=axes[2])
axes[2].set_title('Confusion Matrix - Test')

# Mostra il grafico
plt.show()

In [None]:
visualize_predictions(val_loader, model, DEVICE)