# Lab 7: Self-Attention


This lab covers the following topics:

- Gain insight into the self-attention operation using the sequential MNIST example from before.
- Gain insight into positional encodings

## 0 Initialization

Run the code cell below to download the MNIST digits dataset:

In [5]:
!wget -O MNIST.tar.gz https://activeeon-public.s3.eu-west-2.amazonaws.com/datasets/MNIST.new.tar.gz
!tar -zxvf MNIST.tar.gz

import torchvision
import torch
import torchvision.transforms as transforms
from torch import nn
import torch.nn.functional as F

from torch.utils.data import Subset

dataset = torchvision.datasets.MNIST('./', download=True , transform=transforms.Compose([transforms.ToTensor()]), train=True)
train_indices = torch.arange(0, 10000)
train_dataset = Subset(dataset, train_indices)

dataset=torchvision.datasets.MNIST('./', download=True , transform=transforms.Compose([transforms.ToTensor()]), train=False)
test_indices = torch.arange(0, 10000)
test_dataset = Subset(dataset, test_indices)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64,
                                          shuffle=True, num_workers=0)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16,
                                          shuffle=False, num_workers=0)

'wget' �����ڲ����ⲿ���Ҳ���ǿ����еĳ���
���������ļ���
tar: Error opening archive: Failed to open 'MNIST.tar.gz'


## Exercise 1: Self-Attention without Positional Encoding

In this section, will implement a very simple model based on self-attention without positional encoding. The model you will implement will consider the input image as a sequence of 28 rows. You may use PyTorch's [`nn.MultiheadAttention`](https://pytorch.org/docs/stable/generated/torch.nn.MultiheadAttention.html) for this part. Implement a model with the following architecture:

* **Input**: Input image of shape `(batch_size, sequence_length, input_size)`, where $\text{sequence_length} = \text{image_height}$ and $\text{input_size} = \text{image_width}$.

* **Linear 1**: Linear layer which converts input of shape `(sequence_length*batch_size, input_size)` to input of shape `(sequence_length*batch_size, embed_dim)`, where `embed_dim` is the embedding dimension.

* **Attention 1**: `nn.MultiheadAttention` layer with 8 heads which takes an input of shape `(sequence_length, batch_size, embed_dim)` and outputs a tensor of shape `(sequence_length, batch_size, embed_dim)`. 

* **ReLU**: ReLU activation layer.

* **Linear 2**: Linear layer which converts input of shape `(sequence_length*batch_size, embed_dim)` to input of shape `(sequence_length*batch_size, embed_dim)`.

* **ReLU**: ReLU activation layer.

* **Attention 2**: `nn.MultiheadAttention` layer with 8 heads which takes an input of shape `(sequence_length, batch_size, embed_dim)` and outputs a tensor of shape `(sequence_length, batch_size, embed_dim)`.

* **ReLU**: ReLU activation layer.

* **AvgPool**: Average along the sequence dimension from `(batch_size, sequence_length, embed_dim)` to `(batch_size, embed_dim)`

* **Linear 3**: Linear layer which takes an input of shape `(batch_size, embed_dim)` and outputs the class logits of shape `(batch_size, 10)`.


**NOTE**: Be cautious of correctly permuting and reshaping the input between layers. E.g. if `x` is of shape `(batch_size, sequence_length, input_size)`, note that `x.reshape(batch_size*sequence_length, -1) != x.permute(1,0,2).reshape(batch_size*sequence_length, -1)`. In this example, `x.reshape(batch_size*sequence_length, -1)` has `[batch0_seq0, batch0_seq1, ..., batch1_seq0, batch1_seq1, ...]` format, while `x.permute(1,0,2).reshape(batch_size*sequence_length, -1)` has `[batch0_seq0, batch1_seq0, ..., batch0_seq1, batch1_seq1, ...]` format.

In [89]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class MultiHead_Attn(nn.Module):
    def __init__(self, embed_dim ,num_head ):        
        super().__init__()
        
        self.embed_dim = embed_dim ##1024
        self.num_head = num_head ##8
        self.head_dim = self.embed_dim // self.num_head
        
        
        assert self.embed_dim%self.num_head == 0
        
        self.q = nn.Linear(self.embed_dim , self.embed_dim )
        self.k = nn.Linear(self.embed_dim , self.embed_dim )
        self.v = nn.Linear(self.embed_dim , self.embed_dim)
        
        self.f_linear = nn.Linear(self.embed_dim, self.embed_dim)
        self.dropout = nn.Dropout(.35)
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
    
    def forward(self, x ):
        batch_size = x.shape[0]
        src_len    = x.shape[1]
#         print('src_len=',src_len)
        
        w_k = self.k(x)
        w_q = self.q(x)        
        w_v = self.v(x)
        
        w_q = w_q.view(batch_size,-1,self.head_dim)
        w_k = w_k.view(batch_size,-1,self.head_dim)
        w_v = w_v.view(batch_size,-1,self.head_dim)
         
        energy = torch.matmul( w_k.permute(0,2,1) ,w_q )
        energy = energy/self.scale
        energy = torch.softmax(energy,-1)
        
        
        f_energy = torch.matmul( self.dropout(energy) , w_v.permute(0,2,1))
        f_energy = f_energy.permute(0, 2, 1)
        f_energy = f_energy.reshape(batch_size,-1)
        out = self.f_linear(f_energy)
        
        return out

In [107]:
# Self-attention without positional encoding
torch.manual_seed(691)

# Define your model here
class myModel(nn.Module):
    def __init__(self, input_size, embed_dim, seq_length,
                 num_classes=10, num_heads=8):
        super(myModel, self).__init__()
        # TODO: Initialize myModel
        self.input_size = input_size
        self.embed_dim = embed_dim
        self.seq_length = seq_length
        self.num_classes = num_classes
        self.num_heads = num_heads
        
        self.linear1 = nn.Linear(input_size, embed_dim)
        self.attention1=MultiHead_Attn(embed_dim, 8)
        self.relu=nn.ReLU()
        self.linear2 = nn.Linear(embed_dim, embed_dim)
        self.attention2=MultiHead_Attn(embed_dim, 8)
        self.linear3 = nn.Linear(embed_dim*seq_length, 10)     
        self.avgpool=nn.AvgPool2d((seq_length, 1), stride=(2, 1))

    def forward(self,x):
        # TODO: Implement myModel forward pass
        batch_size, sequence_length, input_size = x.shape
        input=x.reshape(batch_size*sequence_length, -1)
        l1_out=self.linear1(input)
        a1_out=self.attention1(l1_out) 
        relu1_out=self.relu(a1_out)        
        print(type(relu1_out))
        l2_out=self.linear2(relu1_out)
        print(l2_out.shape)
        relu2_out=self.relu(l2_out)
        a2_out=self.attention2(relu2_out)
        print(a2_out.shape)
        relu3_out=self.relu(a2_out)
        print(relu3_out.shape)
        relu3_out=relu3_out.reshape(batch_size,sequence_length, -1) 
        print(relu3_out.shape)
        avgpool_out=self.avgpool(relu3_out)
        print(avgpool_out.shape)
        avgpool_out=avgpool_out.reshape(batch_size, -1) 
        print(avgpool_out.shape)
        l3_out=self.linear2(avgpool_out)
        return 

In [130]:
# Self-attention without positional encoding
torch.manual_seed(691)

# Define your model here
class myModel(nn.Module):
    def __init__(self, input_size, embed_dim, seq_length,
                 num_classes=10, num_heads=8):
        super(myModel, self).__init__()
        # TODO: Initialize myModel
        self.input_size = input_size
        self.embed_dim = embed_dim
        self.seq_length = seq_length
        self.num_classes = num_classes
        self.num_heads = num_heads

        self.linear1 = nn.Linear(input_size, embed_dim)
        self.attention = nn.MultiheadAttention(embed_dim, num_heads)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(embed_dim, embed_dim)
        self.avgpool = nn.AvgPool2d((seq_length, 1), stride=(2, 1))
        self.linear3 = nn.Linear(embed_dim, num_classes)     

        

    def forward(self,x):
        # TODO: Implement myModel forward pass
        batch_size, sequence_length, input_size = x.shape # 64, 28, 28
        input=x.reshape(batch_size*sequence_length, -1) # 1792, 28
        l1_out=self.linear1(input) # 1792, 64
        a1_out, _=self.attention(l1_out, l1_out, l1_out) 
        relu1_out=self.relu(a1_out) # 1792, 64       
        l2_out=self.linear2(relu1_out)
        relu2_out=self.relu(l2_out) # 1792, 64 
        a2_out, _=self.attention(relu2_out, relu2_out, relu2_out) # 1792, 64
        relu3_out=self.relu(a2_out) # 1792, 64
        relu3_out=relu3_out.reshape(batch_size,sequence_length, -1) # 64, 28, 64
        avgpool_out=self.avgpool(relu3_out) # 64, 1, 64
        avgpool_out=avgpool_out.reshape(batch_size, -1) # 64, 64
        l3_out=self.linear3(avgpool_out) # 64, 10
        return l3_out
    

Train and evaluate your model by running the cell below. Expect to see  `60-80%` test accuracy.

In [131]:
# Same training code 

import torch 
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms


# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters
sequence_length = 28
input_size = 28
hidden_size = 64
num_layers = 2
num_classes = 10
num_epochs = 8
learning_rate = 0.005

# Initialize model
model = myModel(input_size=input_size, embed_dim=hidden_size, seq_length=sequence_length)
model = model.to(device)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()

        optimizer.step()

        if (i+1) % 10 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))


# Test the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 

Epoch [1/8], Step [10/157], Loss: 2.3077
Epoch [1/8], Step [20/157], Loss: 2.3170
Epoch [1/8], Step [30/157], Loss: 2.2960
Epoch [1/8], Step [40/157], Loss: 2.3117
Epoch [1/8], Step [50/157], Loss: 2.3046
Epoch [1/8], Step [60/157], Loss: 2.2965
Epoch [1/8], Step [70/157], Loss: 2.3052
Epoch [1/8], Step [80/157], Loss: 2.2832
Epoch [1/8], Step [90/157], Loss: 2.2882
Epoch [1/8], Step [100/157], Loss: 2.2970
Epoch [1/8], Step [110/157], Loss: 2.3073
Epoch [1/8], Step [120/157], Loss: 2.3039
Epoch [1/8], Step [130/157], Loss: 2.2880
Epoch [1/8], Step [140/157], Loss: 2.3068
Epoch [1/8], Step [150/157], Loss: 2.2962
Epoch [2/8], Step [10/157], Loss: 2.2915
Epoch [2/8], Step [20/157], Loss: 2.3048
Epoch [2/8], Step [30/157], Loss: 2.3106
Epoch [2/8], Step [40/157], Loss: 2.2986
Epoch [2/8], Step [50/157], Loss: 2.3147
Epoch [2/8], Step [60/157], Loss: 2.3039
Epoch [2/8], Step [70/157], Loss: 2.3216
Epoch [2/8], Step [80/157], Loss: 2.2995
Epoch [2/8], Step [90/157], Loss: 2.2994
Epoch [2/8

## Exercise 2: Self-Attention with Positional Encoding

Implement a similar model to exercise 1, except this time your embedded input should be added with the positional encoding. For the purpose of this lab, we will use a learned positional encoding, which will be a trainable embedding. Your positional encodings will be added to the initial transformation of the input.

* **Input**: Input image of shape `(batch_size, sequence_length, input_size)`, where $\text{sequence_length} = \text{image_height}$ and $\text{input_size} = \text{image_width}$.

* **Linear 1**: Linear layer which converts input of shape `(batch_size*sequence_length, input_size)` to input of shape `(batch_size*sequence_length, embed_dim)`, where `embed_dim` is the embedding dimension.

* **Add Positional Encoding**: Add a learnable positional encoding of shape `(sequence_length, batch_size, embed_dim)` to input of shape `(sequence_length, batch_size, embed_dim)`, where `pos_embed` is the positional embedding size. The output will be of shape `(sequence_length, batch_size, embed_dim)`.

* **Attention 1**: `nn.MultiheadAttention` layer with 8 heads which takes an input of shape `(sequence_length, batch_size, embed_dim)` and outputs a tensor of shape `(sequence_length, batch_size, embed_dim)`.

* **ReLU**: ReLU activation layer.

* **Linear 2**: Linear layer which converts input of shape `(sequence_length*batch_size, features_dim)` to input of shape `(sequence_length*batch_size, features_dim)`.

* **ReLU**: ReLU activation layer.

* **Attention 2**: `nn.MultiheadAttention` layer with 8 heads which takes an input of shape `(sequence_length, batch_size, features_dim)` and outputs a tensor of shape `(sequence_length, batch_size, features_dim)`.

* **ReLU**: ReLU activation layer.

* **AvgPool**: Average along the sequence dimension from `(batch_size, sequence_length, features_dim)` to `(batch_size, features_dim)`

* **Linear 3**: Linear layer which takes an input of shape `(batch_size, sequence_length*features_dim)` and outputs the class logits of shape `(batch_size, 10)`.


In [21]:
# Self-attention with positional encoding
torch.manual_seed(691)

# Define your model here
class myModel(nn.Module):
    def __init__(self, input_size, embed_dim, seq_length,
                 num_classes=10, num_heads=8):
        super(myModel, self).__init__()
        # TODO: Initialize myModel
        self.positional_encoding = nn.Parameter(torch.rand(self.seq_length, self.embed_dim))
        self.input_size = input_size
        self.embed_dim = embed_dim
        self.seq_length = seq_length
        self.num_classes = num_classes
        self.num_heads = num_heads
        
        self.positional_encoding = nn.Parameter(torch.rand(self.seq_length, self.input_size))
        self.linear1 = nn.Linear(input_size, embed_dim)
        self.attention1=MultiHead_Attn(embed_dim, 8)
        self.relu=nn.ReLU()
        self.linear2 = nn.Linear(embed_dim, embed_dim)
        self.attention2=MultiHead_Attn(embed_dim, 8)
        self.linear3 = nn.Linear(embed_dim*seq_length, 10)     
        self.avgpool=nn.AvgPool2d((seq_length, 1), stride=(2, 1))

    def forward(self,x):
        # TODO: Implement myModel forward pass
        batch_size, sequence_length, input_size = x.shape
        for i in range(batch_size):
            x[i]=x[i]+self.positional_encoding
         
        
        input=x.reshape(batch_size*sequence_length, -1)        
        l1_out=self.linear1(input)
        a1_out=self.attention1(l1_out) 
        relu1_out=self.relu(a1_out)        
#         print(type(relu1_out))
        l2_out=self.linear2(relu1_out)
#         print(l2_out.shape)
        relu2_out=self.relu(l2_out)
        a2_out=self.attention2(relu2_out)
#         print(a2_out.shape)
        relu3_out=self.relu(a2_out)
#         print(relu3_out.shape)
        relu3_out=relu3_out.reshape(batch_size,sequence_length, -1) 
#         print(relu3_out.shape)
        avgpool_out=self.avgpool(relu3_out)
        avgpool_out=avgpool_out.reshape(batch_size, -1) 
#         print(avgpool_out.shape)
        l3_out=self.linear2(avgpool_out)
        return l3_out

Use the same training code as the one from part 1 to train your model. You may copy the training loop here. Expect to see close to `~90+%` test accuracy.

In [22]:
# Same training code 

import torch 
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms


# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters
sequence_length = 28
input_size = 28
hidden_size = 64
num_layers = 2
num_classes = 10
num_epochs = 8
learning_rate = 0.005

# Initialize model
model = myModel(input_size=input_size, embed_dim=hidden_size, seq_length=sequence_length)
model = model.to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()

        optimizer.step()

        if (i+1) % 10 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))


# Test the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 

AttributeError: 'myModel' object has no attribute 'seq_length'