In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import json
from numpy import asarray
from PIL import Image

In [None]:
os.chdir(r'/home/gusripama@GU.GU.SE/machine_learning_2/project/CODE')

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet50
import torchvision.models as models
import torchmetrics
from my_resnet50 import my_ResNet50
import tqdm
from sklearn.model_selection import ParameterGrid
import pandas as pd
import time
from thop import profile

In [None]:
# Change these directory path in base of where the data json and images are
# dir_data_json= r'/home/matteo/Desktop/GU/2nd_year/machine_learning_advanced/project_course/CODE/data'
# dir_img= r'/home/matteo/Desktop/GU/2nd_year/machine_learning_advanced/project_course/CODE/images'
dir_data_json= r'/home/gusripama@GU.GU.SE/machine_learning_2/project/CODE/data'
dir_img= r'/home/gusripama@GU.GU.SE/machine_learning_2/project/CODE/images'


## **1. Dataset generation**

The dataset creation happens through the file dataset_creation.py ; run it and it will create the dataset needed to work with!

Here the list of probability distirbutions images that will be created:

- Beta, Chi-squared, Exponential, Gamma, Laplace, Normal, Uniform, Weibul

The size of the dataset is about to be: ----

## **2. Load Data**

#### **2.1 Open file**

In [None]:
# Let's explore the info.json file
os.chdir(dir_data_json)

with open('data.json', 'r') as file:
    data = json.load(file)

#### **2.2 Data Structure**

##### 2.2.1 Mapping distribution names with integers labels

In [None]:
distr_list= []
for key, val in data.items():
    distr_list.append(val['label'])
# Keep unique values
distr_list= list(set(distr_list))

# Map values wiht integers
distr_to_int= {}
int_to_distr= {}
for i in range(len(distr_list)):
    distr_to_int[distr_list[i]]= i
    int_to_distr[i]= distr_list[i]

##### 2.2.2 Data structure

In [None]:
# my_data= [('img_0.png', 1), ('img_1.png', 4), ...]
my_data= {}
for name_img, vals in data.items():
    distr_cat= distr_to_int[vals['label']]
    my_data[name_img]= distr_cat

#### **2.3 Dataset creation**

In [None]:
# change directory where the data(images) are
os.chdir(dir_img)

In [None]:
# Set to transform the image to tensor and crop it
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

In [None]:
my_dt= []
idx=0

for name_file in os.listdir(dir_img):
    if idx%100 == 0:
        print(idx)
    idx+=1

    # regular matrix of the image
    img = Image.open(name_file).convert('RGB')
    np_img = transform(img)

    img_distr_cat= my_data[name_file]

    my_dt.append((np_img, img_distr_cat))

## **3. Models**

#### **3.1 Preparing data**

In [None]:
dt_number_classes= len(distr_list)

# Baseline hyperparameters
batch_size= 4
epochs= 10

# train 80 - validation 10 - test 10
s_80= int(80*len(my_dt)/100)
s_90= s_80 + int(10*len(my_dt)/100)

my_train= my_dt[:s_80]
my_val= my_dt[s_80:s_90]
my_test= my_dt[s_90:]

#### **3.2 Models**

##### **3.2.1 Setting criterion and device**

In [None]:
# Define the loss function
criterion = nn.CrossEntropyLoss()

# Move the model to the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Models names to save the model after training
models_name= ['resnet50_scratch_baseline.pth', 'resnet50_scratch_preTrained.pth']

##### **Scratch ResNet 50**

In [None]:
# Load the model
model_rn50_scratch= my_ResNet50(img_channels=3, num_classes= dt_number_classes)

optimizer_scratch = optim.SGD(model_rn50_scratch.parameters(), lr=0.001, momentum=0.9)
model_rn50_scratch = model_rn50_scratch.to(device)

##### **Pre-trained ResNet 50**

In [None]:
# Replace last layer with number of classes I have
model_preTrain = resnet50(pretrained=True)

# Replacing the last layer to match number of classes
num_features = model_preTrain.fc.in_features
model_preTrain.fc = nn.Linear(num_features, dt_number_classes)

optimizer_preTrain = optim.SGD(model_preTrain.parameters(), lr=0.001, momentum=0.9)
model_preTrain = model_preTrain.to(device)

## **4. Training and Testing**

#### **4.1 Training and Test**

For the baseline model we use only 500 images.

In [None]:
def train_test_model(model, dt_train, dt_test, criterion, optimizer, device, epochs, batch_size, number_of_classes):

    df_results= {'Epoch': [], 
                 'Train Loss': [],
                 'Test Loss': [],
                 'Test Accuracy': [],
                 'Test Recall': [],
                 'Test Precision': [],
                 'Test F1': [],
                 'Labels_Preds': []}

    dataloader_train = DataLoader(dt_train[:500], batch_size=batch_size, shuffle=True)
    dataloader_test = DataLoader(dt_test[:500], batch_size=batch_size, shuffle=True)

    # Train the model
    for epoch in range(epochs):
        print(f'\nEpoch: {epoch+1}')
        # Train the model on the training set
        model.train()
        train_loss = 0
        for i, (inputs, labels) in enumerate(tqdm.tqdm(dataloader_train)):
            # Move the data to the device
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Update the training loss
            train_loss += loss.item() #* inputs.size(0) # inputs.size(0) is the numbe rof samples in the batch



        # Evaluate the model on the test set
        model.eval()
        test_loss = 0
        test_acc = 0

        # Save pair of labels and predictions in case to observe better what's gone wrong for which classes
        labels_preds= []

        
        with torch.no_grad():
            for i, (inputs, labels) in enumerate(tqdm.tqdm(dataloader_test)):
                # Move the data to the device
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Forward
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                # Predictions
                _, preds = torch.max(outputs, 1)

                # Update the test loss and accuracy
                test_loss += loss.item() #* inputs.size(0)
                test_acc += torch.sum(preds == labels.data)

                # save labels and rpedictions
                labels_preds.append(preds.tolist())



        # training/test loss and accuracy normalization based on the lenght of the data
        train_loss = train_loss / len(dataloader_train) #*batch_size
        test_loss = test_loss / len(dataloader_test) #*batch_size
        test_acc = (test_acc.double() / (len(dataloader_test)*batch_size)).item()
        
        df_results['Epoch'].append(epoch+1)
        df_results['Train Loss'].append(round(train_loss, 4))
        df_results['Test Loss'].append(round(test_loss, 4))
        df_results['Test Accuracy'].append(round(test_acc, 4))
        df_results['Labels_Preds'].append(labels_preds)

    return df_results

In [None]:
result_scratch_ResNet= train_test_model(model_rn50_scratch, my_train, my_test, criterion, optimizer_scratch, device, 
                                       epochs=10, batch_size=4, number_of_classes= dt_number_classes)

result_preTrain_ResNet= train_test_model(model_preTrain, my_train, my_test, criterion, optimizer_preTrain, device, 
                                         epochs=10, batch_size=4, number_of_classes= dt_number_classes)

#### **4.2 Visual Results**

In [None]:
plt.figure(figsize=(20, 5))

plt.subplot(1, 4, 1)
plt.plot(result_scratch_ResNet['Train Loss'], label='Scratch ResNet50')
plt.plot(result_preTrain_ResNet['Train Loss'], label='Pre-trained ResNet50')
plt.ylim(0,3)
plt.legend()
plt.title('Train Loss')

plt.subplot(1, 4, 2)
plt.plot(result_scratch_ResNet['Test Loss'], label='Scratch ResNet50')
plt.plot(result_preTrain_ResNet['Test Loss'], label='Pre-trained ResNet50')
plt.ylim(0,3)
plt.legend()
plt.title('Test Loss')

plt.subplot(1, 4, 3)
plt.plot(result_scratch_ResNet['Test Accuracy'], label='Scratch ResNet50')
plt.plot(result_preTrain_ResNet['Test Accuracy'], label='Pre-trained ResNet50')
plt.ylim(0,1)
plt.legend()
plt.title('Test Accuracy')

plt.tight_layout()
plt.show()

## **5. Fine-tuning hyperparameters**

Due to computational cost, only 100 images have been

#### **5.1 Fine-tuning function**

In [None]:
def hyperparam_finetune(epochs, batch_size, model, optimizer, dt_train, dt_val):

    dataloader_train = DataLoader(dt_train[:500], batch_size=batch_size, shuffle=True)
    dataloader_val = DataLoader(dt_val[:500], batch_size=batch_size, shuffle=True)

    for epoch in range(epochs):
        model.train()
        train_loss = 0

        for i, (inputs, labels) in enumerate(tqdm.tqdm(dataloader_train)):
            # Move the data to the device
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Update the training loss
            train_loss += loss.item() #* inputs.size(0)

        
        # Evaluate the model on the test set
        model.eval()
        val_loss = 0
        val_acc = 0

        with torch.no_grad():
            for i, (inputs, labels) in enumerate(tqdm.tqdm(dataloader_val)):
                # Move the data to the device
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Forward
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                # Predictions
                _, preds = torch.max(outputs, 1)

                # Update the test loss and accuracy
                val_loss += loss.item() #* inputs.size(0)
                val_acc += torch.sum(preds == labels.data)

        # training/test loss and accuracy normalization based on the lenght of the data
        train_loss = train_loss / len(dataloader_train) #*batch_size
        val_loss = val_loss / len(dataloader_val) #*batch_size
        val_acc = (val_acc.double() / (len(dataloader_val)*batch_size)).item()

        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")


    # Store the results
    result_dic = {
        'params': (batch_size, epochs),
        'val_loss': val_loss,
        'val_accuracy': val_acc,
        'epochs_trained': epoch + 1
    }
    
    return result_dic

def best_score_update(results, best_score, params, best_params):
    print(results)
    test_acc= results['val_accuracy']

    if test_acc > best_score:
        best_score = test_acc
        best_params = params
    
    return best_score, best_params

#### **5.2 Fine-tuning process**

In [None]:
my_param_grid= {'batch_size': [4, 8, 16],
             'epochs': [10, 20, 50]}

criterion = nn.CrossEntropyLoss()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

results_scratch, results_pretrain = [], []
best_score_scratch, best_score_pretrain = 0, 0
best_params_scratch, best_params_pretrain = None, None

for params in ParameterGrid(my_param_grid):
    print(f"\nTesting combination: {params}")

    model_scratch= my_ResNet50(img_channels=3, num_classes= dt_number_classes)
    optimizer_scratch = optim.SGD(model_scratch.parameters(), lr=0.001, momentum=0.9)
    model_scratch = model_scratch.to(device)

    model_preTrain = resnet50(pretrained=True)
    num_features = model_preTrain.fc.in_features
    model_preTrain.fc = nn.Linear(num_features, dt_number_classes)
    optimizer_preTrain = optim.SGD(model_preTrain.parameters(), lr=0.001, momentum=0.9)
    model_preTrain = model_preTrain.to(device)

    result_dic_scratch= hyperparam_finetune(epochs= params['epochs'], batch_size= params['batch_size'],
                                         model= model_scratch, optimizer= optimizer_scratch, 
                                         dt_train= my_train, dt_val= my_val)
    
    result_dic_pretrain= hyperparam_finetune(epochs=  params['epochs'], batch_size= params['batch_size'],
                                          model= model_preTrain, optimizer= optimizer_preTrain,
                                          dt_train= my_train, dt_val= my_val)

    results_scratch.append(result_dic_scratch)
    results_pretrain.append(result_dic_pretrain)

    best_score_scratch, best_params_scratch= best_score_update(result_dic_scratch, best_score_scratch, params, best_params_scratch)
    best_score_pretrain, best_params_pretrain= best_score_update(result_dic_pretrain, best_score_pretrain, params, best_params_pretrain)

## **6. Train best models**

Train the best models with different size of the dataset

#### **6.1 Define best hyperparameters**

In [None]:
batch_size_best_scratch, epoch_best_scratch= best_params_scratch['batch_size'], best_params_scratch['epochs']
batch_size_best_preTrain, epoch_best_preTrain= best_params_pretrain['batch_size'], best_params_pretrain['epochs']
dataset_size= [100, 1000, 10000]

#### **6.2 Functions to compute computational complexity: time, number of operations (FLOPs), and model size**

In [None]:
def measure_time_train_test(model, dt_train, dt_test, criterion, optimizer, device,
                            epochs, batch_size, number_of_classes):
    start = time.time()
    results = train_test_model(model, dt_train, dt_test, criterion, optimizer,
                               device, epochs, batch_size, number_of_classes)
    end = time.time()
    elapsed = end - start  # seconds
    return results, elapsed

def measure_flops(model, input_size, device):
    """
    input_size: tuple (1, 3, H, W) for a single input
    """
    model = model.to(device)
    model.eval()
    dummy_input = torch.randn(*input_size).to(device)
    macs, params = profile(model, inputs=(dummy_input,), verbose=False)  # MACs and params
    return macs, params

def measure_model_size_mb(model, temp_path="temp_model.pth"):
    torch.save(model.state_dict(), temp_path)
    size_mb = os.path.getsize(temp_path) / (1024 * 1024)
    os.remove(temp_path)
    return size_mb


#### **6.3 Training and testing**

In [None]:
criterion = nn.CrossEntropyLoss()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

results_by_dt_size_scratch= {}
results_by_dt_size_preTrain= {}

input_size= (1, 3, 224, 224)


for dt_size in dataset_size:

    model_rn50_scratch= my_ResNet50(img_channels=3, num_classes= dt_number_classes)
    optimizer_scratch = optim.SGD(model_rn50_scratch.parameters(), lr=0.001, momentum=0.9)
    model_rn50_scratch = model_rn50_scratch.to(device)

    model_preTrain = resnet50(pretrained=True)
    num_features = model_preTrain.fc.in_features
    model_preTrain.fc = nn.Linear(num_features, dt_number_classes)
    optimizer_preTrain = optim.SGD(model_preTrain.parameters(), lr=0.001, momentum=0.9)
    model_preTrain = model_preTrain.to(device)
        
    dt_train_sized= my_train[:dt_size]
    dt_test_sized= my_test[:dt_size]



    scratch_results, scratch_time = measure_time_train_test(
        model_rn50_scratch, dt_train_sized, dt_test_sized,
        criterion, optimizer_scratch, device,
        epochs=epoch_best_scratch, batch_size=batch_size_best_scratch,
        number_of_classes=dt_number_classes)

    scratch_macs, scratch_params = measure_flops(model_rn50_scratch, input_size, device)

    scratch_size_mb = measure_model_size_mb(model_rn50_scratch)
    
    results_by_dt_size_scratch[f'{dt_size}'] = {
        'results': scratch_results,
        'time_sec': scratch_time,
        'MACs_per_forward': scratch_macs,
        'params_count': scratch_params,
        'model_size_MB': scratch_size_mb,
    }



    pretrain_results, pretrain_time = measure_time_train_test(
        model_preTrain, dt_train_sized, dt_test_sized,
        criterion, optimizer_preTrain, device,
        epochs=epoch_best_preTrain, batch_size=batch_size_best_preTrain,
        number_of_classes=dt_number_classes
    )

    pretrain_macs, pretrain_params = measure_flops(model_preTrain, input_size, device)
    pretrain_size_mb = measure_model_size_mb(model_preTrain)

    results_by_dt_size_preTrain[f'{dt_size}'] = {
        'results': pretrain_results,
        'time_sec': pretrain_time,
        'MACs_per_forward': pretrain_macs,
        'params_count': pretrain_params,
        'model_size_MB': pretrain_size_mb,
    }
    



#### **6.4 Visual representation results**

In [None]:
# Convert keys to sorted integer sizes
sizes = sorted(int(k) for k in results_by_dt_size_scratch.keys())

# extract metrics
def get_metric_from_results(results_dict, metric_key, epoch=-1):
    """
    metric_key: 'time_sec', 'model_size_MB', 'MACs_per_forward', or one of the df_results keys
                inside ['results'] such as 'Test Accuracy', 'Test Loss', 'Test F1', etc.
    epoch: which epoch index to use (-1 = last epoch)
    """
    values = []
    for sz in sizes:
        entry = results_dict[str(sz)]
        if metric_key in ['time_sec', 'model_size_MB', 'MACs_per_forward', 'params_count']:
            values.append(entry[metric_key])
        else:
            metric_list = entry['results'][metric_key]
            values.append(metric_list[epoch])
    return values


scratch_acc = get_metric_from_results(results_by_dt_size_scratch, 'Test Accuracy')
pretrain_acc = get_metric_from_results(results_by_dt_size_preTrain, 'Test Accuracy')

plt.figure(figsize=(6,4))
plt.plot(sizes, scratch_acc, marker='o', label='Scratch ResNet50')
plt.plot(sizes, pretrain_acc, marker='s', label='Pretrained ResNet50')
plt.xscale('log') 
plt.xlabel('Dataset size')
plt.ylabel('Test accuracy')
plt.ylim(0,1)
plt.title('Test accuracy vs dataset size')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()



In [None]:
def compute_total_macs(results_dict, epochs, batch_size):
    total_macs_list = []
    for sz in sizes:
        entry = results_dict[str(sz)]
        macs_per_forward = entry['MACs_per_forward']
        num_batches = math.ceil(sz / batch_size)
        total_macs = macs_per_forward * 2 * num_batches * epochs  # forward + backward
        total_macs_list.append(total_macs)
    return total_macs_list

# Use the best epochs and batch sizes found earlier
scratch_total_macs = compute_total_macs(
    results_by_dt_size_scratch,
    epochs=epoch_best_scratch,
    batch_size=batch_size_best_scratch
)

pretrain_total_macs = compute_total_macs(
    results_by_dt_size_preTrain,
    epochs=epoch_best_preTrain,
    batch_size=batch_size_best_preTrain
)

plt.figure(figsize=(6,4))
plt.plot(sizes, scratch_total_macs, marker='o', label='Scratch ResNet50')
plt.plot(sizes, pretrain_total_macs, marker='s', label='Pretrained ResNet50')
plt.xscale('log')       
plt.yscale('log')      
plt.xlabel('Dataset size (number of samples)')
plt.ylabel('Total MACs (approx. training compute)')
plt.title('Approximate Total MACs vs Dataset Size')
plt.legend()
plt.grid(True, which='both', alpha=0.3)
plt.tight_layout()
plt.show()

In [None]:
# Time (training+testing) vs dataset size
scratch_time = get_metric_from_results(results_by_dt_size_scratch, 'time_sec')
pretrain_time = get_metric_from_results(results_by_dt_size_preTrain, 'time_sec')

plt.figure(figsize=(6,4))
plt.plot(sizes, scratch_time, marker='o', label='Scratch ResNet50')
plt.plot(sizes, pretrain_time, marker='s', label='Pretrained ResNet50')
plt.xscale('log')  
plt.xlabel('Dataset size')
plt.ylabel('Time (s)')
plt.title('Total train+test time vs dataset size')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
