# Setup

In [None]:
!pip install kaggle wandb onnx -Uq
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!mkdir ~/.kaggle
!echo "{\"username\":\"dachisuramelashvili\",\"key\":\"4202ec60e20b612a9947450bb8aeebb5\"}" > ~/.kaggle
!cp /content/drive/MyDrive/ML/kaggle.json ~/.kaggle/kaggle.json
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle competitions download -c challenges-in-representation-learning-facial-expression-recognition-challenge
!unzip challenges-in-representation-learning-facial-expression-recognition-challenge.zip

In [None]:
import torch # Main PyTorch Library
from torch import nn # Used for creating the layers and loss function
from torch.optim import Adam # Adam Optimizer
import torchvision.transforms as transforms # Transform function used to modify and preprocess all the images
from torch.utils.data import Dataset, DataLoader # Dataset class and DataLoader for creating the objects
from sklearn.preprocessing import LabelEncoder # Label Encoder to encode the classes from strings to numbers
import matplotlib.pyplot as plt # Used for visualizing the images and plotting the training progress
from PIL import Image # Used to read the images from the directory
import pandas as pd # Used to read/create dataframes (csv) and process tabular data
import numpy as np # preprocessing and numerical/mathematical operations
import os # Used to read the images path from the directory
import random
import wandb
from sklearn.metrics import classification_report, confusion_matrix

device = "cuda" if torch.cuda.is_available() else "cpu" # detect the GPU if any, if not use CPU, change cuda to mps if you have a mac

print("Device available: ", device)

# Get Data

In [None]:
data = pd.read_csv("train.csv")

In [None]:
data.head()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image

def display_faces(image_data_list, num_images_to_display, image_shape=(48, 48)):
  """
  Displays a specified number of images side by side from a list of image data.

  Args:
    image_data_list: A list of image data (e.g., pixel strings or NumPy arrays).
    num_images_to_display: The number of images to display.
    image_shape: A tuple representing the shape of each image (height, width).
  """
  image_data_list = image_data_list['pixels'].tolist()

  # Ensure we don't try to display more images than available
  actual_images_to_display = min(num_images_to_display, len(image_data_list))

  print(f"Attempting to display {num_images_to_display} images.")
  if actual_images_to_display < num_images_to_display:
      print(f"Warning: Only {actual_images_to_display} images available in the list.")

  plt.figure(figsize=(actual_images_to_display * 3, 3))

  for i in range(actual_images_to_display):
    plt.subplot(1, actual_images_to_display, i + 1)

    image_data = image_data_list[i]

    if isinstance(image_data, str):
      pixel_list = [int(pixel) for pixel in image_data.split()]
      image_array = np.array(pixel_list).reshape(image_shape)
    elif isinstance(image_data, np.ndarray):
      image_array = image_data.reshape(image_shape)
    else:
      print(f"Warning: Unsupported image data type at index {i}. Skipping.")
      continue

    image = Image.fromarray(image_array.astype(np.uint8))

    plt.imshow(image, cmap='gray')
    plt.axis('off')

  plt.tight_layout()
  plt.show()

In [None]:
display_faces(data, 15)

In [None]:
overfit_data=data[:16]
train=data.sample(frac=0.7,random_state=42)
val=data.drop(train.index).sample(frac=0.5, random_state=42)
test=data.drop(train.index).drop(val.index)

In [None]:
overfit_data.shape, train.shape , val.shape , test.shape

In [None]:
import matplotlib.pyplot as plt
import numpy as np
def compute_mean_image(dataframe, image_shape=(48, 48)):
  """
  Computes the mean image from a DataFrame containing image pixel data.

  Args:
    dataframe: A pandas DataFrame with a 'pixels' column containing space-separated
               string representations of pixel values.
    image_shape: A tuple representing the shape of each image (height, width).

  Returns:
    A NumPy array representing the mean image.
  """
  all_images = []
  for pixel_string in dataframe['pixels']:
    print(pixel_string)
    pixel_list = [int(pixel) for pixel in pixel_string.split()]
    image_array = np.array(pixel_list).reshape(image_shape)
    all_images.append(image_array)

  mean_image = np.mean(all_images, axis=0)
  return mean_image

mean_image = compute_mean_image(train)

plt.figure(figsize=(5, 5))
plt.imshow(mean_image.astype(np.uint8), cmap='gray')
plt.title('Mean Image')
plt.axis('off')
plt.show()

# Preprocess

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
])

In [None]:
from torch.utils.data import Dataset
import torch
import numpy as np
from PIL import Image

class FacialExpressionDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_pixels = self.dataframe.iloc[idx]['pixels']
        label = int(self.dataframe.iloc[idx]['emotion'])  # still int here
        label = torch.tensor(label, dtype=torch.long)     # convert to tensor

        pixel_list = [int(pixel) for pixel in img_pixels.split()]
        image_array = np.array(pixel_list).reshape(48, 48).astype(np.uint8)
        image = Image.fromarray(image_array, mode='L')

        if self.transform:
            image = self.transform(image).to(device)  # keep on CPU

        return image, label


In [None]:
def to_loader(dataframe, transform, batch_size=16):
  dataset = FacialExpressionDataset(dataframe, transform)
  loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
  return loader

In [None]:
def show_image(image):
  plt.imshow(image.squeeze(0).numpy(), cmap='gray')
  plt.title("Mean Image")
  plt.axis("off")
  plt.show()

In [None]:
dataset = FacialExpressionDataset(data, transform)

In [None]:
dataset.__getitem__(3)

# Model

In [None]:
class BaseNN(nn.Module):
  def __init__(self, hidden_dims: list[int] = [], normalization: str = '', dropout: float = 0):
    super().__init__()

    dims = [48*48] + hidden_dims + [7]

    l = len(dims)
    self.layers = nn.ModuleList()
    self.layers.append(nn.Flatten())
    for i in range(l-2):
      self.layers.append(nn.Linear(dims[i], dims[i+1]))
      if normalization == 'batch':
        self.layers.append(nn.BatchNorm1d(dims[i+1]))
      elif normalization == 'layer':
        self.layers.append(nn.LayerNorm(dims[i+1]))
      self.layers.append(nn.ReLU())
      if dropout > 0:
        self.layers.append(nn.Dropout(dropout))
    self.layers.append(nn.Linear(dims[l-2], dims[l-1]))

  def forward(self, x):
    for layer in self.layers:
      x = layer(x)
    return x

In [None]:
def get_model(config):
  return BaseNN(config['hidden_dims'], config['normalization'], config['dropout']).to(device)

In [None]:
config = {
    'learning_rate': 1e-3,
    'epochs': 10,
    'batch_size': 16,
    'hidden_dims': [1024],
    'normalization': 'none',
    'dropout': 0.0,
    'architecture': 'Simple NN'
}

In [None]:
model = get_model(config)

In [None]:
from torchsummary import summary
summary(model, input_size = (48*48, ), device=device)

# Training Definitions

In [None]:
import torch
import time

def train_model(model, data, criterion, optimizer, num_epochs=10, device='cuda', print_report=True, log_wandb=False):
  train_loader = data['train']
  val_loader = data['val']

  history = {
      'train_loss': [],
      'val_loss': [],
      'train_acc': [],
      'val_acc': [],
      'grad_to_weight_ratio': [],
      'weight_norm': []
  }

  model.to(device)

  for epoch in range(num_epochs):
    start_time = time.time()

    # ---- TRAINING ----
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    total_grad = 0.0
    total_weight = 0.0
    current_grad_ratio = 0.0
    total_grad_ratio = []
    avg_grad_ratio = 0.0

    for batch_idx, (images, labels) in enumerate(train_loader):
      # Forward/Backward
      images, labels = images.to(device), labels.to(device)

      optimizer.zero_grad()
      outputs = model(images)
      loss = criterion(outputs, labels)
      loss.backward()

      # Grad ration calculation
      for param in model.parameters():
        if param.grad is not None:
          total_grad += param.grad.norm().item()
          total_weight += param.data.norm().item()

      if total_weight > 0:
        current_grad_ratio = (total_grad / total_weight) * 100
      else:
        current_grad_ratio = 0.0

      total_grad_ratio.append(current_grad_ratio)

      # Update
      optimizer.step()

      running_loss += loss.item() * images.size(0)
      _, predicted = outputs.max(1)
      total += labels.size(0)
      correct += predicted.eq(labels).sum().item()

      if print_report and batch_idx % 200 == 0:
          print(f"[Epoch {epoch+1}/{num_epochs}] "
                f"Batch {batch_idx}/{len(train_loader)} - "
                f"Loss: {loss.item():.4f} - "
                f"Processed: {total} images - "
                f"Grad_to_weight_ratio: {current_grad_ratio:.2%}")
      if print_report and batch_idx % 400 == 0:
        # Print what is happening to grads
        for i, layer in enumerate(model.layers):
          if hasattr(layer, 'weight'):
            grad = layer.weight.grad
            if grad is not None:
              print(f"Layer {i} ({layer}): grad mean = {grad.mean().item():.6f}")
            else:
              print(f"Layer {i} ({layer}): grad = None")


    train_loss = running_loss / total
    train_acc = correct / total
    avg_grad_ratio = sum(total_grad_ratio) / len(total_grad_ratio)

    # ---- VALIDATION ----
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
      for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)

        val_loss += loss.item() * images.size(0)
        _, predicted = outputs.max(1)
        val_total += labels.size(0)
        val_correct += predicted.eq(labels).sum().item()

    epoch_time = time.time() - start_time

    val_loss /= val_total
    val_acc = val_correct / val_total

    weight_norm = sum(p.data.norm().item() for p in model.parameters())

    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['train_acc'].append(train_acc)
    history['val_acc'].append(val_acc)
    history['grad_to_weight_ratio'].append(avg_grad_ratio)
    history['weight_norm'].append(weight_norm)

    if print_report:
      print(f"\nEpoch [{epoch+1}/{num_epochs}] completed in {epoch_time:.1f}s")
      print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2%}")
      print(f"Val   Loss: {val_loss:.4f} | Val   Acc: {val_acc:.2%}")
      print(f"Grad to Weight Ratio: {avg_grad_ratio:.2%}")
      print(f"Weight Norm: {weight_norm:.2f}\n")
    if log_wandb:
      metrics = {
          f"epoch": epoch + 1,
          f"train_loss": train_loss,
          f"val_loss": val_loss,
          f"train_acc": train_acc,
          f"val_acc": val_acc,
          f"grad_ratio": avg_grad_ratio,
          f"weight_norm": weight_norm,
          f"epoch_time": epoch_time
      }

      if epoch > 0:
          loss_data = [[x, history['train_loss'][x], history['val_loss'][x]]
                     for x in range(epoch + 1)]
          acc_data = [[x, history['train_acc'][x], history['val_acc'][x]]
                     for x in range(epoch + 1)]

          metrics.update({
              f"loss_plot": wandb.plot.line_series(
                  xs=range(epoch + 1),
                  ys=[history['train_loss'], history['val_loss']],
                  keys=["Train", "Val"],
                  title="Loss Progress",
                  xname="Epoch"
              ),
              f"acc_plot": wandb.plot.line_series(
                  xs=range(epoch + 1),
                  ys=[history['train_acc'], history['val_acc']],
                  keys=["Train", "Val"],
                  title="Accuracy Progress",
                  xname="Epoch"
              ),
              "grad_plot": wandb.plot.line(
                  wandb.Table(
                      data=[[x, y] for x, y in zip(range(epoch + 1), history['grad_to_weight_ratio'])],
                      columns=["Epoch", "Grad/Weight Ratio"]
                  ),
                  "Epoch",  # x-axis
                  "Grad/Weight Ratio",  # y-axis
                  title="Gradient-to-Weight Ratio Over Time"
              )
          })

      wandb.log(metrics)


  return history


# Training

In [None]:
def make(config):
  model = get_model(config)
  optimizer = Adam(model.parameters(), lr=config['learning_rate'], weight_decay=config['weight_decay'])
  criterion = nn.CrossEntropyLoss()

  training_data = {
      'train': to_loader(train, transform=transform, batch_size=config['batch_size']),
      'val': to_loader(val, transform=transform, batch_size=config['batch_size']),
      'test': to_loader(test, transform=transform, batch_size=config['batch_size'])
  }

  return model, training_data, criterion, optimizer


In [None]:
def overfit(config):
  model, data, criterion, optimizer = make(config)

  overfit_data = {
      'train': to_loader(train[:16], transform=transform, batch_size=config['batch_size']),
      'val': to_loader(val[:16], transform=transform, batch_size=config['batch_size'])
  }

  return train_model(model, overfit_data, criterion, optimizer, 100, device, False, False)

In [None]:
overfit_history = overfit(config)

In [None]:
model, training_data, criterion, optimizer = make(config)

history = train_model(model, training_data, criterion, optimizer, 3, device, True)

# Testing Definitions

In [None]:
def test_model(model, test_loader, criterion=None, device='cpu', print_report=True, log_wandb=False):
    """
    Improved version with zero_division handling and additional safeguards.
    """
    model.eval()
    total_correct = 0
    total_samples = 0
    total_loss = 0.0
    true_labels = []
    pred_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)

            if criterion is not None:
                loss = criterion(outputs, labels)
                total_loss += loss.item() * inputs.size(0)

            _, predictions = torch.max(outputs, 1)
            total_correct += (predictions == labels).sum().item()
            total_samples += labels.size(0)

            true_labels.extend(labels.cpu().numpy())
            pred_labels.extend(predictions.cpu().numpy())

    accuracy = total_correct / total_samples
    results = {
        'accuracy': accuracy,
        'true_labels': np.array(true_labels),
        'pred_labels': np.array(pred_labels)
    }

    if criterion is not None:
        results['loss'] = total_loss / total_samples

    present_labels = np.unique(true_labels)
    clf_report = classification_report(
        true_labels, pred_labels,
        labels=present_labels,
        zero_division=0,
        output_dict=True
    )

    if log_wandb:
        wandb.log(results)

        for label in present_labels:
            label = str(label)
            wandb.log({
                f'test/precision_{label}': clf_report[label]['precision'],
                f'test/recall_{label}': clf_report[label]['recall'],
                f'test/f1_{label}': clf_report[label]['f1-score'],
            })

        wandb.log({
            "confusion_matrix": wandb.plot.confusion_matrix(
                y_true=true_labels,
                preds=pred_labels,
                class_names=[str(x) for x in present_labels]
            )
        })

    if print_report:
        print(f"Test Accuracy: {accuracy:.4f}")
        if criterion is not None:
            print(f"Test Loss: {results['loss']:.4f}")

        # Handle cases where some classes are missing in predictions
        present_labels = np.unique(true_labels)
        print("\nClassification Report (subset of classes present in test set):")
        print(classification_report(
            true_labels,
            pred_labels,
            labels=present_labels,
            zero_division=0  # Silences the warning by defining 0/0 = 0
        ))

        print("\nConfusion Matrix (subset):")
        print(confusion_matrix(true_labels, pred_labels, labels=present_labels))

    return results

In [None]:
test_model(model, training_data['test'], criterion, device)

# Plot

In [None]:
import matplotlib.pyplot as plt

def plot_training_history(history, is_overfit=False):
    """
    Plots training and validation loss, accuracy, gradient-to-weight ratio, and weight norm.
    """
    epochs = range(1, len(history['train_loss']) + 1)

    plt.figure(figsize=(24, 5))

    # ---- LOSS ----
    plt.subplot(1, 4, 1)
    plt.plot(epochs, history['train_loss'], label='Train Loss', marker='o')
    plt.plot(epochs, history['val_loss'], label='Val Loss', marker='o')
    plt.title('Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    # ---- ACCURACY ----
    plt.subplot(1, 4, 2)
    plt.plot(epochs, history['train_acc'], label='Train Acc', marker='o')
    plt.plot(epochs, history['val_acc'], label='Val Acc', marker='o')
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    # ---- GRADIENT-TO-WEIGHT RATIO ----
    plt.subplot(1, 4, 3)
    plt.plot(epochs, history['grad_to_weight_ratio'], label='Grad/Weight Ratio', marker='o', color='purple')
    plt.title('Grad-to-Weight Ratio')
    plt.xlabel('Epoch')
    plt.ylabel('Ratio (%)')
    plt.legend()
    plt.grid(True)

    # ---- WEIGHT NORM ----
    plt.subplot(1, 4, 4)
    plt.plot(epochs, history['weight_norm'], label='Weight Norm', marker='o', color='darkgreen')
    plt.title('Weight Norm over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Norm (L2)')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.savefig(f"training_plot{'_overfit' if is_overfit else ''}.png")
    plt.show()


In [None]:
plot_training_history(overfit_history)

# Model Pipeline

In [None]:
wandb.login()

In [None]:
config = {
    'learning_rate': 1e-3,
    'weight_decay': 0,
    'epochs': 10,
    'batch_size': 256,
    'hidden_dims': [1024],
    'normalization': 'none',
    'dropout': 0.0,
    'architecture': '1-1024-Layer-NN'
}

In [None]:
with wandb.init(project='emotion-recognition', config=config, name=config['architecture']+''):
  overfit_history = overfit(config)
  plot_training_history(overfit_history, True)

  wandb.log({"Overfit Training Curves": wandb.Image("training_plot_overfit.png")})

  print('\ntraining\n')

  model, training_data, criterion, optimizer = make(config)
  summary(model, input_size = (48, 48, ), device=device)

  wandb.watch(model, criterion, log='all', log_freq=10)
  history = train_model(model, training_data, criterion, optimizer, config['epochs'], device, True, True)
  plot_training_history(history)

  test_model(model, training_data['test'], criterion, device, True, True)

  wandb.log({"Training Curves": wandb.Image("training_plot.png")})

  torch.save(model.state_dict(), config['architecture'] + '.pt')
  wandb.save(config['architecture'] + '.pt')  # upload to W&B