<a href="https://colab.research.google.com/github/GiX7000/action-recognition-with-CNNs/blob/main/create_src_dir.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Create the parent src directory.

In [1]:
import os
os.makedirs("src", exist_ok=True)

### config.py: defines some essential global variables.

In [2]:
%%writefile src/config.py
""" Contains all global variables. """
import torch.nn as nn

batch_size = 32
num_channels = 3 # RGB
num_classes = 11 # len(label_dict): number of classes for big datasets
num_classes_small = 15 # len(small_label_list): number of classes for the small dataset
init_hidden_units = [16, 32, 64] # [block1, block2, block3]
init_dropout_probs = [0, 0, 0, 0] # [block1, block2, block3, classifier]
init_activation_functions = [nn.ReLU(), nn.SELU()] # [feature extractor, classifier]

Writing src/config.py


### data_utils.py: contains functions for data management and processing.

In [3]:
%%writefile src/data_utils.py
""" Contains functions for data management. """

from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import shutil
import os


def select_images_by_labels(image_list, label_dict):
  """ Finds and returns the images which belong at the classes of label_dict for the three big datasets. """
  selected_images = []
  for image in image_list:
    id = image.split('/')[-1].split('.')[-3][-2:] # extract label from filename
    if id.isdigit() and int(id) in label_dict: # this if-statement very important: we retrieve 'str' and want to compare with 'int'
      selected_images.append(image)
  return selected_images


def check_image_type(image_list, type):
  """ Checks if all images in the list are of the given type. """
  if all(image.endswith(type) for image in image_list):
    print(f"All images in the {image_list} are of type {type}.")
  else:
    print(f"Not all images in the {image_list} are of type {type}!")


def extract_labels(image_list):
  """ Extracts the labels from a list of image paths. """
  labels_list = []
  for image in image_list:
    labels = image.split('/')[-1].split('.')[-3][-2:]
    labels_list.append(labels)
  return labels_list


def count_class_occurrences(label_list):
  """ Counts the occurrences of each label in a list. """
  label_counts = {}
  for label in label_list:
    if label in label_counts:
      label_counts[label] += 1
    else:
      label_counts[label] = 1
  return label_counts


def dataset_splits(data_source, split_ratios, is_combined=False):
  """ Splits the data into train, validation, and test sets. """

  # define the sizes
  train_size = int(split_ratios[0] * len(data_source))
  val_size = max(2, int(split_ratios[1] * len(data_source)))
  #test_size = len(data_source) - train_size - val_size

  # shuffle the dataset to ensure the data is randomly distributed
  random.shuffle(data_source)

  # split the source dataset
  train_data = data_source[:train_size]
  val_data = data_source[train_size:train_size + val_size]

  if is_combined:
    return train_data, val_data # no test split if combined datasets
  else:
    test_data = data_source[train_size + val_size:]
    return train_data, val_data, test_data


def dataset_splits_small(data_source, label_list):
  """ Splits the small dataset into train, validation, and test sets. This is done to ensures each class has at least 2 images in each split, if possible. """

  # group images by class in a dictionary
  class_images = {}
  for image in data_source:
    label = os.path.basename(image).split('.')[0][:-3] # retrieve the label
    if label in class_images: # add the image in label key
      class_images[label].append(image)
    else:
      class_images[label] = [image]

  # initialize empty lists for train, val and test sets
  train_data, val_data, test_data = [], [], []

  for label, images in class_images.items():
    # shuffle the dataset
    random.shuffle(images)

    # each class contains 20 shuffled images
    train_data.extend(images[:16]) # first 16 for train
    val_data.extend(images[16:18]) # second 2 for validation
    test_data.extend(images[18:]) # last 2 for test

  return train_data, val_data, test_data


def copy_images(image_paths, split, is_small_dataset, is_combined_dataset, is_test_set, label_dict, small_label_list, datasets_paths):
  """ Copies any dataset's images to the destination directory. """
  # datasets_paths = [M_dataset_path, L_dataset_path, R_dataset_path, small_dataset_path,
  #                   MR_dataset_path, ML_dataset_path, RL_dataset_path,
  #                   test_M_dataset_path, test_L_dataset_path, test_R_dataset_path]

  # loop over all image paths
  for image_path in image_paths:

    # if small_dataset
    if is_small_dataset:

      # retrieve the label
      label = os.path.basename(image_path).split('.')[0][:-3] # to correct

      # ensure the label is on the label_dictionary
      if not label in small_label_list:
        raise ValueError(f"Invalid label: {label}")

      # set the destination dataset path for the small dataset
      dataset_path = datasets_paths[3]

      # set the destination directory
      dest_dir = os.path.join(dataset_path, split, label)

    else: # if individual or combined datasets

      # get images's id and class name
      img_id = int(image_path.split('/')[-1].split('.')[-3][-2:])
      img_name = label_dict[img_id]


      # set the destination dataset path for each case

      # for MR, ML, RL
      if is_combined_dataset:
        if 'ResActionsImagesM' in image_path:
          # M images to both MR and ML datasets
          dest_dir_1 = os.path.join(datasets_paths[4], split, img_name)
          os.makedirs(dest_dir_1, exist_ok=True)
          shutil.copy(image_path, dest_dir_1) # MR dataset
          dest_dir_2 = os.path.join(datasets_paths[5], split, img_name)
          os.makedirs(dest_dir_2, exist_ok=True)
          shutil.copy(image_path, dest_dir_2) # ML dataset
          continue # skip to next image

        if 'ResActionsImagesR' in image_path:
          # R images to both MR and RL datasets
          dest_dir_1 = os.path.join(datasets_paths[4], split, img_name)
          os.makedirs(dest_dir_1, exist_ok=True)
          shutil.copy(image_path, dest_dir_1) # MR dataset
          dest_dir_2 = os.path.join(datasets_paths[6], split, img_name)
          os.makedirs(dest_dir_2, exist_ok=True)
          shutil.copy(image_path, dest_dir_2) # RL dataset
          continue # skip to next image

        if 'ResActionsImagesL' in image_path:
          # L images to both ML and RL datasets
          dest_dir_1 = os.path.join(datasets_paths[5], split, img_name)
          os.makedirs(dest_dir_1, exist_ok=True)
          shutil.copy(image_path, dest_dir_1) # ML dataset
          dest_dir_2 = os.path.join(datasets_paths[6], split, img_name)
          os.makedirs(dest_dir_2, exist_ok=True)
          shutil.copy(image_path, dest_dir_2) # RL dataset
          continue # skip to next image

      # M, R, L as test sets
      elif is_test_set:
        if 'ResActionsImagesM' in image_path:
          dataset_path = datasets_paths[7] # M alone as a test set
        elif 'ResActionsImagesR' in image_path:
          dataset_path = datasets_paths[9] # R alone as a test set
        elif 'ResActionsImagesL' in image_path:
          dataset_path = datasets_paths[8] # L alone as a test set
        else:
          raise ValueError(f"Invalid dataset path: {image_path}.")

      # for M, R, L
      else:
        if 'ResActionsImagesM' in image_path:
          dataset_path = datasets_paths[0] # M dataset
        elif 'ResActionsImagesR' in image_path:
          dataset_path = datasets_paths[2] # R dataset
        elif 'ResActionsImagesL' in image_path:
          dataset_path = datasets_paths[1] # L dataset
        else:
          raise ValueError(f"Invalid dataset path: {image_path}.")

      # set the destination directory
      dest_dir = os.path.join(dataset_path, split, img_name)

    # create the destination directory if it doesn't exist for both cases
    os.makedirs(dest_dir, exist_ok=True)

    # copy the image to the destination directory
    shutil.copy(image_path, dest_dir)


def image_count(dir):
  """ Counts all image files in the given directory, including subdirectories. """

  total_count = 0
  for root, _, files in os.walk(dir):
    count = 0
    for file in files:
      if file.endswith('.png'):
        count += 1
    print(f"For the directory '{root}', counted {count} image(s).")
    total_count += count

  print(f"Total images in '{dir}': {total_count}")
  return total_count


def mean_std_calculation(image_paths):
  """ Calculates the mean and standard deviation of a dataset. """

  # initialize variables
  total_mean = np.array([0.0, 0.0, 0.0]) # 3-d for 3 color chhannels
  total_std = np.array([0.0, 0.0, 0.0])
  n_pixels = 0  # total number of pixels across all images

  for image_path in image_paths:
    # read the image using plt.imread
    image = plt.imread(image_path)  # shape of (H, W, C)

    # convert the image to float if needed
    #if image.dtype != np.float32:
    #    image = image.astype(np.float32) / 255.0  # scale to [0, 1] if raw pixel values

    # accumulate mean and std for each channel for the entire dataset
    n_pixels += image.shape[0] * image.shape[1]
    total_mean += image.mean(axis=(0, 1)) * image.shape[0] * image.shape[1] # weighted contribution of each image to the dataset on the fly!
    total_std += ((image - image.mean(axis=(0, 1)))**2).sum(axis=(0, 1))

  # mean and std calculations
  total_mean /= n_pixels
  total_std = np.sqrt(total_std / n_pixels)

  print(f"Dataset Mean (per channel): {total_mean}")
  print(f"Dataset Std (per channel): {total_std}")


def create_dataloaders(train_dir, val_dir, test_dir, transform, batch_size, num_workers):
  """ Creates training, validation, and test DataLoaders. """

  # create datasets using ImageFolder
  train_data = datasets.ImageFolder(train_dir, transform=transform)
  val_data = datasets.ImageFolder(val_dir, transform=transform)
  test_data = datasets.ImageFolder(test_dir, transform=transform)

  # get class names
  class_names = train_data.classes

  # check if any classes are empty (came after error!)
  if len(train_data.classes) == 0:
    print("No classes found in the training dataset!")
  if len(val_data.classes) == 0:
    print("No classes found in the validation dataset!")
  if len(test_data.classes) == 0:
    print("No classes found in the test dataset!")

  # turn images into dataloaders
  train_dataloader = DataLoader(
      train_data,
      batch_size=batch_size,
      shuffle=True,
      num_workers=num_workers,
      pin_memory=True # speed up a little
  )
  val_dataloader = DataLoader(
      val_data,
      batch_size=batch_size,
      shuffle=False,
      num_workers=num_workers,
      pin_memory=True
  )
  test_dataloader = DataLoader(
      test_data,
      batch_size=batch_size,
      shuffle=False,
      num_workers=num_workers,
      pin_memory=True
  )

  return train_dataloader, val_dataloader, test_dataloader, class_names


def create_combined_dataloaders(train_dir, val_dir, test_dir, train_val_transform, test_transform, batch_size, num_workers):
  """ Creates train/val DataLoaders for combined datasets and test DataLoader for the respective full dataset. """

  train_data = datasets.ImageFolder(train_dir, transform=train_val_transform)
  val_data = datasets.ImageFolder(val_dir, transform=train_val_transform)
  test_data = datasets.ImageFolder(test_dir, transform=test_transform)

  # Turn datasets into DataLoaders
  train_dataloader = DataLoader( train_data, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True )
  val_dataloader = DataLoader( val_data, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True )
  test_dataloader = DataLoader( test_data, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True )

  return train_dataloader, val_dataloader, test_dataloader

print("Module data_utils imported successfully!")

Writing src/data_utils.py


### models.py: contains class implementations for various CNN architectures.

In [4]:
%%writefile src/models.py
""" Contains all Pytorch CNN versions. """

import torch
import torch.nn as nn
from project.src.config import batch_size, num_channels, num_classes, num_classes_small, init_hidden_units, init_dropout_probs, init_activation_functions


# baseline CNN model
class CNN(nn.Module):
  """ CNN model architecture based on the structure proposed in the given notebook. """
  def __init__(self, in_features: int, out_features: int, hidden_units: list):
    super().__init__() # initialize the initializer!

    # A. define the components of nn

    # 1. feature extractor blocks
    self.block1 = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

    self.block2 = nn.Sequential(
        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        nn.ReLU(),
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

    self.block3 = nn.Sequential(
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

    # 2. classifier
    self.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=1024, out_features=128),  # we set 1024 as in_features to ensure compatibility
        nn.SELU(),
        nn.Linear(in_features=128, out_features=out_features)
    )

  # B. define how the above components are combined into a CNN (feature extractor + classifier)
  def forward(self, x):
    x = self.block3(self.block2(self.block1(x)))
    x = self.classifier(x)
    return x

# CNN model version 2
class CNN_v2(nn.Module):
  """ CNN baseline model architecture with dropout layers. """
  def __init__(self, out_features, in_features=num_channels, hidden_units=init_hidden_units, dropout_probs=init_dropout_probs, activation_functions=init_activation_functions):
    super().__init__() # initialize the initializer!

    # A. define the components of nn

    # 1. feature extractor blocks
    self.block1 = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]) # spatial dropout for conv2d
    )

    self.block2 = nn.Sequential(
        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]) # spatial dropout for conv2d
    )

    self.block3 = nn.Sequential(
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2]) # spatial dropout for conv2d
    )

    # dynamically calculate the output of the feaature extractor/classifier's input by passing a dummy input
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = self.block3(self.block2(self.block1(dummy_input))) # shape of (batch_size, channels, height, width)
      #print(output.shape)
      classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # 2. classifier
    self.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128),  # we set in_features dynamically through a dummy pass to ensure compatibility
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]), # standarrd dropout for fully connected layers
        nn.Linear(in_features=128, out_features=out_features) # dynamically setting the number of classes
    )

  # B. define how the above components are combined into a CNN (feature extractor + classifier)
  def forward(self, x):
    x = self.block3(self.block2(self.block1(x)))
    x = self.classifier(x)
    return x

print("Module models imported successfully!")

Writing src/models.py


### engine.py: contains functions for training and testing models.









In [5]:
%%writefile src/engine.py
""" Contains functions for training and testing a Pytorch model. """

import torch
from tqdm.auto import tqdm

# train step function
def train_step(model, dataloader, loss_fn, optimizer, device, regularization=None, reg_lambda=0.0, max_norm=2.0):
  """ Trains a PyTorch model for a single epoch. """

  # put the model in a train mode
  model.train()

  # setup train loss and accuracy values
  train_loss, train_acc = 0, 0

  # loop through dataloader and data batches
  for batch, (X, y) in enumerate(dataloader):
    # ensure that data is on target device
    X, y = X.to(device), y.to(device)

    # 1. forward pass
    y_pred = model(X)

    # 2. calculate loss
    loss = loss_fn(y_pred, y)

    # add L1 or L2 regularization, if specified
    if regularization == 'L1':
      l1_reg = 0
      for param in model.parameters():
        if param.requires_grad: # ensure to apply L2 on trainable params
          l1_reg += torch.norm(param, p=1)
      loss += reg_lambda * l1_reg
    elif regularization == 'L2':
      l2_reg = 0
      for param in model.parameters():
        if param.requires_grad: # ensure to apply L2 on trainable params
          l2_reg += torch.norm(param, p=2)
      loss += reg_lambda * l2_reg

    train_loss += loss.item()

    # 3. optimizer zero grad
    optimizer.zero_grad()

    # 4. loss backward
    loss.backward()

    # 5. optimizer step
    optimizer.step()

    # apply weight constraints after each optimizer step
    for name, param in model.named_parameters():
      if 'weight' in name: # apply constraints to only weights
        param.data.clamp_(-max_norm, max_norm) # in-place operation

    # 6. calculate and accumulate accuracy
    #y_pred_probs = torch.softmax(y_pred, dim=1) # apply softmax to get probabilities
    y_pred_class = torch.argmax(y_pred, dim=1) # get the max of raw logits
    train_acc += ((y_pred_class == y).sum().item()/len(y_pred_class))

  # adjust metrics to get average loss and accuracy per batch
  train_loss = train_loss / len(dataloader)
  train_acc = train_acc / len(dataloader)
  return train_loss, train_acc

# test set function
def test_step(model, dataloader, loss_fn, device, regularization=None, reg_lambda=0.0):
  """ Tests a PyTorch model for a single epoch. """

  # put the model in eval mode
  model.eval()

  # setup test loss and accuracy values
  test_loss, test_acc = 0, 0

  # storage lists for actual and predicted labels
  all_y_true = []
  all_y_preds = []

  # turn on inference context manager
  with torch.inference_mode():
    # loop through dataloader and data batches
    for batch, (X, y) in enumerate(dataloader):
      # ensure that the data is on the target device
      X, y = X.to(device), y.to(device)

      # 1. forward pass
      test_pred_logits = model(X)

      # 2. calculate loss
      loss = loss_fn(test_pred_logits, y)

      # add L1 or L2 regularization, if specified
      if regularization == 'L1':
        l1_reg = 0
        for param in model.parameters():
          if param.requires_grad: # ensure to apply L2 on trainable params
            l1_reg += torch.norm(param, p=1)
        loss += reg_lambda * l1_reg
      elif regularization == 'L2':
        l2_reg = 0
        for param in model.parameters():
          if param.requires_grad: # ensure to apply L2 on trainable params
            l2_reg += torch.norm(param, p=2)
        loss += reg_lambda * l2_reg

      test_loss += loss.item()

      # 3. calculate and accumulate accuracy across all batches
      #test_pred_probs = torch.softmax(test_pred_logits, dim=1)
      test_pred_labels = torch.argmax(test_pred_logits, dim=1) # index of best prediction
      test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

      # 4. store actual and predicted labels to lists
      all_y_true.extend(y.cpu().numpy())
      all_y_preds.extend(test_pred_labels.cpu().numpy())

  # adjust metrics to get average loss and accuracy per batch
  test_loss = test_loss / len(dataloader)
  test_acc = test_acc / len(dataloader)
  return test_loss, test_acc, all_y_true, all_y_preds

# train function: train step for epochs
def train(model, train_dataloader, test_dataloader, optimizer, loss_fn, epochs, device, regularization=None, reg_lambda=0.0, early_stopping=False, patience=5, delta=1e-5):
  """ Trains and tests a PyTorch model with optional regulariation and early stopping. """

  # create a results dictionary
  results = {"train_loss": [],
             "train_acc": [],
             "val_loss": [],
             "val_acc": [],
             "best_epoch": 0,
             "stopped_epoch":-1}

  # early stopping variables
  best_acc = 0
  epochs_without_improvement = 0
  best_model_weights = None

  # loop through training and test steps for a number of epochs
  for epoch in tqdm(range(epochs)):
    train_loss, train_acc = train_step(model=model,
                                       dataloader=train_dataloader,
                                       loss_fn=loss_fn,
                                       optimizer=optimizer,
                                       device=device,
                                       regularization=regularization,
                                       reg_lambda=reg_lambda)
    test_loss, test_acc, y_true, y_preds = test_step(model=model,
                                                     dataloader=test_dataloader,
                                                     loss_fn=loss_fn,
                                                     device=device,
                                                     regularization=regularization,
                                                     reg_lambda=reg_lambda)

    # print what's happening
    print(
      f"Epoch: {epoch+1} | "
      f"train_loss: {train_loss:.4f} | "
      f"train_acc: {train_acc:.4f} | "
      f"val_loss: {test_loss:.4f} | "
      f"val_acc: {test_acc:.4f}"
    )

    # update results dictionary
    results["train_loss"].append(train_loss)
    results["train_acc"].append(train_acc)
    results["val_loss"].append(test_loss)
    results["val_acc"].append(test_acc)

    # also save actual and predicted labels
    results["y_true"] = y_true
    results["y_preds"] = y_preds

    # early stopping process
    if early_stopping:
      if test_acc > best_acc + delta: # or it could be loss: (test_loss < best_loss - delta)
        best_acc = test_acc
        best_model_weights = model.state_dict() # if no improvement this variable remains equal to None!
        epochs_without_improvement = 0
        results["best_epoch"] = epoch + 1 # (epoch is zero based)
      else:
        epochs_without_improvement += 1
        if epochs_without_improvement >= patience:
          print(f"Early stopping triggered at epoch {epoch+1}.")
          results["stopped_epoch"] = epoch + 1
          break

  # restore and save best weights, if early stopping is requested
  if early_stopping:
    if best_model_weights is not None:
      model.load_state_dict(best_model_weights)
      print(f"Model restored from epoch {results['best_epoch']}.")
    else:
      print("No improvement during training. Use of final model's weights.")

  # return the filled results at the end of the epochs
  return results


def eval_model(model, dataloader, loss_fn, device):
  """ Makes predictions o the test set and returns a dictionary with the results. """

  # initialize test loss and accuracy
  loss, acc = 0, 0

  # storage lists for actual and predicted labels
  actual_labels = []
  predicted_labels = []

  # set the evaluation mode
  model.eval()
  with torch.inference_mode():
    for X, y in tqdm(dataloader):

      # ensure that they are on device
      X, y = X.to(device), y.to(device)

      # make prediction with the model
      y_pred = model(X)

      # calculate loss and accuracy
      loss += loss_fn(y_pred, y)
      y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1) # to predict: logits->probs->labels
      acc += (y_pred_class == y).sum().item()/len(y_pred_class)

      # store predictions and actual labels
      actual_labels.extend(y.cpu().numpy())
      predicted_labels.extend(y_pred_class.cpu().numpy())

    # adjust metrics to get average loss and accuracy per batch
    loss = loss / len(dataloader)
    acc = acc / len(dataloader)

  return {"model_name": model.__class__.__name__, # only works when model was created as a class
          "test_loss": loss.item(),
          "test_acc": acc,
          "y_true": actual_labels,
          "y_preds": predicted_labels}

print("Module engine imported successfully!")

Writing src/engine.py


### experiments.py: includes all functions for conducting experiments.

In [6]:
%%writefile src/experiments.py
""" Contains functions for running experiments. """

import torch
import torch.nn as nn
import torchvision.transforms as transforms
from project.src.engine import train
from project.src.models import CNN, CNN_v2
from project.src.data_utils import create_dataloaders, create_combined_dataloaders
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Subset
import itertools
import random
import numpy as np
import time

# experiment 1
def experiment_l1_regularization(train_dataloader, val_dataloader, num_epochs, device, lambda_values_list, num_channels, num_classes, init_hidden_units):
  """ Experiment with L1 Regularization with different lambda regularization values. """
  print("Experimenting with L1 Regularization")

  # lambda regularization values to experiment with
  lambda_values = lambda_values_list # [1e-2, 1e-3, 1e-4, 1e-5, 1e-6]  # np.logspace(-2, -6, num=5)
  print("Lambda regularization values to try:", lambda_values)

  # storage dictionary for all results
  model_all_results = {}

  # loop over all lambda_values
  start_time = time.time()
  for decay in lambda_values:
    print(f"\nTesting with lambda: {decay}")

    # create a new model's instance for the current lambda
    model = CNN(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units).to(device)

    # setup loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001)

    # start training
    model_results = train(model=model,
                          train_dataloader=train_dataloader,
                          test_dataloader=val_dataloader,
                          optimizer=optimizer,
                          loss_fn=loss_fn,
                          epochs=num_epochs,
                          device=device,
                          regularization='L1',
                          reg_lambda=decay)

    # save the results for the current weight decay
    model_all_results[decay] = model_results

  # calculate and print the total time for this experiment
  end_time = time.time()
  print(f"\nTotal experiment's time for the model with L1 regularization: {end_time - start_time:.3f} seconds")

  # calculate the total trainable parameters
  model_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable parameters: {model_total_params}")

  return model_all_results


# experiment 2
def experiment_l2_regularization(train_dataloader, val_dataloader, num_epochs, device, weight_decay_values_list, num_channels, num_classes, init_hidden_units):
  """ Experiment with L2 Regularization with different weight decay values. """
  print("Experimenting with L2 Regularization")

  # weight decay values to experiment with
  weight_decay_values = weight_decay_values_list # [1e-2, 1e-3, 1e-4, 1e-5, 1e-6]
  print("Weight decay values to try:", weight_decay_values)

  # storage dictionary for all results
  model_all_results = {}

  # loop over all weight decay values
  start_time = time.time()
  for decay in weight_decay_values:
    print(f"\nTesting with weight decay: {decay}")

    # create model instance
    model = CNN(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units).to(device)

    # Setup loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=decay)

    # start training
    model_results = train(model=model,
                             train_dataloader=train_dataloader,
                             test_dataloader=val_dataloader,
                             optimizer=optimizer,
                             loss_fn=loss_fn,
                             epochs=num_epochs,
                             device=device)

    # save the results for the current weight decay
    model_all_results[decay] = model_results

  # calculate and print the total time for this experiment
  end_time = time.time()
  print(f"\nTotal experiment's time for the model_1 with L2 regularization: {end_time - start_time:.3f} seconds")

  # calculate the total trainable parameters
  model_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable parameters: {model_total_params}")

  return model_all_results


# experiment 3
def experiment_dropout_regularization(train_dataloader, val_dataloader, num_epochs, device, num_classes, best_decay_value, dropout_probs_combos_list):
  """ Experiment with Dropout Regularization Layers. """
  print("Experimenting with Dropout Regularization Layers")

  # set dropout probabilities
  dropout_probs = [0.1, 0.3, 0.5, 0.6, 0.7, 0.9]
  print("Dropout probabilities to try:", dropout_probs)

  # all possible combinations
  all_dropout_probs_combinations = list(itertools.product(dropout_probs, repeat=4))
  print("Total number of combinations to ideally try:", len(all_dropout_probs_combinations))

  # set predefined good combinations
  all_good_dropout_probs_combinations = dropout_probs_combos_list
  #  [  [0.5, 0.5, 0.5, 0.5], [0.5, 0.5, 0.4, 0.5], [0.5, 0.4, 0.5, 0.5],
  #     [0.5, 0.4, 0.4, 0.5], [0.5, 0.5, 0.5, 0.4], [0.4, 0.5, 0.5, 0.5],
  #     [0.4, 0.5, 0.5, 0.4], [0.5, 0.5, 0.5, 0.7], [0.8, 0.5, 0.5, 0.5],
  #     [0.8, 0.7, 0.7, 0.6], [0.8, 0.8, 0.7, 0.5], [0.8, 0.5, 0.7, 0.5],
  #     [0.6, 0.5, 0.5, 0.5], [0.5, 0.6, 0.6, 0.5], [0.7, 0.6, 0.5, 0.5] ]
  print("\nSome possible 'good' combinations to try:", all_good_dropout_probs_combinations)

  # storage dictionary for results
  all_results = {}

  # loop over all good dropout combinations
  start_time = time.time()
  for combo in all_good_dropout_probs_combinations:
    conv_dropout_prob = combo[:3]
    classifier_dropout_prob = combo[3]
    print(f"\nTesting with conv_dropout_prob: {conv_dropout_prob} and classifier dropout probability: {classifier_dropout_prob}")

    # create model instance
    model = CNN_v2(out_features=num_classes, dropout_probs=combo).to(device)

    # setup loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

    # start training
    results = train(model=model,
                    train_dataloader=train_dataloader,
                    test_dataloader=val_dataloader,
                    optimizer=optimizer,
                    loss_fn=loss_fn,
                    epochs=num_epochs,
                    device=device)

    # save results
    all_results[tuple(combo)] = results

  # calculate and print total experiment time
  end_time = time.time()
  print(f"\nTotal experiment's time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return all_results


# experiment 4
def experiment_avgpool_instead_of_maxpool(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with applying AvgPool2d instead of MaxPool2d. """
  print("Experimenting with AveragePooling instead of MaxPooling layers")

  # model function with AveragePooling layers
  def cnn_with_avg_pooling(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with AveragePooling instead of MaxPooling layers. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.AvgPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.AvgPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.AvgPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128),
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]),
        nn.Linear(in_features=128, out_features=out_features)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_with_avg_pooling().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 5
def experiment_batch_norm(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with adding BatchNorm layers. """
  print("Experimenting with BatchNorm layers")

  # model function with BatchNorm layers
  def cnn_with_batch_norm(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with BatchNorm layers. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        nn.BatchNorm2d(hidden_units[0]),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        nn.BatchNorm2d(hidden_units[1]),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        nn.BatchNorm2d(hidden_units[1]),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        nn.BatchNorm2d(hidden_units[2]),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128),
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]),
        nn.Linear(in_features=128, out_features=out_features)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_with_batch_norm().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 6
def experiment_extra_block(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, new_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with adding an extra block in feature extractor. """
  print("Experimenting with extra block in feature extractor")

  # model function with an extra block
  def cnn_with_extra_block(in_features=num_channels, out_features=num_classes, hidden_units=new_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with an extra block in feature extractor. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2]),

        # extra block
        nn.Conv2d(in_channels=hidden_units[2], out_channels=hidden_units[3], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=1, stride=1),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128),
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]),
        nn.Linear(in_features=128, out_features=out_features)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_with_extra_block().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 7
def experiment_extra_layers_classifier(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with adding extra layers in classifier. """
  print("Experimenting with extra layers in classifier")

  # model function with extra layers in classifier
  def cnn_with_enhanced_classifier(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with extra layer in classifier. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128),
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]),
        nn.Linear(in_features=128, out_features=out_features),
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]),
        nn.Linear(in_features=out_features, out_features=out_features)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_with_enhanced_classifier().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 8
def experiment_enhanced_blocks(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions,  best_decay_value, best_dropout_probs_combo):
  """ Experiment with enhanced blocks like block 2. """
  print("Experimenting with enhanced blocks like block 2")

  # model function with enhanced blocks
  def cnn_enhanced_blocks(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with all blocks enhanced. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[0], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[2], out_channels=hidden_units[2], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128),
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]),
        nn.Linear(in_features=128, out_features=out_features)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_enhanced_blocks().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 9
def experiment_simplified_block2(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with simplified block2. """
  print("Experimenting with simplified block2")

  # model function with simplified block2
  def cnn_simplified_block2(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with simplified block2. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128),
        activation_functions[1],
        nn.Dropout(p=dropout_probs[3]),
        nn.Linear(in_features=128, out_features=out_features)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_simplified_block2().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 10
def experiment_simplified_block_and_classifier(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with simplified block2 and classifier. """
  print("Experimenting with simplified block2 and classifier")

  # model function with simplified block2 and classifier
  def cnn_simplified_block_and_classifier(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with simplified block2 and classifier. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_simplified_block_and_classifier().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 11
def experiment_simplified_block_and_classifier_enhanced(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with simplified block 2 and classifier plus out_features=256. """
  print("Experimenting with simplified block 2 and classifier plus out_features=256")

  # model function with simplified block 2 and classifier
  def cnn_simplified_block_and_classifier_enhanced(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with simplified block 2 and classifier (out_features=256). """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[1], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[1], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[1]),

        nn.Conv2d(in_channels=hidden_units[1], out_channels=hidden_units[2], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=256)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_simplified_block_and_classifier_enhanced().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 12
def experiment_simplified_block_and_classifier_without_block3(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo):
  """ Experiment with simplified block2 and classifier and dropped block 3. """
  print("Experimenting with simplified block2 and classifier and dropped block 3")

  # Model function with simplified block2 and classifier without block 3
  def cnn_simplified_block_and_classifier_without_block3(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo):
    """ CNN model with simplified block2 and classifier and dropped block 3. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=3, padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[2], kernel_size=3, padding='same'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_simplified_block_and_classifier_without_block3().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 13
def experiment_kernel_sizes(train_dataloader, val_dataloader, num_epochs, device, num_channels, num_classes, init_hidden_units, init_activation_functions, best_decay_value, best_dropout_probs_combo, kernel_sizes_list):
  """ Experiment with modifying kernel sizes. """
  print("Experimenting with modifying kernel sizes")

  # model function with different kernel sizes
  def cnn_kernel_sizes(in_features=num_channels, out_features=num_classes, hidden_units=init_hidden_units, activation_functions=init_activation_functions, dropout_probs=best_dropout_probs_combo, kernel_sizes=kernel_sizes_list):
    """ CNN model with different kernel sizes. """
    feature_extractor = nn.Sequential(
        nn.Conv2d(in_channels=in_features, out_channels=hidden_units[0], kernel_size=kernel_sizes[0], padding='valid'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[0]),

        nn.Conv2d(in_channels=hidden_units[0], out_channels=hidden_units[2], kernel_size=kernel_sizes[1], padding='same'),
        activation_functions[0],
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout2d(p=dropout_probs[2])
        )

    # calculate feature extractor's output shape
    dummy_input = torch.randn(1, in_features, 25, 149)
    with torch.no_grad():
      output = feature_extractor(dummy_input)
    classifier_input_size = output.shape[1] * output.shape[2] * output.shape[3]

    # classifier
    classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=classifier_input_size, out_features=128)
        )

    return nn.Sequential(feature_extractor, classifier)

  # create model instance
  model = cnn_kernel_sizes().to(device)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.RMSprop(params=model.parameters(), lr=0.001, weight_decay=best_decay_value)

  # start training
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device)

  # calculate and print total training time
  end_time = time.time()
  print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return results


# experiment 14
def experiment_hidden_layer_sizes(train_dataloader, val_dataloader, num_epochs, device, num_classes, best_decay_value, best_dropout_probs_combo, hidden_layer_sizes_list):
  """ Experiment with different hidden layer sizes. """
  print("Experimenting with Hidden Layer Sizes")

  # generate some good combinations
  hidden_layer_sizes_combinations = hidden_layer_sizes_list # [(32, 64, 64), (32, 32, 128), (64, 32, 64), (32, 32, 64), (64, 64, 32), (32, 64, 32)]
  print("Hidden layer sizes combinations to try:", hidden_layer_sizes_combinations)

  # storage dictionary for results
  all_results = {}

  # loop over all combinations
  start_time = time.time()
  for combo in hidden_layer_sizes_combinations:
    print(f"\nTesting hidden layer sizes combination: {combo}...")

    # initialize model for the current combination
    model = CNN_v2(out_features=num_classes, hidden_units=combo, dropout_probs=best_dropout_probs_combo).to(device)

    # set loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001, weight_decay=best_decay_value)

    # start training
    start_time_2 = time.time()
    results = train(model=model,
                    train_dataloader=train_dataloader,
                    test_dataloader=val_dataloader,
                    optimizer=optimizer,
                    loss_fn=loss_fn,
                    epochs=num_epochs,
                    device=device)
    end_time_2 = time.time()

    # compute generalization gap
    generalization_gap = results['train_acc'][-1] - results['val_acc'][-1]

    # save the results in the dictionary
    results['generalization_gap'] = generalization_gap
    all_results[combo] = results

    # print best results for the current combination
    print(f"\nHidden layer sizes combination: {combo}")
    print(f"Validation accuracy: {results['val_acc'][-1]:.2f}")
    print(f"Generalization gap: {generalization_gap:.2f}") # Want this negative (not overfitting)
    print(f"Total training time: {end_time_2 - start_time_2:.3f} seconds")

  # calculate and print total experiment time
  end_time = time.time()
  print(f"\nTotal experiment's time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return all_results


# experiment 15
def experiment_optimizers_and_learning_rates(train_dataloader, val_dataloader, num_epochs, device, num_classes, best_hidden_units_combo, best_decay_value, best_dropout_probs_combo, learning_rates_list):
  """ Experiment with optimizers and learning rates. """
  print("Experimenting with Optimizers and Learning Rates")

  # define learning rates to try
  learning_rates = learning_rates_list # [1e-2, 1e-3, 1e-4, 1e-5, 1e-6]
  print("Learning rates to try:", learning_rates)

  # define optimizers to try
  optimizers = {
      'SGD': torch.optim.SGD,
      #'SGD + momentum': lambda params, lr: torch.optim.SGD(params, lr=lr, momentum=0.9, nesterov=True, weight_decay=best_decay_value),
      #'Adagrad': torch.optim.Adagrad,
      'RMSprop': torch.optim.RMSprop,
      'Adam': torch.optim.Adam,
      'AdamW': torch.optim.AdamW,
      'NAdam': torch.optim.NAdam
    }
  print("Optimizers to try:", list(optimizers.keys()))

  # store learning rate for best validation accuracy for each optimizer
  optim_results_list = []

  # loop over optimizers
  start_time = time.time()
  for optimizer_name, optimizer_fn in optimizers.items():
    print(f"\nTesting {optimizer_name} optimizer...")

    # track best learning rate and accuracy for the current optimizer
    best_lr = None
    best_optim_val_accuracy = 0

    # loop over learning rates
    for lr in learning_rates:
      print(f"\nLearning rate: {lr}... \n")

      # initialize model for each optimizer and learning rate
      model = CNN_v2(out_features=num_classes, hidden_units=best_hidden_units_combo, dropout_probs=best_dropout_probs_combo).to(device)

      # set loss function and optimizer
      loss_fn = nn.CrossEntropyLoss()
      if optimizer_name == 'SGD + momentum':
        optimizer = optimizer_fn(model.parameters(), lr=lr)
      else:
        optimizer = optimizer_fn(model.parameters(), lr=lr, weight_decay=best_decay_value)

      # start training
      results = train(model=model,
                      train_dataloader=train_dataloader,
                      test_dataloader=val_dataloader,
                      optimizer=optimizer,
                      loss_fn=loss_fn,
                      epochs=num_epochs,
                      device=device)

      # retrieve validation accuracy for the current combination
      val_accuracy = results['val_acc'][-1]

      # keep track of best validation accuracy
      if val_accuracy > best_optim_val_accuracy:
        best_optim_val_accuracy = val_accuracy
        best_lr = lr

    # save the results in a list
    optim_results_list.append((optimizer_name, best_lr, best_optim_val_accuracy))

    # print best results for the current optimizer
    print(f"\nBest learning rate for {optimizer_name}: {best_lr}")
    print(f"Best validation accuracy for {optimizer_name}: {best_optim_val_accuracy:.2f}%")

  # calculate and print total experiment time
  end_time = time.time()
  print(f"\nTotal experiment's time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return optim_results_list


# experiment 16
def experiment_activation_functions(train_dataloader, val_dataloader, num_epochs, device, num_classes, best_hidden_units_combo, best_decay_value, best_dropout_probs_combo, best_optimizer, best_learning_rate):
  """ Experiment with activation functions. """
  print("Experimenting with Activation Functions")

  # define activation functions to try
  activation_functions = [
      ('ReLU', nn.ReLU()),
      ('LeakyReLU', nn.LeakyReLU(negative_slope=0.01)),
      ('ELU', nn.ELU()),
      ('GELU', nn.GELU()),
      #('PReLU'), nn.PReLU(),
      ('SiLU', nn.SiLU()),
      #('Mish', nn.Mish())
    ]
  print("Activation functions to try:", [name for name, _ in activation_functions])

  # store results
  activation_results_list = []

  # loop over activation functions
  start_time = time.time()
  for activation_name1, activation_fn1 in activation_functions:
    print(f"\nTesting {activation_name1} activation function as activation function in feature extractor...")

    for activation_name2, activation_fn2 in activation_functions:
      print(f"\nTesting {activation_name2} activation function as activation function in classifier...")

      # initialize model
      model = CNN_v2(out_features=num_classes, hidden_units=best_hidden_units_combo, dropout_probs=best_dropout_probs_combo, activation_functions=[activation_fn1, activation_fn2]).to(device)

      # set loss function and optimizer
      loss_fn = nn.CrossEntropyLoss()
      optimizer = best_optimizer(model.parameters(), lr=best_learning_rate, weight_decay=best_decay_value)

      # start training
      results = train(model=model,
                      train_dataloader=train_dataloader,
                      test_dataloader=val_dataloader,
                      optimizer=optimizer,
                      loss_fn=loss_fn,
                      epochs=num_epochs,
                      device=device)

      # retrieve validation accuracy
      val_accuracy = results['val_acc'][-1]

      # save results
      activation_results_list.append((activation_name1, activation_name2, val_accuracy))

      # print best results for the current activation functions combination
      print(f"\nActivation function for feature extractor: {activation_name1}")
      print(f"Activation function for classifier: {activation_name2}")
      print(f"Validation accuracy: {val_accuracy:.2f}%")

  # calculate and print total experiment time
  end_time = time.time()
  print(f"\nTotal experiment's time: {end_time - start_time:.3f} seconds")

  # calculate total trainable parameters
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {trainable_params}")

  return activation_results_list


# experiment 17
def experiment_batch_sizes(train_data, val_data, test_data, num_epochs, device, num_classes, best_hidden_units_combo, best_activation_functions, best_decay_value, best_dropout_probs_combo, best_optimizer, best_learning_rate, batch_sizes_list, num_workers, train_val_transform, is_combined=False, test_transform=None):
  """ Experiment with batch sizes. """
  print("Experimenting with Batch Sizes")

  # define batch sizes to try
  batch_sizes = batch_sizes_list # [16, 32, 64, 128]
  print("Batch sizes to try:", batch_sizes)

  # store results
  batch_size_results_list = []

  # loop over batch sizes
  for batch_size in batch_sizes:
    print(f"\nTesting batch size: {batch_size}...")

    # create new dataloaders
    if is_combined:
      train_dataloader, val_dataloader, test_dataloader, class_names = create_dataloaders_combined(train_data, val_data, test_data, train_val_transform, test_transform, batch_size, num_workers)
    else:
      train_dataloader, val_dataloader, test_dataloader, class_names = create_dataloaders(train_data, val_data, test_data, train_val_transform, batch_size, num_workers)

    # initialize model for the current batch size
    model = CNN_v2(out_features=num_classes, hidden_units=best_hidden_units_combo, dropout_probs=best_dropout_probs_combo, activation_functions=best_activation_functions).to(device)

    # set loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = best_optimizer(params=model.parameters(), lr=best_learning_rate, weight_decay=best_decay_value)

    # start training
    start_time = time.time()
    results = train(model=model,
                    train_dataloader=train_dataloader,
                    test_dataloader=val_dataloader,
                    optimizer=optimizer,
                    loss_fn=loss_fn,
                    epochs=num_epochs,
                    device=device)

    # calculate and print total training time
    end_time = time.time()
    print(f"\nTotal training time: {end_time - start_time:.3f} seconds")

    # calculate total trainable parameters
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total trainable params: {trainable_params}")

    # save results
    batch_size_results_list.append((batch_size, results['val_acc'][-1]))

    # print best results for the current batch size
    print(f"\nBatch size: {batch_size}")
    print(f"Validation accuracy: {results['val_acc'][-1]:.4f}\n")

  return batch_size_results_list


# experiment 18
def cross_validation(dataset, num_epochs, device, num_classes, num_folds, best_config, num_workers, load_weights=False, best_weights=None):
  """ Applies cross-validation to a given dataset and model. """
  print("Cross-Validation training ...")

  # unpack the dictionary
  best_batch_size = best_config['best_batch_size']
  best_hidden_units_combo = best_config['best_hidden_units_combo']
  best_dropout_probs_combo = best_config['best_dropout_probs_combo']
  best_optimizer = best_config['best_optimizer']
  best_learning_rate = best_config['best_learning_rate']
  best_activation_functions = best_config['best_activation_functions']
  best_decay_value = best_config['best_decay_value']
  loss_fn = best_config['loss_fn']
  epochs = best_config['epochs']

  # storage dictionary for results
  cross_valid_results = {}

  # shuffle dataset indices
  indices = list(range(len(dataset)))
  random.shuffle(indices)

  # define fold size
  fold_size = len(dataset) // num_folds
  remainders = len(dataset) % num_folds
  print(f"Dataset size: {len(dataset)}")
  print(f"Number of folds: {num_folds}")
  print(f"Fold size: {fold_size}")

  # loop over the folds
  for fold in range(num_folds):
    print(f"\nFold {fold + 1}/{num_folds}")

    # define validation indices and training indices
    start_idx, end_idx = fold * fold_size, (fold + 1) * fold_size + remainders
    val_indices = indices[start_idx:end_idx]
    train_indices = [i for i in indices if i not in val_indices]

    # create dataset subsets
    train_subset = Subset(dataset, train_indices)
    val_subset = Subset(dataset, val_indices)

    # create dataloaders
    train_dataloader = DataLoader(train_subset, batch_size=best_batch_size, shuffle=True, num_workers=num_workers)
    val_dataloader = DataLoader(val_subset, batch_size=best_batch_size, shuffle=False, num_workers=num_workers)

    # initialize the model for the current fold
    model = CNN_v2(out_features=num_classes, hidden_units=best_hidden_units_combo, activation_functions=best_activation_functions, dropout_probs=best_dropout_probs_combo).to(device)

    # in case we load another model's best weights
    if load_weights:
      print("Loading best weights ...")
      model.load_state_dict(best_weights) # must be of the same form/architecture, e.g. CNN_v2!

    # set loss function and optimizer
    optimizer = best_optimizer(params=model.parameters(), lr=best_learning_rate, weight_decay=best_decay_value)

    # start training
    start_time = time.time()
    model_results = train(model=model,
                          train_dataloader=train_dataloader,
                          test_dataloader=val_dataloader,
                          optimizer=optimizer,
                          loss_fn=loss_fn,
                          epochs=epochs,
                          device=device)
    end_time = time.time()

    # calculate training time and store results
    model_results['training_time'] = end_time - start_time
    model_results['trainable_params'] = sum(p.numel() for p in model.parameters() if p.requires_grad)
    model_results['model_weights'] = model.state_dict()

    # save results
    cross_valid_results[fold] = model_results

    # print validation accuracy for the current fold
    print(f"Validation accuracy: {model_results['val_acc'][-1]:.4f}\n")

  # compute final validation accuracy (mean of all final validation accuracies)
  val_accuracies = [results['val_acc'][-1] for results in cross_valid_results.values()]
  final_val_acc = sum(val_accuracies) / len(val_accuracies)
  final_std = np.std(val_accuracies)
  print("\nCross-validation results:")
  print(f"Final validation accuracy Mean: {final_val_acc:.4f}") # want high mean val accuracy!
  print(f"Final validation accuracy Standard Deviation: {final_std:.4f}") # want small std=>high confidence that model generalizes well!

  return cross_valid_results


# experiment 19
def experiment_balanced_dataset(model, train_dataloader, val_dataloader, num_epochs, device, best_config, class_weights):
  """ Experiment with a training on the initial, but balanced dataset. """
  print("Experimenting with Balanced Dataset")

  # unpack what's useful from best_config dictionary
  best_decay_value = best_config['best_decay_value']
  best_optimizer = best_config['best_optimizer']
  best_learning_rate = best_config['best_learning_rate']

  # set the loss function with class weights and optimizer
  loss_fn = nn.CrossEntropyLoss(weight=class_weights)
  optimizer = best_optimizer(params=model.parameters(), lr=best_learning_rate, weight_decay=best_decay_value)

  # start training with early stopping
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=val_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device,
                  early_stopping=True,  # enabled early stopping
                  patience=5,  # epochs to wait
                  delta=0.005)  # minimum improvement
  end_time = time.time()

  # calculate and print total training time
  training_time = end_time - start_time
  print(f"\nTotal training time: {training_time:.3f} seconds")

  # calculate and print total trainable parameters
  total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print("Total trainable params:", total_params)

  return results


# experiment 20
def experiment_data_augmentation(model, train_dir, val_dataloader, num_epochs, device, best_config, class_weights, num_workers, mean, std, is_Trivial=True):
  """ Experiment with a training on a balanced dattaset with data augmentation. """
  print("Experimenting with Data Augmentation")

  # unpack the best_config dictionary
  best_decay_value = best_config['best_decay_value']
  best_optimizer = best_config['best_optimizer']
  best_learning_rate = best_config['best_learning_rate']
  best_batch_size = best_config['best_batch_size']

  if is_Trivial:
    # create augmented transform
    augmented_transform = transforms.Compose([
        transforms.TrivialAugmentWide(),  # automatically applies a random augmentation
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
        ])
  else:
    # another very simple augmented trasnform
    augmented_transform = transforms.Compose([
        transforms.RandomHorizontalFlip(), # random horizontal flip
        #transforms.RandomVerticalFlip(), # random vertical flip
        transforms.RandomRotation(degrees=10), # random rotation +/- 10
        #transforms.RandomResizedCrop(size=(25, 149), scale=(0.8, 1.0)), # random cropping to (25, 129) with zoom 80%-100%
        #transforms.RandomAffine(degrees=20, scale=(0.8, 1.2)),  # rotates ±20° and zoom 80%-120%
        #transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # random brightness, contrast, saturation, hue
        transforms.ToTensor(),  # convert images to tensors
        transforms.Normalize(mean=mean, std=std)
        ])

  # create new train, transformed dataset
  train_aug_data = ImageFolder(root=train_dir, transform=augmented_transform)

  # verify the new train dataset
  # print(f"Number of training samples: {len(train_aug_data)}")
  # print(f"Class names: {train_aug_data.classes}")

  # create new train dataloader
  train_aug_dataloader = DataLoader(dataset=train_aug_data, batch_size=best_batch_size, shuffle=True, num_workers=num_workers)

  # set loss function and optimizer
  loss_fn = nn.CrossEntropyLoss(weight=class_weights) if class_weights is not None else nn.CrossEntropyLoss()
  optimizer = best_optimizer(params=model.parameters(), lr=best_learning_rate, weight_decay=best_decay_value)

  # start training with early stopping
  start_time = time.time()
  results = train(model=model,
                  train_dataloader=train_aug_dataloader,
                  test_dataloader=val_dataloader, # validation dataloader is preloaded
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=num_epochs,
                  device=device,
                  early_stopping=True, # enabled early stopping
                  patience=5,
                  delta=1e-5)
  end_time = time.time()

  # calculate and print total training time
  training_time = end_time - start_time
  print(f"\nTotal training time: {training_time:.3f} seconds")

  # calculate total trainable parameters
  total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
  print(f"Total trainable params: {total_params}")

  return results

print("Module experiments imported successfully!")

Writing src/experiments.py


### utils.py: contains various utility functions.

In [7]:
%%writefile src/utils.py
""" Contains various utility functions. """

import torch
import torchmetrics
from torchmetrics import ConfusionMatrix
from torchmetrics.classification import Precision, Recall, F1Score
from sklearn.metrics import classification_report
from mlxtend.plotting import plot_confusion_matrix
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm.auto import tqdm
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import time
import os


def set_seeds():
  """ Set random seeds for reproducibility. """
  torch.manual_seed(42)
  torch.cuda.manual_seed(42)


def plot_images_per_class_big(all_M_images, all_L_images, all_R_images, label_dict):
  """ Plots 5 random images per class for M, L, and R datasets, with labels. """

  plt.figure(figsize=(15, 20))
  rows, columns = len(label_dict) * 3, 5  # 3 datasets, 5 samples per class

  for i, (label_id, label_name) in enumerate(label_dict.items()):

    # get all image paths for the current label in each dataset
    image_paths_M = [path for path in all_M_images if int(path.split('/')[-1].split('.')[-3][-2:]) == label_id]
    image_paths_L = [path for path in all_L_images if int(path.split('/')[-1].split('.')[-3][-2:]) == label_id]
    image_paths_R = [path for path in all_R_images if int(path.split('/')[-1].split('.')[-3][-2:]) == label_id]

    # select 5 random images for the current label in each dataset
    if len(image_paths_M) >= 5 and len(image_paths_L) >= 5 and len(image_paths_R) >= 5:
      random_images_M = random.sample(image_paths_M, 5)
      random_images_L = random.sample(image_paths_L, 5)
      random_images_R = random.sample(image_paths_R, 5)

      # plot images for M dataset
      for j, image_path in enumerate(random_images_M):
        plt.subplot(rows, columns, i * 15 + j + 1)  # plot every 15 subplots for each class
        image = plt.imread(image_path)
        plt.imshow(image)
        plt.title(f"M: {label_name}")
        plt.axis("off")

      # plot images for L dataset
      for j, image_path in enumerate(random_images_L):
        plt.subplot(rows, columns, i * 15 + j + 6)  # plot every 15 subplots for each class and second row (j+5)
        image = plt.imread(image_path)
        plt.imshow(image)
        plt.title(f"L: {label_name}")
        plt.axis("off")

      # plot images for R dataset
      for j, image_path in enumerate(random_images_R):
        plt.subplot(rows, columns, i * 15 + j + 11)  # plot every 15 subplots for each class and third row (j+5 from the prvious state)
        image = plt.imread(image_path)
        plt.imshow(image)
        plt.title(f"R: {label_name}")
        plt.axis("off")

  plt.tight_layout()
  plt.show()


def plot_images_per_class_small(all_small_images, small_label_list):
  """ Plots 5 random images per class for small dataset, with labels. """

  plt.figure(figsize=(15, 20))
  rows, columns = len(small_label_list), 5  # 1 dataset, 5 samples per class

  for i, label_name in enumerate(small_label_list): # loop over the list and the index i

    # get all image paths for the current label in the small dataset
    image_paths_small = [path for path in all_small_images if os.path.basename(path).split('.')[0][:-3] == label_name] # os.path.basename: extract only the name from the path (it ccame after error!)

    # select 5 random images for the current label
    if len(image_paths_small) >= 5:
      random_images_small = random.sample(image_paths_small, 5)

      for j, image_path in enumerate(random_images_small):
        plt.subplot(rows, columns, i * columns + j + 1)
        image = plt.imread(image_path)
        plt.imshow(image)
        plt.title(f"{label_name}")
        plt.axis("off")

  plt.tight_layout()
  plt.show()


def plot_loss_curves(results):
  """ Plots training curves of a results dictionary. """

  # get all values from the dictionary
  loss = results['train_loss']
  val_loss = results['val_loss']
  acc = results['train_acc']
  val_acc = results['val_acc']

  # also figure out in how many epochs training occured
  epochs = range(len(results['train_loss']))

  # setup a plot
  plt.figure(figsize=(15, 5))

  # plot losses
  plt.subplot(1, 2, 1)
  plt.plot(epochs, loss, label='train_loss')
  plt.plot(epochs, val_loss, label='val_loss')
  plt.title('Loss')
  plt.xlabel('Epochs')
  plt.legend()
  plt.grid(True)

  # plot accuracies
  plt.subplot(1, 2, 2)
  plt.plot(epochs, acc, label='train_acc')
  plt.plot(epochs, val_acc, label='val_acc')
  plt.title('Accuracy')
  plt.xlabel('Epochs')
  plt.legend()
  plt.grid(True)

  plt.tight_layout()
  plt.show()


def plot_hidden_layer_results(hidden_size_df):
  """ Plots a bar chart for hidden layer sizes vs. validation accuracy. """
  plt.figure(figsize=(12, 6))

  # convert tuple values to string for plotting
  hidden_layer_labels = []
  for h in hidden_size_df['Hidden Layer Sizes']:
    h = str(h)
    hidden_layer_labels.append(h)
  validation_accuracies = hidden_size_df['Validation Accuracy']

  # create a horizaontal bar plot
  plt.barh(hidden_layer_labels, validation_accuracies)

  # labels and title
  plt.xlabel("Validation Accuracy")
  plt.ylabel("Hidden Layer Sizes")
  plt.title("Validation Accuracy for Different Hidden Layer Sizes")
  plt.gca().invert_yaxis()  # Invert y-axis to have the best at the top

  # show values on bars
  for index, value in enumerate(validation_accuracies):
    plt.text(value, index, f"{value:.4f}")

  plt.tight_layout()
  plt.show()


def plot_optimizers_results(results_df):
  """ Plots the results of the experiment_optimizers_and_learning_rates function. """

  # ensure results_df is sorted
  results_df = results_df.sort_values(by='Validation Accuracy', ascending=False)

  # extract optimizer, learning rate and validation accuracy
  optimizer_labels = []
  for index, row in results_df.iterrows():
    optimizer_labels.append(f"{row['Optimizer']} - {row['Best Learning Rate']}")
  validation_accuracies = results_df['Validation Accuracy'].tolist() # need to convert tolist in order to plot it below

  # reverse the order so the best optimizer appears at the top
  optimizer_labels.reverse()
  validation_accuracies.reverse()

  # create a horizontal barplot
  plt.figure(figsize=(12,  6))
  plt.barh(optimizer_labels, validation_accuracies)

  # labels and title
  plt.xlabel('Validation Accuracy')
  plt.ylabel('Optimizer and Learning Rate')
  plt.title('Validation Accuracy by Optimizer and Learning Rate')

  # show values on bars
  for index, value in enumerate(validation_accuracies):
    plt.text(value, index, f'{value:.4f}')

  # show the plot
  plt.tight_layout()
  plt.show()


def plot_activation_function_results(results_df):
  """ Plots a bar chart for activation functions combinataions vs. validation accuracy. """

  # ensure the dataframe iis sorted
  results_df = results_df.sort_values(by='Validation Accuracy', ascending=False)

  # create labels by combining activation function 1 and activation function 2
  labels = [f"{row['Activation Function 1']} + {row['Activation Function 2']}" for _, row in results_df.iterrows()]
  validation_accuracies = results_df['Validation Accuracy']

  # create a horizontal bar chart
  plt.figure(figsize=(12, 6))
  plt.barh(labels, validation_accuracies)

  # labels and title
  plt.xlabel('Validation Accuracy')
  plt.ylabel('Activation Functions Combination')
  plt.title('Activation Functions Combinations vs. Validation Accuracy')
  plt.gca().invert_yaxis() # best at the top (another way than above)

  # show values on bars
  for index, value in enumerate(validation_accuracies):
    plt.text(value, index, f'{value:.4f}')

  # show the plot
  plt.tight_layout()
  plt.show()


def plot_cv_results(cv_results_df):
  """ Plots a bar chart for cross-validation folds vs. validation accuracy. """

  # ensure the dataframe is sorted
  cv_results_df = cv_results_df.sort_values(by='Validation Accuracy', ascending=True)

  # extract fold numbers and their validation accuracies
  folds = [f"Fold {fold}" for fold in cv_results_df['Fold']] # ensures correct indexing!
  val_accs = cv_results_df['Validation Accuracy']

  # create a horizontal bar chart
  plt.figure(figsize=(12, 6))
  plt.barh(folds, val_accs)

  # labels and title
  plt.xlabel('Validation Accuracy')
  plt.ylabel('Fold')
  plt.title('Cross-Validation Results')

  # show accuracy values on bars
  for index, value in enumerate(val_accs):
    plt.text(value, index, f'{value:.4f}')

  # show the plot
  plt.tight_layout()
  plt.show()


def print_best_epoch_and_val_acc(results):
  """ Prints the best epoch and its corresponding validation accuracy. """

  best_epoch = results['best_epoch']
  print(f"Best epoch: {best_epoch}")
  best_val_acc = results['val_acc'][best_epoch - 1] # (val_acc is zero based)
  print(f"Best validation accuracy: {best_val_acc:.4f}")

  return best_epoch, best_val_acc


def print_and_plot_models_accuracies(df):
  """ Prints and plots the dataframe of models accuracies. """

  #print(df)
  for index, row in df.iterrows():
    print(f"Model {row['Model']}: {row['Validation Accuracy']:.8f}")

  # plot the results
  plt.figure(figsize=(12, 6))
  plt.barh(df['Model'], df['Validation Accuracy'])
  plt.ylabel('Model')
  plt.xlabel('Validation Accuracy')
  plt.title('Validation Accuracy Comparison')

  # display the values on bars
  for index, value in enumerate(df['Validation Accuracy']):
    plt.text(value, index, f'{value * 100:.2f}')

  plt.tight_layout()
  plt.show()


def store_best_model(df, dataset_name, best_models_dict):
  """ Finds the best performing model in respect to the accuracy of df and stores it in best_models_dict. """

  # identify the row wit the highest test avccuracy and retrieve it along with its corresponding model name
  best_model_index = df['Validation Accuracy'].idxmax()
  best_model_name = df.loc[best_model_index, 'Model']
  best_model_test_acc = df.loc[best_model_index, 'Validation Accuracy']

  # update the dictionary with best models info
  best_models_dict[dataset_name] = {'Model': best_model_name, 'Validation Accuracy': best_model_test_acc}

  return best_models_dict


def predict_from_dataloader(model, test_dataloader, device, index=0):
  """ Extract a single sample from the test dataloader and predict its label. """

  # retrieve one batch of images and labels
  img_batch, label_batch = next(iter(test_dataloader))

  # get a single image from the batch and unsqueeze it to meet the shape of the model
  img_single, label_single = img_batch[index].unsqueeze(0).to(device), label_batch[index]

  # perform a forward pass on a single image = predict the label of the single image
  model.eval()
  with torch.inference_mode():
    pred_logits = model(img_single.to(device)) # remember our model is on device!

  # print out what's happening, converting logits->probs->labels
  print(f"Image shape: {img_single.shape} -> [batch_size, color_channels, height, width]\n")
  print(f"Output logits:\n{pred_logits}\n")
  print(f"Output prediction probabilities:\n{torch.softmax(pred_logits, dim=1)}\n")
  print(f"Output prediction label:\n{torch.argmax(torch.softmax(pred_logits, dim=1), dim=1)}\n")
  print(f"Actual label:\n{label_single}")


def save_model(model, target_dir, model_name):
  """ Saves a PyTorch model to a target directory. """

  # create target directory, if not exists
  target_dir_path = Path(target_dir)
  target_dir_path.mkdir(parents=True, exist_ok=True)

  # create model save path
  assert model_name.endswith(".pth") or model_name.endswith(".pt"), "model_name should end with '.pt' or '.pth'"
  model_save_path = target_dir_path / model_name

  # save the model state_dict()
  torch.save(obj=model.state_dict(), f=model_save_path)
  print(f"Model saved to: {model_save_path}")


def compute_multi_generalization_gaps(results_dict):
  """ Computes the generalization gap (train acc - val acc) given many model's results (experiments) dictionary. """
  differences = {}
  for decay, results in results_dict.items():
    diff = results["train_acc"][-1] - results["val_acc"][-1]
    differences[decay] = diff
    print(f"Difference between train and val acc for {decay}: {diff:.4f}")

  # Find the best/smallest gap
  min_diff = min(differences.values())
  best_value = [decay for decay, diff in differences.items() if diff == min_diff][0]
  print(f"\nThe best (smallest) generalization gap is for {best_value} and is {min_diff:.4f}")

  return best_value, min_diff


def compute_generalization_gap(results):
  """ Computes the generalization gap (train acc - val acc) given a model's training results dictionary. """

  train_acc_last = results["train_acc"][-1]
  val_acc_last = results["val_acc"][-1]
  diff = train_acc_last - val_acc_last
  print(f"Generalization gap (train acc - val acc): {diff:.4f}")

  return diff


def compute_avg_cv_results(cross_validation_results):
  """ Computes the averaged learning curves (train_loss, val_loss, train_acc, val_acc) across all folds of all epochs of cross validation results. """

  # find the number of folds and epochs
  num_folds = len(cross_validation_results)
  num_epochs = len(cross_validation_results[0]['train_loss']) # take for example the length of one metric

  # initialize arrays to hold the sum over epochs
  sum_train_loss = np.zeros(num_epochs)
  sum_val_loss = np.zeros(num_epochs)
  sum_train_acc = np.zeros(num_epochs)
  sum_val_acc = np.zeros(num_epochs)

  # loop over each fold and accumulate the metrics
  for fold, results in cross_validation_results.items():
    sum_train_loss += np.array(results['train_loss'])
    sum_val_loss +=  np.array(results['val_loss'])
    sum_train_acc +=  np.array(results['train_acc'])
    sum_val_acc +=  np.array(results['val_acc'])

  # compute the average for each metric across all folds
  avg_train_loss = sum_train_loss / num_folds
  avg_val_loss = sum_val_loss / num_folds
  avg_train_acc = sum_train_acc / num_folds
  avg_val_acc = sum_val_acc / num_folds

  # create a dictionary with the averaged results
  cv_averaged_results = {
      'train_loss': avg_train_loss,
      'val_loss': avg_val_loss,
      'train_acc': avg_train_acc,
      'val_acc': avg_val_acc
  }

  return cv_averaged_results


def evaluate_model_performance_using_torchmetrics(model_y_pred, model_y_true, test_data_classes):
  """ Evaluates model performance by computing a confusion matrix, precision, recall, and F1-score. """

  # setup confusion matrix instance and compute confusion matrix
  confmat = ConfusionMatrix(num_classes=len(test_data_classes), task='multiclass')
  confmat_tensor = confmat(model_y_pred.clone().detach(), model_y_true.clone().detach())

  # display the confusion matrix
  print("Confusion Matrix:")
  print(confmat_tensor)
  print(" ")

  # plot the confusion matrix
  fig, ax = plot_confusion_matrix(conf_mat=confmat_tensor.numpy(), class_names=test_data_classes, figsize=(10, 6))
  plt.show()

  # compute classification metrics
  precision = Precision(average='macro', num_classes=len(test_data_classes), task='multiclass')
  recall = Recall(average='macro', num_classes=len(test_data_classes), task='multiclass')
  f1 = F1Score(average='macro', num_classes=len(test_data_classes), task='multiclass')

  prec = precision(model_y_pred, model_y_true)
  rec = recall(model_y_pred, model_y_true)
  f1_score = f1(model_y_pred, model_y_true)

  # print the calculated scores
  print(f"\nPrecision: {prec:.4f}\nRecall: {rec:.4f}\nF1-Score: {f1_score:.4f}")

  # print classification report
  print("\nClassification Report:")
  print(classification_report(y_true=model_y_true.cpu().numpy(), y_pred=model_y_pred.cpu().numpy()))

  # return results as tensors in a dictionary
  return {
      "precision": prec, # .item() for just scalars
      "recall": rec,
      "f1_score": f1_score
      }


def get_sorted_cv_results(cv_results):
  """ Extracts validation accuracies and model weights from the cross validation results dictionary and returns a sorted dataframe in desc order of validation accuracy. """
  results_list = []
  for fold, results in cv_results.items():
    val_acc = results['val_acc'][-1]
    model_weights = results['model_weights']
    results_list.append({'Fold': fold + 1, 'Validation Accuracy': val_acc, 'Model Weights': model_weights})
  results_df = pd.DataFrame(results_list)
  results_df = results_df.sort_values(by='Validation Accuracy', ascending=False)
  print(results_df) # print for vizualization
  return results_df


print("Module utils imported successfully!")

Writing src/utils.py


### Save and download the parent src directory in drive.

In [8]:
# zip the dir with all python files
!zip -r src.zip src

# connect to google drive
from google.colab import drive
drive.mount('/content/drive')

# define destination path
destination_path = '/content/drive/MyDrive/src.zip'

# move zip to drive
!mv src.zip '{destination_path}'
if os.path.exists(destination_path):
  print(f"Saved to Google Drive at {destination_path}")

  adding: src/ (stored 0%)
  adding: src/models.py (deflated 76%)
  adding: src/data_utils.py (deflated 74%)
  adding: src/engine.py (deflated 73%)
  adding: src/experiments.py (deflated 89%)
  adding: src/config.py (deflated 42%)
  adding: src/utils.py (deflated 73%)
Mounted at /content/drive
Saved to Google Drive at /content/drive/MyDrive/src.zip
