# Import Packages

In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import pytorch_lightning as pl
from torch.utils.data import DataLoader, random_split
from torchviz import make_dot

from typing import Any, Dict, List, Tuple, Union
from torch.jit.annotations import TensorType

from IPython.display import Markdown as md # For automated updates of the table

import pickle # tmp

# Dataset

In [2]:
# MNIST dataset and dataloaders
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
dataset = torchvision.datasets.MNIST(root=os.getcwd(), train=True, transform=transform, download=True)

In [3]:
# Split the dataset into train, val, and test sets
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [4]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=os.cpu_count())
val_loader = DataLoader(val_dataset, batch_size=64, num_workers=os.cpu_count())
test_loader = DataLoader(test_dataset, batch_size=64, num_workers=os.cpu_count())

# Models
Below is a dictionary of the models used for the experiment, with different numbers of convolutional layers and fully connected layers

In [5]:
# Parameter dictionary for each model and 
# corresponding layer parameters.
params: Dict[str,Dict[str,torch.nn]] = {
    "model.1": {
        "conv_layers": (nn.Conv2d(1, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(32, 64, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(64, 128, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(128, 256, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        ),
        "fc_layer": (nn.Linear(256, 128),
                     nn.ReLU(),
                     nn.Linear(128, 10),)
    },
    "model.2": {
        "conv_layers": (nn.Conv2d(1, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(32, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(32, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(32, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2)
                        ),
        "fc_layer": (
                    #  nn.Linear(32, 32),
                    #  nn.ReLU(),
                     nn.Linear(32, 10),
                     )
    },
    "model.3": {
        "conv_layers": (nn.Conv2d(1, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(32, 64, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(64, 128, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2)
                        ),
        "fc_layer": (nn.Linear(128 * 3 * 3, 256),
                     nn.ReLU(),
                     nn.Linear(256, 10))
    },
    "model.4": {
        "conv_layers": (nn.Conv2d(1, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        nn.Conv2d(32, 64, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2)
                        ),
        "fc_layer": (nn.Linear(64 * 7 * 7, 128),
                     nn.ReLU(),
                     nn.Linear(128, 10)
                     )
    },
    "model.5": {
        "conv_layers": (nn.Conv2d(1, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        ),
        "fc_layer": (nn.Linear(32 * 14 * 14, 128),
                     nn.ReLU(),
                     nn.Linear(128, 10)
                     )
    },
    "model.6": {
        "conv_layers": (nn.Conv2d(1, 32, kernel_size=3, padding=1),
                        nn.ReLU(),
                        nn.MaxPool2d(kernel_size=2, stride=2),
                        ),
        "fc_layer": (nn.Linear(14 * 14 * 32, 10))
    },
}

# Define Functions

In [6]:
def count_trainable_parameters(model: pl.LightningModule) -> int:
    total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    return total_params

In [7]:
def _save_and_viz_pl_model(model: Union[nn.Module,pl.LightningModule], filename: str) -> None:
    """Helper function to visualize and plot the model architecture.

    Args:
        model: Input pytorch (lightning) model.
        filename: Output filename (no file extension).
    """
    # Create a dummy input with the same shape as expected input during training
    dummy_input = torch.randn(1, 1, 28, 28)

    # Generate the visualization of the model architecture
    dot = make_dot(model(dummy_input))
    # params = dict(model.named_parameters())

    # Save the visualization as an image
    dot.format = 'png'
    dot.render(filename, cleanup=True)
    
    return None

In [8]:
def save_pt_model(model: Union[nn.Module,pl.LightningModule], filename: str) -> None:
    """Saves pytorch (lightning) model, and creates visualization of model architecture.

    Args:
        model: Input pytorch (lightning) model.
        filename: Output filename.
    """
    # TODO: Save metadata file for the model.
    # Check filename
    filename: str
    ext: str
    
    if ('pt' or 'pth') in filename:
        filename, ext = os.path.splitext(filename)
    else:
        ext: str = ".pt"
    
    # Save model (and model state)
    torch.save(model.state_dict(), f"{filename}{ext}")

    # Pickle test
    # with open(f"{filename}{ext}",'wb') as f:
    #     pickle.dump(model,f)

    # Save image of model architecture
    _save_and_viz_pl_model(model=model, filename=filename)

    return None

In [9]:
def load_pt_model(model: Union[nn.Module,pl.LightningModule], filename: str) -> Union[nn.Module,pl.LightningModule]:
    """Loads saved/trained model, in which the model class **must** be provided.

    Args:
        model: Input model class objoect.
        filename: Input filename that corresponds to trained saved/trained model.

    Returns:
        Trained model.
    """
    if ('pt' or 'pth') in filename:
        pass
    else:
        filename: str = f"{filename}.pt"

    # # Pickle test
    # with open(filename,'rb') as f:
    #     pickle.load(f)

    # Load model
    model.load_state_dict(torch.load(filename))
    model.eval() # sets dropout and batch normalization layers to evaluation mode
    return model

In [10]:
def get_gradient_norms(model: Union[nn.Module,pl.LightningModule], weight: bool = True, bias: bool = False) -> List[Tuple[str,float]]:
    model.eval()
    sample_input = torch.randn(1, 1, 28, 28)  # Replace with your own sample input
    outputs = model(sample_input)
    loss = torch.sum(outputs)  # Create a dummy loss

    # Backpropagate to compute gradients
    loss.backward()

    # Compute gradient norms
    gradient_norms: List[Tuple[str,float]] = []
    for name, param in model.named_parameters():
        if param.grad is not None:
            if weight and ('weight' in name):
                gradient_norms.append((name, param.grad.norm().item()))
            
            if bias and ('bias' in name):
                gradient_norms.append((name, param.grad.norm().item()))
                
    return gradient_norms

In [11]:
def get_layerwise_norms(model: Union[nn.Module,pl.LightningModule], weight: bool = True, bias: bool = False) -> List[Tuple[str,float]]:
    model.eval()
    sample_input = torch.randn(1, 1, 28, 28)  # Replace with your own sample input
    outputs = model(sample_input)
    loss = torch.sum(outputs)  # Create a dummy loss

    # Backpropagate to compute gradients
    loss.backward()
    
    layerwise_norms: List[Tuple[str,float]] = []
    for name, param in model.named_parameters():
        layer_name = name #.split('.')[0]  # Extract the layer name
        norm = param.norm().item()

        if weight and ('weight' in name):
            layerwise_norms.append((layer_name, norm))
        
        if bias and ('bias' in name):
            layerwise_norms.append((layer_name, norm))
            
    return layerwise_norms

In [12]:
def calculate_total_parameter_norm(model):
    total_norm = 0.0
    for param in model.parameters():
        total_norm += param.norm().item()
    return total_norm

In [13]:
def calculate_parameter_norms_per_layer(model, weight: bool = True, bias: bool = False):
    norms_per_layer = {}
    for name, param in model.named_parameters():
        layer_name = name #.split('.')[0]  # Extract the layer name
        norm = param.norm().item()

        if weight and ('weight' in name):
            if layer_name not in norms_per_layer:
                norms_per_layer[layer_name] = []
            norms_per_layer[layer_name].append(norm)
        
        if bias and ('bias' in name):
            if layer_name not in norms_per_layer:
                norms_per_layer[layer_name] = []
            norms_per_layer[layer_name].append(norm)
    return norms_per_layer

# Define Neural Network

In [14]:
# Define the LightningModule
class ConvNet(pl.LightningModule):
    def __init__(self, params: Dict[str,torch.nn]):
        super(ConvNet, self).__init__()
        self.save_hyperparameters()
        self.conv_layers = nn.Sequential(
            *params.get('conv_layers')
        )
        
        try:
            self.fc_layer = nn.Sequential(
                *params.get('fc_layer')
            )
        except TypeError:
            self.fc_layer = nn.Sequential(
                params.get('fc_layer')
            )
        
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layer(x)
        return x
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.001)
    
    # Define the training step method
    def training_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self(inputs)
        loss = nn.CrossEntropyLoss()(outputs, targets)
        self.log('train_loss', loss)
        return loss
    
    # Define the validation step method
    def validation_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self(inputs)
        loss = nn.CrossEntropyLoss()(outputs, targets)
        self.log('val_loss', loss, prog_bar=True)  # Logging the validation loss
    
    # Define the test step method
    def test_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self(inputs)
        loss = nn.CrossEntropyLoss()(outputs, targets)
        self.log('test_loss', loss)  # Logging the test loss
        preds = torch.argmax(outputs, dim=1)
        acc = (preds == targets).float().mean()
        self.log('test_acc', acc, prog_bar=True)  # Logging the test accuracy

# Train Models

In [15]:
# Create trainer object
trainer = pl.Trainer(accelerator='mps',max_epochs=10,devices=1)  # Set max_epochs and gpus according to your environment

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [None]:
# MARKDOWN: str = ""

# for param in params.keys():
#     # Print Model number to screen
#     print(f"\n{param}:\n")
    
#     # Initialize the Lightning Trainer
#     model = ConvNet(params=params.get(param))

#     # Train the model using PyTorch Lightning
#     trainer.fit(model, train_loader, val_loader)

#     # Save trained model
#     save_pt_model(model=model,filename=f"models/{param}")

#     MARKDOWN += f"### {param}: \n\n![](models/{param}.png)\n\n"

In [None]:
# # Show model architecture diagrams
# md(MARKDOWN)

# Obtain Models' Metrics

In [20]:
results_dict: Dict[str,Dict[str,Any]] = {}

In [21]:
# Norm calcuation booleans
weight: bool = True
bias: bool = False

for param in params.keys():
    # Print Model number to screen
    print(f"\n{param}:\n")

    tmp_dict = {}

    # Initialize the Lightning Trainer
    model = ConvNet(params=params.get(param))

    # Train the model using PyTorch Lightning
    trainer.fit(model, train_loader, val_loader)

    # Load model
    # model = load_pt_model(model=ConvNet(params=params.get(param)),filename=f"models/{param}")
    # trainer = pl.Trainer(accelerator='mps',max_epochs=10,devices=1)  # Set max_epochs and gpus according to your environment

    # Evaluate the model on the test data
    print(f"Training accuracy:")
    train_results = trainer.test(model, dataloaders=train_loader)

    # Evaluate the model on the test data
    print(f"Testing accuracy:")
    test_results = trainer.test(model, dataloaders=test_loader)

    # Countable parameters
    print(f"Number of trainable parameters (weights): {count_trainable_parameters(model=model):,}")
    countable_parameters: str = f"{count_trainable_parameters(model=model):,}"

    # Gradient Norms
    grad_norms = get_gradient_norms(model=model, weight=weight, bias=bias)

    # Layerwise Norms
    layer_norms = get_layerwise_norms(model=model, weight=weight, bias=bias)

    # Total parameter norm
    total_norm = calculate_total_parameter_norm(model=model)

    # Parameter norms per layer
    norms_per_layer = calculate_parameter_norms_per_layer(model=model, weight=weight, bias=bias)

    tmp_dict = {
        "train_acc": f"{train_results[0].get('test_acc'):.4f}",
        "test_acc": f"{test_results[0].get('test_acc'):.4f}",
        "parameters": countable_parameters,
        "grad_norm": grad_norms,
        "layer_norm": layer_norms,
        "total_norm": f"{total_norm:.4f}",
        "norms_per_layer": norms_per_layer,
    }

    results_dict.update({param:tmp_dict})


  | Name        | Type       | Params
-------------------------------------------
0 | conv_layers | Sequential | 387 K 
1 | fc_layer    | Sequential | 34.2 K
-------------------------------------------
422 K     Trainable params
0         Non-trainable params
422 K     Total params
1.688     Total estimated model params size (MB)



model.1:



Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.


Training accuracy:


Testing: 0it [00:00, ?it/s]

Testing accuracy:


Testing: 0it [00:00, ?it/s]

  rank_zero_warn(f"Checkpoint directory {dirpath} exists and is not empty.")

  | Name        | Type       | Params
-------------------------------------------
0 | conv_layers | Sequential | 28.1 K
1 | fc_layer    | Sequential | 1.4 K 
-------------------------------------------
29.4 K    Trainable params
0         Non-trainable params
29.4 K    Total params
0.118     Total estimated model params size (MB)


Number of trainable parameters (weights): 422,026

model.2:



Sanity Checking: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.


Training accuracy:


Testing: 0it [00:00, ?it/s]

Testing accuracy:


Testing: 0it [00:00, ?it/s]


  | Name        | Type       | Params
-------------------------------------------
0 | conv_layers | Sequential | 92.7 K
1 | fc_layer    | Sequential | 297 K 
-------------------------------------------
390 K     Trainable params
0         Non-trainable params
390 K     Total params
1.562     Total estimated model params size (MB)


Number of trainable parameters (weights): 29,450

model.3:



Sanity Checking: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.


Training accuracy:


Testing: 0it [00:00, ?it/s]

Testing accuracy:


Testing: 0it [00:00, ?it/s]


  | Name        | Type       | Params
-------------------------------------------
0 | conv_layers | Sequential | 18.8 K
1 | fc_layer    | Sequential | 402 K 
-------------------------------------------
421 K     Trainable params
0         Non-trainable params
421 K     Total params
1.687     Total estimated model params size (MB)


Number of trainable parameters (weights): 390,410

model.4:



Sanity Checking: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.


Training accuracy:


Testing: 0it [00:00, ?it/s]

Testing accuracy:


Testing: 0it [00:00, ?it/s]


  | Name        | Type       | Params
-------------------------------------------
0 | conv_layers | Sequential | 320   
1 | fc_layer    | Sequential | 804 K 
-------------------------------------------
804 K     Trainable params
0         Non-trainable params
804 K     Total params
3.218     Total estimated model params size (MB)


Number of trainable parameters (weights): 421,642

model.5:



Sanity Checking: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.


Training accuracy:


Testing: 0it [00:00, ?it/s]

Testing accuracy:


Testing: 0it [00:00, ?it/s]


  | Name        | Type       | Params
-------------------------------------------
0 | conv_layers | Sequential | 320   
1 | fc_layer    | Sequential | 62.7 K
-------------------------------------------
63.1 K    Trainable params
0         Non-trainable params
63.1 K    Total params
0.252     Total estimated model params size (MB)


Number of trainable parameters (weights): 804,554

model.6:



Sanity Checking: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.


Training accuracy:


Testing: 0it [00:00, ?it/s]

Testing accuracy:


Testing: 0it [00:00, ?it/s]

Number of trainable parameters (weights): 63,050


In [None]:
MARKDOWN = """

Table of metrics for each of the models defined above.

| model \ specifications | Train Accuracy | Test Accuracy | Number of Trainable Parameters | Gradient Norm | Layerwise Norm | Total Parameter Norm | Per Layer Parameter Norm |
|------------------------|----------------|---------------|--------------------------------|---------------|----------------|----------------------|--------------------------|
"""

In [None]:
# Fill in markdown table
for name, metric in results_dict.items():
    # print(f"{name}: {metric}")
    MARKDOWN += f"| **{name}** | {metric.get('train_acc')} | {metric.get('test_acc')} | {metric.get('parameters')} | {metric.get('grad_norm')} | {metric.get('layer_norm')} | {metric.get('total_norm')} | {metric.get('norms_per_layer')} |\n"

In [None]:
# Show table of metrics for each model
md(MARKDOWN)

In [16]:
model = load_pt_model(model=ConvNet(params=params.get('model.2')),filename='model.2.pt')

In [17]:
model

ConvNet(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc_layer): Sequential(
    (0): Linear(in_features=32, out_features=10, bias=True)
  )
)

In [18]:
# Evaluate the model on the test data
print(f"Training accuracy:")
train_results = trainer.test(model, dataloaders=train_loader)

# Evaluate the model on the test data
print(f"Testing accuracy:")
test_results = trainer.test(model, dataloaders=test_loader)

Training accuracy:


  rank_zero_warn(


Testing: 0it [00:00, ?it/s]

Testing accuracy:


Testing: 0it [00:00, ?it/s]