### Imports for model

In [None]:
import pandas as pd
import torch
import numpy as np
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.transforms import v2
from torchvision import transforms
from tqdm.notebook import tqdm
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader, Dataset


### Download Data and convert to tensors

In [None]:
#load the data from the project directory
train_data = pd.read_csv('./data/train_data.csv', header=None)
train_target = pd.read_csv('./data/train_target.csv', header=None)
test_data = pd.read_csv('./data/test_data.csv', header=None)

#divide into training and validation, test size 0.2 to use 20% of the data for validation
train_data, valid_data, train_target, valid_target = train_test_split(train_data, train_target, test_size=.2,random_state=42)

# Convert to tensors, and reshape sizes to be 48x48:
X_train_tensor = torch.Tensor(train_data.values).view(-1, 1,48,48)
Y_train_tensor = torch.Tensor(train_target.values).squeeze().long()
X_val_tensor = torch.Tensor(valid_data.values).view(-1, 1,48,48)
Y_val_tensor = torch.Tensor(valid_target.values).squeeze().long()

X_test_tensor = torch.Tensor(test_data.values).view(-1,1,48,48)

# Get MEAN and STD for normalization during later transformations:
training_mean = torch.mean(X_train_tensor / 255)
training_std= torch.std(X_train_tensor / 255)

### Create custom datasets

In [None]:
# Define class to customize datasets
class ClassifierDataset(Dataset):
    def __init__(self, data, target, transform=None):
        self.data = data
        self.target = target
        self.transform = transform

    def __len__(self):
        return len(self.data)

    # function that returns the image and target, and applies transformation when given a sample
    def __getitem__(self, idx):
        image, target = self.data[idx], self.target[idx]

        if self.transform:
            image = self.transform(image)

        return image, target

In [None]:
# Randomize images and convert to tensors, whilst also normalizing
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(48, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[training_mean], std=[training_std])
])



### Custom datasets and dataloaders

In [None]:
# Create Custom datasets and dataloaders, use transform to randomize and normalize
train_dataset = ClassifierDataset(X_train_tensor, Y_train_tensor, transform=transform)
val_dataset = ClassifierDataset(X_val_tensor, Y_val_tensor, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

### Define Model

In [None]:
# Define custom CNN model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(kernel_size=2, padding=0)
        self.fc1 = nn.Linear(256 * 3 * 3, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 3)
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        x = x.view(-1, 256 * 3 * 3)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

### Begin Training

In [None]:
# hyperparameters
EPOCHS = 100
LEARNING_RATE = 0.001
threshold = 10
leeway = 0
highest_accuracy = 0

# Create model, loss, and optimizer, use scheduler for adaptive LR
model = CNN()

# add custom weights for classes:
class_counts = torch.tensor([3995, 7215, 4965], dtype=torch.float)
class_weights = 1/class_counts
class_weights = class_weights / class_weights.sum()

criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

# Start training with TQDM for progress visualization
for epoch in tqdm(range(EPOCHS)):
    
    # begin training
    model.train()
    for x_train_batch, y_train_batch in train_loader:
        optimizer.zero_grad()
        y_predictions = model(x_train_batch)
        loss = criterion(y_predictions, y_train_batch)
        loss.backward()
        optimizer.step()

    # begin evaluations
    model.eval()
    with torch.no_grad():
        val_epoch_loss, val_epoch_accuracy = 0, 0
        total_samples = 0
        for x_val_batch, y_val_batch in val_loader:
            y_val_pred = model(x_val_batch)
            val_epoch_loss += criterion(y_val_pred, y_val_batch).item()
            _, predicted = torch.max(y_val_pred, 1)
            val_epoch_accuracy += (predicted == y_val_batch).sum().item()
            total_samples += y_val_batch.size(0)

        val_accuracy = val_epoch_accuracy / total_samples
        val_epoch_loss /= len(val_loader)
        scheduler.step(val_epoch_loss)

        # Output progress per epoch
        print(f'Epoch {epoch + 1}/{EPOCHS}, Loss: {loss.item():.4f}, Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
        print(f'Highest val accuracy so far: {highest_accuracy}')

        # Check for improvement
        if val_accuracy > highest_accuracy:
            highest_accuracy, leeway = val_accuracy, 0
            torch.save(model.state_dict(), 'experimentalModel.pt')
            torch.save(optimizer.state_dict(), 'experimentalOptimizer.pt')
        else:
            leeway += 1
            if leeway >= threshold:
                # Reload the best model
                print("Reloading best model and resuming training...")
                model.load_state_dict(torch.load('experimentalModel.pt'))
                optimizer.load_state_dict(torch.load('experimentalOptimizer.pt'))

                # Optional: Reset the scheduler if you want it to adapt from this point
                scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
                
                # Reset the leeway counter and continue training
                leeway = 0


In [None]:
# Create custom dataset for test tensor, fill labels with 0's because we dont have test_target
test_dataset = ClassifierDataset(X_test_tensor, torch.zeros(len(X_test_tensor)), transform=transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# begin evaluations, load best model
model.load_state_dict(torch.load('experimentalModel.pt', map_location='cpu'))
model.eval()
predictions = []

with torch.no_grad():
    for x_test_batch, y_test_batch in test_loader:
        
        outputs = model(x_test_batch)
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.numpy())

# Save outputs to csv
submission_df = pd.DataFrame({'Id': range(len(predictions)), 'Category': predictions})
submission_df.to_csv('experimentalOutput.csv', index=False)