# <p style="text-align: center;">Base Model for TP2 - Do you need more signs?</p>

### **1. Import the Required Libraries**

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
from PIL import Image # Install Pillow -> conda install anaconda::pillow or pip install pillow
import os
from skimage.io import  imread, imshow # Install scikit-image -> conda install scikit-image or pip install scikit-image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import re
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.model_selection import train_test_split

### **2. Load the Image Training and Test Datasets**

#### **i. Get the Image Dataset Paths**

In [None]:
train_dataset_path = 'data-students/TRAIN/'
test_dataset_path = 'data-students/TEST'

#gen_dataset_path = 'GEN_DATASETS/DCGAN_50'
#gen_dataset_path = 'GEN_DATASETS/DCGAN_100'
#gen_dataset_path = 'GEN_DATASETS/DCGAN_500'

gen_dataset_path = 'GEN_DATASETS/DDPM_50'
#gen_dataset_path = 'GEN_DATASETS/DDPM_100'
#gen_dataset_path = 'GEN_DATASETS/DDPM_500'

#gen_dataset_path = 'GEN_DATASETS/VAE_50'
#gen_dataset_path = 'GEN_DATASETS/VAE_100'
#gen_dataset_path = 'GEN_DATASETS/VAE_500'

### ii. Load Image Datasets 

We are going for the tiny version of the dataset!

In [None]:
IMG_WIDTH = 75
IMG_HEIGHT = 75
BATCH_SIZE = 8

Loading the training dataset. Via DataGenerators

In [None]:
from torch.utils.data import ConcatDataset

# Define the transform
transform = transforms.Compose([transforms.Resize((IMG_WIDTH, IMG_HEIGHT)), transforms.ToTensor()])

# Load datasets
generated_dataset = datasets.ImageFolder(root=gen_dataset_path, transform=transform)
traffic_signals_dataset = datasets.ImageFolder(root=train_dataset_path, transform=transform)

AUGMENT_DATASET = True

if AUGMENT_DATASET:
    combined_dataset = ConcatDataset([traffic_signals_dataset, generated_dataset])
    
    # Merge class_to_idx
    class_to_idx_traffic = traffic_signals_dataset.class_to_idx
    class_to_idx_generated = generated_dataset.class_to_idx
    
    combined_class_to_idx = class_to_idx_traffic.copy()
    max_idx = max(combined_class_to_idx.values()) + 1
    for key, value in class_to_idx_generated.items():
        if key in combined_class_to_idx:
            combined_class_to_idx[key] = max_idx
            max_idx += 1
        else:
            combined_class_to_idx[key] = value + max_idx
    
    # Combine targets
    combined_targets = traffic_signals_dataset.targets + [combined_class_to_idx[generated_dataset.classes[idx]] for idx in generated_dataset.targets]
else:
    combined_dataset = traffic_signals_dataset
    combined_class_to_idx = traffic_signals_dataset.class_to_idx
    combined_targets = traffic_signals_dataset.targets

# Reverse the mapping to get the real labels
labels = {value: key for key, value in combined_class_to_idx.items()}
the_real_labels = {idx: label for label, idx in combined_class_to_idx.items()}

# Perform train-test split
train_idx, valid_idx = train_test_split(
    range(len(combined_dataset)),
    test_size=0.1,
    shuffle=True,
    stratify=combined_targets
)

# Create subsets
train_subset = Subset(combined_dataset, train_idx)
valid_subset = Subset(combined_dataset, valid_idx)

# Create data loaders
train_dataset_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
validation_dataset_loader = DataLoader(valid_subset, batch_size=64, shuffle=False)

In [None]:
training_targets = combined_targets
t_targets = {k:0 for k in training_targets}
for t in training_targets:
    t_targets[t] += 1
print('Training class distribution:', t_targets)

Loading the test dataset.

In [None]:
class TestDataset(Dataset):
    def get_int(self, text):
        return [int(c) if c.isdigit() else c for c in re.split('(\d+)', text)]
    
    def __init__(self, images_folder, transform=None):
        self.images_folder = images_folder
        self.image_files = [f for f in os.listdir(images_folder) if os.path.isfile(os.path.join(images_folder, f))]
        self.image_files.sort(key=self.get_int)
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.images_folder, self.image_files[idx])
        image = Image.open(img_name).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

inference_dataset = TestDataset(images_folder=test_dataset_path, transform=transform)

test_dataset_loader = DataLoader(inference_dataset, batch_size=1, shuffle=False)

#### **iii. Get the Label Mappings**

The labels dictionary is made in order to retrive the class names against the label indices used for training the model

In [None]:
### subset version
#the_labels = {value for _, value in train_datagen.class_to_idx.items()}
labels = {value: key for key, value in combined_class_to_idx.items()}
print(labels)
the_real_labels = {}
with open("data-students/labels.csv","r") as label_f:
    for line in label_f.readlines()[1:]:
        label_value, label_description = line.strip().split(";")
        the_real_labels[int(label_value)] = label_description 

print(the_real_labels)

print("Label Mappings for classes present in the training and validation datasets\n")
for key, value in labels.items():
    print(f"{key} : {value} - {the_real_labels[int(value)]}")
the_labels_map = {key: value for key, value in combined_class_to_idx.items()}
print(len(labels))

In [None]:
class CustomCNN(nn.Module):
    def __init__(self, img_width, img_height, num_classes):
        super(CustomCNN, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=128, kernel_size=5, padding='valid')
        self.bn1 = nn.BatchNorm2d(num_features=128)
        
        self.conv2 = nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3, padding='valid', bias=False)
        self.bn2 = nn.BatchNorm2d(num_features=64)
        
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=32, kernel_size=3, padding='valid', bias=False)
        self.bn3 = nn.BatchNorm2d(num_features=32)
        
        #self.fc1 = nn.Linear(32 * self._conv_output_shape(img_width, img_height), 256)  # Assuming square input for simplification
        self.fc1 = nn.Linear(1568,256)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, num_classes)
        
        # L2 regularization is not directly included in layers in PyTorch, 
        # it's typically added to the optimizer during the training step.

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.max_pool2d(x, kernel_size=2)
        
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.max_pool2d(x, kernel_size=2)
        
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.max_pool2d(x, kernel_size=2)
        
        x = torch.flatten(x, 1)  # Flatten all dimensions except batch
        
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        
        return x
    
    def _conv_output_shape(self, img_width, img_height, kernel_size=3, stride=1, padding=0, dilation=1):
        h = ((img_height + (2 * padding) - (dilation * (kernel_size - 1)) - 1) / stride) + 1
        w = ((img_width + (2 * padding) - (dilation * (kernel_size - 1)) - 1) / stride) + 1
        return int(h/8) * int(w/8)  # Considering three max pooling layers with kernel_size=2, stride=2


In [None]:
from sklearn.metrics import balanced_accuracy_score

def train_model(model, criterion, optimizer, train_dataset_loader, num_epochs=10, device='cpu'):
    model.to(device)
    for epoch in range(num_epochs):
        cumulative_loss = 0
        all_targets = []
        all_predictions = []

        for batch_idx, (data, target) in enumerate(train_dataset_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            _, predicted = output.max(1)
            all_targets.extend(target.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

            print(f'Epoch {epoch+1}/{num_epochs}, Step {batch_idx+1}/{len(train_dataset_loader)}, Loss: {loss.item():.4f}')
            cumulative_loss += loss.item()

        # Calculate balanced accuracy for the epoch
        balanced_acc = balanced_accuracy_score(all_targets, all_predictions)
        print(f"Epoch {epoch+1} average loss: {cumulative_loss/len(train_dataset_loader):.4f}")
        print(f"Epoch {epoch+1} balanced accuracy: {balanced_acc:.4f}")

    return model.to("cpu")


In [None]:
device = torch.device("cpu")
if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():    
    device = torch.device("cuda")

num_classes = len(labels)
print(num_classes)
print(device)
num_epochs = 15
model = CustomCNN(IMG_WIDTH, IMG_HEIGHT, num_classes)
criterion = nn.CrossEntropyLoss()  
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)


In [None]:
TRAIN_MODEL = True



if TRAIN_MODEL:
    ccnn = train_model(model, criterion, optimizer, train_dataset_loader, num_epochs, device)
    
else:
    ccnn = torch.load('baseline_model.pth')

In [None]:
from sklearn.metrics import balanced_accuracy_score, confusion_matrix, ConfusionMatrixDisplay
import torch
import matplotlib.pyplot as plt

def test_model(model, dataset_loader, device='cpu'):
    y_real = []
    y_pred = []
    model.to(device)
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in dataset_loader:
            y_real.extend(labels.numpy())
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            y_pred.extend(predicted.cpu().numpy())
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    balanced_acc = balanced_accuracy_score(y_real, y_pred)
    print(f'Accuracy of the model on the {total} test images: {accuracy:.2f} %')
    print(f'Balanced accuracy of the model on the {total} test images: {balanced_acc:.4f}')

    # Generate the confusion matrix
    cm = confusion_matrix(y_real, y_pred)

    # Display the confusion matrix
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.show()


In [None]:
print(test_model(ccnn, validation_dataset_loader))

In [None]:
label_str_id = [
    "12",
    "13",
    "24",
    "38",
    "39",
    "44",
    "46",
    "49",
    "50",
    "6"
]

import csv

def createCSV(model, test_dataset_loader, name, device='cpu'):
    # Move model to the appropriate device
    model.to(device)
    model.eval()

    data = []

    # Determine the directory to save the CSV file
    try:
        import google.colab
        IN_COLAB = True
    except ImportError:
        IN_COLAB = False

    if IN_COLAB:
        save_dir = "/content/drive/MyDrive/Project1-AML/Nic/"
    else:
        save_dir = "/home/stefanotrenti/AML/project2/TP2/CSVs"

    # Define the file name and path
    csv_file = os.path.join(save_dir, name)

    with torch.no_grad():
        for i, images in enumerate(test_dataset_loader):
            images = images.to(device)
            
            # Forward pass
            outputs = model(images)
            test_predictions = torch.argmax(outputs, dim=1)

            # Move predictions back to the CPU
            predicted_classes = test_predictions.cpu().numpy()

            # Iterate over the batch
            predicted_class = int(predicted_classes[0])  # Extract the integer value
            data.append({"ID": i + 1, "Class": label_str_id[predicted_class]})

    # Define the field names
    fields = ["ID", "Class"]

    # Write data to CSV file
    with open(csv_file, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fields)

        # Write the header
        writer.writeheader()

        # Write the data rows
        for row in data:
            writer.writerow(row)

    print("CSV file created successfully.")
    return

In [None]:
#createCSV(ccnn, test_dataset_loader,"ccnn_baseline_with_training.csv")

#createCSV(ccnn, test_dataset_loader,"VAE_50.csv")
#createCSV(ccnn, test_dataset_loader,"VAE_100.csv")
#createCSV(ccnn, test_dataset_loader,"VAE_500.csv")
#createCSV(ccnn, test_dataset_loader,"DCGAN_50.csv")
#createCSV(ccnn, test_dataset_loader,"DCGAN_100.csv")
#createCSV(ccnn, test_dataset_loader,"DCGAN_500.csv")
#createCSV(ccnn, test_dataset_loader,"DDPM_50.csv")
#createCSV(ccnn, test_dataset_loader,"DDPM_100.csv")
#createCSV(ccnn, test_dataset_loader,"DDPM_500.csv")
