
### CIFAR-10 DATASET
60000 images of 32x32 color images in 10 classes.
There are 50000 training images and 10000 test images.

The classes are airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck.

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
from torchvision import datasets, transforms
import numpy as np
import random
import matplotlib.pyplot as plt

In [2]:
torch.__version__

'2.7.1'

In [3]:
torchvision.__version__

'0.22.1'

In [4]:
#!nvidia-smi

In [5]:
# device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

## Set the seed

In [6]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)
np.random.seed(42)
random.seed(42)

## Setting the hyperparameters

In [7]:
BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 3e-4
PATCH_SIZE = 4
NUM_CLASSES = 10
IMAGE_SIZE = 32
CHANNELS = 3
EMBED_DIM = 256
NUM_HEADS = 8 # number of attention heads
DEPTH = 6 # number of transformer blocks
MLP_DIM = 512 # dimension of the feedforward network
DROP_RATE = 0.1 # dropout rate

## Define Image Transformations

In [8]:
transform = transforms.Compose([
    transforms.ToTensor(), # Convert image to tensor
    transforms.Normalize((0.5), (0.5)), # Normalize tensor: 1. Helps the model converge faster; 2. Helps to make numerical computation stable
])


In [9]:
train_dataset = datasets.CIFAR10(root='./data',
                                 train=True,
                                 download=True,
                                 transform=transform)

In [10]:
test_dataset = datasets.CIFAR10(root='./data',
                                train=False,
                                download=True,
                                transform=transform)

In [11]:
train_dataset, len(train_dataset)

(Dataset CIFAR10
     Number of datapoints: 50000
     Root location: ./data
     Split: Train
     StandardTransform
 Transform: Compose(
                ToTensor()
                Normalize(mean=0.5, std=0.5)
            ),
 50000)

In [12]:
test_dataset, len(test_dataset)

(Dataset CIFAR10
     Number of datapoints: 10000
     Root location: ./data
     Split: Test
     StandardTransform
 Transform: Compose(
                ToTensor()
                Normalize(mean=0.5, std=0.5)
            ),
 10000)

## Converting datasets to DataLoader
Right now our data is in the form of PyTorch datasets. To feed this data into our model, we need to convert it into a DataLoader, which will allow us to easily iterate over the data in batches (DataLoader turns data into batches).

We need batches because our model processes data in parallel, and having a batch of samples allows for more efficient computation and better utilization of hardware resources, which otherwise could not fit an entire dataset into memory.

The neural network will update its weights based on the average loss over the entire batch, rather than on individual samples. This leads to more stable and efficient training.

Let's create DataLoaders for our training and test datasets.

In [13]:
train_loader = DataLoader(dataset=train_dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True)

test_loader = DataLoader(dataset=test_dataset,
                         batch_size=BATCH_SIZE,
                         shuffle=False)

In [14]:
print(f"DataLoader: {train_loader}, {test_loader}")
print(f"Length: {len(train_loader)} and {len(test_loader)} batches of size {BATCH_SIZE}")

DataLoader: <torch.utils.data.dataloader.DataLoader object at 0x15779cd60>, <torch.utils.data.dataloader.DataLoader object at 0x111761220>
Length: 391 and 79 batches of size 128


Note that we now have 391*128 images in total, which is 50048, and bigger than our original dataset size of 50000 images. This is because DataLoader fills the last batch with duplicate samples to reach the desired batch size.

## Building Vision Transformer Model From Scratch

In [15]:
# split images into patches
class PatchEmbedding(nn.Module):
    def __init__(self,
                img_size,
                patch_size,
                in_channels,
                embed_dim):
        super().__init__()
        self.patch_size = patch_size
        # define the projection layer
        self.proj = nn.Conv2d(in_channels=in_channels,
                              out_channels=embed_dim,
                              kernel_size=patch_size,
                              stride=patch_size)
        num_patches = (img_size // patch_size) ** 2 # ** 2 because we have a square image
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim)) # class token
        self.pos_embed = nn.Parameter(torch.randn(1, 1 + num_patches, embed_dim)) # position embedding

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, C, H, W = x.shape
        # convert image to patches
        x = self.proj(x) # (B, embed_dim, H/patch_size, W/patch_size)
        # flatten(2) flattens the last two dimensions: (H/patch_size * W/patch_size = num_patches)
        x = x.flatten(2).transpose(1, 2) # (B, num_patches, embed_dim) (transpose(1,2) swaps the first and second dimensions)
        # expand the class token to the batch size. -1,-1 is used to keep the size of the other dimensions the same
        cls_token = self.cls_token.expand(B, -1, -1) # (B, 1, embed_dim)
        # concatenate the class token and the patch embeddings along the sequence dimension
        x = torch.cat((cls_token, x), dim=1) # (B, 1 + num_patches, embed_dim)
        x = x + self.pos_embed # (B, 1 + num_patches, embed_dim)
        return x


In [16]:
class MLP(nn.Module):
    def __init__(self,
                 in_features,
                 hidden_features,
                 drop_rate):
        super().__init__()
        self.fc1 = nn.Linear(in_features=in_features, out_features=hidden_features)
        self.fc2 = nn.Linear(in_features=hidden_features, out_features=in_features) # in_features because we want to project back to the original dimension
        self.dropout = nn.Dropout(drop_rate) # randomly zero out some of the features with probability drop_rate

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.dropout(F.gelu(self.fc1(x))) # GELU activation function
        x = self.dropout(self.fc2(x))
        return x

In [17]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self,
                 embed_dim,
                 num_heads,
                 mlp_dim,
                 drop_rate):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim) # layer normalization
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=drop_rate) # multi-head self-attention
        self.norm2 = nn.LayerNorm(embed_dim) # layer normalization
        self.mlp = MLP(embed_dim, mlp_dim, drop_rate)

    def forward(self, x):
        x = x + self.attn(self.norm1(x), self.norm1(x), self.norm1(x))[0] # residual connection
        x = x + self.mlp(self.norm2(x))
        return x

In [18]:
class VisionTransformer(nn.Module):
    def __init__(self,
                 img_size,
                 patch_size,
                 in_channels,
                 num_classes,
                 embed_dim,
                 depth,
                 num_heads,
                 mlp_dim,
                 drop_rate):
        super().__init__()
        self.patch_embed = PatchEmbedding(img_size, patch_size, in_channels, embed_dim)
        self.encoder = nn.Sequential(
            # we use * to unpack the list of layers and pass it to the Sequential container
            *[TransformerEncoderLayer(embed_dim, num_heads, mlp_dim, drop_rate) for _ in range(depth)]
        )
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_classes) # output layer for classification

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.patch_embed(x)
        x = self.encoder(x)
        x = self.norm(x)
        # take the class token
        cls_token = x[:, 0] # [:, 0] because it's the first token of the sequence
        x = self.head(cls_token)
        return x

In [19]:
# Instantiate model
model = VisionTransformer(
    img_size=IMAGE_SIZE,
    patch_size=PATCH_SIZE,
    in_channels=CHANNELS,
    num_classes=NUM_CLASSES,
    embed_dim=EMBED_DIM,
    depth=DEPTH,
    num_heads=NUM_HEADS,
    mlp_dim=MLP_DIM,
    drop_rate=DROP_RATE
).to(device)

In [20]:
model

VisionTransformer(
  (patch_embed): PatchEmbedding(
    (proj): Conv2d(3, 256, kernel_size=(4, 4), stride=(4, 4))
  )
  (encoder): Sequential(
    (0): TransformerEncoderLayer(
      (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
      )
      (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (mlp): MLP(
        (fc1): Linear(in_features=256, out_features=512, bias=True)
        (fc2): Linear(in_features=512, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
    (1): TransformerEncoderLayer(
      (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
      )
      (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
      (mlp): MLP

## Defining a Loss function and an optimizer

In [21]:
criterion = nn.CrossEntropyLoss() # for multi-class classification
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

criterion, optimizer

(CrossEntropyLoss(),
 Adam (
 Parameter Group 0
     amsgrad: False
     betas: (0.9, 0.999)
     capturable: False
     decoupled_weight_decay: False
     differentiable: False
     eps: 1e-08
     foreach: None
     fused: None
     lr: 0.0003
     maximize: False
     weight_decay: 0
 ))

## Defining a Training Loop function

In [22]:
def train(model, loader, optimizer, criterion):
    model.train()

    total_loss, correct = 0, 0

    for x, y in loader:
        # Moving data to the device
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad() # zero the gradients
        # 1. Forward pass (model outputs raw logits)
        out = model(x)
        # 2. Calculate loss
        loss = criterion(out, y)
        # 3. Perform backpropagation
        loss.backward()
        # 4. Perform Gradient Descent (Update the weights)
        optimizer.step()
        # accumulate training loss (CEL returns the average over a batch)
        total_loss += loss.item() * x.size(0) # Multiply by x.size(0) to get an average over all samples
        correct += (out.argmax(dim=1) == y).sum().item() # count correct predictions
    # we scale the loss to account for the batch size
    return total_loss / len(loader.dataset), correct / len(loader.dataset)

In [23]:
def evaluate(model, loader):
    model.eval()
    correct = 0
    with torch.inference_mode(): # Disable gradient tracking
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            correct += (out.argmax(dim=1) == y).sum().item()
    return correct / len(loader.dataset)

In [None]:
# if using Colab
# from tqdm.auto import tqdm
from tqdm import tqdm

In [None]:
### Training
train_accuracies, test_accuracies = [], []

for epoch in tqdm(range(EPOCHS)):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    test_acc = evaluate(model, test_loader)
    train_accuracies.append(train_acc)
    test_accuracies.append(test_acc)
    print(f"Epoch {epoch+1}/{EPOCHS} - Train Loss: {train_loss:.4f} - Train Acc: {train_acc:.4f}% - Test Acc: {test_acc:.4f}")

  0%|          | 0/10 [00:00<?, ?it/s]