In [2]:
import os
import random
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets

import safetensors
# from torchsummary import summary

import matplotlib.pyplot as plt
from PIL import Image

ROOT = './data'
train_data = datasets.MNIST(
    root=ROOT,
    train=True,
    download=True
)
test_data = datasets.MNIST(
    root=ROOT,
    train=False,
    download=True
)

### Configuration

In [3]:
# Set the ratio for splitting training data into train and validation sets
valid_ratio = 0.9

# Calculate number of training examples based on ratio
n_train_examples = int(len(train_data) * valid_ratio)
# Calculate remaining examples for validation
n_valid_examples = len(train_data) - n_train_examples

# Split the training data into training and validation sets
train_data, valid_data = data.random_split(
    train_data,
    [n_train_examples, n_valid_examples]
)

# Calculate mean of training data and normalize to 0-1 range
mean = train_data.dataset.data.float().mean() / 255
# Calculate standard deviation of training data and normalize to 0-1 range
std = train_data.dataset.data.float().std() / 255

# Define transformations for training data - convert to tensor and normalize
train_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[mean], std=[std])
])

# Define transformations for test data - same as training transforms
test_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[mean], std=[std])
])

# Apply training transformations to training dataset
train_data.dataset.transform = train_transforms
# Apply test transformations to validation dataset
valid_data.dataset.transform = test_transforms

# Set batch size for training
batch_size = 256

# Create training data loader with shuffling
train_dataloader = data.DataLoader(
    train_data,
    shuffle=True,
    batch_size=batch_size
)

# Create validation data loader without shuffling
val_dataloader = data.DataLoader(
    train_data,
    shuffle=False,
    batch_size=batch_size
)

In [4]:
for inputs, labels in train_dataloader:
    print(inputs.shape)
    break

torch.Size([256, 1, 28, 28])


In [5]:
# LeNet CNN architecture for classification
class LeNetClassifier(nn.Module):
    def __init__(self, num_classes):
        # Initialize parent class
        super().__init__()
        # First conv layer: 1 input channel (grayscale), 6 output channels, 5x5 kernel with same padding
        self.conv1 = nn.Conv2d(
            in_channels=1, out_channels=6, kernel_size=5, padding='same'
        )
        # First pooling layer: 2x2 average pooling
        self.avgpool1 = nn.AvgPool2d(kernel_size=2)
        # Second conv layer: 6 input channels, 16 output channels, 5x5 kernel
        self.conv2 = nn.Conv2d(
            in_channels=6, out_channels=16, kernel_size=5
        )
        # Second pooling layer: 2x2 average pooling
        self.avgpool2 = nn.AvgPool2d(kernel_size=2)
        # Flatten layer to convert 2D feature maps to 1D vector
        self.flatten = nn.Flatten()
        
        # First FC layer: 16*5*5=400 inputs (16 channels, 5x5 feature map), 120 outputs
        self.fc_1 = nn.Linear(16*5*5, 120)
        # Second FC layer: 120 inputs, 84 outputs
        self.fc_2 = nn.Linear(120, 84)
        # Output FC layer: 84 inputs, num_classes outputs
        self.fc_3 = nn.Linear(84, num_classes)

    def forward(self, inputs):
        # inputs shape: (batch_size, 1, 28, 28)
        
        # Pass through first conv layer
        # outputs shape: (batch_size, 6, 28, 28) - same padding preserves dimensions
        outputs = self.conv1(inputs)
        
        # Apply first average pooling
        # outputs shape: (batch_size, 6, 14, 14) - halved spatial dimensions
        outputs = self.avgpool1(outputs)
        
        # Apply ReLU activation - shape remains (batch_size, 6, 14, 14)
        outputs = F.relu(outputs)
        
        # Pass through second conv layer
        # outputs shape: (batch_size, 16, 10, 10) - no padding reduces spatial dims by 4
        outputs = self.conv2(outputs)
        
        # Apply second average pooling
        # outputs shape: (batch_size, 16, 5, 5) - halved spatial dimensions
        outputs = self.avgpool2(outputs)
        
        # Apply ReLU activation - shape remains (batch_size, 16, 5, 5)
        outputs = F.relu(outputs)
        
        # Flatten 2D feature maps to 1D
        # outputs shape: (batch_size, 16*5*5) = (batch_size, 400)
        outputs = self.flatten(outputs)
        
        # Pass through first FC layer
        # outputs shape: (batch_size, 120)
        outputs = self.fc_1(outputs)
        
        # Pass through second FC layer
        # outputs shape: (batch_size, 84)
        outputs = self.fc_2(outputs)
        
        # Pass through output FC layer
        # outputs shape: (batch_size, num_classes)
        outputs = self.fc_3(outputs)
        return outputs

In [11]:
import time

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def train(model, optimizer, criterion, train_dataloader, device, epoch=0, log_interval=50):
    model.train()
    total_acc, total_count = 0, 0
    losses = []
    start_time = time.time()

    for idx, (inputs, labels) in enumerate(train_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        predictions = model(inputs)

        loss = criterion(predictions, labels)
        losses.append(loss.item())

        loss.backward()
        torch.nn.utils.clip_grad_norm(model.parameters(), 0.1)
        optimizer.step()

        total_acc += (predictions.argmax(1) == labels).sum().item()
        total_count += labels.size(0)
        if idx % log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print(
                "| epoch {:3d} | {:5d}/{:5d} batches "
                "| accuracy {:8.3f}".format(
                    epoch, idx, len(train_dataloader), total_acc / total_count
                )
            )
            total_acc, total_count = 0, 0
            start_time = time.time()

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_acc, epoch_loss


def evaluate(model, criterion, valid_dataloader):
    model.eval()
    total_acc, total_count = 0, 0
    losses = []

    with torch.no_grad():
        for idx, (inputs, labels) in enumerate(val_dataloader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            predictions = model(inputs)

            loss = criterion(predictions, labels)
            losses.append(loss.item())

            total_acc += (predictions.argmax(1) == labels).sum().item()
            total_count += labels.size(0)

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_acc, epoch_loss

In [13]:
num_classes = len(train_data.dataset.classes)

lenet_model = LeNetClassifier(num_classes=num_classes)
lenet_model.to(device=device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(lenet_model.parameters())

num_epochs = 10
save_model = './model'

train_accs, train_losses = [], []
eval_accs, eval_losses = [], []
best_loss_eval = 100

for epoch in range(1, num_epochs+1):
    epoch_start_time = time.time()

    train_acc, train_loss = train(
        model=lenet_model,
        optimizer=optimizer,
        criterion=criterion,
        train_dataloader=train_dataloader,
        device=device,
        epoch=epoch
    )

    eval_acc, eval_loss = evaluate(
        model=lenet_model,
        criterion=criterion,
        valid_dataloader=val_dataloader
    )
    eval_losses.append(eval_loss)

    if eval_loss < best_loss_eval:
        torch.save(lenet_model.state_dict(), save_model + '/lenet_model.pt')

    print("-" * 59)
    print(
        "| End of epoch {:3d} | Time: {:5.2f}s | Train Accuracy {:8.3f} | Train Loss {:8.3f} "
        "| Valid Accuracy {:8.3f} | Valid Loss {:8.3f} ".format(
            epoch, time.time() - epoch_start_time, train_acc, train_loss, eval_acc, eval_loss
        )
    )
    print("-" * 59)

    lenet_model.load_state_dict(safetensors.torch.load_model(save_model + "/lenet_model.pt"))
    lenet_model.eval()

  torch.nn.utils.clip_grad_norm(model.parameters(), 0.1)


| epoch   1 |    50/  211 batches | accuracy    0.613
| epoch   1 |   100/  211 batches | accuracy    0.877
| epoch   1 |   150/  211 batches | accuracy    0.911
| epoch   1 |   200/  211 batches | accuracy    0.934
-----------------------------------------------------------
| End of epoch   1 | Time: 76.80s | Train Accuracy    0.942 | Train Loss    0.535 | Valid Accuracy    0.949 | Valid Loss    0.170 
-----------------------------------------------------------
| epoch   2 |    50/  211 batches | accuracy    0.953
| epoch   2 |   100/  211 batches | accuracy    0.961
| epoch   2 |   150/  211 batches | accuracy    0.966
| epoch   2 |   200/  211 batches | accuracy    0.969
-----------------------------------------------------------
| End of epoch   2 | Time: 73.00s | Train Accuracy    0.972 | Train Loss    0.124 | Valid Accuracy    0.970 | Valid Loss    0.098 
-----------------------------------------------------------
| epoch   3 |    50/  211 batches | accuracy    0.974
| epoch   3 

In [14]:
test_data.transform = test_transforms
test_dataloader = data.DataLoader(
    test_data,
    batch_size=batch_size
)
test_acc, test_loss = evaluate(lenet_model, criterion, test_dataloader)
test_acc, test_loss

(0.9896851851851852, 0.03167389450435884)