In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, TensorDataset
import torch.jit
from torch.optim.lr_scheduler import ExponentialLR
import time
import os

## Use GPU if it is available

In [None]:
def choose_device() -> torch.device:
    """ Move the device to GPU if it is supported by the OS """
    if torch.backends.mps.is_available():
        return torch.device("mps")
    elif torch.cuda.is_available():
        return torch.device("cuda")
    else:
        return torch.device("cpu")

### Check if GPU is available

For MacOS GPU is available through Pytorch MPS and for Windows and linux it is available through Pytorch Cuda

In [None]:
device = choose_device()
print(device)
x = torch.ones(1, device = device)
print(x)

# Download and transform Train data

In [None]:

def clip_to_01(img):
    # Clip image pixel values to the range [0, 1]
    return torch.clamp(img, 0, 1)
    
transform_primary = transforms.Compose([
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])
original_train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform = transforms.ToTensor())

In [None]:
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

## Data augmentation

To increase the available images artificially we perform data augmentation and append the augmented images to the train dataset.

In [None]:
transform_augmentation = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomResizedCrop(32, scale=(0.8, 1.0), ratio=(0.9, 1.1), antialias=True),
    transforms.GaussianBlur(kernel_size=3)
])

augmented_data = []

for original_image, original_label in original_train_dataset:
    augmented_data.append((transform_primary(original_image), original_label))
    augmented_image = transform_augmentation(original_image)
    augmented_data.append((transform_primary(augmented_image), original_label))

    
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image, label = self.data[idx]
        if self.transform:
            image = self.transform(image)
        return image, label
    

train_dataset = CustomDataset(augmented_data)

In [None]:
print(f" Augmented dataset size: {len(list(train_dataset))}")

### Inspect the training data

show_image() method gets an image and its label to let you inspect the training data.

You can comment transforms.Normalize and clip_to_01 in the Download and transform Train data cell to inspect the real images.

In [None]:
def show_image(image : torch.tensor, label : int) -> None:
    assert image.size(0) == 3, "First dimension should present the three color channels"
    assert image.size(1) == 32, "Expected a 32 * 32 image"
    assert image.size(2) == 32, "Expected a 32 * 32 image"

    red_channel = image[0]
    green_channel = image[1]
    blue_channel = image[2]
    print(f"label: {label}")
    plt.figure(figsize=(0.75,0.5))
    rgb_image = np.stack([red_channel, green_channel, blue_channel], axis=2)
    plt.imshow(rgb_image)
    plt.axis('off')  # Turn off axis labels
    plt.show()

In [None]:
original_train_dataset_list = list(original_train_dataset)
print(f"original_dataset size: {len(original_train_dataset_list)}")
# Change this index to inspect diffrent images
index = 0
image, label = original_train_dataset_list[index]
show_image(image, label)

In [None]:
augmented_train_data_set_list = list(train_dataset)
index = 9
image, label = augmented_train_data_set_list[index]
show_image(image, label)

In [None]:
test_dataset_list = list(test_dataset)

index = 3
image, label = test_dataset_list[index]
show_image(image, label)

## Define the model

The Model that I used in this notebook is a three block VGG feel free to play with it, by adding or removing blocks, changing the dropout and what ever that can help you to explore more!

In [None]:
class VGGThreeBlocks(nn.Module):
    def __init__(self, num_classes=10):
        super(VGGThreeBlocks, self).__init__()
        
        
        # Convolutional layers
        # VGG 1
        self.conv0 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.bn0 = nn.BatchNorm2d(32)
        self.relu0 = nn.ReLU()
        
        self.conv1 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # VGG 2
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.relu2 = nn.ReLU()
        
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # VGG 3
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.relu4 = nn.ReLU()
        
        self.conv5 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.relu5 = nn.ReLU()
        self.pool5 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Dropout layers
        self.dropout1 = nn.Dropout(0.8)
        self.dropout2 = nn.Dropout(0.8)
        
        # Batch normalization
        
        
        # Fully connected layers
        self.fc1 = nn.Linear(128 * 4 * 4, 1024)
        self.relu4 = nn.ReLU()
        self.fc2 = nn.Linear(1024, 512)
        self.relu5 = nn.ReLU()  
        self.fc3 = nn.Linear(512, 256)
        self.relu6 = nn.ReLU()
        self.predicator = nn.Linear(256, num_classes)
        
    def forward(self, x):
        # VGG 1
        x = self.relu0(self.bn0(self.conv0(x)))
        x = self.pool1(self.relu1(self.bn1(self.conv1(x))))
        # VGG 2
        x = self.relu2(self.bn2(self.conv2(x)))
        x = self.pool3(self.relu3(self.bn3(self.conv3(x))))
        # VGG 3
        x = self.relu4(self.bn4(self.conv4(x)))
        x = self.pool5(self.relu5(self.bn5(self.conv5(x))))
        
        # Flatten the tensor for fully connected layers
        x = x.view(x.size(0), -1)
        
        # Fully connected layers
        x = self.dropout1(self.relu4(self.fc1(x)))
        x = self.dropout2(self.relu5(self.fc2(x)))
        x = self.relu6(self.fc3(x))
        x = self.predicator(x)
        return x



def custom_weight_init(module):
    """ You can initialize your model weights by a custom method """
    if isinstance(module, nn.Linear) or isinstance(module, nn.Conv2d):
        # Initialize weights using your custom logic
        nn.init.xavier_normal_(module.weight)

### A Method for drawing the loss function

Drawing the loss function against the epochs can help you to get a feeling on how you are training your model and if you need to change some hyper parameter to train the model more effective.

For example, if your loss function is not reducing "almost" monotonically, probably your learning rate is too high.

In [None]:
def plot_loss(loss : list, accuracy : list ,epochs : list):
    plt.figure()
    plt.title('Cross Entropy Loss')
    plt.plot(epochs, loss, color='blue', label='loss')
    plt.plot(epochs, accuracy, color='green', label='accuracy')
    plt.legend(loc="upper left")
    plt.show()

## Create the model

Here we are creating the model and applying the custom weight initialization. Feel free to use a different initialization technique or just comment the model.apply() method to go on with the torch default weight initialization.

**state_dict_epoch** is used to save the sate of the model when the loss function is in its lowest value. This technique is called early drop.

**number of epochs** is also defined in this cell. Change it based on your need, for example run the model for a single epoch to check if everything is working well.

In [None]:
#create or reset the model
model = VGGThreeBlocks(num_classes=10)
model.apply(custom_weight_init)

state_dict_epoch = {
    "epoch": 0,
    "loss": 0,
    "state_dict":{}
}

# This is used for drawing the loss against the epochs
loss_list = []

# This is used for calculating the acuracy throw training
correct = 0
total = 0
accuracy_list = []

# Use a single or low number of epochs for debuging purposes
num_epochs = 10
epochs = range(num_epochs)


# Train the model

It is worth to mention that I use ExponentialLR scheduler to reduce the learning rate after each epoch to reduce the learning rate and avoid an unstable loss function behavior


In [None]:
def update_state_epoch(epoch, loss_value, model_state, state_dict_epoch):
    """This method is used to capture the state when the loss has its minimum value"""
    if(epoch == 0):   
        state_dict_epoch["epoch"] = epoch
        state_dict_epoch["loss"] = loss
        state_dict_epoch["state_dict"] = model_state
    if(epoch > 1 and loss < state_dict_epoch["loss"]):
        state_dict_epoch["epoch"] = epoch
        state_dict_epoch["loss"] = loss
        state_dict_epoch["state_dict"] = model_state

In [None]:
model.to(device) #Use GPU if it is available, check the first cell for more info

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay = 0.01)
scheduler = ExponentialLR(optimizer, gamma=0.90)

batch_size = 125
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


start_time = time.time()

# Define the beta value for the EMA (usually a value close to 1, e.g., 0.9)
beta = 0.9

# Initialize the EMA loss
ema_loss = None

for epoch in epochs:
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
    accuracy = correct / total
    accuracy_list.append(accuracy)
    # Calculate EMA loss
    if ema_loss is None:
        ema_loss = loss.item()
    else:
        ema_loss = beta * ema_loss + (1 - beta) * loss.item()
    loss_list.append(ema_loss)
    
    update_state_epoch(epoch, loss.item(), model.state_dict(), state_dict_epoch)
    
    scheduler.step()
    if((epoch +1)%(num_epochs / 5) == 0 or epoch == 0):    
        print(f"Epoch [{epoch+1}/{num_epochs}] - Loss: {loss.item():.4f} - Accuracy: {accuracy * 100:.2f}%")
        
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.2f} seconds")

print(f"""Minimum loss has happened at epoch number {state_dict_epoch["epoch"]}""")
    
plot_loss(loss_list, accuracy_list, epochs)

## Save the trained model state

Here I am dumping the model state in its lowest loss.

feel free to change the first parameter with model.state_dict() to drop the model state at its latest state.

In [None]:
if not os.path.exists("./model_state"):
    os.mkdir("./model_state")
    
torch.save(state_dict_epoch["state_dict"], "./model_state/model.pt")

In [None]:
model.load_state_dict(torch.load("./model_state/model.pt"))
model.eval() # To avoid dropout

# Evaluate the trained model

Here we load the downloaded test data and evaluate the model performance

In [None]:
batch_size = 1
validation_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
model.to(torch.device("cpu"))
model.eval() # To avoid dropout
correct = 0
total = 0
accuracy = 0

In [None]:
with torch.no_grad():
    for images, labels in validation_loader:
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy:.2f}%")

## Check the test and train data distribution

Here we check the distribution of different labels in the test and train dataset.

In [None]:
# Function to get class distribution from a data loader
def get_class_distribution(data_loader):
    class_count = np.zeros(10)
    for images, labels in data_loader:
        class_count += np.bincount(labels, minlength=10)
    return class_count

# Get class distribution for train and test datasets
train_class_distribution = get_class_distribution(train_loader)
test_class_distribution = get_class_distribution(validation_loader)

# Plot class distribution
plt.figure(figsize=(10, 5))
plt.bar(range(10), train_class_distribution, label='Train', alpha=0.7)
plt.bar(range(10), test_class_distribution, label='Test', alpha=0.7)
plt.xlabel('Class Label')
plt.ylabel('Number of Samples')
plt.xticks(range(10), ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck'], rotation=45)
plt.legend()
plt.show()