In [None]:
'''
Questo script esegue la K-Fold c.v. sul training-dataset
'''

In [None]:
# Import delle librerie
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from torchvision.models import resnet50
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader,SubsetRandomSampler
from torch.optim.lr_scheduler import CosineAnnealingLR,OneCycleLR

import numpy as np
from sklearn.model_selection import KFold

In [None]:
# Import funzioni da file utils.txt
import ipynb
from ipynb.fs.full.utils import fix_seed, train_one_epoch, test_one_epoch, reset_wgts, show_batch

In [None]:
# Inizializzazione generatore valori random
SEED = 123
fix_seed(SEED)

In [None]:
# Definizione iper-parametri
IMGS_PATH = '/media/users/cgambina/Progetto_6/Dati/Immagini'

NUM_EPOCHS = 7
BS = 128 
LR = 1e-3
LR_DEC = 0.75
K = 10

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
splits = KFold(n_splits=K, shuffle=True, random_state=SEED)

In [None]:
# Definizione trasformazioni da applicare al batch di immagini
transforms = T.Compose([
    T.RandomChoice(
        [
            T.RandomRotation((-5,5)),
            T.RandomRotation((85,95)),
            T.RandomRotation((175,195)),
            T.RandomRotation((265,275)),
    ]),
    T.ToTensor(),
    T.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

In [None]:
# Definizione del dataset, della loss function, e dell'archittettura della rete
dataset = ImageFolder(IMGS_PATH, transforms)
criterion = nn.BCEWithLogitsLoss() 

net = resnet50(weights='ResNet50_Weights.IMAGENET1K_V1').to(device)

n_filters = net.fc.in_features
net.fc = nn.Linear(n_filters, 1).to(device) 

In [None]:
# Riduzione del learning rate in base al layer (blocco)
layer_names = []
for name, param in net.named_parameters():
    layer_names.append(name)

layer_names.reverse()

parameters = []
prev_group_name = layer_names[0].split('.')[0]

for idx, name in enumerate(layer_names):
    
    cur_group_name = name.split('.')[0]
    if cur_group_name != prev_group_name:
        LR *= LR_DEC
    prev_group_name = cur_group_name

    #print(f'{idx}: lr = {LR:.6f}, {name}')
    parameters += [{'params': [p for n, p in net.named_parameters() if n == name and p.requires_grad],
                    'lr':     LR}]

In [None]:
OPTIMIZER = optim.Adam(net.parameters(), lr=LR)  
SCHEDULER = None

In [None]:
# Training della rete tramite ripetizione della k-fold c.v.
epoch_train_loss = []; epoch_train_acc = []
epoch_test_loss = []; epoch_test_acc = []
gen = torch.Generator().manual_seed(SEED)

for fold, (train_idx,test_idx) in enumerate(splits.split(dataset)):

    print(f'--- FOLD ---: {fold + 1}')
    
    train_sampler = SubsetRandomSampler(train_idx, gen)
    test_sampler = SubsetRandomSampler(test_idx, gen)
    train_loader = DataLoader(dataset, batch_size=BS, sampler=train_sampler)
    test_loader = DataLoader(dataset, batch_size=BS, sampler=test_sampler)
    
    # Reset parametri
    net.apply(reset_wgts)
    optimizer = OPTIMIZER 
    scheduler = SCHEDULER

    for epoch in range(NUM_EPOCHS):
        train_loss, train_correct = train_one_epoch(net,device,train_loader,criterion,optimizer,scheduler)
        test_loss, test_correct = test_one_epoch(net,device,test_loader,criterion)

        train_loss = train_loss / len(train_loader.sampler)
        train_acc = train_correct / len(train_loader.sampler) * 100
        test_loss = test_loss / len(test_loader.sampler)
        test_acc = test_correct / len(test_loader.sampler) * 100

        print(f'Epoca:{epoch + 1}/{NUM_EPOCHS} AVG Training Loss:{train_loss:.3f} AVG Test Loss:{test_loss:.3f}') 
        print(f'AVG Training Acc {train_acc:.2f} % AVG Test Acc {test_acc:.2f} %')
          
    epoch_train_loss.append(train_loss); epoch_train_acc.append(train_acc)  
    epoch_test_loss.append(test_loss); epoch_test_acc.append(test_acc)

In [None]:
# Calcolo di loss e accuracy
avg_train_loss = np.mean(epoch_train_loss)
avg_train_acc = np.mean(epoch_train_acc)
avg_test_loss = np.mean(epoch_test_loss)
avg_test_acc = np.mean(epoch_test_acc)

# Calcolo deviazione standard
std_train_loss = np.std(epoch_train_loss)
std_train_acc = np.std(epoch_train_acc)
std_test_loss = np.std(epoch_test_loss)
std_test_acc = np.std(epoch_test_acc)

print(f'Media Training Loss: {avg_train_loss:.4f} -- Media Test Loss: {avg_test_loss:.4f}')
print(f'Media Training Accuracy: {avg_train_acc:.3f} -- Media Test Accuracy: {avg_test_acc:.3f}')

print(f'Std Training Loss: {std_train_loss:.4f} -- Std Test Loss: {std_test_loss:.4f}')
print(f'Std Training Accuracy: {std_train_acc:.3f} -- Std Test Accuracy: {std_test_acc:.3f}')

In [None]:
# Mostra un campione di immagini 
show_tfms = T.Compose([
    T.RandomChoice(
        [
            T.RandomRotation((-5,5)),
            T.RandomRotation((85,95)),
            T.RandomRotation((175,195)),
            T.RandomRotation((265,275)),
    ]),
    T.ToTensor(),
])

foo_dataset = ImageFolder(IMGS_PATH, show_tfms)
show_batch(foo_dataset)