# Funzioni globali / comuni a tutti i modelli

## AverageValueMeter

In [1]:
class AverageValueMeter:
    def __init__(self):
        self.reset()
        self.sum = 0
        self.num = 0

    def reset(self):
        self.sum = 0
        self.num = 0

    def add(self, val, n=1):
        self.sum += val*n
        self.num += n

    def value(self):
        try:
            return self.sum/self.num
        except:
            return None

## Split Dataset train, validation, test

In [5]:
from sklearn.model_selection import train_test_split
def split_train_val_test(dataset, perc=None): # default 60% train, 10% val, 30% test
    if perc is None:
        perc = [0.6, 0.1, 0.3]
    train, testval = train_test_split(dataset, test_size=perc[1] + perc[2])
    val, test = train_test_split(testval, test_size=perc[2] / (perc[1]+perc[2]))
    return train, val, test

## Classe dataset che permette di caricare immagini dal disco a partire da un csv

In [None]:
import torch
import pandas as pd
from torch.utils import data
from os.path import join
from PIL import Image

class CSVImageDataset(data.Dataset):
    def __init__(self, data_root, csv, transform=None):
        self.data_root = data_root
        self.data = pd.read_csv(csv)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        im_path, im_label = self.data.iloc[idx]['path'], self.data.iloc[idx].label
        # convertiamo tutto in RGB
        im = Image.open(join(self.data_root, im_path)).convert('RGB')
        if self.transform is not None:
            im = self.transform(im)
        return im, im_label

# Funzione per scaricare Squeezenet (adattabile ad altri modelli pre-trained)

In [6]:
from torch import nn
from torchvision.models import squeezenet1_0
from torchvision.models import SqueezeNet1_0_Weights

def get_squeezenet_model(num_classes):
    model = squeezenet1_0(weights = SqueezeNet1_0_Weights.DEFAULT)
    num_classes = 101
    model.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
    model.num_classes = num_classes
    return model

## Funzione per training, print epoche e generazioni grafici tensorboard (semplice)

In [3]:
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import accuracy_score
from os.path import join
import tqdm
import numpy as np


def train_classifier(model, train_loader, test_loader, exp_name='experiment', lr=0.01, epochs=10, momentum=0.99, logdir='logs'):
    optimizer = SGD(model.parameters(), lr=lr, momentum=momentum)
    criterion = nn.CrossEntropyLoss()
    writer = SummaryWriter(join(logdir, exp_name))
    loss_meter = AverageValueMeter()
    acc_meter = AverageValueMeter()
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    loader = {
        'train': train_loader,
        'test': test_loader
    }

    global_step = 0

    for e in range(epochs):
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            loss_meter.reset()
            acc_meter.reset()

            with torch.set_grad_enabled(phase=='train'):
                with tqdm.tqdm(enumerate(loader[phase]), total=len(loader[phase]), desc=f"{phase.capitalize()} Epoch {e+1}/{epochs}", unit="batch") as pbar:
                    for i, batch in pbar:
                        x = batch[0].to(device)
                        y = batch[1].to(device)
                        output = model(x)

                        n = x.shape[0]
                        global_step += n
                        loss = criterion(output, y)

                        if phase == 'train':
                            loss.backward()
                            optimizer.step()
                            optimizer.zero_grad()

                        accuracy = accuracy_score(y.to('cpu'), output.to('cpu').max(1)[1])
                        loss_meter.add(loss.item(), n)
                        acc_meter.add(accuracy, n)

                        pbar.set_postfix(loss=loss_meter.value(), accuracy=acc_meter.value())

                        if phase == 'train':
                            writer.add_scalar('loss/train', loss_meter.value(), global_step=global_step)
                            writer.add_scalar('accuracy/train', acc_meter.value(), global_step=global_step)
            writer.add_scalar('loss/' + phase, loss_meter.value(), global_step=global_step)
            writer.add_scalar('accuracy/' + phase, acc_meter.value(), global_step=global_step)

        torch.save(model.state_dict(), '%s-%d.pth' % (exp_name, e+1))

    return model

In [4]:
import tqdm

def test_classifier(model, loader):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    model.eval()

    predictions, labels = [], []

    with torch.no_grad():
        with tqdm.tqdm(loader, desc="Testing", unit="batch") as pbar:
            for batch in pbar:
                x = batch[0].to(device)
                y = batch[1].to(device)
                output = model(x)
                preds = output.to("cpu").max(1)[1].numpy()
                labs = y.to("cpu").numpy()
                predictions.extend(list(preds))
                labels.extend(list(labs))

    return np.array(predictions), np.array(labels)

## Funzione per training, print epoche e generazioni grafici tensorboard (più complessa

In [None]:
import os
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import accuracy_score
from os.path import join
import tqdm


def trainval_classifier(model, train_loader, test_loader, exp_name='experiment', lr=0.01, epochs=10, momentum=0.99, logdir='logs'):
    criterion = nn.CrossEntropyLoss()
    optimizer = SGD(model.parameters(), lr=lr, momentum=momentum)
    # meters
    loss_meter = AverageValueMeter()
    acc_meter = AverageValueMeter()
    # writer
    writer = SummaryWriter(join(logdir, exp_name))
    # device
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)
    loader = {'train': train_loader, 'test': test_loader}

    global_step = 0
    os.makedirs("weights", exist_ok=True)

    for e in range(epochs):
        print(f'Epoch {e + 1} of {epochs}')
        # iterazione tra train e test
        for phase in ['train', 'test']:
            loss_meter.reset()
            acc_meter.reset()
            model.train() if phase == 'train' else model.eval()
            with torch.set_grad_enabled(phase == 'train'):
                with tqdm.tqdm(enumerate(loader[phase]), total=len(loader[phase]), desc=f"{phase.capitalize()} Epoch {e + 1}/{epochs}", unit="batch") as pbar:
                    for i, batch in pbar:
                        x = batch[0].to(device)
                        y = batch[1].to(device)
                        output = model(x)

                        # global step update
                        batch_elements = x.shape[0]
                        global_step += batch_elements
                        # loss
                        loss = criterion(output, y)

                        if phase == 'train':
                            loss.backward()
                            optimizer.step()
                            optimizer.zero_grad()

                        acc = accuracy_score(y.to('cpu'), output.to('cpu').max(1)[1])
                        loss_meter.add(loss.item(), batch_elements)
                        acc_meter.add(acc, batch_elements)

                        pbar.set_postfix(loss=loss_meter.value(), accuracy=acc_meter.value())

                        # log dei risultati di training
                        if phase == 'train':
                            writer.add_scalar('Loss/train', loss_meter.value(), global_step)
                            writer.add_scalar('Accuracy/train', acc_meter.value(), global_step)
            # epoca finita
            writer.add_scalar('loss/' + phase, loss_meter.value(), global_step=global_step)
            writer.add_scalar('accuracy/' + phase, acc_meter.value(), global_step=global_step)
        torch.save(model.state_dict(), f'weights/{exp_name}-{e + 1}.pth')
    return model

In [7]:
def test_classifier_v2(model, loader):
    device = "cuda" if torch.cuda.is_available() else "cpu"

    model.to(device)
    model.eval()

    predictions, labels = [], []

    with torch.nograd():
        with tqdm.tqdm(loader, desc="Testing", unit="batch") as pbar:
            for batch in pbar:
                x = batch[0].to(device)
                y = batch[1].to(device)
                output = model(x)
                preds = output.to("cpu").max(dim=1)[1]
                labs = y.to("cpu").numpy()
                predictions.extend(list(preds))
                labels.extend(list(labs))

    return np.array(predictions), np.array(labels)

# MiniAlexNet + DropOut (dal lab)
- Nota: out_classes=100, nel nostro caso dovrebbe essere 22 per le lettere del dataset

In [None]:
from torch import nn
class MiniAlexNet_Dropout(nn.Module):
    def __init__(self, input_channels=3, out_classes=100):
        super(MiniAlexNet_Dropout, self).__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(input_channels, 16, 5, padding=2), # 3 canali in input, 16 mappe di feature in output, kernel 5x5 | input: 3x32x32 -> output: 16x32x32
            nn.MaxPool2d(2), # Max pooling 2x2 | input: 32x32x32 -> output: 32x16x16
            nn.ReLU(),

            nn.Conv2d(16, 32, 5, padding=2), # 16 canali in input, 32 mappe di feature in output, kernel 5x5 | input: 16x16x16 -> output: 32x16x16
            nn.MaxPool2d(2), # Max pooling 2x2 | input: 64x16x16 -> output: 64x8x8
            nn.ReLU(),

            nn.Conv2d(32, 64, 3, padding=1), # 32 canali in input, 64 mappe di feature in output, kernel 3x3 | input: 32x8x8 -> output: 64x8x8
            nn.ReLU(),

            nn.Conv2d(64, 128, 3, padding=1), # 64 canali in input, 128 mappe di feature in output, kernel 3x3 | input: 64x8x8 -> output: 128x8x8
            nn.ReLU(),

            nn.Conv2d(128, 256, 3, padding=1), # 128 canali in input, 256 mappe di feature in output, kernel 3x3 | input: 128x8x8 -> output: 256x8x8
            nn.MaxPool2d(2), # Max pooling 2x2 | input: 128x8x8 -> output: 128x4x4
            nn.ReLU()
        )

        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256*4*4, 2048), #input 256*4*4=4096, output 2048
            nn.ReLU(),

            nn.Dropout(),
            nn.Linear(2048, 1024), #input 2048, output 1024
            nn.ReLU(),

            nn.Linear(1024, out_classes) #input 1024, output 100
        )

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.classifier(x.view(x.shape[0], -1))
        return x

In [None]:
# esempio di utilizzo
mini_alexnet_dropout = MiniAlexNet_Dropout()
# mini_alexnet_dropout = train_classifier(mini_alexnet_dropout, <DataLoader-di-Training>, <DataLoader-di-Testing>, exp_name='mini_alexnet_v2', lr=0.01, epochs=10)