# INITIALIZATION

In [1]:
import matplotlib.pyplot as plt
#%matplotlib inline
from PIL import Image 
import os
from skimage.io import  imread, imshow 
import torch
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.metrics import confusion_matrix, balanced_accuracy_score,  mean_squared_error, r2_score, f1_score
from sklearn.model_selection import KFold, train_test_split
import numpy as np

#import torch
import torch.nn as nn
import torch.optim as optim
from torch.functional import F
from torch.nn import Dropout

import warnings
warnings.filterwarnings("ignore")

# DATA LOADING PIPELINE

In [2]:
# Check if the code is running on Google Colab
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    # Mount Google Drive
    from google.colab import drive
    drive.mount('/content/drive')

    train_dataset_path = '/content/drive/MyDrive/Nicoli/data-students/TRAIN'
    test_dataset_path = '/content/drive/MyDrive/Nicoli/data-students/TEST'
else:
    # Load data from local file
    train_dataset_path = 'data-students/TRAIN'
    test_dataset_path = 'data-students/TEST'

# Now you can use file_path to load your data
#print("File path:", file_path)

#FIXED VARIABLES
IMG_WIDTH = 75
IMG_HEIGHT = 75
BATCH_SIZE = 32

#testing variables
seed = 42

In [3]:
#!pip install rembg #needed for colab notebook
from rembg import remove
import io

def remove_background_pil(image):
    # Convert PIL image to bytes
    with io.BytesIO() as output_buffer:
        image.save(output_buffer, format='PNG')
        image_bytes = output_buffer.getvalue()

    # Use rembg to remove the background
    result = remove(image_bytes)

    # Convert the result binary data back to a PIL image
    result_image = Image.open(io.BytesIO(result))

    # Fill transparent pixels with black
    result_image = fill_transparent_pixels_with_black(result_image)

    # Convert the image to RGB mode if it's not already
    if result_image.mode != 'RGB':
        result_image = result_image.convert('RGB')

    return result_image

def fill_transparent_pixels_with_black(image):
    # Convert image to RGBA mode if it's not already
    if image.mode != 'RGBA':
        image = image.convert('RGBA')

    # Get the image data as a pixel access object
    pixel_data = image.load()

    # Iterate over each pixel
    width, height = image.size
    for x in range(width):
        for y in range(height):
            # Check if the pixel is transparent
            if pixel_data[x, y][3] == 0:
                # Set the pixel color to black (RGB: 0, 0, 0, Alpha: 255)
                pixel_data[x, y] = (0, 0, 0, 255)

    return image

In [4]:
from torchvision.transforms import v2

# Define your custom transformation function
augmentation = transforms.Compose([v2.RandomAffine(degrees=(-20, 20), translate=(0.1, 0.2), scale=(0.5, 0.9))])

def removeBackground(image):
    # Apply your custom background removal function
    processed_image = remove_background_pil(image) # remove_background(removal_model, image)

    # Apply other transformations if needed
    transform = transforms.Compose([
        transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
        #transforms.ToTensor(),
        #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    return transform(processed_image)

def Augment(image):
    # Apply your custom background removal function
    transform = transforms.Compose([
        augmentation,
        transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    return transform(image)

def input_transform(image):
    # Apply your custom background removal function
    transform = transforms.Compose([
        transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    return transform(image)

def input_transform_noBG(image):
    # Apply your custom background removal function
    transform = transforms.Compose([
        transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    return transform(image)

normal_transform = transforms.Compose([
        transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
        transforms.ToTensor(),
        #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])


In [5]:
#SAVE THE PREPROCESSED IMAGES IN A FOLDER
torch.cuda.is_available()
True
from torchvision.transforms import v2
import shutil
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#background_removed_path = '/content/drive/MyDrive/Nicoli/data-students/TRAIN-NOBG'

background_removed_path = 'data-students/TRAIN-NOBG'


'''
#only run to generate the folder with images with no background

#------------------------------ONLY RUN ONCE--------------------------------------
 
# Assuming you already have your original dataset and transformed dataset
original_dataset = datasets.ImageFolder(root=train_dataset_path, transform=normal_transform)

# Create a DataLoader
dataloader = DataLoader(original_dataset, batch_size=None, shuffle=False)

transform_to_tensor = transforms.ToTensor()

# Iterate through the DataLoader
for image_path, target in original_dataset.imgs:
    class_name = original_dataset.classes[target]
    class_path = os.path.join(background_removed_path, class_name)
    os.makedirs(class_path, exist_ok=True)

    # Load the image
    image = Image.open(image_path)

    # Apply the transformation
    
    transformed_image = transform_to_tensor(removeBackground(image))

    # Save the transformed image in the corresponding class folder
    image_filename = os.path.basename(image_path)
    image_save_path = os.path.join(class_path, image_filename)
    transformed_image_pil = transforms.ToPILImage()(transformed_image)
    transformed_image_pil.save(image_save_path)  # Save the transformed image

'''


'\n#only run to generate the folder with images with no background\n#------------------------------ONLY RUN ONCE--------------------------------------\n \n# Assuming you already have your original dataset and transformed dataset\noriginal_dataset = datasets.ImageFolder(root=train_dataset_path, transform=normal_transform)\n\n# Create a DataLoader\ndataloader = DataLoader(original_dataset, batch_size=None, shuffle=False)\n\ntransform_to_tensor = transforms.ToTensor()\n\n# Iterate through the DataLoader\nfor image_path, target in original_dataset.imgs:\n    class_name = original_dataset.classes[target]\n    class_path = os.path.join(background_removed_path, class_name)\n    os.makedirs(class_path, exist_ok=True)\n\n    # Load the image\n    image = Image.open(image_path)\n\n    # Apply the transformation\n    \n    transformed_image = transform_to_tensor(removeBackground(image))\n\n    # Save the transformed image in the corresponding class folder\n    image_filename = os.path.basename(im

In [6]:
from torch.utils.data import ConcatDataset
nobg_dataset = datasets.ImageFolder(root=background_removed_path, transform=normal_transform)


# Define how many times you want to enlarge the dataset
enlarge_factor = 1

# Create a list to hold the datasets
combined_datasets = [nobg_dataset]

# Add the transformed dataset to the list multiple times
for _ in range(enlarge_factor):
    transformed_dataset = datasets.ImageFolder(root=background_removed_path, transform=Augment)
    combined_datasets.append(transformed_dataset)

# Concatenate the datasets into a single dataset
enlarged_dataset = ConcatDataset(combined_datasets)


test_set_size = 0.4
#get train & test for K-MEANS
train_val_dataset, test_dataset = train_test_split(enlarged_dataset, test_size = test_set_size, random_state=seed) #random_state = randomizer seed
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [7]:
#to print the label (AUXILIAR)
label_str = [
    "12 - Don't Go Left or Right",
    "13 - Don't Go Right",
    "24 - Go Right",
    "37 - Children crossing",
    "38 - Dangerous curve to the right",
    "39 - Dangerous curve to the left",
    "44 - Go left or straight",
    "50 - Fences",
    "6 - Speed limit (70km/h)"
]
label_str_id = [
    "12",
    "13",
    "24",
    "37",
    "38",
    "39",
    "44",
    "50",
    "6"
]

# AUXILIAY FUNCTIONS

In [8]:
def createCSV(model, test_dataset_loader,type, name, directory):
    import csv

    data = []

    #directory where you want to save the CSV file
    try:
        import google.colab
        IN_COLAB = True
    except ImportError:
        IN_COLAB = False

    if IN_COLAB:
        save_dir = '/content/drive/MyDrive/'
    else:
        save_dir = directory


    #file name
    csv_file = os.path.join(save_dir, name)

    if type == "voting":
      most_voted_classes = voting(model, test_dataset_loader,1)
      for i in range(len(most_voted_classes)):
        predicted_class = int(most_voted_classes[i])  # Extract the integer value
        data.append({"ID": i+1, "Class": label_str_id[predicted_class]}) #, "Name": label_str[test_predictions]})

    else:
      for i, images in enumerate(test_dataset_loader):

          images = images.to(device)
          # Forward pass
          outputs = model(images)
          test_predictions = torch.argmax(outputs, dim=1)

          images = images.cpu().numpy()
          predicted_classes = test_predictions.cpu().numpy()

          # Iterate over the batch
          predicted_class = int(predicted_classes[0])  # Extract the integer value
          data.append({"ID": i+1, "Class": label_str_id[predicted_class]}) #, "Name": label_str[test_predictions]})


    # Define the field names
    fields = ["ID", "Class"]#, "Name"]

    # Write data to CSV file
    with open(csv_file, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fields)

        # Write the header
        writer.writeheader()

        # Write the data rows
        for row in data:
            writer.writerow(row)

    print("CSV file created successfully.")
    return

In [9]:
def evaluate_network(model,dataset_loader, to_device=True):
    # X given input data
    # y corresponding target labels
    full_dataset = []
    for batch in dataset_loader:
        # Assuming each batch is a tuple (inputs, labels)
        inputs, labels = batch
        full_dataset.append((inputs, labels))

        # Concatenate all data points into a single tensor
        X = torch.cat([inputs for inputs, _ in full_dataset], dim=0)
        y = torch.cat([labels for _, labels in full_dataset], dim=0)


    # Set the model to evaluation mode
    model.eval()
    if to_device:
      # Assuming you're using GPU (if available)
        #device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        X = X.to(device)
        y = y.to(device)
        #model = model.to(device)

    # Run the model on the test data
    with torch.no_grad():
        outputs = model(X)
        _, predicted = torch.max(outputs.data, 1)

    # Convert tensors to numpy arrays
    if to_device:
        predicted = predicted.to("cpu")

    predicted_np = predicted.cpu().numpy()
    test_target_np = y.cpu().numpy()


    # Compute confusion matrix and F1 score
    conf_mat = confusion_matrix(test_target_np, predicted_np)
    f1 = f1_score(test_target_np, predicted_np, average='weighted')
    bal_acc = balanced_accuracy_score(y.cpu(), predicted_np)

    #print('Confusion Matrix:\n', conf_mat)
    print('F1 Score: ', f1)
    print('B_acc: ', bal_acc)

    return conf_mat, f1, bal_acc

In [10]:
import os,sys

def saveResults(model, directory, name, modelName, optim, loss):
    # Create the directory if it doesn't exist
    os.makedirs(directory, exist_ok=True)
    
    # Join the directory path with the file name
    results_path = os.path.join(directory, name)
    
    # Check if the file exists
    file_exists = os.path.isfile(results_path)
    
    # Open the file in append mode or write mode depending on whether the file exists
    with open(results_path, 'a' if file_exists else 'w') as f:
        # Redirect stdout to the file
        sys.stdout = f
        print(" ---------------------------------------------------------------------------------------------------------------------- ")
        print(" ----------------------------- Model parameters ------------------------------")
        print("model name:" + modelName)
        print('learning rate  = 0.001')
        print('epochs = 50')
        print('folds = 5')
        print('batch size = 64')
        print('Test set size  = ',test_set_size)
        print("enlarge_factor =", enlarge_factor)
        print('loss = '+ loss)
        print('optimizer = '+ optim)
        print(" ------------------------------- Evaluation  ---------------------------------")
        evaluate_network(model, test_loader)
        print(" ---------------------------------------------------------------------------------------------------------------------- ")
 
    # Reset stdout to its default value (console)
    sys.stdout = sys.__stdout__


In [11]:
def fit(train_loader, val_loader, model, criterion, optimizer, n_epochs, l2 ,to_device=True):
    #Train the network
    loss_values = []
    balanced_accuracy_values = []
    l2_lambda=l2
    for epoch in range(n_epochs):

        model.train()
        running_loss = 0.0

        for X, y in train_loader:
            # Move data to GPU
            X, y = X.to(device), y.to(device)

            #Forward pass
            y_pred = model(X)

            #if criterion -> MSELoss
            #y_one_hot = F.one_hot(y, num_classes=9).float()
            if isinstance(criterion, nn.MSELoss):
                y_one_hot = F.one_hot(y, num_classes=9).float()
                loss = criterion(y_pred, y_one_hot)
            else:
                loss = criterion(y_pred, y)


            # L2 regularization
            l2_reg = 0
            for param in model.parameters():
                l2_reg += torch.norm(param)**2

            loss += l2_lambda * l2_reg

            running_loss += loss.item()

            #Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        train_loss = running_loss / len(train_loader)
        train_acc = balanced_accuracy_score(y.tolist(),y_pred.argmax(dim=1).tolist())

        #Validation
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
          for X, y in val_loader:
            X, y = X.to(device), y.to(device)

            y_pred = model(X)
            if isinstance(criterion, nn.MSELoss):
                y_one_hot = F.one_hot(y, num_classes=9).float()
                loss = criterion(y_pred, y_one_hot)
            else:
                loss = criterion(y_pred, y)
            val_loss += loss.item()

        val_loss = val_loss/len(val_loader)
        val_acc = balanced_accuracy_score(y.tolist(),y_pred.argmax(dim=1).tolist())

        print(f'Epoch [{epoch + 1}/{n_epochs}], '
              f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc*100:.2f}%, '
              f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc*100:.2f}%')

    return train_loss, balanced_accuracy_values, model

In [12]:
def Train_K_FOLDS(k_model, epochs, lr, folds, batch_size, l2, input_size, hidden_size, output_size, optim_type, loss_type):

    kf = KFold(n_splits=folds, shuffle=True, random_state=seed)

    models = []
    index=0
    i = 0
    best_accuracy = 0
    best_model = k_model(input_size, hidden_size, output_size)
    second_best_model = k_model(input_size, hidden_size, output_size)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for fold, (train_index, val_index) in enumerate(kf.split(train_val_dataset)):

        model = k_model(input_size, hidden_size, output_size)
        model.to(device)
        second_best_model.to(device)

        #Creating DataLoaders for training and validation
        train_sampler = torch.utils.data.SubsetRandomSampler(train_index)
        val_sampler = torch.utils.data.SubsetRandomSampler(val_index)
        #
        train_loader = DataLoader(train_val_dataset, batch_size=batch_size, sampler=train_sampler)
        val_loader = DataLoader(train_val_dataset, batch_size=batch_size, sampler=val_sampler)

        #CHOOSE LOSS
        if loss_type == 'MSELoss' or loss_type == 'BCELoss':
            criterion = nn.MSELoss()
            
        elif loss_type == 'cross_entropy':
            criterion = nn.CrossEntropyLoss()
        elif loss_type == 'BCELoss':
            criterion = nn.BCELoss()
        else:
            raise ValueError(f"Unsupported loss type: {loss_type}")
            
        #CHOOSE OPTIMIZER
        if optim_type == 'adam':
            optimizer = optim.Adam(model.parameters(), lr=lr)
        elif optim_type == 'SGD':
            optimizer = optim.SGD(model.parameters(), lr=lr)
        elif optim_type == 'RMSprop':
            optimizer = optim.RMSprop(model.parameters(), lr=lr)
        else:
            raise ValueError(f"Unsupported optimizer type: {optim_type}")

        # Train the model
        train_loss, train_acc,  trained_model = fit(train_loader, val_loader, model, criterion, optimizer, epochs, l2, to_device=False)
        #models.append(trained_model)
        conf_matrix, F_score, bal_acc = evaluate_network(trained_model, val_loader)
        if(bal_acc > best_accuracy):
            best_accuracy = bal_acc
            index = i
            if(fold >1):second_best_model = best_model
            best_model = model

        print("Best model has index ", index , "\n")


    models.append(best_model)
    models.append(second_best_model)

    return best_model, models

# MLP


In [13]:
class MLP_v4(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLP_v4, self).__init__()
        self.lin1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()

        #hidden
        self.lin2 = nn.Linear(hidden_size, hidden_size)

        self.lin3 = nn.Linear(hidden_size, hidden_size)

        self.lin4 = nn.Linear(hidden_size, hidden_size)

        self.lin5 = nn.Linear(hidden_size, hidden_size)

        #hidden
        self.lin6 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten input tensor
        x = self.lin1(x)
        x = self.relu(x)

        #hidden
        x = self.lin2(x)
        x = self.relu(x)

        x = self.lin3(x)
        x = self.relu(x)

        x = self.lin4(x)
        x = self.relu(x)

        x = self.lin4(x)
        x = self.relu(x)

        #hidden
        x = self.lin6(x)
        return x
    
    def get_architecture_name(self):
        name = 'MLP_v4'
        return name

# TRAINING

In [14]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#print(device)

#Initialize model, loss function, and optimizer
input_size = IMG_WIDTH * IMG_HEIGHT * 3 #RGB
#n of classes
output_size = 9

#save_dir = '/home/stefanotrenti/AML/project/AOL_test'
#name = 'AOL_function_tests_MLP.txt'

optimizers = ['adam','SGD','RMSprop']
losses = ['cross_entropy','MSELoss','BCELoss']


best_model_v4, _ = Train_K_FOLDS(MLP_v4, 50, 0.0001, 10, 32, 0.01, input_size, 128, output_size, 'adam', 'cross_entropy')
filename_v4 = best_model_v4.get_architecture_name() + '.txt'
#saveResults(best_model_v4,save_dir,name,filename_v4, 'adam', 'cross_entropy')

cuda:0
Epoch [1/50], Train Loss: 4.2384, Train Acc: 12.50%, Val Loss: 2.1843, Val Acc: 10.00%
Epoch [2/50], Train Loss: 4.0015, Train Acc: 12.50%, Val Loss: 2.1518, Val Acc: 12.87%
Epoch [3/50], Train Loss: 3.8019, Train Acc: 4.17%, Val Loss: 2.1162, Val Acc: 10.00%
Epoch [4/50], Train Loss: 3.6354, Train Acc: 17.78%, Val Loss: 2.0377, Val Acc: 15.43%
Epoch [5/50], Train Loss: 3.4860, Train Acc: 28.57%, Val Loss: 1.9314, Val Acc: 14.32%
Epoch [6/50], Train Loss: 3.3246, Train Acc: 40.00%, Val Loss: 1.8145, Val Acc: 17.46%
Epoch [7/50], Train Loss: 3.1868, Train Acc: 25.00%, Val Loss: 1.7027, Val Acc: 34.26%
Epoch [8/50], Train Loss: 3.0486, Train Acc: 54.76%, Val Loss: 1.5756, Val Acc: 33.33%
Epoch [9/50], Train Loss: 2.8919, Train Acc: 41.67%, Val Loss: 1.5115, Val Acc: 37.04%
Epoch [10/50], Train Loss: 2.7488, Train Acc: 50.00%, Val Loss: 1.3158, Val Acc: 35.71%
Epoch [11/50], Train Loss: 2.6123, Train Acc: 55.56%, Val Loss: 1.2052, Val Acc: 41.67%
Epoch [12/50], Train Loss: 2.5226, 

## LOAD TEST FOLDER 

In [15]:
import re
from torch.utils.data import Dataset

class TestDataset(Dataset):
    def get_int(self, text):
        return [int(c) if c.isdigit() else c for c in re.split('(\d+)', text)]

    def __init__(self, images_folder, transform=None):
        self.images_folder = images_folder
        self.image_files = [f for f in os.listdir(images_folder) if os.path.isfile(os.path.join(images_folder, f))]
        self.image_files.sort(key=self.get_int)
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.images_folder, self.image_files[idx])
        image = Image.open(img_name).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image
    
    
'''
#run if we want to use new test folder every time we use csv

transform_test_set = transforms.Compose([removeBackground,input_transform])

inference_dataset = TestDataset(images_folder=test_dataset_path, transform=transform_test_set)

test_dataset_loader = DataLoader(inference_dataset, batch_size=1, shuffle=False)
'''

#IF YOU WANT TO TAKE IMAGES FROM THE TEST FLDER WITH NO BG

test_nobg_path = 'data-students/TEST-NOBG'

inference_dataset = TestDataset(images_folder=test_nobg_path, transform=input_transform)

test_dataset_loader = DataLoader(inference_dataset, batch_size=1, shuffle=False)



# PREDICT AND CREATE CSV FILE FOR THE RESULTS

In [16]:

directory = "/home/stefanotrenti/AML/project/CSVs"

createCSV(best_model_v4, test_dataset_loader,[],"PREDICTIONS_best_model_v4.csv", directory)

CSV file created successfully.
