# Building PyTorch AlexNet model from scratch

## By Kelvin Hong, 5 March 2021

This notebook will show you how to create an AlexNet model using this [AlexNet Paper](https://papers.nips.cc/paper/2012/file/c399862d3b9d6b76c8436e924a68c45b-Paper.pdf). This model will not be perfectly same as described in the paper, as this Convolutional Neural Network (CNN) is to perform classification on a small dataset. 

The originial AlexNet with an input of size $(3, 227, 227)$ will go through 5 convolutional layers:
$$(3, 227, 227)\xrightarrow{\text{Conv}} (96, 55, 55)\xrightarrow{\text{Conv}} (256, 27, 27)\xrightarrow{\text{Conv}} (384, 13, 13)\xrightarrow{\text{Conv}} (384, 13, 13)\xrightarrow{\text{Conv}} (128, 13, 13).$$
We take the output with size $128\times 13^2$, perform max-pooling to get $128\times 6^2$ then flatten it to get $4608$ parameters. Continued with 3 fully connected layers, the dimensions become
$$4608\xrightarrow{\text{FC}} 4096\xrightarrow{\text{FC}} 4096\xrightarrow{\text{FC}} 1000\text{ classes.}$$

Our task will be using AlexNet model to classify 1125 images from the [weather](https://data.mendeley.com/datasets/4drtyfjtfy/1) datasets into 4 classes (cloudy, rain, sunrise, shine). Thus we will be using a simplified version of AlexNet, with convolutional layers: 
$$(3, 224, 224)\xrightarrow{\text{Conv}} (16, 55, 55)\xrightarrow{\text{Conv}} (32, 27, 27)\xrightarrow{\text{Conv}} (64, 13, 13)\xrightarrow{\text{Conv}} (128, 13, 13)\xrightarrow{\text{Conv}} (128, 13, 13),$$
and fully connected layers
$$4608\xrightarrow{\text{FC}} 2048\xrightarrow{\text{FC}} 1024\xrightarrow{\text{FC}} 4\text{ classes.}$$

In [1]:
# First import all necessary modules.
import torch
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
import torchsummary
import os
import numpy as np
from datetime import datetime
import time
from PIL import Image
from urllib.request import urlopen
import argparse
import re

# A handy function for convert seconds to HH-MM-SS format.
def convert_sec(t):
    h = int(t//3600)
    m = int((t%3600)//60)
    s = int(t%60)
    if h!=0:
        t_str = f"{h}h {m}m {s}s"
    elif m!=0:
        t_str = f"{m}m {s}s"
    else:
        t_str = f"{s}s"
    return t_str

In [2]:
# Check if GPU is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


## Prepare data

Before we begin, create a `Tutorial` folder in current directory then save the zip file from [weather](https://data.mendeley.com/datasets/4drtyfjtfy/1) into `Tutorial` folder. After extracting the zip file we will see a `dataset2` folder which contains everything we want. This folder only contains thousands of images related to weather. They will have image names like `cloudy1.jpg` or `rain157.jpg`. We will make use of this to create list of images and list of labels (ground truth) in Python. 

The location of an image will be like `./Tutorial/dataset2/cloudy1.jpg`. Save this notebook in the home directory, hence `./Tutorial_PyTorch_Build_AlexNet_Model.ipynb`.

A basic structure:
```
./
├─── Tutorial/
|    ├──── dataset2/
|          ├──── [images]
├─── Tutorial_PyTorch_Build_AlexNet_Model.ipynb
```

In [3]:
# Load the data
images_list = os.listdir('./Tutorial/dataset2')
# Correspond labels into indices
labels = {'cloudy': 0, 'rain': 1, 'shine': 2, 'sunrise': 3}
# Below labels_list contains labels 0, 1, 2, 3 corresponding to the images list.
labels_list = [labels[image_name[:re.search(r"\d", image_name).start()]] for image_name in images_list]
# Split datasets into train and test with ratios 0.7, 0.3.
n = len(labels_list)
train_ratio = 0.7
train_n = int(train_ratio*n)
train_images = images_list[:train_n]
test_images = images_list[train_n:]
train_labels = labels_list[:train_n]
test_labels = labels_list[train_n:]
test_n = n-train_n

## Using DataLoader from torch

We will use `torch.utils.data.DataLoader` to load the data, by inherit the DataLoader. The benefit is it can generate minibatch in one-line code, as well as provide randomness. 

Since I only use four lists (train, test images and train, test labels), my method for using DataLoader can be generalized to other datasets as well. 

### Define custom Dataset class
First, we need to define a custom `DataSet` object. From the [official documentation](https://pytorch.org/docs/stable/data.html), the dataset type is either Map-style or Iterable-style. We will be using Map-style dataset, hence we have to look into the `torch.utils.data.Dataset` class, then override the `__len__()` and `__getitem__()` protocols.


In [4]:
# Define custom Dataset for later use of DataLoader
class CustomDataset(torch.utils.data.Dataset):
    # Take in a list of image paths, and a list of numeric labels (integers). Transform will be explained below.
    def __init__(self, images_list, labels_list, transform):
        self.images = [os.path.join('./Tutorial/dataset2/',path) for path in images_list]
        self.labels = labels_list
        self.transform = transform
    
    # Define the length of dataset. Syntax len(dataset)
    def __len__(self):
        images_n = len(self.images)
        labels_n = len(self.labels)
        if images_n != labels_n:
            print(f"SizeError: number of images [{images_n}] and number of labels [{labels_n}] should be same.")
            return None
        return images_n
    # Define the get function. Syntax dataset[index]
    def __getitem__(self, index):
        # Convert to RGB to make sure input channel = 3
        img = Image.open(self.images[index]).convert('RGB')
        # First we applies transform on image, then using to(device) method, 
        # to align with the input device of model.
        sample = {'image': self.transform(img).to(device), 'label': self.labels[index]}
        return sample

### Transform, DataLoader

The main purpose of `transform` is to first preprocess and perform data augmentations to the training dataset (as well as test dataset). Base on the model architecture, we first resize image to `(240,240)`, then apply Random Crop to produce `(227,277)` image to make the training more robust. The `ToTensor()` method will transform PIL image into tensor format, then finally we `Normalize()` each number in the tensor with the given mean and standard deviation on the three color channels.

After using `transform` to prepare the dataset, we dump the variables into DataLoader, with batch size equals to 4. 

In [5]:
transform = transforms.Compose([
        transforms.Resize((240,240)),
        transforms.RandomCrop(227),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406),(0.229,0.224,0.225))
    ])

# For more customization, one can define transforms for train and test datasets separately. 
train_dataset = CustomDataset(train_images, train_labels, transform=transform)
test_dataset = CustomDataset(test_images, test_labels, transform=transform)

# Shuffle the trainloader to provide randomness
trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)
testloader = torch.utils.data.DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=0)

## Define AlexNet model

To define a whole-new model, we have to define how to forward in the network, hence we need to inherit the class `torch.nn.Module`. 

*Noticed we have used `Dropout` in the model, it is to drop some proportion of weights, and this can weaken the correlation between adjacent nodes, making them independent and robust.*

In [6]:
# Model
class Alex_Custom(torch.nn.Module):
    def __init__(self):
        super().__init__()
        """ 
        In convolution layers below, in_channels and out_channels are
        respectively: Current filter & Future filter. 
        (The first convolution layer is color channels & Future filter)
        
        We change how the filters (channels) behave between each Convolutional network
        Because the network is for training a relatively small dataset.
        The overall model architecture is still the same.
        
        Check the paper here: https://papers.nips.cc/paper/2012/file/c399862d3b9d6b76c8436e924a68c45b-Paper.pdf
        
        """ 
        self.conv1 = torch.nn.Conv2d(3, 16, 11, stride=4)
        self.relu = torch.nn.ReLU()
        self.batchnorm1 = torch.nn.BatchNorm2d(16)
        self.conv2 = torch.nn.Conv2d(16, 32, 5, padding=2)
        self.batchnorm2 = torch.nn.BatchNorm2d(32)
        self.maxp = torch.nn.MaxPool2d(2,2)
        self.conv3 = torch.nn.Conv2d(32, 64, 3, padding=1)
        self.conv4 = torch.nn.Conv2d(64, 128, 3, padding=1)
        self.conv5 = torch.nn.Conv2d(128, 128, 3, padding=1)
        self.flatten = torch.nn.Flatten()
        self.linear1 = torch.nn.Linear(6*6*128, 2048)
        self.dropout = torch.nn.Dropout(0.5)
        self.linear2 = torch.nn.Linear(2048,1024)
        self.linear3 = torch.nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.batchnorm1(self.conv1(x))
        x = self.maxp(self.batchnorm2(self.relu(self.conv2(x))))
        x = self.maxp(self.relu(self.conv3(x)))
        x = self.conv4(x)
        x = self.maxp(self.relu(self.conv5(x)))
        x = self.flatten(x)
        x = self.relu(self.linear1(x))
        x = self.dropout(x)
        x = self.relu(self.linear2(x))
        x = self.linear3(x)
        return x


### Custom weights initialization

We can control initialization of weights to speed up the training process. The `weights_unit_rule` below is an helper function, it takes each layer as input `m`. By identify layer as Convolutioal or Linear Dense layer, we could assign different initializations.

In [7]:
def weights_init_rule(m):
    classname = m.__class__.__name__
    # Initialize convolutional weights as normal distribution (0, 0.01), bias as all ones
    if classname.find('Conv') != -1:
        # The no_grad flag is to prevent model from doing gradient calculation.
        with torch.no_grad():
            m.weight.normal_(0,0.04)
            m.bias.fill_(1)
    # Initialize Dense weights as uniform distribution of radius 1/sqrt n, n is the number of nodes
    # in previous layer. The quantity 1/sqrt n is usually a good practice to speed up training. 
    if classname.find('Linear') != -1:
        with torch.no_grad():
            n = m.in_features
            y = 1.0/np.sqrt(n)
            m.weight.uniform_(-y,y)
            m.bias.fill_(1)


## Start Training

Provide parameters such as number of classes, epochs and so on. After that, we can initialize the model and apply the weight initialization rule. 

In [8]:
num_classes = 4
num_epochs = 10
train_size = len(train_images)

model = Alex_Custom().to(device)
model.apply(weights_init_rule) 

# Using Stochastic Gradient Descent optimizer.
# The Adam (Adaptive Momentum Estimator) has been tested, it is not better than SGD.
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# torchsummary is used for print a summary about the model.
torchsummary.summary(model, (3,227,227), 1, "cpu")
# Cross Entropy is a good way to compute loss of classification problems.
criterion = torch.nn.CrossEntropyLoss()

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1            [1, 16, 55, 55]           5,824
       BatchNorm2d-2            [1, 16, 55, 55]              32
            Conv2d-3            [1, 32, 55, 55]          12,832
              ReLU-4            [1, 32, 55, 55]               0
       BatchNorm2d-5            [1, 32, 55, 55]              64
         MaxPool2d-6            [1, 32, 27, 27]               0
            Conv2d-7            [1, 64, 27, 27]          18,496
              ReLU-8            [1, 64, 27, 27]               0
         MaxPool2d-9            [1, 64, 13, 13]               0
           Conv2d-10           [1, 128, 13, 13]          73,856
           Conv2d-11           [1, 128, 13, 13]         147,584
             ReLU-12           [1, 128, 13, 13]               0
        MaxPool2d-13             [1, 128, 6, 6]               0
          Flatten-14                  [

In [9]:
train_accs, test_accs = [], []
train_losses, test_losses = [], []

for epoch in range(num_epochs):
    # Training mode
    model.train()
    accu_loss = 0
    taccu_loss = 0
    epoch_start = time.time()
    steps = 20
    accuracy_count = 0
    print(f"Training loss every {steps} batchs:", end = " ")
    for i, data in enumerate(trainloader, 0):
        optimizer.zero_grad()
        batch_images = data['image'].to(device)
        batch_labels = data['label'].type(torch.LongTensor).to(device)
        outputs = model(batch_images)
        outputs_labels = [one_hot.index(max(one_hot)) for one_hot in outputs.tolist()]
        accuracy_count += [batch_labels[i] == outputs_labels[i] for i in range(len(batch_labels))].count(True)
        loss = criterion(outputs, batch_labels)
        accu_loss+=loss.item()
        loss.backward()
        optimizer.step()
        if i%steps==steps-1:
            train_loss = accu_loss/i
            print(round(train_loss,2), end=", ")
    train_losses.append(train_loss)
    print()
    epoch_end = time.time()
    epoch_duration = convert_sec(epoch_end-epoch_start)
    train_acc  = 100*accuracy_count/train_n
    train_accs.append(train_acc)
    print(f"Epoch {epoch+1} Train Accuracy: {accuracy_count} out of {train_n} = {round(train_acc, 2)}%. Time elapsed {epoch_duration}.")
    
    # Evaluation mode
    # evaluation on test set
    with torch.no_grad():
        model.eval()
        t_count = 0
        print(f"Test loss every {steps} batchs:", end = " ")
        for j, data in enumerate(testloader, 0):
            tbatch_images = data['image'].to(device)
            tbatch_labels = data['label'].type(torch.LongTensor).to(device)
            toutputs = model(tbatch_images)
            toutputs_labels = [one_hot.index(max(one_hot)) for one_hot in toutputs.tolist()]
            t_count += [tbatch_labels[k] == toutputs_labels[k] for k in range(len(tbatch_labels))].count(True)
            loss = criterion(toutputs, tbatch_labels)
            taccu_loss += loss.item()
            if j%steps==steps-1:
                test_loss = taccu_loss/j
                print(round(test_loss,2), end=", ")
        test_losses.append(test_loss)
        print()
        test_acc = 100*t_count/test_n
        test_accs.append(test_acc)
        print(f"Epoch {epoch+1} Test Accuracy: {t_count} out of {test_n} = {round(test_acc,2)}%.")
    
"""
The code below save the model.
It is enough to just save model.state_dict() for inferencing, 
but to save the model halfway and resume training in the future, 
we should also save the optimizer state_dict. 
"""
print("Saving model...")
torch.save({
        'epoch': num_epochs,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss
    }, os.path.join("./Tutorial/", f'model_Epoch{num_epochs}_{datetime.now()}.pth') )
print("Saved!")

Training loss every 20 batchs: 2.4, 2.21, 1.97, 1.74, 1.57, 1.44, 1.37, 1.33, 1.27, 
Epoch 1 Train Accuracy: 432 out of 787 = 54.89%. Time elapsed 45s.
Test loss every 20 batchs: 0.36, 0.38, 0.44, 0.49, 
Epoch 1 Test Accuracy: 275 out of 338 = 81.36%.
Training loss every 20 batchs: 0.86, 0.93, 0.99, 0.94, 0.91, 0.91, 0.88, 0.84, 0.83, 
Epoch 2 Train Accuracy: 520 out of 787 = 66.07%. Time elapsed 37s.
Test loss every 20 batchs: 0.44, 0.47, 0.53, 0.55, 
Epoch 2 Test Accuracy: 272 out of 338 = 80.47%.
Training loss every 20 batchs: 0.76, 0.8, 0.73, 0.78, 0.78, 0.78, 0.76, 0.8, 0.78, 
Epoch 3 Train Accuracy: 543 out of 787 = 69.0%. Time elapsed 38s.
Test loss every 20 batchs: 0.53, 0.57, 0.6, 0.65, 
Epoch 3 Test Accuracy: 257 out of 338 = 76.04%.
Training loss every 20 batchs: 0.86, 0.71, 0.76, 0.76, 0.73, 0.7, 0.72, 0.71, 0.7, 
Epoch 4 Train Accuracy: 555 out of 787 = 70.52%. Time elapsed 37s.
Test loss every 20 batchs: 0.33, 0.36, 0.42, 0.48, 
Epoch 4 Test Accuracy: 283 out of 338 = 83.

## Testing model

We can grab some images from the web to check the performance of our network.

In [14]:
# Evaluation mode
model.eval()

img = Image.open(urlopen("https://i.redd.it/i88wwl95in1z.jpg"))
img.show()   # Uncomment this to see the picture
t = transform(img)
# Note that the output array will not be the same, since the transform contains RandomCrop.
y = model(t.unsqueeze(0)).tolist()[0]
print(f"Model Output: {y}.")
max_index = y.index(max(y))
index_to_label = {0: 'cloudy', 1:'rain', 2:'shine', 3:q'sunrise'}
print(f"Predicted class is {index_to_label[max_index]}.")

Model Output: [0.9064050912857056, -3.110182285308838, 3.942471742630005, 1.7003302574157715].
Predicted class is shine.


## Resume Training

To learn how to resume training, we reformulize our code and pack them into a function.

In [None]:
# Configuration

num_classes = 4
num_epochs = 10
train_size = len(train_images)

# A function block for resume training (Can also train a new one)    
def train_and_save(num_classes, num_epochs, trainloader, testloader, train_n, test_n, pretrained, device):
    # Detect any pretrained model. If there is any, resume training.
    if pretrained:
        # We still need to initialize model and optimizer, then attach loaded state_dicts to them
        model = Alex_Custom().to(device)
        optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
        try:
            checkpoint = torch.load(pretrained)
        except:
            print("LoadModelError: Cannot load the model.")
            return
        else:
            print("This model is a trained model.\nContinue training...")
            model.load_state_dict(checkpoint['model_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            prev_epochs = checkpoint['epoch']
            loss = checkpoint['loss']
            print(f"Resume training after epoch {prev_epochs}...")
    else:
        # If not detected any model, start a new training
        # So this block can replace the previous one.
        print("This is a new model, start training...")
        model = Alex_Custom().to(device)
        # Initialize weights only when our model is brand new
        model.apply(weights_init_rule) 
        optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
        prev_epochs=0
        
    
    torchsummary.summary(model, (3,227,227), 1, "cuda")
    criterion = torch.nn.CrossEntropyLoss()
    
    # The training is basically the same
    # Only difference is we have to add up prev_epochs to show correct messages
    for epoch in range(num_epochs):
        # Training mode
        model.train()
        accu_loss = 0
        epoch_start = time.time()
        steps = 20
        accuracy_count = 0
        print(f"Training loss every {steps} batchs:", end = " ")
        loader_n = len(trainloader)
        for i, data in enumerate(trainloader, 0):
            optimizer.zero_grad()
            batch_images = data['image'].to(device)
            batch_labels = data['label'].type(torch.LongTensor).to(device)
            outputs = model(batch_images)
            outputs_labels = [one_hot.index(max(one_hot)) for one_hot in outputs.tolist()]
            accuracy_count += [batch_labels[i] == outputs_labels[i] for i in range(len(batch_labels))].count(True)
            loss = criterion(outputs, batch_labels)
            accu_loss+=loss.item()
            loss.backward()
            optimizer.step()
            if i%steps==steps-1:
                print(round(accu_loss/i,2), end=", ")
        print()
        epoch_end = time.time()
        epoch_duration = convert_sec(epoch_end-epoch_start)
        print(f"Epoch {epoch+1+prev_epochs} Train Accuracy: {accuracy_count} out of {train_n} = {round(100*accuracy_count/train_n, 2)}%. Time elapsed {epoch_duration}.")

        # Evaluation mode
        # evaluation on test set
        model.eval()
        t_count = 0
        for j, data in enumerate(testloader, 0):
            tbatch_images = data['image'].to(device)
            tbatch_labels = data['label'].type(torch.LongTensor).to(device)
            toutputs = model(tbatch_images)
            toutputs_labels = [one_hot.index(max(one_hot)) for one_hot in toutputs.tolist()]
            t_count += [tbatch_labels[i] == toutputs_labels[i] for i in range(len(tbatch_labels))].count(True)
        print(f"Epoch {epoch+1+prev_epochs} Test Accuracy: {t_count} out of {test_n} = {round(100*t_count/test_n,2)}%.")
        
    
    torch.save({
        'epoch': num_epochs+prev_epochs,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss
    }, os.path.join('./Tutorial/', f'model_Epoch{num_epochs+prev_epochs}_{datetime.now()}.pth') )

    

In [None]:
# Replace the path below by your model.
pretrained = './Tutorial/model_Epoch10_2021-03-24 09:49:42.409798.pth'
train_and_save(num_classes, num_epochs, trainloader, testloader, train_n, test_n, pretrained, device)