#DATA MINING AND NEURAL NETWORKS    
##Assignment 3.4 - Self-attention and Transformers

In this file, we first understand the self-attention mechanism by implementing it both with ``NumPy`` and ``PyTorch``.
Then, we implement a 6-layer Vision Transformer (ViT) and train it on the MNIST dataset.

All training will be conducted on a single T4 GPU.


In [None]:
# Please first load your google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Please go to the folder with all assignment files
# Please change the following path to your own path
!cd /content/drive/MyDrive/DMNN/DMNN2023

/bin/bash: line 1: cd: /content/drive/MyDrive/DMNN/DMNN2023: No such file or directory


In [None]:
# Please go to Edit > Notebook settings > Hardware accelerator > choose "T4 GPU"
# Now check if you have loaded the GPU successfully
!nvidia-smi

Fri Dec 22 13:19:44 2023       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   37C    P8               9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

# Self-attention Mechanism
Self-attention is the core mechanism in Transformer.

## Self-attention with NumPy
To have a better understanding of it, we first manually implement self-attention mechanism with ``numpy``. You can check the dimension of each variable during the matrix computation.

Feel free to change the dimensions of each variable and see how the output dimension will change accordingly.

In [None]:
import math
import numpy as np
from numpy.random import randn

# I. Define the input data X
# X is of 32 samples, each sample of dimension 256
d = 256
n = 32
X = randn(n, d) # (32, 256)

# II. Generate the projection weights
Wq = randn(d, d) # (256, 256)
Wk = randn(d, d)
Wv = randn(d, d)

# III. Project X to queries, keys and values
# We would train these in real life.
Q = np.dot(X, Wq) # (32, 256)
K = np.dot(X, Wk)
V = np.dot(X, Wv)

# IV. Compute the self-attention score, denoted by A
# A = softmax(QK^T / \sqrt{d})
# Define the softmax function
def softmax(z):
    z = np.clip(z, 100, -100) # clip in case softmax explode
    tmp = np.exp(z)
    res = np.exp(z) / np.sum(tmp, axis=1)
    return res

# This represents how all the different samples are related to one another.
A = softmax(np.dot(Q, K.transpose())/math.sqrt(d)) #(32, 32)

# V. Compute the self-attention output
# outputs = A * V
outputs = np.dot(A, V) #(32, 256)

print("The attention outputs are\n {}".format(outputs))

The attention outputs are
 [[ 0.29813141 -0.5775645  -4.69297744 ...  2.21494554 -0.78305058
   3.10710804]
 [ 0.29813141 -0.5775645  -4.69297744 ...  2.21494554 -0.78305058
   3.10710804]
 [ 0.29813141 -0.5775645  -4.69297744 ...  2.21494554 -0.78305058
   3.10710804]
 ...
 [ 0.29813141 -0.5775645  -4.69297744 ...  2.21494554 -0.78305058
   3.10710804]
 [ 0.29813141 -0.5775645  -4.69297744 ...  2.21494554 -0.78305058
   3.10710804]
 [ 0.29813141 -0.5775645  -4.69297744 ...  2.21494554 -0.78305058
   3.10710804]]


## Self-attention with PyTorch
Now, we implement self-attention with ``PyTorch``, which is commonly used when building Transformers.

Feel free to change the dimensions of each variable and see how the output dimension will change accordingly.

In [None]:
import math
import torch
import torch.nn as nn

class SelfAttention(nn.Module):
    def __init__(self, dim_input, dim_q, dim_v):
        '''
        dim_input: the dimension of each sample
        dim_q: dimension of Q matrix, should be equal to dim_k
        dim_v: dimension of V matrix, also the the dimension of the attention output
        '''
        super(SelfAttention, self).__init__()

        self.dim_input = dim_input
        self.dim_q = dim_q
        self.dim_k = dim_q
        self.dim_v = dim_v

        # Define the linear projection
        self.linear_q = nn.Linear(self.dim_input, self.dim_q, bias=False)
        self.linear_k = nn.Linear(self.dim_input, self.dim_k, bias=False)
        self.linear_v = nn.Linear(self.dim_input, self.dim_v, bias=False)
        self._norm_fact = 1 / math.sqrt(self.dim_k)

    def forward(self, x):
        batch, n, dim_q = x.shape

        q = self.linear_q(x) # (batchsize, seq_len, dim_q)
        k = self.linear_k(x) # (batchsize, seq_len, dim_k)
        v = self.linear_v(x) # (batchsize, seq_len, dim_v)
        print(f'x.shape:{x.shape} \n Q.shape:{q.shape} \n K.shape:{k.shape} \n V.shape:{v.shape}')

        dist = torch.bmm(q, k.transpose(1,2)) * self._norm_fact
        dist = torch.softmax(dist, dim=-1)
        print('attention matrix: ', dist.shape)

        outputs = torch.bmm(dist, v)
        print('attention outpus: ', outputs.shape)

        return outputs

if __name__ == '__main__':
    batch_size = 32 # number of samples in a batch
    dim_input = 128 # dimension of each item in the sample sequence
    seq_len = 20 # sequence length for each sample
    x = torch.randn(batch_size, seq_len, dim_input)
    self_attention = SelfAttention(dim_input, dim_q = 64, dim_v = 32)

    attention = self_attention(x)

    print(attention)

x.shape:torch.Size([32, 20, 128]) 
 Q.shape:torch.Size([32, 20, 64]) 
 K.shape:torch.Size([32, 20, 64]) 
 V.shape:torch.Size([32, 20, 32])
attention matrix:  torch.Size([32, 20, 20])
attention outpus:  torch.Size([32, 20, 32])
tensor([[[-0.0408,  0.0632, -0.1087,  ..., -0.2198, -0.0873,  0.0345],
         [-0.0587,  0.1012, -0.1041,  ..., -0.2198, -0.1100,  0.0832],
         [-0.0424,  0.1211, -0.0849,  ..., -0.2848, -0.1433,  0.0299],
         ...,
         [-0.0569,  0.0842, -0.1462,  ..., -0.2108, -0.0740,  0.0010],
         [-0.0562,  0.0348, -0.1415,  ..., -0.2358, -0.0356,  0.0340],
         [-0.0695,  0.0524, -0.0828,  ..., -0.2781, -0.0172,  0.1039]],

        [[ 0.1277, -0.0648,  0.1376,  ..., -0.0085,  0.2023,  0.1575],
         [ 0.2143, -0.1453,  0.0011,  ...,  0.0217,  0.1621,  0.1580],
         [ 0.2936, -0.1005,  0.0184,  ...,  0.0715,  0.1990,  0.1594],
         ...,
         [ 0.2394, -0.1046,  0.0029,  ...,  0.0250,  0.2097,  0.2439],
         [ 0.1455, -0.0767,  0.11

# Transformers
In this section, we implement a 6-layer Vision Transformer (ViT) and trained it on the MNIST dataset.
We consider the classification tasks.
First, we load the MNIST dataset as follows:

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import torchvision
from torchvision import datasets, utils
from torchvision.datasets import MNIST

def get_mnist_loader(batch_size=100, shuffle=True):
    """

    :return: train_loader, test_loader
    """
    train_dataset = MNIST(root='../data',
                          train=True,
                          transform=torchvision.transforms.ToTensor(),
                          download=True)
    test_dataset = MNIST(root='../data',
                         train=False,
                         transform=torchvision.transforms.ToTensor(),
                         download=True)

    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=shuffle)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                              batch_size=batch_size,
                                              shuffle=False)
    return train_loader, test_loader

In [None]:
# This package is needed to build the transformer
!pip install einops

Collecting einops
  Downloading einops-0.7.0-py3-none-any.whl (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m578.2 kB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.7.0


## Build ViT from scratch
Recall that each Transformer block include 2 modules: the self-attention module, the feedforward module.

In [None]:
from einops import rearrange

class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, **kwargs):
        return self.fn(x, **kwargs) + x

class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn

    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, dim)
        )

    def forward(self, x):
        return self.net(x)

class Attention(nn.Module):
    def __init__(self, dim, heads=8):
        super().__init__()
        self.heads = heads
        self.scale = dim ** -0.5

        self.to_qkv = nn.Linear(dim, dim * 3, bias=False)
        self.to_out = nn.Linear(dim, dim)

    def forward(self, x, mask = None):
        b, n, _, h = *x.shape, self.heads
        qkv = self.to_qkv(x)
        q, k, v = rearrange(qkv, 'b n (qkv h d) -> qkv b h n d', qkv=3, h=h)

        dots = torch.einsum('bhid,bhjd->bhij', q, k) * self.scale

        if mask is not None:
            mask = F.pad(mask.flatten(1), (1, 0), value = True)
            assert mask.shape[-1] == dots.shape[-1], 'mask has incorrect dimensions'
            mask = mask[:, None, :] * mask[:, :, None]
            dots.masked_fill_(~mask, float('-inf'))
            del mask

        attn = dots.softmax(dim=-1)

        out = torch.einsum('bhij,bhjd->bhid', attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        out =  self.to_out(out)
        return out

class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, mlp_dim):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                Residual(PreNorm(dim, Attention(dim, heads = heads))),
                Residual(PreNorm(dim, FeedForward(dim, mlp_dim)))
            ]))

    def forward(self, x, mask=None):
        for attn, ff in self.layers:
            x = attn(x, mask=mask)
            x = ff(x)
        return x

class ViT(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, channels=3):
        super().__init__()
        assert image_size % patch_size == 0, 'image dimensions must be divisible by the patch size'
        num_patches = (image_size // patch_size) ** 2
        patch_dim = channels * patch_size ** 2

        self.patch_size = patch_size

        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        self.patch_to_embedding = nn.Linear(patch_dim, dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.transformer = Transformer(dim, depth, heads, mlp_dim)

        self.to_cls_token = nn.Identity()

        self.mlp_head = nn.Sequential(
            nn.Linear(dim, mlp_dim),
            nn.GELU(),
            nn.Linear(mlp_dim, num_classes)
        )

    def forward(self, img, mask=None):
        p = self.patch_size

        x = rearrange(img, 'b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=p, p2=p)
        x = self.patch_to_embedding(x)

        cls_tokens = self.cls_token.expand(img.shape[0], -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.pos_embedding
        x = self.transformer(x, mask)

        x = self.to_cls_token(x[:, 0])
        return self.mlp_head(x)

## Training and test function


In [None]:
import torch.nn.functional as F

def train_epoch(model, optimizer, data_loader, loss_history):
    total_samples = len(data_loader.dataset)
    model.train()

    for i, (data, target) in enumerate(data_loader):
        data = data.cuda()
        target = target.cuda()
        optimizer.zero_grad()
        output = F.log_softmax(model(data), dim=1)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        if i % 100 == 0:
            print('[' +  '{:5}'.format(i * len(data)) + '/' + '{:5}'.format(total_samples) +
                  ' (' + '{:3.0f}'.format(100 * i / len(data_loader)) + '%)]  Loss: ' +
                  '{:6.4f}'.format(loss.item()))
            loss_history.append(loss.item())

In [None]:
def evaluate(model, data_loader, loss_history):
    model.eval()

    total_samples = len(data_loader.dataset)
    correct_samples = 0
    total_loss = 0

    # We do not need to remeber the gradients when testing
    # This will help reduce memory
    with torch.no_grad():
        for data, target in data_loader:
            data = data.cuda()
            target = target.cuda()
            output = F.log_softmax(model(data), dim=1)
            loss = F.nll_loss(output, target, reduction='sum')
            _, pred = torch.max(output, dim=1)

            total_loss += loss.item()
            correct_samples += pred.eq(target).sum()

    avg_loss = total_loss / total_samples
    loss_history.append(avg_loss)
    print('\nAverage test loss: ' + '{:.4f}'.format(avg_loss) +
          '  Accuracy:' + '{:5}'.format(correct_samples) + '/' +
          '{:5}'.format(total_samples) + ' (' +
          '{:4.2f}'.format(100.0 * correct_samples / total_samples) + '%)\n')
    return avg_loss, correct_samples, total_samples

## Let's start training!
Here, you can change the ViT structure by changing the hyper-parametrs inside ``ViT`` function.
The default settings are with 6 layers, 8 heads for the multi-head attention mechanism and embedding dimension of 64.
You can also increase the number of epochs to obtain better results.

In [None]:
import time

# You can change the architecture here
model = ViT(image_size=28, patch_size=7, num_classes=10, channels=1,
            dim=64, depth=6, heads=8, mlp_dim=128)
model = model.cuda()
# We also print the network architecture
model

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

train_loss_history, test_loss_history = [], []

In [None]:
N_EPOCHS = 20

train_loader, test_loader = get_mnist_loader(batch_size=128, shuffle=True)

scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

start_time = time.time()
for epoch in range(1, N_EPOCHS + 1):
    print('Epoch:', epoch,'LR:', scheduler.get_last_lr())
    train_epoch(model, optimizer, train_loader, train_loss_history)
    evaluate(model, test_loader, test_loss_history)
    scheduler.step()

print('Execution time:', '{:5.2f}'.format(time.time() - start_time), 'seconds')

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ../data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 105261094.30it/s]


Extracting ../data/MNIST/raw/train-images-idx3-ubyte.gz to ../data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ../data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 28705140.72it/s]

Extracting ../data/MNIST/raw/train-labels-idx1-ubyte.gz to ../data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz





Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ../data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 31944661.32it/s]


Extracting ../data/MNIST/raw/t10k-images-idx3-ubyte.gz to ../data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ../data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 22074772.62it/s]


Extracting ../data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ../data/MNIST/raw

Epoch: 1 LR: [0.001]

Average test loss: 0.2030  Accuracy: 9379/10000 (93.79%)

Epoch: 2 LR: [0.00095]

Average test loss: 0.1176  Accuracy: 9629/10000 (96.29%)

Epoch: 3 LR: [0.0009025]

Average test loss: 0.1162  Accuracy: 9635/10000 (96.35%)

Epoch: 4 LR: [0.000857375]

Average test loss: 0.0800  Accuracy: 9738/10000 (97.38%)

Epoch: 5 LR: [0.0008145062499999999]

Average test loss: 0.0807  Accuracy: 9757/10000 (97.57%)

Epoch: 6 LR: [0.0007737809374999998]

Average test loss: 0.0706  Accuracy: 9773/10000 (97.73%)

Epoch: 7 LR: [0.0007350918906249997]

Average test loss: 0.0649  Accuracy: 9789/10000 (97.89%)

Epoch: 8 LR: [0.0006983372960937497]

Average test loss: 0.0735  Accuracy: 9792/10000 (97.92%)

Epoch: 9 LR: [0.0006634204312890621]

Average test loss: 0.0632  Accuracy: 9820/10000 (98.20%)

Epoch: 10 LR: [0.000630249409724609]

Average test loss: 0.0578  Accuracy: 9829/10000 (98.29%)

Epoch: 11 LR: [

In [None]:
import time
import torch.nn.functional as F

def train_and_evaluate(config):
    model = ViT(image_size=28, patch_size=7, num_classes=10, channels=1,
                dim=config['dim'], depth=config['depth'], heads=config['heads'], mlp_dim=config['mlp_dim'])
    model = model.cuda()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

    train_loss_history, test_loss_history = [], []
    N_EPOCHS = 20
    train_loader, test_loader = get_mnist_loader(batch_size=128, shuffle=True)

    start_time = time.time()
    for epoch in range(1, N_EPOCHS + 1):
        print('Config:', config, 'Epoch:', epoch, 'LR:', scheduler.get_last_lr())
        train_epoch(model, optimizer, train_loader, train_loss_history)
        avg_loss, correct_samples, total_samples = evaluate(model, test_loader, test_loss_history)
        scheduler.step()

    execution_time = time.time() - start_time
    last_epoch_accuracy = 100.0 * correct_samples / total_samples

    return execution_time, last_epoch_accuracy


In [None]:
# List of configurations to try
# configs = [
#     {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128},  # Baseline
#     {'dim': 128, 'depth': 8, 'heads': 16, 'mlp_dim': 256},  # Increased Capacity
#     # Add other configurations here
# ]

configs = [
    # Baseline Model
    {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128},  # Original setup as a baseline

    # Increased Capacity Model
    {'dim': 128, 'depth': 8, 'heads': 16, 'mlp_dim': 256},  # Increases capacity for better feature learning but risks overfitting

    # Reduced Capacity Model
    {'dim': 32, 'depth': 4, 'heads': 4, 'mlp_dim': 64},  # Smaller model, potentially faster training and better generalization

    # Increased Depth
    {'dim': 64, 'depth': 12, 'heads': 8, 'mlp_dim': 128},  # More layers for complex features, but more challenging to train

    # Fewer Heads with Higher Dimension
    {'dim': 128, 'depth': 6, 'heads': 4, 'mlp_dim': 128},  # Higher dimension per head with fewer heads, affecting attention diversity

    # High MLP Dimension
    {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 256},  # Increased MLP dimension for potentially improved complex function learning

    # Balanced Increase
    {'dim': 96, 'depth': 7, 'heads': 10, 'mlp_dim': 192}  # Moderately larger model with balanced increase across parameters
]


results = []
for config in configs:
    execution_time, accuracy = train_and_evaluate(config)
    results.append((config, execution_time, accuracy))

# Print results
for config, execution_time, accuracy in results:
    print(f"Config: {config}, Time: {execution_time:.2f} sec, Accuracy: {accuracy:.2f}%")

Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128} Epoch: 1 LR: [0.001]

Average test loss: 0.2009  Accuracy: 9385/10000 (93.85%)

Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128} Epoch: 2 LR: [0.00095]

Average test loss: 0.1449  Accuracy: 9521/10000 (95.21%)

Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128} Epoch: 3 LR: [0.0009025]

Average test loss: 0.1304  Accuracy: 9580/10000 (95.80%)

Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128} Epoch: 4 LR: [0.000857375]

Average test loss: 0.0910  Accuracy: 9708/10000 (97.08%)

Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128} Epoch: 5 LR: [0.0008145062499999999]

Average test loss: 0.0829  Accuracy: 9743/10000 (97.43%)

Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128} Epoch: 6 LR: [0.0007737809374999998]

Average test loss: 0.0716  Accuracy: 9781/10000 (97.81%)

Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128} Epoch: 7 LR: [0.0007350918906249997]

Average test loss:

EinopsError: ignored

In [None]:
# Print results
for config, execution_time, accuracy in results:
    print(f"Config: {config}, Time: {execution_time:.2f} sec, Accuracy: {accuracy:.2f}%")


Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 128}, Time: 293.55 sec, Accuracy: 98.43%
Config: {'dim': 128, 'depth': 8, 'heads': 16, 'mlp_dim': 256}, Time: 332.59 sec, Accuracy: 98.67%
Config: {'dim': 32, 'depth': 4, 'heads': 4, 'mlp_dim': 64}, Time: 250.83 sec, Accuracy: 97.87%
Config: {'dim': 64, 'depth': 12, 'heads': 8, 'mlp_dim': 128}, Time: 421.16 sec, Accuracy: 98.62%
Config: {'dim': 128, 'depth': 6, 'heads': 4, 'mlp_dim': 128}, Time: 298.24 sec, Accuracy: 98.48%
Config: {'dim': 64, 'depth': 6, 'heads': 8, 'mlp_dim': 256}, Time: 297.36 sec, Accuracy: 98.39%
