In [None]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from tqdm import tqdm

from torch.utils.tensorboard import SummaryWriter

## Pre Processing & Data loading


In [None]:
def load_data(train_batch_size, test_batch_size):
    # Data in train set and test set are [im_tensor, label]. im_tensor size - 1x32x32 (gray scale, 32x32 pixels)
    trainset = datasets.MNIST('../Dataset/', train=True, download=True,
                               transform=transforms.Compose([
                               transforms.Resize((32, 32)),
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))
                               ]))

    # Split train data to validation set and train set (20-80%)
    val_set_size = int(0.2 * len(trainset))
    trainset, valset = torch.utils.data.random_split(trainset, [len(trainset) - val_set_size, val_set_size])

    testset = datasets.MNIST('../Dataset/', train=False,
                              transform=transforms.Compose([
                              transforms.Resize((32, 32)),
                              transforms.ToTensor(),
                              transforms.Normalize((0.1307,), (0.3081,))
                              ]))

    train_loader = torch.utils.data.DataLoader(trainset, batch_size=train_batch_size, shuffle=True)
    val_loader   = torch.utils.data.DataLoader(valset,   batch_size=train_batch_size, shuffle=False)
    test_loader  = torch.utils.data.DataLoader(testset,  batch_size=test_batch_size,  shuffle=False)

    return trainset, train_loader, valset, val_loader, testset, test_loader

## LeNet5

In [None]:
class LeNet5(nn.Module):
    def __init__(self, Cin, Cout1, Cout2, Cout3, feat4, featOut):
        super(LeNet5, self).__init__()
        
        self.C1   = nn.Sequential(nn.Conv2d(in_channels=Cin, out_channels=Cout1, kernel_size=5),
                                nn.ReLU(),
                                nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.C2   = nn.Sequential(nn.Conv2d(in_channels=Cout1, out_channels=Cout2, kernel_size=5),
                                nn.ReLU(),
                                nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.C3   = nn.Sequential(nn.Conv2d(in_channels=Cout2, out_channels=Cout3, kernel_size=5),
                                nn.ReLU())
        
        self.fc1  = nn.Sequential(nn.Linear(Cout3,feat4),
                                 nn.ReLU())
        
        self.fc2  = nn.Linear(feat4, featOut)
        
        self.drop = nn.Dropout(p=0.2) 
        
        
    def forward(self, input):
        x = self.C1(input)
        x = self.drop(x)
        
        x = self.C2(x)
        x = self.drop(x)
        
        x = self.C3(x)
        x = self.drop(x)
        
        # flatten all dimensions except the batch dimension
        x = torch.flatten(x, 1) 
        
        x = self.fc1(x) 
        x = self.fc2(x)
        
        return x


## Train

In [None]:
def train(model, train_loader, optimizer, criterion, device, epoch):

    model.train()
    train_loss  = 0
    tot_correct = 0

    writer = SummaryWriter('Events/runs')

    for batch_idx, (data, target) in enumerate(train_loader):
        
        # Load batch
        data, target = data.to(device), target.to(device)
        
        # Zero gradients 
        optimizer.zero_grad()
        
        # Calculate predictions
        output = model(data)
        
        # Calculate loss
        loss   = criterion(output, target)
        
        # Calculate gradients
        loss.backward()
        
        # Update weights 
        optimizer.step()


        # Calculate prediction accuracy
        train_loss  += loss.data  # sum up batch loss
        
        preds        = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability   
        correct      = preds.eq(target.data.view_as(preds)).cpu().sum().item() # sum up batch correct  
        tot_correct += correct
        
        accuracy     = 100. * correct / len(target)    
       
        # print log
        if batch_idx % 100 == 0:         
            print(f'Train set, Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}'
                  f' ({100. * batch_idx / len(train_loader):.0f}%)]\t'
                  f'Loss: {loss.data:.4f}\t'
                  f' Accuracy: {accuracy:.3f}')

            # ...log the running loss, accuracy and bounding 
            writer.add_scalar(tag='training loss',
                              scalar_value = loss.data,
                              global_step  = batch_idx+((epoch-1)*100*4))

            writer.add_scalar(tag='Accuracy',
                              scalar_value = accuracy,
                              global_step  = batch_idx+((epoch-1)*100*4))
        

    train_loss    /= len(train_loader.dataset)
    train_accuracy = 100. * tot_correct / len(train_loader.dataset)
    
    writer.close()   
    return train_loss, train_accuracy


## Evaluate & Test

In [None]:
def eval(model, val_loader, criterion, device):

    model.eval()
    val_loss = 0
    correct  = 0

    for data, target in tqdm(val_loader):
        data, target = data.to(device), target.to(device)
        output       = model(data)
        val_loss    += criterion(output, target).data  # sum up batch loss
        
        preds    = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += preds.eq(target.data.view_as(preds)).cpu().sum().item()
        
    val_loss /= len(val_loader.dataset)
    accuracy  = 100. * correct / len(val_loader.dataset)

    print('\nValidation set: average loss: {:.6f}, accuracy: {}/{} ({:.0f}%)\n'.format(
          val_loss, correct, len(val_loader.dataset), accuracy))
    
    return val_loss, accuracy


## Run Model

In [None]:
# load train data
train_batch_size = 64
test_batch_size  = 1000

train_set, train_loader, val_set, val_loader, test_set, test_loader = load_data(train_batch_size, test_batch_size)
print(f'data shape: train {len(train_set)}, val {len(val_set)}, test {len(test_set)}\n\n')

# Set training parameters
device      = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Running on: {device}\n\n')
num_epoches = 10

# Model parameters
Cin     = 1     # Gray scale - 1 channel
Cout1   = 6     # Output channels for 1st Convolution layer
Cout2   = 16    # Output channels for 2nd Convolution layer
Cout3   = 120   # Output channels for 3rd Convolution layer
feat4   = 84    # Output features for 1st Fully Connected layer
featOut = 10    # Output features for 2nd Fully Connected layer

model = LeNet5(Cin, Cout1, Cout2, Cout3, feat4, featOut).to(device)

# Set optimizer & criterion
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
criterion = nn.CrossEntropyLoss()

# Train and evaluate model
for epoch in range(num_epoches):
    train(model, train_loader, optimizer, criterion, device, epoch)
    eval(model, val_loader, criterion, device)


## Inference

In [None]:
print('Model accuracy based on test data set:\n\n')
loss, accuracy = eval(model, test_loader, criterion, device)