# 1. Importing Libraries

In [17]:
# Importing Libraries
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets
from torchvision.transforms import transforms
from torchmetrics import Accuracy
from torchinfo import summary
import numpy as np
import os
import datetime

# 2. Setting Device

In [18]:
# Setting Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


# 3. Preparing Input Data

In [19]:
# Preparing Input Data
# prepare the dataset MNIST(1x28x28) for LeNet
param_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.1307], std=[0.3081])  # Normalize to [-1, 1] range
])

In [20]:
# Download the dataset
train_val_dataset = datasets.MNIST(root='./dataset', train=True, transform=param_transform, download=True)
test_dataset = datasets.MNIST(root='./dataset', train=False, transform=param_transform, download=True)

# Dataset summary
print('train_val_dataset length:', len(train_val_dataset))
print('test_dataset length:', len(test_dataset))
print('train_val_dataset shape:', train_val_dataset[0][0].shape)
print('test_dataset shape:', test_dataset[0][0].shape)

train_val_dataset length: 60000
test_dataset length: 10000
train_val_dataset shape: torch.Size([1, 28, 28])
test_dataset shape: torch.Size([1, 28, 28])


In [21]:
# Split the dataset into train and validation
train_size = int(0.8 * len(train_val_dataset))
val_size = len(train_val_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(train_val_dataset, [train_size, val_size])

# Dataset summary
print('train_dataset length:', len(train_dataset))
print('val_dataset length:', len(val_dataset))
print('test_dataset length:', len(test_dataset))

train_dataset length: 48000
val_dataset length: 12000
test_dataset length: 10000


In [22]:
# Create dataloaders
BATCH_SIZE = 128 if torch.cuda.is_available() else 64
train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# dataloaders summary
print('train_loader length:', len(train_loader))
print('val_loader length:', len(val_loader))
print('test_loader length:', len(test_loader))

train_loader length: 375
val_loader length: 94
test_loader length: 79


# 4. Defining Model

In [23]:
# Defining Model
# LeNet-5 architecture implementation
class LeNet5(nn.Module):
    def __init__(self):
        super().__init__()
        # Feature extractor
        self.feature = nn.Sequential(
            # Convolutional layers
            
            # ============================================================================== #
            # First conv layer
            # input: 1 x 28 x 28 --> padding = 2 --> 1 x 32 x 32 --> 6 x 28 x 28
            nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=2),
            # activation function
            nn.Sigmoid(),
            # pooling layer 14 x 14
            nn.AvgPool2d(kernel_size=2, stride=2),
            # ============================================================================== #
            
            # ============================================================================== #
            # Second conv layer
            # input: 6 x 14 x 14 --> 16 x 10 x 10
            nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1),
            # activation function
            nn.Sigmoid(),
            # pooling layer 5 x 5
            nn.AvgPool2d(kernel_size=2, stride=2),
            # ============================================================================== #
        )

        # Classifier
        self.classifier = nn.Sequential(
            # Fully connected layers
            
            # ============================================================================== #
            # First fc layer
            # input: 16 x 5 x 5 = 400 --> 120
            # flatten
            nn.Flatten(),
            # fc layer
            nn.Linear(in_features=16 * 5 * 5, out_features=120),
            # activation function
            nn.Sigmoid(), # sigmoid
            # ============================================================================== #

            # ============================================================================== #
            # Second fc layer
            nn.Linear(in_features=120, out_features=84),

            # activation function
            nn.Sigmoid(), # sigmoid
            # ============================================================================== #
            
            # ============================================================================== #
            # Third fc layer
            nn.Linear(in_features=84, out_features=10)
            # ============================================================================== #
        )

    def forward(self, x):
        return self.classifier(self.feature(x))

In [24]:
# Create model
model = LeNet5().to(device)
print(model)

LeNet5(
  (feature): Sequential(
    (0): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): Sigmoid()
    (2): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (3): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
    (4): Sigmoid()
    (5): AvgPool2d(kernel_size=2, stride=2, padding=0)
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=400, out_features=120, bias=True)
    (2): Sigmoid()
    (3): Linear(in_features=120, out_features=84, bias=True)
    (4): Sigmoid()
    (5): Linear(in_features=84, out_features=10, bias=True)
  )
)


In [25]:
# Model summary
# Detailed layer-wise summary
summary(model, input_size=(1, 1, 28, 28), verbose=2, device=device)

Layer (type:depth-idx)                   Output Shape              Param #
LeNet5                                   [1, 10]                   --
├─Sequential: 1-1                        [1, 16, 5, 5]             --
│    └─0.weight                                                    ├─150
│    └─0.bias                                                      ├─6
│    └─3.weight                                                    ├─2,400
│    └─3.bias                                                      └─16
│    └─Conv2d: 2-1                       [1, 6, 28, 28]            156
│    │    └─weight                                                 ├─150
│    │    └─bias                                                   └─6
│    └─Sigmoid: 2-2                      [1, 6, 28, 28]            --
│    └─AvgPool2d: 2-3                    [1, 6, 14, 14]            --
│    └─Conv2d: 2-4                       [1, 16, 10, 10]           2,416
│    │    └─weight                                                

Layer (type:depth-idx)                   Output Shape              Param #
LeNet5                                   [1, 10]                   --
├─Sequential: 1-1                        [1, 16, 5, 5]             --
│    └─0.weight                                                    ├─150
│    └─0.bias                                                      ├─6
│    └─3.weight                                                    ├─2,400
│    └─3.bias                                                      └─16
│    └─Conv2d: 2-1                       [1, 6, 28, 28]            156
│    │    └─weight                                                 ├─150
│    │    └─bias                                                   └─6
│    └─Sigmoid: 2-2                      [1, 6, 28, 28]            --
│    └─AvgPool2d: 2-3                    [1, 6, 14, 14]            --
│    └─Conv2d: 2-4                       [1, 16, 10, 10]           2,416
│    │    └─weight                                                

In [26]:
# Optimizer and loss function
LEARNING_RATE = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss_function = nn.CrossEntropyLoss()
accuracy = Accuracy(task='multiclass', num_classes=10).to(device)

# 5. Training

In [27]:
# Training
# Log training process to TensorBoard
date_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
log_dir = os.path.join('train_logs', date_time)
os.makedirs(log_dir, exist_ok=True)
writer = SummaryWriter(log_dir=log_dir)

In [28]:
# Training Parameters
NUM_EPOCHS = 12
BATCH_SIZE = 128 if torch.cuda.is_available() else 64
NUM_BATCHES = len(train_loader)
NUM_BATCHES_VAL = len(val_loader)
NUM_BATCHES_TEST = len(test_loader)

In [29]:
# Training loop
for epoch in range(NUM_EPOCHS):
    # Training
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # Move data to device
        data = data.to(device)
        target = target.to(device)

        # Forward
        output = model(data)
        loss = loss_function(output, target)
        acc = accuracy(output, target)

        # Backward
        optimizer.zero_grad()
        loss.backward()

        # Update weights
        optimizer.step()

        # log training
        if batch_idx % 10 == 0: # every 100 mini-batches
            print(f'Train Epoch: {epoch} [{batch_idx}/{NUM_BATCHES} ({100. * batch_idx / NUM_BATCHES:.0f}%)]\tLoss: {loss.item():.6f}')
            writer.add_scalar('Train Loss (Every 10 batch)', loss.item(), epoch * NUM_BATCHES + batch_idx)
            writer.add_scalar('Train Accuracy (Every 10 batch)', acc.item(), epoch * NUM_BATCHES + batch_idx)
            print(f'Train Epoch: {epoch} [{batch_idx}/{NUM_BATCHES} ({100. * batch_idx / NUM_BATCHES:.0f}%)]\tAccuracy: {acc.item():.6f}')

    # Validation
    model.eval()
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            # Move data to device
            data = data.to(device)
            target = target.to(device)

            # Forward
            output = model(data)
            loss = loss_function(output, target)
            acc = accuracy(output, target)

            # log validation
            if batch_idx % 10 == 0: # every 100 mini-batches
                print(f'Validation Epoch: {epoch} [{batch_idx}/{NUM_BATCHES_VAL} ({100. * batch_idx / NUM_BATCHES_VAL:.0f}%)]\tLoss: {loss.item():.6f}')
                writer.add_scalar('Validation Loss (Every 10 batch)', loss.item(), epoch * NUM_BATCHES_VAL + batch_idx)
                writer.add_scalar('Validation Accuracy (Every 10 batch)', acc.item(), epoch * NUM_BATCHES_VAL + batch_idx)
                print(f'Validation Epoch: {epoch} [{batch_idx}/{NUM_BATCHES_VAL} ({100. * batch_idx / NUM_BATCHES_VAL:.0f}%)]\tAccuracy: {acc.item():.6f}')

    # Test
    # model.eval()
    # with torch.no_grad():
    #     for batch_idx, (data, target) in enumerate(test_loader):
    #         # Move data to device
    #         data = data.to(device)
    #         target = target.to(device)
    # 
    #         # Forward
    #         output = model(data)
    #         loss = loss_function(output, target)
    #         acc = accuracy(output, target)
    # 
    #         # Log loss
    #         writer.add_scalar('Loss/test', loss.item(), epoch * NUM_BATCHES_TEST + batch_idx)
    #         writer.add_scalar('Accuracy/test', acc.item(), epoch * NUM_BATCHES_TEST + batch_idx)
    #         print(f'Test Epoch: {epoch} [{batch_idx}/{NUM_BATCHES_TEST} ({100. * batch_idx / NUM_BATCHES_TEST:.0f}%)]\tLoss: {loss.item():.6f}\tAccuracy: {acc.item():.6f}')
    
# clear cache
torch.cuda.empty_cache()
features = None
targets = None



In [30]:
# Save the model checkpoint
if not os.path.exists('models'):
    os.mkdir('models')
VERSION = 1
MODEL_NAME = f'LeNet5_v{VERSION}_{date_time}.pth'
torch.save(model.state_dict(), os.path.join('models', MODEL_NAME))
print(f'Saved PyTorch Model State to {MODEL_NAME}')

Saved PyTorch Model State to LeNet5_v1_2024_01_01-00_30_40.pth


In [31]:
# Test the model load the model checkpoint
model_loaded = LeNet5().to(device)

# Load the model checkpoint
model_loaded.load_state_dict(torch.load(os.path.join('models', MODEL_NAME)))
# Set to eval mode
model_loaded.eval()

# Test the model
with torch.no_grad():
    correct = 0
    total = 0
    for features, targets in test_loader:
        features = features.to(device)
        targets = targets.to(device)
        logits = model_loaded(features)
        _, predicted = torch.max(logits, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
        writer.add_scalar('test accuracy', 100 * correct / total, 0)

    print(f'Accuracy of the model on the test images: {100 * correct / total}%')
    
features = None
targets = None


# Close the writer
writer.flush()
writer.close()

Accuracy of the model on the test images: 98.05%


In [32]:
model = None
model_loaded = None

# release all loaders
train_loader = None
val_loader = None
test_loader = None

# release all variables
optimizer = None
loss_fn = None
accuracy = None

# Clear cache
torch.cuda.empty_cache()

print('Released all variables')

Released all variables
