# Import e path

In [1]:
results_save_path = "../../Results/"
dataset_path = "../../Datasets/"

In [2]:
import torchvision
import torch
import os
import time
import csv
import matplotlib.pyplot as plt
import numpy as np
import shutil
from random import randint
import pandas as pd
from PIL import Image
import random
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as functional
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, TensorDataset, Dataset
from torchvision.datasets import ImageFolder
from einops import rearrange
from einops.layers.torch import Rearrange
import torchvision.transforms as transforms
from torchvision.utils import make_grid
import torch.optim.lr_scheduler as lr_scheduler

from vit_pytorch import SimpleViT
from PatchMerger import PatchMerger
from TRAM import TRAM

## Validi per ogni modello

In [3]:
def set_seed(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

set_seed(42)

In [4]:
# Verifica se la GPU è disponibile
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# device = torch.device('cpu')

## Funzioni per train e validation del modello RAW

In [5]:
def train_iter(model, optimz, data_load, loss_val, device, scheduler):
    samples = len(data_load.dataset)
    model.train()

    for i, (data, target) in enumerate(data_load):
        data = data.to(device)
        target = target.to(device)

        optimz.zero_grad()
        out = functional.log_softmax(model(data), dim=1)
        loss = functional.nll_loss(out, target)
        loss.backward()
        optimz.step()
    
        if i % 100 == 0:
            print('[' +  '{:5}'.format(i * len(data)) + '/' + '{:5}'.format(samples) +
                  ' (' + '{:3.0f}'.format(100 * i / len(data_load)) + '%)]  Loss: ' +
                  '{:6.4f}'.format(loss.item()))
    scheduler.step()
    print(scheduler.get_last_lr())
    loss_val.append(loss.item())

def evaluate(model, optimizer, data_load, loss_val, device):
    model.eval()

    samples = len(data_load.dataset)
    # predizioni corrette
    csamp = 0
    tloss = 0

    with torch.no_grad():
        for data, target in data_load:

            data = data.to(device)
            target = target.to(device)

            output = functional.log_softmax(model(data), dim=1)
            loss = functional.nll_loss(output, target, reduction='sum')
            _, pred = torch.max(output, dim=1)

            tloss += loss.item()
            csamp += pred.eq(target).sum()

    aloss = tloss / samples
    loss_val.append(aloss)
    acc = (100.0 * csamp / samples).cpu()

    print('\nAverage test loss: ' + '{:.4f}'.format(aloss) +
          '  Accuracy:' + '{:5}'.format(csamp) + '/' +
          '{:5}'.format(samples) + ' (' +
          '{:4.2f}'.format(acc) + '%)\n')

    return acc

def train_validation(model, optimizer, train_loader, validation_loader, dataset_name, epoche,scheduler, device):
  tr_loss, ts_loss, ts_acc, epoch_time_list = [], [], [], []

  for epoch in range(1, epoche + 1):

      start_time = time.time()

      print(f'Epoch: {epoch}/{epoche}')
      print("INIZIO TRAINING")
      train_iter(model, optimizer, train_loader, tr_loss, device, scheduler= scheduler)
      print("INIZIO VALIDATION")
      acc = evaluate(model, optimizer, validation_loader, ts_loss, device)

      ts_acc.append(acc)


      epoch_time = time.time() - start_time
      epoch_time_list.append(epoch_time)

      print('Execution time:', '{:5.2f}'.format(epoch_time), 'seconds')
      print("#"*40)

  return tr_loss, ts_loss, ts_acc, epoch_time_list

## Funzioni per train e validation del modello SAMPLING

In [6]:
def train_iter_sampling(model, optimz, data_load, loss_val, device, n_patch,scheduler):
    samples = len(data_load.dataset)
    model.train()

    for i, (data, target) in enumerate(data_load):
        data = data.to(device)
        target = target.to(device)

        optimz.zero_grad()
        out = functional.log_softmax(model(data, n_patch), dim=1)
        loss = functional.nll_loss(out, target)
        loss.backward()
        optimz.step()

        if i % 100 == 0:
            print('[' +  '{:5}'.format(i * len(data)) + '/' + '{:5}'.format(samples) +
                  ' (' + '{:3.0f}'.format(100 * i / len(data_load)) + '%)]  Loss: ' +
                  '{:6.4f}'.format(loss.item()))
    scheduler.step()
    print(scheduler.get_last_lr())
    loss_val.append(loss.item())

In [7]:
def evaluate_sampling(model, optimizer, data_load, loss_val, device, n_patch):
    model.eval()

    samples = len(data_load.dataset)
    # predizioni corrette
    csamp = 0
    tloss = 0

    with torch.no_grad():
        for data, target in data_load:

            data = data.to(device)
            target = target.to(device)

            output = functional.log_softmax(model(data, n_patch), dim=1)
            loss = functional.nll_loss(output, target, reduction='sum')
            _, pred = torch.max(output, dim=1)

            tloss += loss.item()
            csamp += pred.eq(target).sum()

    aloss = tloss / samples
    loss_val.append(aloss)
    acc = (100.0 * csamp / samples).cpu()

    print('\nAverage test loss: ' + '{:.4f}'.format(aloss) +
          '  Accuracy:' + '{:5}'.format(csamp) + '/' +
          '{:5}'.format(samples) + ' (' +
          '{:4.2f}'.format(acc) + '%)\n')

    return acc

In [8]:
def train_validation_sampling(model, optimizer, train_loader, validation_loader, dataset_name, epoche, device, n_patch, scheduler):
  tr_loss, ts_loss, ts_acc, epoch_time_list = [], [], [], []

  for epoch in range(1, epoche + 1):

      start_time = time.time()

      print(f'Epoch: {epoch}/{epoche}')
      print("INIZIO TRAINING")
      train_iter_sampling(model, optimizer, train_loader, tr_loss, device, n_patch, scheduler)
      print("INIZIO VALIDATION")
      acc = evaluate_sampling(model, optimizer, validation_loader, ts_loss, device, n_patch)

      ts_acc.append(acc)


      epoch_time = time.time() - start_time
      epoch_time_list.append(epoch_time)

      print('Execution time:', '{:5.2f}'.format(epoch_time), 'seconds')
      print("#"*40)

  return tr_loss, ts_loss, ts_acc, epoch_time_list

In [9]:
def ViT_train_test_save(dataset_name, model, n_patch = None):
    
    model.to(device)
    # definiamo l'ottimizzatore
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = lr_scheduler.StepLR(optimizer,step_size=10, gamma=0.5)
    initial = time.time()
    if n_patch == None: 
        _, _, validation_acc, epoch_time = train_validation(model, optimizer, train_loader, val_loader, dataset_name, epoche, device = device, scheduler = scheduler)
    else:
        _, _, validation_acc, epoch_time = train_validation_sampling(model, optimizer, train_loader, val_loader, dataset_name, epoche, device = device, n_patch = n_patch, scheduler = scheduler)
    print(f'Total Time: {time.time()-initial}')
    
    df = pd.DataFrame({'validation_acc': [tensor.item() for tensor in validation_acc],
                       'epoch_time': epoch_time
                       })
    
    if not os.path.exists(results_save_path):
        os.makedirs(results_save_path)
        
    df.to_csv(f'{results_save_path}{dataset_name}.csv', index=False)

## Dataset

In [10]:
batch_size = 64
img_size = 160
patch_size = 16

mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)

trans = transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0)==1 else x)


transform_train = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.RandomRotation(10),  # Random rotation by 10 degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Color Jitter
    transforms.RandomVerticalFlip(p=0.5),  # Vertical flip with 50% probability
    transforms.RandomResizedCrop(img_size),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    trans,
    transforms.Normalize(mean, std),
])

transform_validation = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
    trans,
    transforms.Normalize(mean, std),
])


# Prepare dataset
trainset = torchvision.datasets.Imagenette(root=dataset_path, split='train', transform=transform_train) #download=True,
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)

validationset = torchvision.datasets.Imagenette(root=dataset_path, split='val', transform=transform_validation) #download=True
val_loader = torch.utils.data.DataLoader(validationset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

classes = trainset.classes
print(classes)

[('tench', 'Tinca tinca'), ('English springer', 'English springer spaniel'), ('cassette player',), ('chain saw', 'chainsaw'), ('church', 'church building'), ('French horn', 'horn'), ('garbage truck', 'dustcart'), ('gas pump', 'gasoline pump', 'petrol pump', 'island dispenser'), ('golf ball',), ('parachute', 'chute')]


# RUN

### Parameters

In [11]:
dim_base = 768
dim_small = 384

heads_base = 12
heads_small = 6

n_patch_75 = [100, 100, 100, 75, 75, 75, 56, 56, 56, 42, 42, 42]
n_patch_50 = [100, 100, 100, 50, 50, 50, 25, 25, 25, 12, 12, 12]

n_patch_75_PM = [(2,75),(5,56),(8,42)]
n_patch_50_PM = [(2,50),(5,25),(8,12)]

epoche = 1
learning_rate = 0.0001

### SimpleViT

In [12]:
ViTnet_base = SimpleViT(
        image_size = img_size,
        patch_size = patch_size,
        num_classes = 10,
        dim = dim_base,
        depth = 12,
        heads = heads_base,
        mlp_dim = dim_base*4,
)


ViTnet_small = SimpleViT(
        image_size = img_size,
        patch_size = patch_size,
        num_classes = 10,
        dim = dim_small,
        depth = 12,
        heads = heads_small,
        mlp_dim = dim_small*4,
)

In [None]:
ViT_train_test_save(dataset_name = "imaginette_SimpleViTBase_75%", model = ViTnet_base)
ViT_train_test_save(dataset_name = "imaginette_SimpleViTBase_50%", model = ViTnet_base)


ViT_train_test_save(dataset_name = "imaginette_SimpleViTSmall_75%", model = ViTnet_small)
ViT_train_test_save(dataset_name = "imaginette_SimpleViTSmall_50%", model = ViTnet_small)

### TRAM

In [13]:
ViTnet_base = TRAM(
        image_size = img_size,
        patch_size = patch_size,
        num_classes = 10,
        dim = dim_base,
        depth = 12,
        heads = heads_base,
        mlp_dim = dim_base*4,
)


ViTnet_small = TRAM(
        image_size = img_size,
        patch_size = patch_size,
        num_classes = 10,
        dim = dim_small,
        depth = 12,
        heads = heads_small,
        mlp_dim = dim_small*4,
)

In [14]:
ViT_train_test_save(dataset_name = "imaginette_SimpleViTBase_TRAM_75%", model = ViTnet_base, n_patch = n_patch_75)
ViT_train_test_save(dataset_name = "imaginette_SimpleViTBase_TRAM_50%", model = ViTnet_base, n_patch = n_patch_50)


ViT_train_test_save(dataset_name = "imaginette_SimpleViTSmall_TRAM_75%", model = ViTnet_small, n_patch = n_patch_75)
ViT_train_test_save(dataset_name = "imaginette_SimpleViTSmall_TRAM_50%", model = ViTnet_small, n_patch = n_patch_50)

Epoch: 1/1
INIZIO TRAINING
[0.0001]
INIZIO VALIDATION

Average test loss: 1.9280  Accuracy: 1228/ 3925 (31.29%)

Execution time: 115.46 seconds
########################################
Total Time: 115.45905828475952
Epoch: 1/1
INIZIO TRAINING
[0.0001]
INIZIO VALIDATION

Average test loss: 1.8440  Accuracy: 1379/ 3925 (35.13%)

Execution time: 85.03 seconds
########################################
Total Time: 85.03131008148193
Epoch: 1/1
INIZIO TRAINING
[0.0001]
INIZIO VALIDATION

Average test loss: 1.9115  Accuracy: 1259/ 3925 (32.08%)

Execution time: 38.62 seconds
########################################
Total Time: 38.6196825504303
Epoch: 1/1
INIZIO TRAINING
[0.0001]
INIZIO VALIDATION


KeyboardInterrupt: 

### PatchMerger

In [15]:
ViTnet_base = PatchMerger(
        image_size = img_size,
        patch_size = patch_size,
        num_classes = 10,
        dim = dim_base,
        depth = 12,
        heads = heads_base,
        mlp_dim = dim_base*4,
)


ViTnet_small = PatchMerger(
        image_size = img_size,
        patch_size = patch_size,
        num_classes = 10,
        dim = dim_small,
        depth = 12,
        heads = heads_small,
        mlp_dim = dim_small*4,
)

NameError: name 'Transformer_PatchMerger' is not defined

In [None]:
ViT_train_test_save(dataset_name = "imaginette_SimpleViTBase_PatchMerger_75%", model = ViTnet_base, n_patch = n_patch_75_PM)
ViT_train_test_save(dataset_name = "imaginette_SimpleViTBase_PatchMerger_50%", model = ViTnet_base, n_patch = n_patch_50_PM)

ViT_train_test_save(dataset_name = "imaginette_SimpleViTSmall_PatchMerger_75%", ViTnet_small, n_patch = n_patch_75_PM)
ViT_train_test_save(dataset_name = "imaginette_SimpleViTSmall_PatchMerger_50%", ViTnet_small, n_patch = n_patch_50_PM)