In [None]:
from __future__ import print_function
import glob
from itertools import chain
import os
import random
import zipfile
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from tqdm import tqdm,tqdm_notebook
import albumentations as A
from albumentations.pytorch import ToTensorV2



In [None]:
print(f"Torch: {torch.__version__}")

In [None]:
# Training settings
batch_size = 32
epochs = 30
lr = 3e-4
gamma = 0.7
seed = 42
re_mean_sd=False
debug_mode=False
image_size = 224


In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(seed)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Data stream

In [None]:
os.makedirs('data', exist_ok=True)
images_dir = 'images/images'
train_list = glob.glob(os.path.join(images_dir,'*/*.jpg'))
if debug_mode:
    # keep only the first 100 images
    train_list = train_list[:100]
print(f"Number of total images: {len(train_list)}")

#split train and validation and test
train_list, test_list = train_test_split(train_list, test_size=0.1, random_state=seed)
train_list, val_list = train_test_split(train_list, test_size=0.1, random_state=seed)

print(f"Number of train images: {len(train_list)}")
print(f"Number of validation images: {len(val_list)}")
print(f"Number of test images: {len(test_list)}")

labels = [os.path.split(os.path.split(path)[0])[1] for path in train_list]
# kkep only unique labels
labels = list(set(labels))
print("The labels are:" ,labels)


In [None]:
if not debug_mode:
    print(len(train_list))
    random_idx = np.random.randint(1, len(train_list), size=9)
    print(random_idx)
    fig, axes = plt.subplots(3, 3, figsize=(16, 12))

    for idx, ax in enumerate(axes.ravel()):
        img = Image.open(train_list[idx])
        ax.set_title(labels[idx])
        ax.imshow(img)

In [None]:
if re_mean_sd:
    #extract mean and std from train set
    train_transform = transforms.Compose([
        transforms.Resize((image_size, image_size)),
        transforms.ToTensor(),
    ])
    train_dataset = datasets.ImageFolder(images_dir, transform=train_transform)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)

    mean = 0.
    std = 0.
    nb_samples = 0.
    for data, _ in tqdm(train_loader):
        batch_samples = data.size(0)
        data = data.view(batch_samples, data.size(1), -1)
        mean += data.mean(2).sum(0)
        std += data.std(2).sum(0)
        nb_samples += batch_samples

    mean /= nb_samples
    std /= nb_samples
else:
    mean = [0.5035, 0.4448, 0.3722]
    std =  [0.2078, 0.1935, 0.1769]
print(f"mean: {mean}")
print(f"std: {std}")


In [None]:
import cv2 
train_transforms = A.Compose(
    [
        A.LongestMaxSize(max_size=224),
        A.PadIfNeeded(min_height=224, min_width=224, border_mode=cv2.BORDER_CONSTANT),
        A.augmentations.Normalize(mean=mean, std=std),
        A.augmentations.geometric.rotate.RandomRotate90(),
        A.LongestMaxSize(max_size=224),
        A.PadIfNeeded(min_height=224, min_width=224, border_mode=cv2.BORDER_CONSTANT),
        ToTensorV2(),
    ]
)

val_transforms = A.Compose(
    [
        A.LongestMaxSize(max_size=224),
        A.PadIfNeeded(min_height=224, min_width=224, border_mode=cv2.BORDER_CONSTANT),
        A.augmentations.Normalize(mean=mean, std=std),
        ToTensorV2(),
    ]
)


test_transforms = A.Compose(
    [
        A.LongestMaxSize(max_size=224),
        A.PadIfNeeded(min_height=224, min_width=224, border_mode=cv2.BORDER_CONSTANT),
        A.augmentations.Normalize(mean=mean, std=std),
        ToTensorV2(),
    ]
)


In [None]:
class Create_dataset(Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        self.filelength = len(self.file_list)
        return self.filelength

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        img = Image.open(img_path)
        # convert image to numpy array
        img = np.array(img)
        #if it is a grayscale image, repeat it 3 times
        if len(img.shape) == 2:
            img = np.repeat(img[:, :, np.newaxis], 3, axis=2)
        # apply transforms
        img = self.transform(image=img)
        img = img['image']
        #cast to tensor
        #if it is a grayscale image, repeat it 3 times

        label = os.path.split(os.path.split(img_path)[0])[1]
        # map label to index of labels
        label = labels.index(label)
        return img, label


In [None]:
# create dataset
train_dataset = Create_dataset(train_list, transform=train_transforms)
val_dataset = Create_dataset(val_list, transform=val_transforms)
test_dataset = Create_dataset(test_list, transform=test_transforms)

# create dataloader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
# print the sizes of train, validation and test
print(f"Train size: {len(train_loader.dataset)}")
print(f"Validation size: {len(val_loader.dataset)}")
print(f"Test size: {len(test_loader.dataset)}")

In [None]:
def train(model, device, train_loader,val_loader, epochs, output_folder):
    # loss function with label smoothing
    criterion = nn.CrossEntropyLoss(label_smoothing=0.0)
    # optimizer
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    # scheduler
    scheduler = StepLR(optimizer, step_size=1, gamma=gamma)    
    # training loop
    best_accuracy = 0
    train_losses = []
    val_losses = []
    for epoch in range(epochs):
        epoch_loss = 0
        epoch_accuracy = 0
        for data, label in tqdm(train_loader):

            data = data.to(device)
            label = label.to(device)

            output = model(data)
            loss = criterion(output, label)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            acc = (output.argmax(dim=1) == label).float().mean()
            epoch_accuracy += acc / len(train_loader)
            epoch_loss += loss / len(train_loader)

        with torch.no_grad():
            epoch_val_accuracy = 0
            epoch_val_loss = 0
            for data, label in tqdm(val_loader):

                data = data.to(device)
                label = label.to(device)

                val_output = model(data)
                val_loss = criterion(val_output, label)

                acc = (val_output.argmax(dim=1) == label).float().mean()
                epoch_val_accuracy += acc / len(val_loader)
                epoch_val_loss += val_loss / len(val_loader)

        print(
            f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_accuracy:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
        )
        train_losses.append(epoch_loss)
        val_losses.append(epoch_val_loss)
        filename="./trained_models/"+str(output_folder)
        # create folder if it does not exist
        if not os.path.exists(filename):
            os.makedirs(filename)
        if epoch_val_accuracy > best_accuracy:
            best_accuracy = epoch_val_accuracy
            filename="./trained_models/"+str(output_folder)+"/best.pt"
            torch.save(model.state_dict(), filename)
        filename="./trained_models/"+str(output_folder)+"/last.pt"
        torch.save(model.state_dict(), filename)
    # bring train_losses and val_losses to cpu
    train_losses = torch.stack(train_losses).cpu().detach().numpy()
    val_losses = torch.stack(val_losses).cpu().detach().numpy()
    return train_losses, val_losses

In [None]:
from sklearn.metrics import classification_report
def test_model(model_name, model, device, test_loader):
    model.load_state_dict(torch.load(model_name))
    print("Predictions: ")
    # load the best model
    model.load_state_dict(torch.load(str(model_name)))
    model.eval()
    # make predictions and extract the classificaiton report
    y_pred = []
    y_true = []
    for data, label in tqdm(test_loader):
            data = data.to(device)
            label = label.to(device)
            output = model(data)
            y_pred.extend(output.argmax(dim=1).cpu().numpy())
            y_true.extend(label.cpu().numpy())
    print(classification_report(y_true, y_pred, target_names=labels))


### Ora si va ad addestrare i diversi modelli proposti via mail 
#### Quello che vogliamo fare è:
 - Addestrare prima su Resnet50 senza alcun pretraining
 - Addestrare prima su Resnet50 con pretraining
 - Addestrare Resnet50 eseguendo freezing layer di convoluzione e globalpooling
 - Addestrare Resnet50 eseguendo freezing layer dei primi layer di convoluzione
 - Addestrare Resnet50 modificata senza alcun pretraining
 - Addestrare Resnet50 modificata con pretraining
 - Addestrare Resnet50 modificata eseguendo freezing layer di convoluzione e globalpooling
 - Addestrare Resnet50 modificata eseguendo freezing layer dei primi layer di convoluzione

In [None]:
#import the needed libraries
import torchvision.models as models
from resnet import *
from torchsummary import summary
num_classes = len(labels)


#### Resnet50 scratch

In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = models.resnet50(weights=None)
# change the last layer
model.fc = nn.Linear(2048, num_classes)
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_from_scratch"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model


#### Resnet50 pretrained

In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
# change the last layer
model.fc = nn.Linear(2048, num_classes)
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_from_scratch"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model

#### Resnet50 pretrained freeze all

In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
# change the last layer
model.fc = nn.Linear(2048, num_classes)
#freeze all layers except the fully connected
for param in model.parameters():
    param.requires_grad = False
for param in model.fc.parameters():
    param.requires_grad = True
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_pretrained_freeze_all"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model


In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
# change the last layer
model.fc = nn.Linear(2048, num_classes)
#freeze all layers except the layers from the last block
for param in model.parameters():
    param.requires_grad = False
for param in model.layer4.parameters():
    param.requires_grad = True
for param in model.fc.parameters():
    param.requires_grad = True
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_pretrained_freeze_softer"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model

In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = ResNet50(num_classes=num_classes)
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_custom_from_scratch"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model

In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = ResNet50(num_classes=num_classes)
PATH = "models/custom_resnet50.pt"
model.load_state_dict(torch.load(PATH), strict=True)
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_custom_pretrained"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model

In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = ResNet50(num_classes=num_classes)
PATH = "models/custom_resnet50.pt"
model.load_state_dict(torch.load(PATH), strict=True)
#freeze all layers except the fully connected
for param in model.parameters():
    param.requires_grad = False
for param in model.fc.parameters():
    param.requires_grad = True
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_custom_pretrained"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model

In [None]:
#empty gpu cache
torch.cuda.empty_cache()
# create model 
model = ResNet50(num_classes=num_classes)
PATH = "models/custom_resnet50.pt"
model.load_state_dict(torch.load(PATH), strict=True)
#freeze all layers except the layers from the last block
for param in model.parameters():
    param.requires_grad = False
for param in model.layer4.parameters():
    param.requires_grad = True
for param in model.fc.parameters():
    param.requires_grad = True
# print the model summary
summary(model.to('cpu'),  (3, 224, 224), device='cpu')
# move model to GPU if available and compile it thanks to torch 2.0
model = model.to(device)
model = torch.compile(model)
MODEL_PATH = "resnet50_custom_pretrained"
# train the model
train_losses, val_losses =train(model, device, train_loader, val_loader, epochs, MODEL_PATH)
#plot the losses
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.legend()
plt.show()
# test the model
BEST_MODEL_PATH = "./trained_models/"+str(MODEL_PATH)+"/best.pt"
test_model(BEST_MODEL_PATH, model, device, test_loader)
del model