* The objective of this assignments is to build the **Decoder** part of the Transformer architecture.
* We will be using the **PyTorch** framework to implement the following components
  * Decoder Layer that contains
    * Multi-Head Masked Attention (MHMA) Module
    * Multi-Head Cross Attention (MHMA) Module
    * Position-wise Feed Forward Neural Network

  * Implement CLM

* **DO NOT** USE Built-in **TRANSFORMER LAYERS** as it affects the reproducibility.

* You will be given with a configuration file that contains information on various hyperparameters such as embedding dimension, vocabulary size,number heads and so on

* Use ReLU activation function and Stochastic Gradient Descent optimizer
* Here are a list of helpful Pytorch functions (does not mean you have to use all of them) for this subsequent assignments
  * [torch.matmul](https://pytorch.org/docs/stable/generated/torch.matmul.html#torch-matmul)
  * [torch.bmm](https://pytorch.org/docs/stable/generated/torch.bmm.html)
  * torch.swapdims
  * torch.unsqueeze
  * torch.squeeze
  * torch.argmax
  * [torch.Tensor.view](https://pytorch.org/docs/stable/generated/torch.Tensor.view.html)
  * [torch.nn.Embedding](https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html)
  * [torch.nn.Parameter](https://pytorch.org/docs/stable/generated/torch.nn.parameter.Parameter.html)
  * torch.nn.Linear
  * torch.nn.LayerNorm
  * torch.nn.ModuleList
  * torch.nn.Sequential
  * [torch.nn.CrossEntropyLoss](https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html)
  
* Important: Do not set any global seeds.

* Helpful resources to get started with

 * [Andrej Karpathys Nano GPT](https://github.com/karpathy/nanoGPT)
 * [PyTorch Source code of Transformer Layer](https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html)



In [257]:
import torch
from torch import Tensor

import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
from torch.nn.functional import one_hot

import torch.optim as optim

from  pprint import pprint
from yaml import safe_load
import copy
import requests
from io import BytesIO

In [258]:
#do not edit this cell
config_url = "https://raw.githubusercontent.com/Arunprakash-A/LLM-from-scratch-PyTorch/main/config_files/dec_config.yml"
response = requests.get(config_url)
config = response.content.decode("utf-8")
config = safe_load(config)
pprint(config)

{'input': {'batch_size': 10, 'embed_dim': 32, 'seq_len': 8, 'vocab_size': 12},
 'model': {'d_ff': 128,
           'd_model': 32,
           'dk': 4,
           'dq': 4,
           'dv': 4,
           'n_heads': 8,
           'n_layers': 6}}


In [259]:
vocab_size = config['input']['vocab_size']
batch_size = config['input']['batch_size']
seq_len = config['input']['seq_len']
embed_dim = config['input']['embed_dim']
dmodel = embed_dim
dq = torch.tensor(config['model']['dq'])
dk = torch.tensor(config['model']['dk'])
dv = torch.tensor(config['model']['dv'])
heads = torch.tensor(config['model']['n_heads'])
d_ff = config['model']['d_ff']

# Input tokens

* Generate a raw_input ids (without any special tokens appended to it)

* Since we will be using this as label after adding the special  \<start\> token, we use the variable name "label_ids"

* Keep the size of the `label_ids=(bs,seq_len-1)` as we insert a special token ids in the next step

In [260]:
# do not edit this cell
data_url = 'https://github.com/Arunprakash-A/LLM-from-scratch-PyTorch/raw/main/config_files/w2_input_tokens'
r = requests.get(data_url)
label_ids = torch.load(BytesIO(r.content))

* Let the first token_id be be a special `[start]` token (mapped to integer 0)
* If label_ids=$\begin{bmatrix}1&2\\3&4 \end{bmatrix}$, then we modify it as $\begin{bmatrix}0&1&2\\0&3&4 \end{bmatrix}$

In [261]:
label_ids

tensor([[ 7,  8,  7,  7,  9,  2,  6],
        [10,  1, 10,  5,  3,  6,  8],
        [ 3,  4,  8,  2, 10, 10, 10],
        [ 4, 10,  1,  3,  4,  9,  7],
        [ 8,  4,  7,  3,  8, 10,  5],
        [ 9,  1,  8,  5,  9,  9, 10],
        [ 7,  3,  8,  2,  5,  1,  5],
        [ 3,  3,  2,  1,  4,  1,  1],
        [10,  9,  9,  9,  6,  9,  2],
        [ 3,  6,  6,  3,  5,  4,  5]])

In [262]:
token_ids = torch.cat((torch.zeros(label_ids.size(0), 1, dtype=label_ids.dtype), label_ids), dim=1) #the first column of token_ids should be zeros and the rest of the columns come from label_ids

token_ids

tensor([[ 0,  7,  8,  7,  7,  9,  2,  6],
        [ 0, 10,  1, 10,  5,  3,  6,  8],
        [ 0,  3,  4,  8,  2, 10, 10, 10],
        [ 0,  4, 10,  1,  3,  4,  9,  7],
        [ 0,  8,  4,  7,  3,  8, 10,  5],
        [ 0,  9,  1,  8,  5,  9,  9, 10],
        [ 0,  7,  3,  8,  2,  5,  1,  5],
        [ 0,  3,  3,  2,  1,  4,  1,  1],
        [ 0, 10,  9,  9,  9,  6,  9,  2],
        [ 0,  3,  6,  6,  3,  5,  4,  5]])

# Implement the following components of a decoder layer

 * Multi-head Masked Attention (MHMA)
 * Multi-head Cross Attention (MHCA)
 * Postion-wise FFN

* Randomly initialize the parameters using normal distribution with the following seed values
  * $W_Q:$(seed=43)
  * $W_K:$(seed=44)
  * $W_V:$(seed=45)
  * $W_O:$(seed=46)

* Remember that, Multi-head cross atention takes two represnetation. One is the encoder output and the other one is the output from masked attetnion sub-layer.

* However, in this assignment, we will fix it to a random matrix.

In [263]:
class MHCA(nn.Module):

  def __init__(self,dmodel,dq,dk,dv,heads):
    super(MHCA,self).__init__()
    self.dmodel = dmodel
    self.dq = dq
    self.dk = dk
    self.dv = dv
    self.heads = heads


    self.W_q = nn.Linear(dmodel, dq * heads, bias=b)
    self.W_k = nn.Linear(dmodel, dk * heads, bias=b)
    self.W_v = nn.Linear(dmodel, dv * heads, bias=b)
    self.W_o = nn.Linear(dv * heads, dmodel, bias=b)

    torch.manual_seed(43)
    nn.init.normal_(self.W_q.weight)
    if b:
        nn.init.normal_(self.W_q.bias)

    torch.manual_seed(44)
    nn.init.normal_(self.W_k.weight)
    if b:
        nn.init.normal_(self.W_k.bias)

    torch.manual_seed(45)
    nn.init.normal_(self.W_v.weight)
    if b:
        nn.init.normal_(self.W_v.bias)

    torch.manual_seed(46)
    nn.init.normal_(self.W_o.weight)
    if b:
        nn.init.normal_(self.W_o.bias)



     # your method definitions go here (if you want to)

  def forward(self,Q_input,H=None):
    BS, T_dec, _ = Q_input.size()

    BS, T_enc, _ = H.size()
    Q = self.W_q(Q_input)
    K = self.W_k(H)
    V = self.W_v(H)


    Q = Q.view(BS, T_dec, self.heads, self.dq).transpose(1, 2)
    K = K.view(BS, T_enc, self.heads, self.dk).transpose(1, 2)
    V = V.view(BS, T_enc, self.heads, self.dv).transpose(1, 2)


    # attention
    scores = torch.matmul(Q, K.transpose(-2,-1)) / (self.dk ** 0.5)

    # print(scores.shape,'scores shape')
    attn_weights = F.softmax(scores, dim=-1)
    # print(attn_weights.shape,'attention weights')
    context = torch.matmul(attn_weights, V)
    # print(context.shape,'context shape')

    # concat heads
    context = context.transpose(1, 2).contiguous().view(BS, T_dec, self.dv*self.heads)

    out = self.W_o(context)


    return out

* By default, `mask=None`. Therefore, create and apply the mask while computing the attention scores


In [264]:
b=False
class MHMA(nn.Module):

  def __init__(self,dmodel,dq,dk,dv,heads,mask=None):
    super(MHMA,self).__init__()
    # your code goes here
    self.dmodel = dmodel
    self.dq = dq
    self.dk = dk
    self.dv = dv
    self.heads = heads


    self.W_q = nn.Linear(dmodel, dq * heads, bias=b)
    self.W_k = nn.Linear(dmodel, dk * heads, bias=b)
    self.W_v = nn.Linear(dmodel, dv * heads, bias=b)
    self.W_o = nn.Linear(dv * heads, dmodel, bias=b)

    torch.manual_seed(43)
    nn.init.normal_(self.W_q.weight)
    if b:
        nn.init.normal_(self.W_q.bias)

    torch.manual_seed(44)
    nn.init.normal_(self.W_k.weight)
    if b:
        nn.init.normal_(self.W_k.bias)

    torch.manual_seed(45)
    nn.init.normal_(self.W_v.weight)
    if b:
        nn.init.normal_(self.W_v.bias)

    torch.manual_seed(46)
    nn.init.normal_(self.W_o.weight)
    if b:
        nn.init.normal_(self.W_o.bias)



  def forward(self,H=None):
    # implement forward method

    BS, T, _ = H.size()
    Q = self.W_q(H)
    K = self.W_k(H)
    V = self.W_v(H)
    Q = Q.view(BS, T, self.heads, self.dq).transpose(1, 2)
    K = K.view(BS, T, self.heads, self.dk).transpose(1, 2)
    V = V.view(BS, T, self.heads, self.dv).transpose(1, 2)



    scores = torch.matmul(Q, K.transpose(-2,-1)) / (self.dk ** 0.5)
    mask = torch.tril(torch.ones(T, T, device=H.device)).unsqueeze(0).unsqueeze(0)
    scores = scores.masked_fill(mask == 0, float('-inf'))

    # print(scores.shape,'scores shape')
    attn_weights = F.softmax(scores, dim=-1)
    # print(attn_weights.shape,'attention weights')
    context = torch.matmul(attn_weights, V)
    # print(context.shape,'context shape')

    # concat heads
    context = context.transpose(1, 2).contiguous().view(BS, T, self.dv*self.heads)

    # output
    out = self.W_o(context)

    return out

* Implement the FFN and OutputLayer modules (same as the one you implemented for encoder)

In [265]:
class FFN(nn.Module):
  def __init__(self,dmodel,d_ff):
    super(FFN,self).__init__()

    self.fc1 = nn.Linear(dmodel, d_ff, bias=b)
    self.fc2 = nn.Linear(d_ff, dmodel, bias=b)

    torch.manual_seed(47)
    nn.init.normal_(self.fc1.weight)
    if b:
        nn.init.normal_(self.fc1.bias)

    torch.manual_seed(48)
    nn.init.normal_(self.fc2.weight)
    if b:
        nn.init.normal_(self.fc2.bias)

  def forward(self,x):
    out = self.fc1(x)
    out = F.relu(out)
    out = self.fc2(out)
    return out

In [266]:
class OutputLayer(nn.Module):

  def __init__(self,dmodel,vocab_size):
    super(OutputLayer,self).__init__()
    self.proj = nn.Linear(dmodel, vocab_size,bias=b)



    torch.manual_seed(49)
    nn.init.normal_(self.proj.weight)
    if b:
      nn.init.normal_(self.proj.bias)

  def forward(self,representations):
    out = self.proj(representations)
    return out

* Implement the final decoder layer.

In [267]:
class DecoderLayer(nn.Module):

  def __init__(self,dmodel,dq,dk,dv,d_ff,heads,mask=None):
    super(DecoderLayer,self).__init__()
    self.mhma = MHMA(dmodel,dq,dk,dv,heads,mask=None)
    self.mhca = MHCA(dmodel,dq,dk,dv,heads)
    self.layer_norm_mhma = torch.nn.LayerNorm(dmodel)
    self.layer_norm_mhca = torch.nn.LayerNorm(dmodel)
    self.layer_norm_ffn = torch.nn.LayerNorm(dmodel)
    self.ffn = FFN(dmodel,d_ff)

  def forward(self,x,enc_rep):
    # print(x.shape,'encoder input')
    attn_out = self.mhma(x)
    # print('mha output created')


    x = self.layer_norm_mhma(x + attn_out)


    c_attn_out = self.mhca(x,enc_rep)
    x = self.layer_norm_mhca(x + c_attn_out)


    # ffn out with layernorm
    ffn_out = self.ffn(x)
    out = self.layer_norm_ffn(x + ffn_out)

    return out

* Create an embedding layer that takes in token_ids and return embeddings for the token_ids

 * Use seed value: 70

In [268]:
class Embed(nn.Module):

  def __init__(self,vocab_size,embed_dim):
    super(Embed,self).__init__()

    self.embed= nn.Embedding(vocab_size, embed_dim)
    torch.manual_seed(70)
    nn.init.normal_(self.embed.weight)


  def forward(self,x):
    out = self.embed(x)
    return out

# Decoder

 * Implement the decoder that has `num_layers` decoder layers

In [269]:
class Decoder(nn.Module):

  def __init__(self,vocab_size,dmodel,dq,dk,dv,d_ff,heads,mask,num_layers=1):
    super(Decoder,self).__init__()
    self.embed_lookup =  Embed(vocab_size, dmodel)
    self.dec_layers = nn.ModuleList([
      DecoderLayer(dmodel, dq, dk, dv, d_ff, heads, mask=mask)
      for _ in range(num_layers)
    ])
    self.out = OutputLayer(dmodel, vocab_size,)

  def forward(self,enc_rep,tar_token_ids):


    x = self.embed_lookup(tar_token_ids)


    for layer in self.dec_layers:
      # print('aaa')
      x = layer(enc_rep,x )

    out = self.out(x)

    return out

* Representation from encoder

 * Since all the decoder layers require the representation from the encoder to compute cross attention, we are going to feed in the random values (Note, it does not require gradient during training)

In [270]:
# do not edit this
enc_rep = torch.randn(size=(batch_size,seq_len,embed_dim),generator=torch.random.manual_seed(10))

In [271]:
enc_rep.shape

torch.Size([10, 8, 32])

# Instantiate the model

In [272]:
model = Decoder(vocab_size,dmodel,dq,dk,dv,d_ff,heads,mask=None)

In [273]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

In [274]:
def train(enc_rep,tar_token_ids,label_ids,epochs=1000):
  loss_trace = []
  for epoch in range(epochs):
    out = model(enc_rep,tar_token_ids)
    loss =F.cross_entropy(out.view(-1, out.size(-1)), tar_token_ids.view(-1))
    loss_trace.append(loss)
    loss.backward()

    #update parameters
    optimizer.step()
    optimizer.zero_grad()

  return loss_trace[-1],model

* Train the model for 1000 epochs

In [275]:
loss,model = train(enc_rep,token_ids,label_ids,1000)

In [276]:
with torch.inference_mode():
  predictions = torch.argmax(model(enc_rep,token_ids),dim=-1)

* The loss will be around 0.17 after 1000 epochs

In [277]:
print('LOSS : ',float(loss))

LOSS :  0.12169989198446274


In [278]:
# number of correct predictions
print('Number of correct predictions :',int(torch.count_nonzero(label_ids==predictions[:,1:])))

Number of correct predictions : 70


* THe number of correct predictions is close to 66