## CS5242 Neural Networks and Deep Learning
## Sem 2 2024/25
### Lecturer: Xavier Bresson
### Teaching Assistants: Liu Nian, Liu Ziming, Liu Xiaokang, Yan Zehong, Chew KinWhye

## Final exam, coding test
Date: April 14 2025 <br>
Time: 6:45pm-8:15pm (90min) <br>

*Instructions* <br>
Name: Please, add your name here : KOH KAI YIT<br>
Answers: Please write your answers directly in this notebook by completing the code sections marked with  
`# YOUR CODE STARTS HERE`  
`# YOUR CODE` (it can span one or multiple lines)  
`# YOUR CODE ENDS HERE`. <br>
Remark: If certain conditions of the questions (for eg. hyperparameter values) are not stated, you are free to choose anything you want.  


## Exercise 1 : Implement a New Recurrent Neural Network

In this exercise, you will design and train a new vanilla RNN for next-word prediction using a subset of the Penn Treebank (PTB) dataset.

Unlike the standard RNN cell, this model uses a multiplicative interaction between the input and the recurrent transformation, followed by a ReLU nonlinearity:
$$
h_t = \textrm{ReLU}\big( (W_R h_{t-1} + b_R) \odot (W_V g_t + b_V) \big)
$$
where:
- $g_t$ is the input word embedding at time step $t$,
- $h_{t-1}$ is the hidden state from the previous step $t-1$ (i.e. the memory of past tokens),
- $W_R, W_V$ are weight matrices (learnable),
- $b_R, b_V$ are biases,
- $\odot$ is element-wise matrix multiplication,
- ReLU is the standard rectified linear unit activation.

This formulation allows the network to combine past memory and new input in a gated, element-wise multiplicative way, enhancing expressivity over standard additive updates.

Your Tasks:
- Implement the new RNN cell in a `new_rnn_layer` class.
- Choose training hyper-parameters (learning rate, batch size, hidden size, etc).
- Your trained model must achieve a perplexity below 850 (i.e. exp(cross-entropy)) on the test set.  
   

In [1]:
# You must run this cell, but you don't need to expand it if you'd prefer not to see the details.

%reset -f

import datetime
import torch
import torch.nn.functional as F
import torch.nn as nn
import math
import time
import numpy as np
import os.path
import matplotlib.pyplot as plt

print('Timestamp:',datetime.datetime.now().strftime("%y-%m-%d--%H-%M-%S"))

def normalize_gradient(net):
    grad_norm_sq=0
    for p in net.parameters():
        grad_norm_sq += p.grad.data.norm()**2
    grad_norm=math.sqrt(grad_norm_sq)
    if grad_norm<1e-4:
        net.zero_grad()
        print('grad norm close to zero')
    else:
        for p in net.parameters():
             p.grad.data.div_(grad_norm)
    return grad_norm

def display_num_param(net):
    nb_param = 0
    for param in net.parameters():
        nb_param += param.numel()
    print('There are {} ({:.2f} million) parameters in this neural network'.format(
        nb_param, nb_param/1e6))

def sentence2vector(sentence):
    words = sentence.split()
    x = torch.LongTensor(len(words),1)
    for idx, word in enumerate(words):
         if word not in word2idx:
            print('You entered a word which is not in the vocabulary.')
            print('Make sure that you do not have any capital letters')
         else:
            x[idx,0]=word2idx[word]
    return x

def show_next_word(scores):
    num_word_display=30
    prob=F.softmax(scores,dim=2)
    p=prob[-1].squeeze()
    p,word_idx = torch.topk(p,num_word_display)
    for i,idx in enumerate(word_idx):
        percentage= p[i].item()*100
        word=  idx2word[idx.item()]
        print(  "{:.1f}%\t".format(percentage),  word )

def eval_on_test_set(test_data):
    running_loss=0
    num_batches=0
    h = torch.zeros(1, bs, hidden_size)
    for count in range(0, test_data.shape[0]-1-seq_length, seq_length):
        minibatch_data =  test_data[ count   : count+seq_length   ]
        minibatch_label = test_data[ count+1 : count+seq_length+1 ]
        scores, h  = net( minibatch_data, h )
        minibatch_label =   minibatch_label.view(  bs*seq_length )
        scores          =            scores.view(  bs*seq_length , vocab_size)
        loss = criterion(  scores ,  minibatch_label )
        h=h.detach()
        running_loss += loss.item()
        num_batches += 1
    total_loss = running_loss/num_batches
    print('test: exp(loss) = ', math.exp(total_loss)  )
    return math.exp(total_loss)

# Load the PTB data
path_data = 'datasets/' 
word2idx  =  torch.load(os.path.join(path_data, 'word2idx.pt'))
idx2word  =  torch.load(os.path.join(path_data, 'idx2word.pt'))
mini_train_size = 1000
mini_test_size = 250
train_data = torch.load(os.path.join(path_data, 'train_data.pt'))[:mini_train_size]
test_data = torch.load(os.path.join(path_data, 'test_data.pt'))[:mini_test_size]


Timestamp: 25-04-14--18-56-16


In [4]:
class new_rnn_layer(nn.Module):
    
    def __init__(self, hidden_size):
        super().__init__()
        self.R = nn.Linear( hidden_size , hidden_size )
        self.V = nn.Linear( hidden_size , hidden_size )
    def forward(self, g_seq , h_init ):
        ##########################
        # YOUR CODE STARTS HERE
        seq_len, batch_size, hidden_size = g_seq.shape

        # Precompute V * g_t for all t (can be done in parallel)
        V_g = self.V(g_seq)  # shape: (seq_len, batch_size, hidden_size)
        h_t = h_init.squeeze(0)  # shape: (batch_size, hidden_size)
        h_seq = []

        for t in range(seq_len):
            g_transformed = V_g[t]                 # V g_t
            h_transformed = self.R(h_t)            # R h_{t-1}
            h_t = torch.relu(h_transformed * g_transformed)  # apply recurrence with element wise
            h_seq.append(h_t.unsqueeze(0))         # add time dimension back

        h_seq = torch.cat(h_seq, dim=0)  # (seq_len, batch_size, hidden_size)
        h_final = h_t.unsqueeze(0)       # (1, batch_size, hidden_size)

        # YOUR CODE ENDS HERE
        ##########################
        return h_seq , h_final

class vanilla_rnn(nn.Module):
    def __init__(self, vocab_size, hidden_size):
        super().__init__()
        self.layer1 = nn.Embedding(vocab_size, hidden_size)
        self.layer2 = new_rnn_layer(hidden_size)
        self.layer3 = nn.Linear(hidden_size, vocab_size)
    def forward(self, word_seq, h_init ):
        g_seq               =   self.layer1( word_seq )
        h_seq , h_final     =   self.layer2( g_seq , h_init )
        score_seq           =   self.layer3( h_seq )
        return score_seq,  h_final
        

# Hyper-parameters
bs = 20
vocab_size = 10000
seq_length = 25
##########################
# YOUR CODE STARTS HERE
hidden_size = 300
num_epochs = 10
my_lr = 1
# YOUR CODE ENDS HERE
##########################
# Initialize the net
net = vanilla_rnn(vocab_size, hidden_size)
# Loss
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD( net.parameters() , lr=my_lr )
# Train the RNN Model
start = time.time()
for epoch in range(num_epochs):
    running_loss=0
    num_batches=0
    h = torch.zeros(1, bs, hidden_size)
    for count in range(0, train_data.shape[0]-1-seq_length, seq_length):
        optimizer.zero_grad()
        minibatch_data =  train_data[ count   : count+seq_length   ]
        minibatch_label = train_data[ count+1 : count+seq_length+1 ]
        h=h.detach()
        h=h.requires_grad_()
        scores, h  = net( minibatch_data, h )
        scores          =            scores.view(  bs*seq_length , vocab_size)
        minibatch_label =   minibatch_label.view(  bs*seq_length )
        loss = criterion(  scores ,  minibatch_label )
        loss.backward()
        normalize_gradient(net)
        optimizer.step()
        running_loss += loss.item()
        num_batches += 1
    total_loss = running_loss/num_batches
    if total_loss>10.0: total_loss = 10.0
    elapsed = time.time()-start
    print('')
    print('epoch=',epoch, '\t time=', elapsed,'\t lr=', my_lr, '\t exp(loss)=',  math.exp(total_loss))
    test_loss = eval_on_test_set(test_data)
    # Check if the test loss is small enough
    if test_loss < 850:
        print("Well Done!")
        break

if test_loss > 850:
    print("Try again.")
    


epoch= 0 	 time= 2.931614875793457 	 lr= 1 	 exp(loss)= 2977.211918494641
test: exp(loss) =  1579.1335663025004

epoch= 1 	 time= 5.836550712585449 	 lr= 1 	 exp(loss)= 1840.5022768268361
test: exp(loss) =  1015.1690695001333

epoch= 2 	 time= 8.820572853088379 	 lr= 1 	 exp(loss)= 687.3804331232899
test: exp(loss) =  1016.2288757809229

epoch= 3 	 time= 12.08574891090393 	 lr= 1 	 exp(loss)= 461.0606639428799
test: exp(loss) =  749.4604984346275
Well Done!


***
***

## Exercise 2 : Add Coupled External Modulation to LSTM

In this exercise, you will enhance the standard LSTM layer by incorporating an **external modulation mechanism**. 

This upgraded architecture introduces an additional control signal that dynamically influences the hidden state through a learned gating mechanism.

Precisely, this extended LSTM includes the usual components of an LSTM cell - input, forget, and output gates - but introduces two key changes:
1. Coupled Input and Forget Gates: The input and forget gates are tied such that:
   $$ \eta_t = 1-\theta_t $$
2. External Modulation with Control Input: A new sequence $u_t$ (same length as the input sequence $g_t$) is used to modulate the final hidden state using a **learnable modulation gate**.

The full equations of the new modulated LSTM are:
$$
\begin{aligned}
&\tilde{h}_t = \tanh ( Rh_{t-1} + b_R + V g_t + b_V) \\
&c_t = \theta_t \odot c_{t-1} + \eta_t  \odot \tilde{h}_t  \hspace{2cm} \textrm{ long-term memory state }\\
&\bar{h}_t = \psi_t \odot \tanh (c_t)  \quad\quad\quad  \\
&h_t = \gamma_t \odot \bar{h}_t + (1-\gamma_t) \odot u_t \hspace{1cm} \textrm{ (new) short-term memory state }  \\\\
&\textrm{with the gates are defined as}\\\\
&\quad \theta_t = \textrm{sigmoid}(Ah_{t-1} + b_A + Bg_t + b_B) \quad \textrm{ (forget gate) }\\
&\quad \eta_t = 1-\theta_t \hspace{5.5cm} \textrm{ (input gate) }\\
&\quad \psi_t = \textrm{sigmoid}(Eh_{t-1} + b_E + Fg_t + b_F) \quad \textrm{ (output gate) }\\
&\quad \gamma_t = \textrm{sigmoid}(Qh_{t-1} + b_Q + Su_t + b_S) \quad \textrm{ (modulation gate) }\\
\end{aligned}
$$
where:
- $g_t$ is the word input after embedding at time step $t$,
- $u_t$ is the external modulation signal at time step $t$,
- $A, B, E, F, Q, R, S, V$ are learnable weight matrices,
- $b_A, b_B, b_E, b_F, b_Q, b_R, b_S, b_V$ are learnable biases,
- $\odot$ denotes the element-wise matrix multiplication.



In [58]:
# You must run this cell, but you don't need to expand it if you'd prefer not to see the details.

%reset -f

import torch
import torch.nn as nn
import torch.nn.functional as F
import time
import datetime
import random

print('Timestamp:',datetime.datetime.now().strftime("%y-%m-%d--%H-%M-%S"))

seed = 42
random.seed(seed)
torch.manual_seed(seed)
input_dict = torch.load('datasets/lstm_input.pt')
w_t = input_dict["word_seq"]
u_t = input_dict["control_seq"]

def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.manual_seed(seed)
        nn.init.xavier_uniform_(m.weight)
        if m.bias is not None:
            m.bias.data.fill_(0.01)

gt_output = torch.load('datasets/lstm_output.pt')
score_seq_gt = gt_output["score_seq"]
h_final_gt = gt_output["h_final"]
c_final_gt = gt_output["c_final"]

vocab_size = 100; hidden_size = 64

h_init = torch.zeros(1, 4, hidden_size)
c_init = torch.zeros(1, 4, hidden_size)


Timestamp: 25-04-14--19-37-45


In [62]:
class LSTM_modulated(nn.Module):
    
    def __init__(self, hidden_size):
        super().__init__()
        self.R = nn.Linear(hidden_size, hidden_size)
        self.V = nn.Linear(hidden_size, hidden_size)
        self.A = nn.Linear(hidden_size, hidden_size)
        self.B = nn.Linear(hidden_size, hidden_size)
        self.E = nn.Linear(hidden_size, hidden_size)
        self.F = nn.Linear(hidden_size, hidden_size)
        # For modulation mechanism
        self.Q = nn.Linear(hidden_size, hidden_size)
        self.S = nn.Linear(hidden_size, hidden_size)
        
    def forward(self, g_seq, u_seq, hc_init):
        h_init, c_init = hc_init
        h_t_pre = h_init
        c_t_pre = c_init
        ##########################
        # YOUR CODE STARTS HERE
        # h_t = h_t_pre.squeeze(0)
        # c_t = c_t_pre.squeeze(0)

        h_t = h_t_pre#.squeeze(0)
        c_t = c_t_pre#.squeeze(0)

        V_g = self.V(g_seq)
        B_g = self.B(g_seq)
        F_g = self.F(g_seq)
        S_u = self.S(u_seq)
        # YOUR CODE ENDS HERE
        ##########################
        h_seq = []
        num_t = g_seq.size(0)
        for t in range(num_t):
            ##########################
            # YOUR CODE STARTS HERE
            A_h = self.A(h_t)
            E_h = self.E(h_t)
            R_h = self.R(h_t)
            Q_h = self.Q(h_t)

            # Gates
            theta_t = torch.sigmoid(A_h + B_g[t])   # forget gate
            # print(f'theta shape: {theta_t.shape}')
            eta_t   = 1 - theta_t   # input gate    # input gate
            # eta_t   = torch.ones(theta_t.shape[0], theta_t.shape[1], theta_t.shape[2]) - theta_t   # input gate    # input gate
            # print(eta_t)
            psi_t   = torch.sigmoid(E_h + F_g[t])   # output gate
            # print(psi_t.shape)
            phi_t = torch.sigmoid(Q_h + S_u[t]) #modulation gate

            #candidates
            h_tilde = torch.tanh(R_h + V_g[t])
            c_t = (theta_t * c_t) + (eta_t * h_tilde) #long term memory state

            h_bar = psi_t * torch.tanh(c_t)

            # Hidden update
            # h_t = (phi_t * h_bar) + ((torch.ones(phi_t.shape[0], phi_t.shape[1], phi_t.shape[2])-phi_t)*h_bar)
            h_t = (phi_t * h_bar) + ((1-phi_t)*h_bar)

            # YOUR CODE ENDS HERE
            ##########################
            h_seq.append(h_t.unsqueeze(0))
            h_t_pre = h_t#.unsqueeze(0)
            c_t_pre = c_t
        h_seq = torch.cat(h_seq, dim=0)#.unsqueeze(0)
        h_final = h_seq[-1, :, :].unsqueeze(0)
        c_final = c_t#.unsqueeze(1)
        return h_seq, (h_final, c_final)

class three_layer_recurrent_net_modulated(nn.Module):
    
    def __init__(self, vocab_size, hidden_size):
        super().__init__()
        self.layer1 = nn.Linear(vocab_size, hidden_size)
        self.layer1_alt = nn.Linear(vocab_size, hidden_size)
        self.layer2 = LSTM_modulated(hidden_size)
        self.layer3 = nn.Linear(hidden_size, vocab_size)

    def forward(self, word_seq, control_seq, h_init, c_init):
        g_seq = self.layer1(word_seq)          # main input
        u_seq = self.layer1_alt(control_seq)   # control signal
        h_seq, (h_final, c_final) = self.layer2(g_seq, u_seq, (h_init, c_init))
        score_seq = self.layer3(h_seq)
        return score_seq, h_final, c_final


model = three_layer_recurrent_net_modulated(vocab_size, hidden_size)
model.apply(init_weights)

score_seq, h_final, c_final = model(w_t, u_t, h_init, c_init)
print(score_seq.shape)
print(h_final.shape)
print(c_final.shape)

if torch.allclose(score_seq_gt, score_seq, atol=1e-3):
    print("score_seq correct -- Well Done!")
else:
    print("score_seq incorrect -- Try again.") 
if torch.allclose(h_final_gt, h_final, atol=1e-3):
    print("h_final correct -- Well Done!")
else:
    print("h_final incorrect -- Try again.")
if torch.allclose(c_final_gt, c_final, atol=1e-3):
    print("c_final correct -- Well Done!")
else:
    print("c_final incorrect -- Try again.")
    

torch.Size([10, 1, 4, 100])
torch.Size([1, 1, 4, 64])
torch.Size([1, 4, 64])
score_seq incorrect -- Try again.
h_final incorrect -- Try again.
c_final incorrect -- Try again.


In [53]:
print(score_seq_gt.shape)

torch.Size([10, 1, 4, 100])


In [56]:
print(h_final_gt.shape)

torch.Size([1, 1, 4, 64])


In [57]:
print(c_final_gt.shape)

torch.Size([1, 4, 64])


***
***

## Exercise 3 : Implement Gated Attention

In this exercise, you will replace the standard softmax-based attention with a gated attention mechanism, where attention weights are computed using a sigmoid activation instead of Softmax.

As a reminder, the standard attention mechanism with masking is defined as:

$$
\begin{aligned}
\textrm{Standard-HA}(Q,K,V) &=  \textrm{Softmax}\Big( \frac{Q K^T}{\sqrt{d_\textrm{head}}} \odot \textrm{Mask}  \Big) V, \\
&\textrm{with } Q, K, V\in \mathbb{R}^{L\times d_\textrm{head}}, \textrm{Mask}\in\mathbb{R}^{L\times L}, \\
& \textrm{and }\textrm{Mask}_{ij}= 
\left\{
\begin{array}{lll}
1 & \textrm{ if attention between $i$ and $j$ }\\
- \infty & \textrm{ if no attention }
\end{array}
\right.
\end{aligned}
$$
Where:
- L is the sequence length,
- $d_\textrm{head}$ is the embedding dimension,
- and the Mask restricts attention.

In the new design of gated attention, attention weights are computed using a sigmoid function (sigmoid provides smoother attention scores), and are explicitly normalized using a normalization constant Z. The gated attention is defined as:

$$
\begin{aligned}
\textrm{Gated-HA}(Q,K,V) &=  \frac{1}{Z} \odot \textrm{Sigmoid}\Big( \frac{Q K^T}{\sqrt{d_\textrm{head}}} \odot \textrm{Mask}  \Big) V, \\
&\textrm{The terms $Q, K, V \textrm{and Mask}$ are as defined above.} \\
\end{aligned}
$$
$$
\begin{aligned}
&\textrm{Z ensures the attention weights sum to 1 row-wise (like Softmax). Thus, matrix Z is defined as:}\\ 
& Z_{i,j} = \sum_{j'=1}^L A_{i,j'} + \varepsilon, \ \forall i,j, \textrm{ with } \varepsilon=0.001 \textrm{ and } A=\textrm{Sigmoid}\Big( \frac{Q K^T}{\sqrt{d_\textrm{head}}} \odot \textrm{Mask}  \Big)\in\mathbb{R}^{L\times L} \\
\end{aligned}
$$

Your Tasks:
- Implement the new gated attention mechanism in the `Gated_AttentionHead` class.
- Choose training hyper-parameters (hidden size, num_heads, num_blocks).
- Your trained model must achieve a perplexity below 950 (i.e. exp(cross-entropy)) on the test set.
  

**Hints**:
- Apply the sigmoid function to the masked attention scores: $Q K^T / \sqrt{d_\textrm{head}} \odot \textrm{Mask} $.
- Compute the row-wise sum for normalization. You may use `torch.sum(input, dim)`. Example:
```
input_tensor = torch.tensor([[1, 2, 3],
                             [4, 5, 6]])
print('Tensor size:', input_tensor.size()) # Output: torch.Size([2, 3])
# Sum along dimension 1 (across columns), keeping the dimension 1:
sum_keepdim = torch.sum(input_tensor, dim=1, keepdim=True)
print('Tensor size:', sum_keepdim.size()) # Output: torch.Size([2, 1])
print("Row-wise sum with keepdim=True:\n", sum_keepdim) # Output: tensor([[ 6],[15]])
```
- Broadcast the division of the row-wise sum to each row of the attention matrix (so that each row is properly normalized to 1). Example:
```
attention_scores = torch.tensor([[1.0, 2.0, 3.0],
                                 [4.0, 5.0, 6.0]])
# Step 1: Row-wise sum (sum over dim=1)
row_sums = torch.sum(attention_scores, dim=1, keepdim=True)
# Step 2: Broadcast division for normalization
normalized_attention = attention_scores / row_sums
```
- Add a small $\varepsilon=0.001$ to avoid division by zero.
- As usual, multiply the normalized attention matrix by matrix V.


In [102]:
# You must run this cell, but you don't need to expand it if you'd prefer not to see the details.

%reset -f

import torch
import torch.nn.functional as F
import torch.nn as nn
import math
import time
import datetime
import os

print('Timestamp:',datetime.datetime.now().strftime("%y-%m-%d--%H-%M-%S"))

mini_train_size = 1000
mini_test_size = 250
train_data = torch.load(os.path.join('datasets/train_data.pt'))[:mini_train_size]
test_data = torch.load(os.path.join('datasets/test_data.pt'))[:mini_test_size]

bs = 20
vocab_size = 10000

def generate_positional_encoding(seq_length, dim):
    assert dim == 2* (dim//2) # check if dim is divisible by 2
    pe = torch.zeros(seq_length, dim)
    position = torch.arange(0, seq_length, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / dim))
    pe[:,0::2] = torch.sin(position * div_term)
    pe[:,1::2] = torch.cos(position * div_term)
    return pe  

def eval_on_test_set(test_data):
    net.eval()
    running_loss=0
    num_batches=0    
    for count in range(0, test_data.shape[0]-1-seq_length, seq_length):       
        minibatch_data =  test_data[ count   : count+seq_length   ]
        minibatch_label = test_data[ count+1 : count+seq_length+1 ]
        pos = generate_positional_encoding(seq_length, hidden_size)
        scores = net( minibatch_data, pos )
        minibatch_label = minibatch_label.view(  bs*seq_length ) 
        scores = scores.view(  bs*seq_length , vocab_size)
        loss = criterion(scores, minibatch_label) 
        running_loss += loss.item()
        num_batches += 1        
    total_loss = running_loss/num_batches
    print('test: exp(loss) = ', math.exp(total_loss)  )
    return math.exp(total_loss)


class MultipleAttentionHead(nn.Module):
    def __init__(self, d, num_heads):
        super().__init__()
        d_head = d // num_heads # dim_head = d // num_heads, usually dimension per head is 64
        assert d == d_head * num_heads # check divisibility
        self.MHA = nn.ModuleList([ Gated_AttentionHead(d, d_head) for _ in range(num_heads) ])
        self.WO = nn.Linear(d, d) # combination layer
        self.dropout = nn.Dropout(0.5)
    def forward(self, H): # size(H)=[batch_size, seq_length, d]
        batch_size = H.size(0); seq_length = H.size(1)
        H_heads = []
        for HA_layer in self.MHA:
            H_heads.append(HA_layer(H)) # size=[batch_size, seq_length, d_head]
        H_heads = torch.cat(H_heads, dim=2) # size=[batch_size, seq_length, d]            
        H_heads = self.dropout(H_heads) # dropout attention activations
        H = self.WO(H_heads) # size=[batch_size, seq_length, d]
        return H
        
class TransformerBlock(nn.Module):
    def __init__(self, d, num_heads):
        super().__init__()
        self.LN_MHA = nn.LayerNorm(d)
        self.LN_MLP = nn.LayerNorm(d)
        self.MHA = MultipleAttentionHead(d, num_heads)
        self.MLP = nn.Sequential(nn.Linear(d,4*d), nn.ReLU(), nn.Dropout(0.5), nn.Linear(4*d,d))        
    def forward(self, H): # size=[batch_size, seq_length, d]
        # Multiple Attention Heads w/ layer normalization (LN), residual connection (RC)
        H = H + self.MHA(self.LN_MHA(H)) # size=[batch_size, seq_length, d]
        # MLP w/ layer normalization (LN), residual connection (RC)
        H = H + self.MLP(self.LN_MLP(H)) # size=[batch_size, seq_length, d]
        return H # size=[batch_size, seq_length, d]
        
class Transformer_decoder(nn.Module):
    def __init__(self, d, num_heads, num_blocks, seq_length):
        super().__init__()
        self.TR_Blocks = nn.ModuleList([ TransformerBlock(d, num_heads) for _ in range(num_blocks) ]) 
    def forward(self, batch_seq, pos_enc):
        H = batch_seq.transpose(1,0) # size=[batch_size, seq_length, d]
        batch_size = H.size(0); batch_len = H.size(1)
        # Add positional encoding  
        pos_enc = pos_enc.unsqueeze(dim=0) # size=[1,          seq_length, d]
        H = H + pos_enc                    # size=[batch_size, seq_length, d]
        # Apply transformer blocks 
        for TR_Block in self.TR_Blocks:
            H = TR_Block(H)
        # Output
        H = H.permute(1,0,2)  # size=[batch_length, batch_size, d]
        return H # return prediction scores for next token

class ANN(nn.Module):
    def __init__(self, d, num_heads, num_blocks, seq_length):
        super(ANN, self).__init__()
        self.decoder = Transformer_decoder(d, num_heads, num_blocks, seq_length)
    def forward(self, g_seq , pos ):
        h_dec_seq = self.decoder( g_seq , pos )
        return h_dec_seq 
    
class attention_net(nn.Module):
    def __init__(self, d, num_heads, num_blocks, seq_length):
        super(attention_net, self).__init__()  
        self.layer1 = nn.Embedding( vocab_size  , hidden_size  )
        self.layer2 = ANN(d, num_heads, num_blocks, seq_length)
        self.layer3 = nn.Linear(    hidden_size , vocab_size   )
    def forward(self, word_seq, pos ):
        g_seq     =   self.layer1( word_seq ) # size=(seq_length, bs, hidden_dim) 
        h_seq     =   self.layer2( g_seq , pos ) # size=(seq_length, bs, hidden_dim) 
        score_seq =   self.layer3( h_seq ) # size=(seq_length, bs, vocab_size)
        return score_seq 



Timestamp: 25-04-14--20-03-19


In [126]:
attention_scores = torch.tensor([[1.0, 2.0, 3.0],
                                 [4.0, 5.0, 6.0]])
# Step 1: Row-wise sum (sum over dim=1)
print(attention_scores)
row_sums = torch.sum(attention_scores, dim=1, keepdim=True)
print(row_sums)
# Step 2: Broadcast division for normalization
normalized_attention = attention_scores / row_sums
print(normalized_attention)

tensor([[1., 2., 3.],
        [4., 5., 6.]])
tensor([[ 6.],
        [15.]])
tensor([[0.1667, 0.3333, 0.5000],
        [0.2667, 0.3333, 0.4000]])


In [123]:
x = torch.tensor([
    [[1,2,3], 
     [1,2,3],
     [1,2,3]], 

    [[1,2,3], 
     [1,2,3],
     [1,2,3]],

    ])
print(x.shape)
# print(x.view(x.shape(0)))

row = torch.sum(x, dim = 1, keep_dim = True)
print(row)
# print(row.shape)

# for i in range(row.shape[0]):
#     x[]

torch.Size([2, 3, 3])


TypeError: sum() received an invalid combination of arguments - got (Tensor, keep_dim=bool, dim=int), but expected one of:
 * (Tensor input, *, torch.dtype dtype)
      didn't match because some of the keywords were incorrect: keep_dim, dim
 * (Tensor input, tuple of ints dim, bool keepdim, *, torch.dtype dtype, Tensor out)
 * (Tensor input, tuple of names dim, bool keepdim, *, torch.dtype dtype, Tensor out)


In [130]:
class Gated_AttentionHead(nn.Module):
    
    def __init__(self, d, d_head):
        super().__init__()
        self.LN_MHA = nn.LayerNorm(d_head)
        self.LN_MLP = nn.LayerNorm(d_head)
        self.query = nn.Linear(d, d_head, bias=False) # query embedding layer
        self.key = nn.Linear(d, d_head, bias=False) # key embedding layer
        self.value = nn.Linear(d, d_head) # value embedding layer
        
    def forward(self, H): # size(H)=[batch_size, seq_length, d]
        batch_size = H.size(0); batch_len = H.size(1)
        # Compute a single attention head H = Softmax( QK^T / d^0.5 ) V
        Q = self.query(H) # size=[batch_size, batch_length, d]        
        K = self.key(H) # size=[batch_size, batch_length, d]
        V = self.value(H) # size=[batch_size, batch_length, d]
        # Gated attention
        ##########################
        # YOUR CODE STARTS HERE
        attention_score = Q @ K.transpose(2,1) * H.size(2)**-0.5 # QK^T/sqrt(d), (B,L,d) @ (B,d,L) => (B,L,L), size=[batch_size, batch_length, batch_length)
        mask = torch.tril(torch.ones(batch_len,batch_len)).long() # mask to use previous tokens only : { token(<=t) }, size=[batch_len,batch_len]
        attention_score = attention_score.masked_fill(mask==0, value=float('-inf')) # softmax(-inf)=0 prevents using next tokens for prediction, size=(batch_size, batch_len, batch_len)
        attention_score = torch.sigmoid(attention_score) # sum weights = 1, size=[batch_size, batch_length, batch_len)
        # print(attention_score.shape)
        row_sums = torch.sum(attention_scores, dim=1, keepdim=False)
        normalized_attention = attention_scores / (row_sums + 1e-6)
        H_HA = normalized_attention @ V # softmax( QK^T / sqrt(d) ) V, (B,L,L) @ (B,L,d) => (B,L,d), size=[batch_size, batch_length, d)
        # YOUR CODE ENDS HERE
        ##########################
        return H_HA # return prediction scores for next token


# Instantiate network
seq_length = 50
##########################
# YOUR CODE STARTS HERE
hidden_size = 150
num_heads = 10
num_blocks = 3
# YOUR CODE ENDS HERE
##########################
net = attention_net(hidden_size, num_heads, num_blocks, seq_length)
pos = generate_positional_encoding(seq_length, hidden_size) # size=(seq_length, hidden_dim)

# Optimization
my_lr = 0.001
optimizer = torch.optim.Adam(net.parameters(), lr=my_lr)
criterion = nn.CrossEntropyLoss()

# Train
start = time.time()
for epoch in range(10):
    running_loss=0
    num_batches=0    
    for count in range(0, train_data.shape[0]-1-seq_length, seq_length):
        optimizer.zero_grad()
        minibatch_data = train_data[ count   : count+seq_length   ]
        minibatch_label = train_data[ count+1 : count+seq_length+1 ]    
        pos = generate_positional_encoding(seq_length, hidden_size) # size=(seq_length, hidden_dim)    
        scores = net( minibatch_data, pos ) # size=(seq_length, bs, vocab_size)
        scores = scores.view(  bs*seq_length , vocab_size) # size=(seq_length/2.bs, vocab_size)
        minibatch_label = minibatch_label.view(  bs*seq_length ) # size=(seq_length/2.bs, vocab_size)
        loss = criterion(scores, minibatch_label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        num_batches += 1
    total_loss = running_loss/num_batches
    elapsed = time.time()-start
    
    print('')
    print('epoch=',epoch, '\t time=', elapsed,'\t lr=', my_lr, '\t exp(loss)=',  math.exp(total_loss))
    test_loss = eval_on_test_set(test_data) 
    
    # Check if the test loss is small enough
    if test_loss < 950:
        print("Well Done!")
        break

if test_loss > 950:
    print("Try again.")


RuntimeError: The size of tensor a (3) must match the size of tensor b (2) at non-singleton dimension 1

***
***

## Exercise 4 : Implement Window Attention

Window Attention is a variant of causal attention where each token attends only to a limited local window of preceding tokens, instead of all previous tokens in the sequence. This localized attention reduces computational overhead and focuses the model’s capacity on nearby context.

In this exercise, you will implement window attention with a fixed window size of 3. This means that each token can only attend to itself and the two preceding tokens. See figure below: 
<center>
<img src="pic/window_attention.png" style="height:350px"/>
</center>

As a reminder, we define the masked attention with the following formula:
$$
\begin{aligned}
\textrm{Mask-HA}(Q,K,V) &=  \textrm{Softmax}\Big( \frac{Q K^T}{\sqrt{d_\textrm{head}}} \odot \textrm{Mask}  \Big) V, \\
&\textrm{with } Q, K, V\in\mathbb{R}^{L\times d_\textrm{head}}, \textrm{Mask}\in\mathbb{R}^{L\times L}, \\
& \textrm{and }\textrm{Mask}_{ij}= 
\left\{
\begin{array}{lll}
1 & \textrm{ if attention between $i$ and $j$ }\\
- \infty & \textrm{ if no attention }
\end{array}
\right.
\end{aligned}
$$
Where:
- L is the sequence length,
- $d_\textrm{head}$ is the embedding dimension,
- and the Mask restricts attention.


**Hints**:
1. First, compute the standard attention score matrix: $Q K^T / \sqrt{d_\textrm{head}}$.
2. Then apply the window mask to allow attention only within a window of size 3 of previous tokens.
3. You may use `torch.tril()` to build the window mask. Examples:
```
a = torch.tensor([[ 1, 2, 3],
                  [ 4, 5, 6],
                  [ 7, 8, 9]])
torch.tril(a, diagonal=0)
>>> torch.tensor([[ 1, 0, 0],)
                  [ 4, 5, 0],
                  [ 7, 8, 9]])
torch.tril(a, diagonal=1)
 >>> torch.tensor([[ 1, 2, 0],)
                   [ 4, 5, 6],
                   [ 7, 8, 9]])
torch.tril(a, diagonal=-1)
>>> torch.tensor([[ 0, 0, 0],)
                  [ 4, 0, 0],
                  [ 7, 8, 0]])
```


In [100]:
%reset -f

import datetime
import torch
import torch.nn as nn

print('Timestamp:',datetime.datetime.now().strftime("%y-%m-%d--%H-%M-%S"))

class Window_Attention(nn.Module):
    
    def __init__(self, d, d_head, window_size):
        super().__init__()
        self.LN_MHA = nn.LayerNorm(d_head)
        self.LN_MLP = nn.LayerNorm(d_head)
        self.query = nn.Linear(d, d_head, bias=False)
        self.key = nn.Linear(d, d_head, bias=False)
        self.value = nn.Linear(d, d_head)
        self.window_size = window_size # new hyper-parameter
    
    def forward(self, H):
        batch_size = H.size(0); batch_len = H.size(1)
        ##########################
        # YOUR CODE STARTS HERE
        # Masked self-attention decoder
        # Compute a single attention head H = Softmax( QK^T / d^0.5 ) V
        Q = self.query(H) # size=[batch_size, batch_length, d] 
        K = self.key(H) # size=[batch_size, batch_length, d]
        V = self.value(H) # size=[batch_size, batch_length, d]
        attention_score = Q @ K.transpose(2,1) * H.size(2)**-0.5 # QK^T/sqrt(d), (B,L,d) @ (B,d,L) => (B,L,L), size=[batch_size, batch_length, batch_length)
        mask = torch.tril(torch.ones(batch_len,batch_len)).long() # mask to use previous tokens only : { token(<=t) }, size=[batch_len,batch_len]
        new_window_mask = torch.tril(torch.ones(batch_len,batch_len), diagonal = -1*window_size).long()
        
        attention_score = attention_score.masked_fill(mask==0, value=float('-inf')) # softmax(-inf)=0 prevents using next tokens for prediction, size=(batch_size, batch_len, batch_len)
        attention_score = attention_score.masked_fill(new_window_mask==1, value=float('-inf')) # softmax(-inf)=0 prevents using next tokens for prediction, size=(batch_size, batch_len, batch_len)
        # print(attention_score.shape)
        # print(attention_score)
        attention_score = torch.softmax(attention_score, dim=2) # sum weights = 1, size=[batch_size, batch_length, batch_len)
        H_HA = attention_score @ V # softmax( QK^T / sqrt(d) ) V, (B,L,L) @ (B,L,d) => (B,L,d), size=[batch_size, batch_length, d)
        return H_HA # return prediction scores for next token
        # YOUR CODE ENDS HERE
        ##########################
        return H_HA


# Instantiate the window attention layer
d = 32
d_head = 16
window_size = 3
WA = Window_Attention(d, d_head, window_size)

# Evaluation of your window attention
WA.load_state_dict(torch.load('datasets/wa_model_weights.pt')) # load pre-defined parameters of the window attention layer
H_in = torch.load('datasets/wa_input.pt') # load pre-defined input tensor
H_out = WA(H_in) # output of forward pass 
H_gt = torch.load('datasets/wa_gt.pt') # exact output solution
if torch.sum(torch.abs(H_out - H_gt)) < 1e-4: # check output of forward pass w.r.t. exact solution
    print("Well Done!")
else:
    print("Try Again.")


Timestamp: 25-04-14--19-59-51
Well Done!


***
***

## Exercise 5 : Vectorized Multi-Head Attention

During a tutorial, you implemented Multi-Head Attention (MHA) by explicitly **looping** over each attention head. 

While this approach is functionally correct, it is computationally inefficient and not scalable for larger models.

Your goal in this exercise is to re-implement Multi-Head Attention in a fully vectorized form — **without** using any for loops, i.e. all heads are processed in parallel.


**Hints:**
- For each of the query, key and value matrix transformations, you may use an implementation that only requires a single $d\times d$ linear transformation, instead of $\textrm{num\_heads}$ linear transformations of size  $\ d \times (d/\textrm{num\_heads})$ . 
- Use a bias (`bias=True`) for the query, key and value linear transformations.
- You may use `torch.reshape(input, shape)` that returns a tensor with the same data and same number of elements as `input`, but with the specified `shape`. Example:
```
x = torch.tensor([[1, 2, 3, 4, 5, 6],
                  [7, 8, 9, 10, 11, 12]])
print("Original shape:", x.size())
# Output: torch.Size([2, 6])
y = torch.reshape(x, (3, 4))
print(y)
print("New shape:", y.size())
# New shape: torch.Size([3, 4])
```
- You may use `torch.permute(input, dims)` that returns the original tensor input but with its dimensions permuted. Example:
```
x = torch.randn(2, 3, 4)  # e.g., (batch, channels, features)
print("Original shape:", x.size())
# Output: torch.Size([2, 3, 4])
# Permute dimensions to shape (3, 2, 4)
y = x.permute(1, 0, 2)
print("Permuted shape:", y.size())
# Output: torch.Size([3, 2, 4])
```


In [None]:
%reset -f

import datetime
import torch

print('Timestamp:',datetime.datetime.now().strftime("%y-%m-%d--%H-%M-%S"))

class your_MultipleAttentionHead(torch.nn.Module):
    
    def __init__(self, d, num_heads):
        
        super().__init__()
        ##########################
        # YOUR CODE STARTS HERE
        self.num_heads = num_heads
        self.d_head = d // num_heads
        assert d == self.d_head * self.num_heads # check divisibility
        # Instead of have num_heads matrices of size (d, d_head), we simply use a single (d, d) matrix, and each head would take their respective dimensions
        self.query =   # query embedding layer
        self.key   =   # key embedding layer
        self.value =   # value embedding layer
        # YOUR CODE ENDS HERE
        ##########################

    def forward(self, Q, K, V):
        ##########################
        # YOUR CODE STARTS HERE
        H = 
        # YOUR CODE ENDS HERE
        ##########################
        return H


batch_size, seq_length, d, num_heads = 32, 100, 256, 8
x = torch.randn(batch_size, seq_length, d)

your_implementation = your_MultipleAttentionHead(d, num_heads)


# Evaluate your implementation vs. PyTorch implementation
correct_implementation = torch.nn.MultiheadAttention(embed_dim=d, num_heads=num_heads, bias=True, batch_first=True)
with torch.no_grad():
    # Copy-paste linear matrices from your implementation to the PyTorch implementation
    in_proj_weight_custom = torch.cat([your_implementation.query.weight, your_implementation.key.weight, your_implementation.value.weight], dim=0)
    correct_implementation.in_proj_weight.copy_(in_proj_weight_custom)
    # Copy-paste linear biases from your implementation to PyTorch implementation
    in_proj_bias_custom = torch.cat([your_implementation.query.bias, your_implementation.key.bias, your_implementation.value.bias], dim=0)
    correct_implementation.in_proj_bias.copy_(in_proj_bias_custom)
    # Make the output projection layer of the PyTorch implementation as Identity transformation
    correct_implementation.out_proj.weight.copy_(torch.eye(d))
    correct_implementation.out_proj.bias.zero_()
if torch.allclose(your_implementation(x, x, x), correct_implementation(x, x, x)[0], atol=1e-3):
    print("Well Done!")
else:
    print("Try Again.")


## End of coding test