<div class="markdown-google-sans">
  <h1>Architecture design in neural networks</h2>
</div>

Content:
1. Writing a generic architectural constructor class (revision / extension of last week)
2. Inspecting out training: TensorBoards intro (New!)
3. Diagnosing Vanishing Gradients!
4. Hyperparameter Tuning with Optuna lib

# Loading the data

Again, we will be using PyTorch library/framework for this lab, following on from last week

In [23]:
# import basic libs
import pandas as pd, numpy as np
import matplotlib.pyplot as plt, seaborn as sns
from sklearn.model_selection import train_test_split
from IPython.display import display, clear_output
import time
import datetime

# import torch (whole lib & specific modules)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

torch.set_default_dtype(torch.double)

seed = 42

In [24]:
# import dataset
from torchvision import datasets
from torchvision.transforms import ToTensor

training_data = datasets.FashionMNIST( # MNIST for image classification
    root="data", # specifies directory
    train=True,
    download=True,
    transform=ToTensor(), # converts images from PIL format (or numpy array) to PyTorch Tensor (fundamental data structure for PyTorch)
)

test_data = datasets.FashionMNIST(
    root="data",
    train=False, # NB
    download=True,
    transform=ToTensor(),
)

In [25]:
# there is unncessary amount of data!
print(f"len of training dataset: {len(training_data)}, len of testing dataset: {len(test_data)}")

len of training dataset: 60000, len of testing dataset: 10000


In [26]:
# extract a subset of the datasets
from torch.utils.data import random_split

training_data, _ = random_split(training_data, # arg: original dataset to split
                                [1/10, 9/10], # split props
                                generator=torch.manual_seed(seed)) # reproducibility

test_data, _ = random_split(test_data, [1/10, 9/10], generator=torch.manual_seed(seed))

print(f"len of training dataset: {len(training_data)}, len of testing dataset: {len(test_data)}")

len of training dataset: 6000, len of testing dataset: 1000


# Constructing the model using a generic class

As last week, we construct a lazy sequential model with a customized number of `hidden_layers`, of dimensions provided by list object `hidden_size`.

Whereas last week predefined a classification task -> only required `input_dims` as an arg; here we define a more generic architecture that can be either regr / classification -> requires `out_feat_size` that can take any hidden dims size.

In [28]:
class LazySequential(nn.Sequential):

    def __init__(self,
                 in_feat_size: int,
                 out_feat_size: int,
                 hidden_sizes: list,
                 activation_fn: str ='ReLU'):

        """
        NB: two step function:
            1) first we build a list object called 'layers' that contains definition of layer-wise achitecture as iterable tuples
            2) only then we do call constructor method of the parent class, feeding it the 'layers' list object

        """

        layers = [nn.Flatten()] # defines FIRST layer that flattens input image (channels * height * width) into vector

        # loop adds INTERIM layers according to hidden_sizes list
        for i, size in enumerate(hidden_sizes):

            # 1st value in tuple:
            hidden_layer = nn.Linear(in_features=(in_feat_size if i==0 # first layer
                                                else hidden_sizes[i-1]), # subsequent layers
                                   out_features=size)

            # 2nd value in tuple:
            layers.extend([hidden_layer, getattr(nn, activation_fn)()])
                # ^^^ creates temp list w/ 2 elements
                # ^^^ then adds tuple to 'layers' list

        # finally the OUTPUT layer:
        layers.append(nn.Linear(in_features=size, # NB: `size` = last value in hidden_sizes
                                out_features=out_feat_size))

        # now we unpack `layers` into as individual arguments into nn.Sequential constructor
        super().__init__(*layers) # the * is the unpacking operator

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 17)

So now we have a generic class for definiing architecture upto the output activation function! We haven't applied a final activation function, because in PyTorch this is handled by the selected loss function (here, `CrossEntropyLoss`)

Next, we need to define the training function; here our loss function will contain the final activations fn - this modularity allows us to define a generic architecture to be put to multiple end use cases.

We will also feed our per-batch training loss (already calculated in the training loop) to a TensorBoard for visualisation.

In [None]:
# define the training function

from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter

# NB: getattr gets attribute (fn/class/method) from object; syntax (object, attribute_str)

def torch_train(model,
                data,
                optimizer='Adam',
                loss='CrossEntropyLoss', # classification
                batch_size=2**6,
                epochs=1,
                shuffle=False,
                logdir=None): # for tensorboard


  criterion = getattr(nn, loss)() # returns torch's CrossEntropyLoss() fn

  optimizer = getattr(optim, optimizer)(model.parameters()) # returns 'Adam'() optimiser from torch.optim lib -> passes the parameter attr of the model

  dataloader = DataLoader(data, # wraps our Dataset object in a dataloader
                          batch_size=batch_size,
                          shuffle=shuffle)

  model.train()

  # unique dir path for each training run
  current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
  logdir = '//'.join(['runs', current_time, 'train']) if logdir is None else logdir

  # SummaryWriter object (Torch class) for capturing data to be passed to TensorBoard
  logger = SummaryWriter(logdir) # constructor takes str of dir to write events to

  for epoch in range(epochs):
    # init values:
    i = 0

    for (X, Y) in tqdm(dataloader):
        optimizer.zero_grad()
        # Forward pass
        pred = model(X)
        loss = criterion(
            pred.squeeze(-1), # removes a dim -> passes to loss fn
            Y.long()) # ensure Y is a long tensor for CrossEntropyLoss
        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        i+=1 # as python starts 0

        # here log training loss fed to TensorBoard
        logger.add_scalar(tag='Training loss per batch', # --> name of scalar point added
                          scalar_value=loss, # --> y (value of the loss)
                          global_step=i + len(dataloader) * epoch) # --> x (batch number)

  return model

### Instantiate our models

In [None]:
# define 2 architectures
hidden_layers1 = [2**10, 2**5, 2**3]
hidden_layers2 = [2**10, 2**7, 2**5]

# model1
model1 = LazySequential(in_feat_size=28*28,
                        out_feat_size=10,
                        hidden_sizes=hidden_layers1)
model1 = torch_train(model1,
                     training_data,
                     logdir='runs/models1_2/model1')

# model 2
model2 = LazySequential(in_feat_size=28*28, out_feat_size=10, hidden_sizes=hidden_layers2)
model2 = torch_train(model2, training_data, logdir='runs/models1_2/model2')

# TensorBoard(s)

[Tensorboard](https://www.tensorflow.org/tensorboard) is a visualisation tool provided by TensorFlow that allows you to visualise our ML training process -> helps us understand how our model is training and to diagnose issues. Can incl metrics like:
- loss
- accuracy,
- model graphs,
- other data (all customisable).

TensorBoard is just the visualisation dashboard; we need to generate and send the metrics to it. Tensorboard integrates w/ libs PyTorch (here: SummaryWriter) to calculate and log the specific metrics we want to see during your training process (i.e. we need to calculate the logged metrics in our training / eval loops).

In [None]:
# colab magic command -> loads IPython extension
# tells tensorboard to look for log files in in `runs/models1_2` dir

%load_ext tensorboard
%tensorboard --logdir runs/models1_2

This TensorBoard shows us the Training Loss that we calculated as a by-product of our training process; but it might be more informative to know how the average epoch loss changed over time.

Let's develop our TensorBoard to visualise this. We'll need to calculate this new metric and pass it to the Torch's SummaryWriter object within the training function.

In [None]:
# @title Modifying training to log avg epoch loss and total elapsed time per epoch.

import warnings
import time
warnings.filterwarnings('ignore')


def torch_train_mod(model, data, optimizer='Adam', loss='CrossEntropyLoss', batch_size=2**6, epochs=1, shuffle=False, logdir=None):

    criterion = getattr(nn, loss)()
    optimizer = getattr(optim, optimizer)(model.parameters())
    dataloader = DataLoader(data, batch_size=batch_size, shuffle=shuffle)

    model.train()

    current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    logdir = '//'.join(['runs', current_time, 'train']) if logdir is None else logdir
    logger = SummaryWriter(logdir)


    start = time.time() # <- NEW: returns current time as float

    for epoch in range(epochs):
        epoch_loss = 0 # <- NEW: init epoch loss
        i = 0
        for (X, Y) in tqdm(dataloader):
            optimizer.zero_grad()
            pred = model(X)
            loss = criterion(pred.squeeze(-1), torch.tensor(Y))
            loss.backward()
            optimizer.step()
            epoch_loss+=loss.item() # <- NEW: sum of epoch loss
            i+=1
            logger.add_scalar('Training loss per batch', loss, i + len(dataloader) * epoch )

        # new:
        logger.add_scalar('Avg epoch loss',  # <- NEW comp avg
                        epoch_loss / len(dataloader),
                        epoch) # before was batch
        logger.add_scalar('Total training time', time.time() - start, epoch)

    logger.flush() # forces buffered data to disk
    logger.close()

    return model

# Online learning, minibatch and batch

With our training function set up to visualise how loss changes over epoch, let's visualise three common batch learning strategies.

- **Online learning**: Processes one sample at a time, updating model weights after each individual example.
- **Minibatch learning**: Processes minibatches (obviously).
- **Batch learning** (or Full-batch learning): Processes the entire dataset at once, computing gradients over all samples before updating weights.

In [None]:
import warnings
warnings.filterwarnings('ignore')

# define our learning types
names = ['online', 'minibatch', 'fullbatch']
batch_sizes = [1, 2**6, len(training_data)]

# make one timestamped parent directory for this batch of runs
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
parent_logdir = f"runs/batches/{timestamp}"

# first instatiate the model, then feed that model to the training function
for name, batch_size in zip(names, batch_sizes): # zip combnes our two lists into an iterator of tuples
    batch_model = LazySequential(
        in_feat_size=28*28, # model is generic across batch types
        out_feat_size=10,
        hidden_sizes=hidden_layers1 # Using hidden_layers1 for consistency
    )
    logdir = f"{parent_logdir}/{name}"   # <- now grouped under timestamp
    batch_model = torch_train_mod(
        batch_model,
        training_data,
        batch_size=batch_size, # this changes within the loop
        epochs=2,
        logdir=logdir
    )

print("Logs saved to:", parent_logdir)

In [None]:
%tensorboard --logdir {parent_logdir}

# The Vanishing Gradient Problem:

## A quick bit of theoretical context:

In short, think of backpropagation as sending a **signal** backwards:
* Each layer attenuates it by multiplying with <1.
* By the time it reaches the start, it’s almost silence (vanishing gradient).

---

### NNs learn via **backpropagation**:

  * Forward pass: compute activations $h^{(l)}$ layer by layer.
  * Backward pass: compute gradients of the loss with respect to weights, using the chain rule.

Formally, for layer $l$:

$$
\frac{\partial Loss}{\partial W^{(l)}} = \frac{\partial Loss}{\partial h^{(L)}} \cdot
\frac{\partial h^{(L)}}{\partial h^{(L-1)}} \cdots
\frac{\partial h^{(l+1)}}{\partial h^{(l)}} \cdot
\frac{\partial h^{(l)}}{\partial W^{(l)}}
$$

This involves multiplying many Jacobians (derivs of activations wrt inputs).


### Chain rule amplification / attenuation

Each term in the product is typically a number between $(0,1)$ for common activation functions (sigmoid, tanh).

* Example: derivative of sigmoid $\sigma(z) = \frac{1}{1+e^{-z}}$ is

  $$
  \sigma'(z) = \sigma(z)(1-\sigma(z)) \in (0, 0.25)
  $$
* For tanh, derivative $\in (0,1)$, but max is 1 at the origin and usually smaller.

So in backpropagation, the gradient passed backwards through each layer is repeatedly multiplied by numbers < 1.

$$
\text{Gradient at layer } l \approx \prod_{k=l+1}^L f'(z^{(k)})
$$

If $L$ is large, this product shrinks **exponentially** with depth.


### Vanishing gradient phenomenon

In deep networks, gradients at the **earlier layers** (close to the input) become *vanishingly small*. This has consequences:
  * Early layers barely update during training → they stay close to their initialisation.
  * Network learns “shortcuts” with later layers, but fails to capture hierarchical representations from raw input.

## Solution & Historical context

In the 1990s, this problem made it nearly impossible to train deep networks (> 2–3 layers). Then, breakthroughs:

  * **ReLU activations** (avoid most saturation).
  * **Batch normalisation** (keep activations in stable ranges).
  * **Careful initialisation** (this built into most frameworks like PyTorch).
  * **Residual connections (ResNets)**: skip connections let gradients flow more directly (see future classes).

These techniques revived deep learning in the 2010s (woo!)


# BONUS:

## Why it’s worse for sigmoid/tanh

* Sigmoid squashes input to $[0,1]$. Most inputs fall in saturated regions ($\sigma'(z) \approx 0$).
* Tanh squashes to $[-1,1]$. Better, but still saturates.
* ReLU partially fixes it ($f'(z)=1$ when active, 0 when inactive).
  * That prevents vanishing *for active neurons*, but “dead ReLUs” still give zero gradient.

---

## Exploding gradient (the sibling problem)

If derivatives or weight magnitudes > 1, the product **explodes exponentially**.

* Early gradients become enormous → unstable updates.
* Training loss oscillates or diverges.

Vanishing and exploding are two sides of the same coin: *repeated multiplication through depth*.


---

## Back to our lab implementation:

The below exercise aims to demonstrate the vanishing gradient problem. By (i)initialising the weights to zero and (ii) using a Sigmoid activation function in a deep network, we observe how gradients diminish during training, hindering effective learning.

In [29]:
def init_weights_zero(module):
    if isinstance(module, nn.Linear):
        # init weights & biases to 0
        torch.nn.init.zeros_(module.weight)
        torch.nn.init.zeros_(module.bias)

# instantiate custom layers
zero_grad_layers = [2**5] * 50
zero_grad_model = LazySequential(in_feat_size=28*28,
                              out_feat_size=10,
                              hidden_sizes=zero_grad_layers,
                              activation_fn='Sigmoid')

# recursively apply to all modules/layers in model
_ = zero_grad_model.apply(init_weights_zero)

# apply our train function (as per usual)
zero_grad_model = torch_train(zero_grad_model, training_data, epochs=1, logdir='runs/zero_grad')

100%|██████████| 94/94 [00:02<00:00, 45.36it/s]


In [None]:
%tensorboard --logdir runs/zero_grad

We can see no trend in the loss time series above; it's purely stochastic / normally distributed. There's no meaningful learning going on when we encounter the vanishing gradient problem.

Now we are going to modify our training function to output a heatmap of the weights to the TensorBoard (same model as before, only diff training function).

In [None]:
def weight_heatmaps(model, cmap='Reds', **fig_kwargs):
    mat, titles = [], []
    vmin, vmax = +np.inf, -np.inf

    for name, param in model.named_parameters():
      if len(param.squeeze().shape) == 2:
        param = param.detach().numpy()
        param = np.abs(param)
        mat.append(param)
        titles.append(name)

        if param.max() > vmax:
          vmax = param.max()
        if param.min() < vmin:
          vmin = param.min()

    fig, axes = plt.subplots(2,3)
    top, bottom = [1,2,3], [-4,-3,-2]
    for i, weights in enumerate((top, bottom)):
      for j, w in enumerate(weights):
        axes[i,j].imshow(mat[w], vmin=vmin, vmax=vmax, cmap=cmap)


    fig.suptitle('First 3 (top row) vs. last 3 (bottom row) hidden weights')
    return fig

This is going to show the weight matrix for selected layers.
- Each cell in the heatmap corresponds to a single weight matrix, connecting  input features to output features in that layer.
  - x axis: input features (units from prev layer)
  - y axis: output features (units in current layer)
- The colour of each cell indicates the magnitude of that weight.

Next, we add it to out training function.

In [None]:
# train function to log heatmaps of the parameters at every epoch

def torch_train_heatmap(model, data,
                optimizer='Adam', loss='CrossEntropyLoss',
                batch_size=2**6, epochs=1, shuffle=False, logdir=None,
                cmap='Reds', **fig_kwargs):

  criterion = getattr(nn, loss)()
  optimizer = getattr(optim, optimizer)(model.parameters())
  dataloader = DataLoader(data, batch_size=batch_size, shuffle=shuffle)

  model.train()

  current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
  logdir = '//'.join(['runs', current_time, 'train']) if logdir is None else logdir
  logger = SummaryWriter(logdir)
  start = time.time()

  for epoch in range(epochs):
    avg_loss = 0
    i = 0

    for (X, Y) in tqdm(dataloader):

        # NEW:
        fig = weight_heatmaps(model, cmap=cmap, **fig_kwargs)
        logger.add_figure('Weights', fig, global_step=i + len(dataloader) * epoch)

        optimizer.zero_grad()

        pred = model(X)
        loss = criterion(pred.squeeze(-1), torch.tensor(Y))

        loss.backward()
        optimizer.step()

        avg_loss+=loss.item()
        i+=1
        logger.add_scalar('Training loss', loss, i + len(dataloader) * epoch )

    logger.add_scalar('Avg epoch loss', avg_loss / len(dataloader), epoch)
    logger.add_scalar('Total training time', time.time() - start, epoch)


  return model

In [None]:
zero_grad_model = torch_train_heatmap(zero_grad_model, training_data, epochs=1, logdir='runs/heatmap')

In [None]:
%tensorboard --logdir runs/heatmap

Here we see weights/activations shrink layer by layer → gradient norms get smaller and smaller → earlier layers barely update → their weights stay close to initialisation (negligible coefficients).

# Hyperparameter Tuning
Hyperparameter tuning is the process of systematically searching for the optimal set of hyperparameters (e.g. number of layers, size of the layers, learning rate, batch size, dropout ...) of a ML model. There are four common methods of hyperparameter optimization:  

*   Manual
*   Grid search
*   Random search
*   Bayesian search

In [None]:
n_layers = [1,2,3]
learning_rate = [0.1, 0.001, 0.00001]

In [None]:
%pip install optuna torcheval --q

### Optuna
Optuna is an open-source hyperparameter optimization framework -> automates the process of finding the best set of hyperparameters. Optuna systematically searches through a defined space of possible values to find the combination that yields the best performance (e.g., highest accuracy, lowest loss) on our model.

- **Study**: In Optuna, a "study" represents an optimisation session. Think of a study as a container that manages the entire hyperparameter tuning process.
    - We specify the optimisation direction.
        - 'maximize' for metrics like accuracy,
        -  'minimize' for metrics like loss.

- **Trial**: a single run of our model with a specific set of hyperparameters suggested by Optuna.
    - The objective function defines what happens in each trial (w/ different parameters).
    - Optuna calls this function repeatedly, each time providing a trial object.

- **Objective Function**: This is the function that Optuna optimises.
    - takes a trial object as input
    - returns the metric we want to optimise (e.g., test accuracy).
    - Inside the objective function:
        - we use trial object to suggest values for the hyperparameters you want to tune. Optuna uses these suggestions to explore the hyperparameter space.
        - we build and train our model using the suggested hyperparameters.
        - we evaluate our model's performance using the chosen metric.
        - we return the evaluated metric.

In [None]:
import optuna
from torcheval.metrics.functional import multiclass_accuracy
from tqdm import tqdm

# Define an objective function to be maximized.
def objective(trial,
              training_data,
              test_data,
              **train_params):
    """
    Objective function for Optuna hyperparameter tuning.

    Args:
        trial (optuna.Trial): An Optuna trial object.
        training_data (torch.utils.data.Dataset): The training dataset.
        test_data (torch.utils.data.Dataset): The test dataset.
        **train_params: Additional parameters to pass to the torch_train function.

    Returns:
        float: The test accuracy of the model with the suggested hyperparameters.
    """

    # Suggest values of the hyperparameters using a trial object.
    n_layers = trial.suggest_int('n_layers', 1, 3)
        # ^^^ telling optuna to sample this hyperparam from defined bounds
        # this is the magic: optuna will choose optimum sampling strategy to imporve performance based on past results
    layers = []

    for i in range(n_layers):
        size = trial.suggest_int(f'n_units_l{i}', 4, 128)
        # ^^^ telling optuna to sample numb of hidden units from our bounds
        layers.append(size)

    # build models
    model = LazySequential(in_feat_size=28*28, out_feat_size=10, hidden_sizes=layers).to(torch.device('cpu'))
    model = torch_train(model, training_data, **train_params)

    # prep data
    test_dataloader = DataLoader(test_data, batch_size= len(test_data))

    # compute acc
    X_test, Y_test = next(iter(test_dataloader))
    Y_pred = model(X_test).argmax(dim=-1)
    test_acc = multiclass_accuracy(Y_pred, Y_test)

    return test_acc

...with our objective function defined, let's create a study object

In [None]:
# Create a study object
study = optuna.create_study(direction='maximize')

# Optimize the objective function.
study.optimize(lambda trial: objective(trial, training_data, test_data), n_trials=5)
# ^ lambda fn: takes arg trial, applies objective()

In [None]:
# Print the best trial's parameters and value
print("Best trial:")
print("  Acc: {}".format(study.best_trial.value))
print("  Params: ")
for key, value in study.best_trial.params.items():
    print("    {}: {}".format(key, value))

# can also access all trials
print("\nAll trials:")
for trial in study.trials:
    print("  Trial {}:".format(trial.number))
    print("    Acc: {}".format(trial.value))
    print("    Params: {}".format(trial.params))