#CS 4464/7643 Deep Learning HW 3
Transformers and Language Modeling
In this exercise you will implement a Transformer model and several variants such as Encoder Transformers, Decoder Transformers, and Encoder-Decoder transformers.

You will then use these as the basis to train a (small) Language Model from scratch on Google Colab.

In [None]:
#@title Colab Setup
!pip install datasets
!pip install tokenizers
!pip install sacrebleu
!pip install colab-convert
!rm -rf gtGPT/
!rm -rf gtgpt
!git clone https://github.com/Helw150/gtGPT gtGPT
!mv gtGPT/gtgpt/ .

from gtgpt.utils import set_seed

set_seed(3407)

In [1]:
from gtgpt.utils import set_seed

set_seed(3407)

In [2]:
#export
import os
os.environ["CUBLAS_WORKSPACE_CONFIG"]=":4096:2"
os.environ["CUBLAS_WORKSPACE_CONFIG"]=":16:8"
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from gtgpt.model import DummyMultiHeadedSelfAttention, DummyBlock, DummyTransformer, DummyEmbedding
from gtgpt.utils import set_seed
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.set_default_device(DEVICE)

# #Do not change, it will break the AutoGrader
# setup_block = In[-1]

#### Embeddings

We will first format our input embeddings similarly to how they are constructed in [BERT](https://arxiv.org/pdf/1810.04805.pdf).

Recall from lecture that unlike a RNN, a Transformer does not inherently capture positional information in the forward pass. Because of this, we need to add a signal which encodes the position of each token in its embedding.

Your first task is to implement the embedding lookup, including the addition of positional encodings. We have already provided the neccesary parameters inside of `DummyEmbedding`.

```python
self.vocab_embeddings = nn.Embedding(config.vocab_size, config.n_embd)
self.position_embeddings = nn.Embedding(config.block_size, config.n_embd)
```

In [3]:
#export
class Embedding(DummyEmbedding):
    def forward(self, idx):
        """
        :param idx: intTensor of shape (B,T)
        :returns embeddings: floatTensor of shape (B,T,n_embd)
        """
        B, T = idx.size()
        embeddings = None
        #############################################################################
        # TODO:
        # Implement the embedding lookup.                                           #
        #                                                                           #
        # This will take a few lines.                                               #
        #############################################################################

        # print(idx, idx.size())
        # # print(B)
        # # print(T)

        # # n_embeddings = 3    # From BERT paper

        ## From Piazza: idx = word indices; vocab_emb = embeddings for the words;
        # pos_emb = embeddings for relative positions

        emb_v = self.vocab_embeddings(idx)
        # emb_p = self.position_embeddings(idx[B])
        # print(emb_v.size())

        # # emb_p = torch.zeros_like(emb_v)
        # # # print('idx', idx.squeeze().tolist())
        # # # list_of_indices = idx.squeeze().tolist()
        # # # for i in list_of_indices:
        # # for i, word_index in enumerate(idx.squeeze()):
        # #     # print(i, word_index)
        # #     ii = torch.tensor(i)
        # #     # print('ii', ii)
        # #     emb_p[:, i, :] = self.position_embeddings(ii)
        # #     # print('i=', i, '; emb_p=', emb_p[:, i, :])

        # # # print(emb_v)
        # # # print(emb_p)
        emb_p = self.position_embeddings(torch.arange(T, device=idx.device))[None, :] # Problem solved 

        embeddings = emb_v + emb_p
        # print(embeddings)

        ##############################################################################
        #                               END OF YOUR CODE                             #
        ##############################################################################
        return embeddings


# #Do not change, it will break the AutoGrader
# embedding_def = In[-1]

#### Basic Embedding Test

In [4]:
#@title Basic Embedding Test

def test_embedding():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_embd = 1
  torch.set_default_device("cpu")
  set_seed(3047)
  embedding = Embedding(config)
  embedding.vocab_embeddings.weight = torch.nn.Parameter(torch.arange(0, 10, dtype=torch.float).reshape(10, 1))
  embedding.position_embeddings.weight = torch.nn.Parameter(torch.arange(0, 10, dtype=torch.float).reshape(10, 1))
  assert torch.allclose(embedding(torch.tensor([[1, 2, 3]])), torch.tensor([1, 3, 5], dtype=torch.float).reshape(1, 3, 1))

test_embedding()

#### 3.2 Multi-head Self-Attention
Attention can be computed in matrix-form using the following formula:

$Attention(Q, K, V) = softmax(\frac{QK^T}{\sqrt{d_k}})V$

We want to have multiple self-attention operations. Each of these is called a head with each head applied to some portion of the input.

$head_i = Attention(W_Q X_i, W_K X_i, W_V X_i)$

Here, we'll use GPT-style Multi-headed Self-Attention which fragments the head into pieces and applies one head to each fragment. The fragments are then concatenated together to reconstruct the transformed input and projected with a feed-forward layer.

$MultiHead(Q, K, V) = Concat(head_1, ..., head_h)W^O$

Note that while this is "Multi-head", all heads can be computed in parallel with a single matrix multiplication. You can find an in-depth description of this in the reference linked in the code.


We provide the needed weights in `DummyMultiHeadedSelfAttention`

```python
# Note that we need this to be true in GPT-style MHA
# Knowing this might come in handy :)
assert config.n_embd % config.n_head == 0

# Note: These could be a single batched linear layer
# but we separate them for simplicity of implementation.
self.k = nn.Linear(config.n_embd, config.n_embd)
self.v = nn.Linear(config.n_embd, config.n_embd)
self.q = nn.Linear(config.n_embd, config.n_embd)
# output projection
self.c_proj = nn.Linear(config.n_embd, config.n_embd)
# regularization
self.attn_dropout = nn.Dropout(config.attn_pdrop)
self.hidden_dropout = nn.Dropout(config.hidden_pdrop)

self.n_head = config.n_head
self.n_embd = config.n_embd
```




In [5]:
#export
class GenericSelfAttention(DummyMultiHeadedSelfAttention):
    def forward(self, x, attention_mask):
        """
        :param x: float Tensor of shape (batch size, sequence length, embedding dimensionality)
        :param attention_mask: int Tensor of shape (batch size, 1, sequence length, sequence_length)
        :returns y: float Tensor of shape (batch size, sequence length, embedding dimensionality)
        """
        B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)
        y = None

        #############################################################################
        # TODO:                                                                     #
        # Implement multi-headed self-attention in GPT-2 Style                      #
        # Use the provided layers initialized in the DummySelfAttention constructor #
        # Apply dropout to the attention values after softmax and the final output  #
        #                                                                           #
        # Reference:                                                                #
        # https://jalammar.github.io/illustrated-gpt2/#part-2-illustrated-self-attention
        #                                                                           #
        # Note: All heads should be computed in parallel using the q,k,v layers     #
        #                                                                           #
        # For each item in the batch, if attention_mask[b, i, j] = 0,               #
        # then you should manually set the attention from token i to j to be -inf   #
        # Hint: See torch.masked_fill                                               #
        #############################################################################
        # print(x, x.size())

        ### From Piazza:
        ## Don't iterate over n_heads (single matmul should do the job)
        ## Should compute the layer norm **after each transformer layer** when iterating over them, rather than only after the final layer.

        k = self.k(x)
        q = self.q(x)
        v = self.v(x)

        # print('K:', k.size())   # D_X, D_Q
        # print('Q:', q.size())   # D_X, D_Q
        # print('V:', v.size())   # D_X, D_V
        # # print('Shape K', k.shape)

        ## Multihead part: "splitting" matrices into different heads (following https://jalammar.github.io/illustrated-gpt2/#part-2-illustrated-self-attention)
        # print('Number of heads =', self.n_head)
        # print(C / self.n_head)
        # k = k.reshape(B, self.n_head, T, C // self.n_head)   # B, H, Dx, Dq (slide notation)
        # q = q.reshape(B, self.n_head, T, C // self.n_head)   # B, H, Dx, Dq (slide notation)
        # v = v.reshape(B, self.n_head, T, C // self.n_head)   # B, H, Dx, Dv (slide notation)

        # print('K:', k.shape)   # D_X, D_Q
        # print('Q:', q.shape)   # D_X, D_Q
        # print('V:', v.shape)   # D_X, D_V

        # ## Alternative way. This way passes next test, but can't have laryernorm to pass
        k = k.view(B, T, self.n_head, int(C / self.n_head)).transpose(1, 2) # Ghazal's tip. Ok to use, confirmed in Avinash's OH.
        q = q.view(B, T, self.n_head, int(C / self.n_head)).transpose(1, 2) # Ghazal's tip. Ok to use, confirmed in Avinash's OH.
        v = v.view(B, T, self.n_head, int(C / self.n_head)).transpose(1, 2) # Ghazal's tip. Ok to use, confirmed in Avinash's OH.

        # # # print('Embedding dim:', C)
        # # # k.reshape(C, C, -1)

        # print('K:', k.size())   # H, Dx, Dq (slide notation)
        # print('Q:', q.size())   # H, Dx, Dq (slide notation)
        # print('V:', v.size())   # H, Dx, Dv (slide notation)

        similarities = torch.matmul(q, torch.transpose(k, 2, 3)) / torch.sqrt(torch.tensor(k.shape[3])) # Normalized by Dq (slides notation)
        # print(similarities) # IF DOESN'T WORK, TRY TORCH.BMM<>TORCH.MATMUL<> @
        # mask1 = torch.where(attention_mask == 0, -float('inf'), attention_mask)
        # print('torch.where:', mask1)
        # mask = attention_mask.masked_fill(attention_mask == 0, float('-inf'))
        # # print('masked_fill:', mask2)
        masked_sim = similarities.masked_fill(attention_mask == 0, float('-inf'))
        # # # print('similarities', similarities)
        # # # print('masked_sim', masked_sim)
        # # # print('elementwise multiplication', similarities * mask)
        # # # # similarities = similarities * mask

        self.softmax = nn.Softmax(dim=3)
        softmax = self.softmax(masked_sim)
        # print(softmax)
        first_dropout = self.attn_dropout(softmax)
        # print(first_dropout)

        att = torch.matmul(first_dropout, v)
        # # # print('Before summing', att)
        # # # print(att.shape)
        # # # summed = torch.sum(att, dim=1)
        # # # print('Checking if summed over the correct dimension', summed)

        ## Concatenating heads' outputs and performing projections
        # reshaping = att.transpose(1, 2)
        # concat = att.reshape(B, T, C)   # Looks like it is reshaping the way I want
        
        # # # concat = torch.zeros_like(x)
        # # # for i in range(self.n_head):
        # # #     a = att[:, i, :, :]
        # # #     torch.cat((a, a))
        # # #     # concat[:, :, i] = 

        # # # concat = att.contiguous().view(B, T, C) # This way passes next test, but can't have laryernorm
        concat = att.transpose(1, 2).contiguous().view(B, T, C) # Matches Ghazal's tips. This way passes next test. Confirmed in Avinash's OH.
        # # # print(concat)
        projected = self.c_proj(concat)
        # print(projected)

        y = self.hidden_dropout(projected)
        # print(second_dropout)
        # print(attention_mask)

        # self.normalization = nn.LayerNorm(C)
        # y = self.normalization(y)   #.detach().float()
        
        # print(y)
        # print(type(y))

        ##############################################################################
        #                               END OF YOUR CODE                             #
        ##############################################################################

        return y

# #Do not change, it will break the AutoGrader
# mha_def = In[-1]

#### Test Multi-Headed Attention

In [6]:
#@title Test Multi-Headed Attention

def test_mha():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_embd = 4
  config.n_head = 2
  config.hidden_pdrop = 0.25
  config.attn_pdrop = 0.1
  set_seed(3407)
  torch.set_default_device("cpu")
  attn = GenericSelfAttention(config)
  attn.q.weight = torch.nn.Parameter(torch.eye(2, 2).repeat(2, 2).flip(0))
  attn.q.bias = torch.nn.Parameter(torch.zeros(4))
  attn.k.weight = torch.nn.Parameter(torch.eye(2, 2).repeat(2, 2))
  attn.k.bias = torch.nn.Parameter(torch.zeros(4))
  attn.v.weight = torch.nn.Parameter(torch.eye(4, 4))
  attn.v.bias = torch.nn.Parameter(torch.zeros(4))
  attn.c_proj.weight = torch.nn.Parameter(torch.eye(4, 4))
  attn.c_proj.bias = torch.nn.Parameter(torch.zeros(4))
  embeddings = torch.tensor([[[1, 2, 3, 4] ,[4, 3, 2, 1]]], dtype=torch.float)
  mask = torch.ones(1, 2, 2)
  assert torch.allclose(attn(embeddings, mask), torch.tensor([[[5.6779, 0, 0, 0], [0, 3.0456, 4.3618, 5.6779]]], dtype=torch.float), atol=1e-3, rtol=1)
  print("Success 1")

test_mha()

def test_mha_mask():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_embd = 4
  config.n_head = 2
  config.hidden_pdrop = 0.0
  config.attn_pdrop = 0.0
  set_seed(3407)
  torch.set_default_device("cpu")
  attn = GenericSelfAttention(config)
  attn.v.weight = torch.nn.Parameter(torch.eye(4, 4))
  attn.v.bias = torch.nn.Parameter(torch.zeros(4))
  attn.c_proj.weight = torch.nn.Parameter(torch.eye(4, 4))
  attn.c_proj.bias = torch.nn.Parameter(torch.zeros(4))
  embeddings = torch.tensor([[[1, 2, 3, 4] ,[4, 3, 2, 1]]], dtype=torch.float)
  mask = torch.zeros(1, 2, 2)
  mask[0, 0, 0] = 1
  mask[0, 1, 1] = 1
  assert torch.allclose(attn(embeddings, mask), torch.tensor([[[1, 2, 3, 4], [4, 3, 2, 1]]], dtype=torch.float), atol=1e-4)
  print("Success 2")

test_mha_mask()

Success 1
Success 2


#### Now, we can very simply create a single layer transformer block!

In [7]:
#export
#@title Now, we can very simply create a single layer transformer block!
class TransformerBlock(DummyBlock):
    def __init__(self, config):
        super().__init__(config, GenericSelfAttention)

    # A Basic Transformer Block with Attention followed by an MLP
    # note the layer norms and residual information preserved at each step.
    def forward(self, x, attention_mask):
        x = x + self.attn(self.ln_1(x), attention_mask)
        x = x + self.mlpf(self.ln_2(x))
        return x

# #Do not change, it will break the AutoGrader
# block_def = In[-1]

#### Putting it all together

Using our Embedding Layer, the above Transformer Block using our Multi-head attention, and a simple classification head we have all the pieces we need for a Transformer language model.

For the forward pass, you'll want to first embed our inputs, apply each transformer layer sequentially, and finally get logits for each possible output word using a classification layer (often called a language modeling head).

If an argument is passed to `hidden_cache`, you should prepend it to your input embeddings and pass it alongside the embeddings for the rest of the model. This will allow use to use this structure in Encoder-Decoder architectures later, but also allows passing vectors from any other neural network (such as a computer vision model or an audio model to enable multi-modal understanding). You can find a rich description of how these pieces come together [here](https://jalammar.github.io/illustrated-transformer/).

All the parameters you'll need come from `DummyTransformer` and the code blocks above your code section.

```python
self.transformer = nn.ModuleDict(
    dict(
        embedding=embedding(config),
        h=nn.ModuleList(
            [block(config) for _ in range(config.n_layer)]
        ),
        ln_f=nn.LayerNorm(config.n_embd),
    )
)
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
```

In [9]:
#export
class GenericTransformer(DummyTransformer):
    def __init__(self, config):
        super().__init__(config, TransformerBlock, Embedding)
        self.block_size = config.block_size # Maximum Number of Tokens which can be encoded at once
        self.vocab_size = config.vocab_size

    def get_attention_mask(self, num_tokens):
        """
        Dummy For now, we will see how we use this later!
        """
        B = num_tokens.shape[0]
        return torch.ones((B, self.block_size, self.block_size))[:, :num_tokens.max().item(), :num_tokens.max().item()]

    def forward(self, idx, targets=None, hidden_cache=None, return_hidden=False):
        """
        :param idx: int Tensor of shape (B,T)
        :param hidden_cache: float Tensor of shape (B,P_T,n_embd)
        :param targets: int Tensor of shape (B,T_T)
        :param return_hidden: bool
        (if return_hidden = None)
        :returns x: float Tensor of shape (B,T,n_embd)
        (else)
        :returns logits: float Tensor of shape (B, T, vocab_size)
        :returns loss: float Tensor of shape (B) or None
        """
        num_tokens = (idx != -1).type(torch.int).sum(dim=1)
        if hidden_cache is not None:
          num_tokens = num_tokens + hidden_cache.shape[1]
        idx = idx.masked_fill(idx == -1, int(0)).type(torch.int)[:, :num_tokens.max().item()]
        if targets is not None:
          targets = targets[:, :num_tokens.max().item()]
        attention_mask = self.get_attention_mask(num_tokens)
        #############################################################################
        # TODO:                                                                     #
        # Put all the modules of a Transformer together for inference               #
        #                                                                           #
        # If hidden_cache exists,                                                   #
        # then the Transformer inputs should be concatenated in the token dimension #
        # First) All Embeddings from Hidden Cache                                   #
        # Next)  All Embeddings of tokens from idx.                                 #
        #                                                                           #
        # All the modules you'll need are listed here:                              #
        #                                                                           #
        # Note: You can iterate through a nn.ModuleList using a standard for loop.  #
        #                                                                           #
        # This will take a few lines!                                               #
        ##############################################################################

        emb = self.transformer['embedding'](idx) # Embed inputs 

        # print(emb.shape) # Shape 1, 5, 4 (B, T, n_emb) 

        if hidden_cache is not None: 
          emb = torch.cat((hidden_cache, emb), dim=1) 
          # idx = torch.cat((hidden_cache['tokens'], idx), dim=1) 

        # # print(len(attention_mask.shape))
        # if len(attention_mask.shape) == 3: 
        #   B, T, T2 = attention_mask.shape 
        #   attention_mask = attention_mask.reshape(B, 1, T, T2) # From Piazza, reshaping here is needed (if it doesn't work, change it to 3D in GenericSelfAttention) 

        # print(self.transformer['h']) # There already are some Layernorms going on 
        # transf = self.transformer['h'](idx) # Apply each transformer layer sequentially 

        x = emb
        for i in self.transformer['h']: # Apply each transformer layer sequentially 
          # print(i) # There are some layernorms, but not at the end of each transformer block 
          x = i(x, attention_mask) 
          norm = self.transformer['ln_f'](x) 
          x = norm # Ensures normalizations after each block/iteration

        logits = self.lm_head(x)

        # x = torch.tensor(1)
        # logits = torch.tensor(1)


        ### From Piazza: 
        ## Should compute the layer norm **after each transformer layer** when iterating over them, rather than only after the final layer. 
        ## From Manav on Piazza: "Now after the model generates the token, we normalize the logits to get probability values and decide which one to sample." 
        # self.normalization = nn.LayerNorm(C) 
        # y = self.normalization(y) #.detach().float()

        ##############################################################################
        #                               END OF YOUR CODE                             #
        ##############################################################################
        if return_hidden:
            return x

        # if we are given some desired targets also calculate the loss
        loss = None
        if targets is not None:
            s_logits = logits
            if hidden_cache is not None:
              s_logits = logits[:, hidden_cache.shape[1]-1:-1].contiguous()
              #print(logits[-1].argmax(dim=1))
            loss = F.cross_entropy(
                s_logits.reshape(-1, self.vocab_size), targets.reshape(-1), ignore_index=-1
            )


        return logits, loss

# #Do not change, it will break the AutoGrader
# transformer_def = In[-1]

#### Test Full Transformer Forward Pass

In [10]:
#@title Test Full Transformer Forward Pass

def test_transformer():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_layer = 2
  config.n_embd = 4
  config.n_head = 2
  torch.set_default_device("cpu")
  set_seed(3407)
  transformer = GenericTransformer(config)
  idx = torch.tensor([[1, 2, 3, 4, 5, -1, -1, -1, -1, -1]], dtype=torch.long)
  s = F.softmax(transformer(idx)[0][0, 0], dim=0)
  assert torch.allclose(s, torch.tensor([0.1034, 0.0960, 0.1019, 0.1022, 0.1003, 0.1040, 0.0983, 0.1072, 0.0958, 0.0910], dtype=torch.float), atol=1e-5, rtol=1)
  print("Success 1")

def test_transformer_loss():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_layer = 2
  config.n_embd = 4
  config.n_head = 2
  torch.set_default_device("cpu")
  set_seed(3407)
  transformer = GenericTransformer(config)
  idx = torch.tensor([[1, 2, 3, 4, 5, -1, -1, -1, -1, -1]], dtype=torch.long)
  target = torch.arange(5).reshape(1, 5)
  assert torch.allclose(transformer(idx, targets=target)[1], torch.tensor(2.2973))
  print("Success 2")

def test_transformer_hidden():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_layer = 2
  config.n_embd = 4
  config.n_head = 2
  torch.set_default_device("cpu")
  set_seed(3407)
  transformer = GenericTransformer(config)
  idx = torch.tensor([[1, 2, 3, 4, 5, -1, -1, -1, -1, -1]], dtype=torch.long)
  target = torch.arange(5).reshape(1, 5)
  hidden = transformer(idx, targets=target, return_hidden=True)
  assert torch.allclose(hidden[0, 0], torch.tensor([1.4417, -1.3564,  0.1549, -0.2401]), atol=1e-4)
  print("Success 3")

test_transformer()
test_transformer_loss()
test_transformer_hidden()

number of parameters: 0.00M
Success 1
number of parameters: 0.00M
Success 2
number of parameters: 0.00M
Success 3


#### Implement an Encoder Transformer

Encoders, like the BERT model you learned about in lecture, utilize bi-directional attention. This means that in the sequence "A B C", the representation for token "B" will be influenced by tokens A *and* C. When all tokens can attend to all other tokens, the attention_mask is just a matrix of ones.

However, since sentences come in a wide range of lengths, we need a way to batch sequences of different lengths together in order to maximize our GPU throughput. The most common way of doing this is called "Padding". When you pad an input, you add additional "pad" tokens to make it the same length as the longest sequence in a batch. For example, if we wanted to batch "A B C" and "A B C D" together, we would add a "pad" token to "A B C". Our resulting batch would be \["A B C \<pad\>", "A B C D"\].

Since these pad tokens are meaningless, we want to avoid having them affect our results. To do this, we remove them from the attention mask for that element in the batch. Below, you'll write a function to create such an attention mask for padded sequences given a tensor which contains the number of valid leading tokens for each batch.

In [11]:
#export
class Encoder(GenericTransformer):
    """Encoder Style Transformer with Bidirectional Attention"""
    def get_attention_mask(self, num_tokens):
        """
        :param num_tokens: int Tensor of shape (batch size)
        :returns attention_mask: int tensor of shape (batch_size, 1, max_tokens, max_tokens)
        """
        B = num_tokens.shape[0]
        max_tokens = min(self.block_size, num_tokens.max().item())
        ##############################################################################
        # TODO:                                                                      #
        # Implement a padding mask function.                                         #
        # This allows batching sequences of different lengths.                       #
        #                                                                            #
        # For example, for any row attention_mask[b, i] the following should be true:#
        #               For j < num_tokens[b], attention_mask[b, i, j] = 1          #
        #               For j >= num_tokens[b],  attention_mask[b, i, j] = 0         #
        #                                                                            #
        # Reference:https://huggingface.co/docs/transformers/glossary#attention-mask #                                                                #
        #                                                                            #
        # This should be a 1-3 line function.                                        #
        ##############################################################################
        
        # Create attention mask
        attention_mask = torch.ones((B, max_tokens, max_tokens))
        # print(max_tokens)

        # Zeros where there is too many tokens
        # print(num_tokens)   # [5, 6]
        for i in range(B):
            # print(num_tokens[i])
            if max_tokens > num_tokens[i]:
                diff = (max_tokens - num_tokens[i]).item()
                attention_mask[i, :, -diff:] = 0
        
        # print(attention_mask)

        ##############################################################################
        #                               END OF YOUR CODE                             #
        ##############################################################################
        return attention_mask.reshape(B, 1, max_tokens, max_tokens)

# #Do not change, it will break the AutoGrader
# encoder_def = In[-1]

#### Test Encoder

In [12]:
#@title Test Encoder

def test_encoder():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_layer = 2
  config.n_embd = 4
  config.n_head = 2
  torch.set_default_device("cpu")
  set_seed(3407)
  transformer = Encoder(config)
  mask = transformer.get_attention_mask(torch.tensor([5, 6]))
  assert mask[0, :, 0].sum() == 5
  assert mask[1, :, 0].sum() == 6


test_encoder()

number of parameters: 0.00M


#### Implement an Decoder Transformer

Unlike Encoders, Decoders are a "causal" model, meaning that each prediction is only influenced by the tokens which came earlier than it in the input. While "Encoders" and "Decoders" are often discussed as different types of models, the only core difference is the attention mask used.

For decoders, we want the attention mask for each token to only include the previous tokens in the sequence. Despite being functionally very different models, a "Decoder" can be implemented with just a one line change of the "Encoder" attention mask. You'll implement this below.

In [13]:
#export
class Decoder(Encoder):
    """Decoder Style model with a Causal Attention Mask"""

    def get_attention_mask(self, num_tokens):
        """
        :param num_tokens: int Tensor of shape (batch size)
        :returns attention_mask: int tensor of shape (batch_size, 1, block_size, block_size)
        """
        full_attention_mask = super().get_attention_mask(num_tokens)
        ##############################################################################
        # TODO:                                                                      #
        # Modify the output of the full encoder mask to create a "causal" mask       #
        # such that tokens only attend to tokens which occured earlier in the input. #
        #                                                                            #
        # For example, for any row attention_mask[b, i} the following should be true:#
        #               For j <= i, attention_mask[b, i, j] = 1                      #
        #               For j > i,  attention_mask[b, i, j] = 0                      #
        #                                                                            #
        # This should be a one line function which modifies the full attention_mask  #
        ##############################################################################
        
        # Call/create attention mask
        attention_mask = full_attention_mask
        for i in range(attention_mask.shape[2]):
            for j in range(attention_mask.shape[3]):
                if j > i:
                    attention_mask[:, :, i, j:] = 0

        ##############################################################################
        #                               END OF YOUR CODE                             #
        ##############################################################################
        return attention_mask

# #Do not change, it will break the AutoGrader
# decoder_def = In[-1]

#### Test Decoder

In [14]:
#@title Test Decoder

def test_decoder():
  config = DummyTransformer.get_default_config()
  config.vocab_size = 10
  config.block_size = 10
  config.n_layer = 2
  config.n_embd = 4
  config.n_head = 2
  torch.set_default_device("cpu")
  set_seed(3407)
  transformer = Decoder(config)
  mask = transformer.get_attention_mask(torch.tensor([[5], [6]]))
  for i in range(5):
    assert mask[0, :, i].sum() == i+1
  assert mask[0, :, 5].sum() == 5
  for i in range(6):
    assert mask[1, :, i].sum() == i+1

test_decoder()

number of parameters: 0.00M


#### Use your model to generate!

In [16]:
#export
def generate(model, idx, max_new_tokens, temperature=1.0):
    """
    :param idx: int Tensor of shape (B, T)
    :param max_new_tokens: int
    :param temperature: Float
    :returns idx: int Tensor of shape (B, T+max_new_tokens)
    """
    ##############################################################################
    # TODO:                                                                      #
    # Sample from your model max_new_tokens times                                #
    # You should feed the predictions back into the model each time              #
    #                                                                            #
    # Adjust the probability distribution to be more or less greedy using        #
    # the temperature parameter                                                  #
    #                                                                            #
    # Reference: https://huggingface.co/blog/how-to-generate#sampling            #
    # Temperature Reference:                                                     #
    # https://web.stanford.edu/class/cs224n/slides/cs224n-2023-lecture10-nlg.pdf#page=34 #
    ##############################################################################
    
    # print(idx.shape)
    B, T = idx.shape
    # print(B, T)
    # print(max_new_tokens) # 6

    ### From Krishanu's OH:
    ## logits -> temp red -> softmax -> multinomial(?) -> concat -> repeat using only the last array
    ## model should be a tuple of logits and loss
    for _ in range(int(max_new_tokens)):
        sample = model(idx)[0][:, -1, :]  #.reshape(B, -1)
        # print(sample)
        temp_reduced = sample / temperature # https://web.stanford.edu/class/cs224n/slides/cs224n-2023-lecture10-nlg.pdf#page=34
        # print(temp_reduced)
        softmax = F.softmax(temp_reduced, dim=1)
        # print(softmax)
        # test = torch.tensor([7,8,9], dtype=torch.float)
        choice_idx = torch.multinomial(softmax, num_samples=1)
        # print(choice_idx)
        # print(softmax.shape)
        # print((softmax[:, choice_idx]).reshape(1,1).shape, 'and', idx.shape)
        # print(softmax[:, choice_idx].reshape(1,1))
        idx = torch.cat((idx, choice_idx), dim=1)
        # print(idx)
    
    # print(idx.shape)

    ##############################################################################
    #                               END OF YOUR CODE                             #
    ##############################################################################
    return idx

# #Do not change, it will break the AutoGrader
# generate_def = In[-1]

#### Test Generation

In [17]:
#@title Test Generation

def test_generate():
    def dumb_model(idx):
      l = torch.zeros(1, 1, 10)
      l[0, 0, 0] = 100
      l[0, 0, 5] = 90
      return l.roll(int(idx[0, -1].item())+1), None
    torch.set_default_device("cpu")
    set_seed(3047)
    assert torch.allclose(generate(dumb_model, torch.tensor([[0]]), 6), torch.tensor([0,1,2,3,4,5,6]))
    temp_gen_1 = generate(dumb_model, torch.tensor([[0]]), 6, temperature=10)
    assert torch.allclose(temp_gen_1, torch.tensor([[0, 6, 2, 3, 4, 5, 6]]))

test_generate()

#### Implement an Encoder Decoder Transformer

Now, we'll put together our Encoder and Decoder models. This combination of the two architectures allows us to maximize the signal we can draw from the input using a bi-directional encoder, while generating language using a causal decoder.

Below, you'll combine your Encoder and Decoder classes in a forward function making use of the `return_hidden` and `hidden_cache` arguments we supported in our Transformer implementation to pass information between the modules.

In [19]:
#export
class EncoderDecoder(nn.Module):
    """Encoder-Decoder Model which combines the two architectures"""
    def __init__(self, encoder_config, decoder_config):
        super().__init__()
        # Add end of sequence token.
        decoder_config.vocab_size += 1
        self.vocab_size = decoder_config.vocab_size
        self.encoder = Encoder(encoder_config)
        self.decoder = Decoder(decoder_config)

    def configure_optimizers(self, train_config):
        enc_groups = self.encoder.configure_optimizers(train_config)
        dec_groups = self.decoder.configure_optimizers(train_config)
        return enc_groups + dec_groups

    def forward(self, prefix, targets=None):
        """
        :param prefix: int Tensor of shape (B,P_T)
        :param idx: float Tensor of shape (B,P_T,n_embd)
        :returns logits: float Tensor of shape (B, vocab_size)
        :returns loss: float Tensor of shape (B) or None
        """
        B = prefix.shape[0]
        idx = torch.tensor([[]]).repeat(B, 1)
        if targets is not None:
          idx = torch.cat((idx, targets), dim=1)

        ##############################################################################
        # TODO:                                                                      #
        # Create an Encoder Decoder Model by combining your previous transformers    #
        # The Encoder should encode the tokens from prefix into an embeddings        #
        # Use these in the hidden_cache to condition decoder generation              #
        #                                                                            #
        # This should be a 1-2 lines.                                                #
        ##############################################################################
        
        # print(prefix)
        hidden_cache = self.encoder.forward(prefix, return_hidden=True)
        logits, loss = self.decoder.forward(idx, targets=targets, hidden_cache=hidden_cache)

        ##############################################################################
        #                               END OF YOUR CODE                             #
        ##############################################################################
        return logits, loss

# #Do not change, it will break the AutoGrader
# encdec_def = In[-1]

This will also require a custom `prefix_generation` function to account for the distinction between a human provided `prefix` and a model generated `idx` in the Encoder Decoder forward pass.

Don't worry, this should be only a small change from your original `generate` function above.

In [20]:
#export
def prefix_generate(model, prefix, max_new_tokens, temperature=1.0):
    """
    :param prefix: int Tensor of shape (B, T)
    :param max_new_tokens: int
    :param temperature: Float
    :returns idx: int Tensor of shape (B, max_new_tokens)
    """
    idx = torch.tensor([[]], dtype=torch.long)
    ##############################################################################
    # TODO:                                                                      #
    # Adjust your original generation function to work Encoder-Decoder models    #
    #                                                                            #
    # Note: This should be a one line change from the original generate function #
    ##############################################################################

    ### From Piazza: Pay close attention to the hint for prefix_generate function! 
    ## The starting prefix and starting idx are provided in the boilerplate code. 
    ## We mean "small change" quite literally - it should be at most 2 lines from the generate function if done correctly. 
    ## (There is complementary info in Piazza via image)
    
    ##############################################################################
    #                               END OF YOUR CODE                             #
    ##############################################################################
    return idx

# #Do not change, it will break the AutoGrader
# pref_generate_def = In[-1]

#### End to End Test of Encoder Decoder Training

In [21]:
#@title End to End Test of Encoder Decoder Training

from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from gtgpt.trainer import Trainer

import pickle

class SortDataset(Dataset):
    """
    Dataset for the Sort problem. E.g. for problem length 6:
    Input: 0 0 2 1 0 1 -> Output: 0 0 0 1 1 2
    Which will feed into the transformer concatenated as:
    input:  0 0 2 1 0 1 0 0 0 1 1
    output: I I I I I 0 0 0 1 1 2
    where I is "ignore", as the transformer is reading the input sequence
    """

    def __init__(self, split, length=6, num_digits=3):
        assert split in {'train', 'test'}
        self.split = split
        self.length = length
        self.num_digits = num_digits

    def __len__(self):
        return 10000 # ...

    def get_vocab_size(self):
        return self.num_digits

    def get_block_size(self):
        # the length of the sequence that will feed into transformer,
        # containing concatenated input and the output, but -1 because
        # the transformer starts making predictions at the last input element
        return 20

    def __getitem__(self, idx):

        # use rejection sampling to generate an input example from the desired split
        while True:
            # generate some random integers
            inp = torch.randint(self.num_digits, size=(self.length,), dtype=torch.long)
            # half of the time let's try to boost the number of examples that
            # have a large number of repeats, as this is what the model seems to struggle
            # with later in training, and they are kind of rate
            if torch.rand(1).item() < 0.5:
                if inp.unique().nelement() > self.length // 2:
                    # too many unqiue digits, re-sample
                    continue
            # figure out if this generated example is train or test based on its hash
            h = hash(pickle.dumps(inp.tolist()))
            inp_split = 'test' if h % 4 == 0 else 'train' # designate 25% of examples as test
            if inp_split == self.split:
                break # ok

        # solve the task: i.e. sort
        sol = torch.sort(inp)[0]

        # concatenate the problem specification and the solution
        cat = torch.cat((inp, sol), dim=0)

        # the inputs to the transformer will be the offset sequence
        x = cat[:self.length].clone()
        y = cat[self.length:].clone()
        # we only want to predict at output locations, mask out the loss at the input locations
        return x, y

def test_encoder_decoder():
  # print an example instance of the dataset
  train_dataset = SortDataset('train')
  test_dataset = SortDataset('test')
  x, y = train_dataset[0]
  config = DummyTransformer.get_default_config()
  config.vocab_size = train_dataset.get_vocab_size()
  config.block_size = train_dataset.get_block_size()
  config.n_layer = 3
  config.n_embd = 48
  config.n_head = 3
  torch.set_default_device("cpu")
  set_seed(3407)
  model = EncoderDecoder(config, config)
  train_config = Trainer.get_default_config()
  train_config.learning_rate = 5e-4 # the model we're using is so small that we can go a bit faster
  train_config.max_iters = 500
  train_config.num_workers = 0
  train_config.device = "cpu"
  trainer = Trainer(train_config, model, train_dataset)
  def batch_end_callback(trainer):
      if trainer.iter_num % 100 == 0:
          print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
  trainer.set_callback('on_batch_end', batch_end_callback)

  trainer.run()
  model.eval()
  assert torch.allclose(prefix_generate(model, torch.tensor([[2, 1, 1, 0, 1, 2]]), max_new_tokens=6), torch.tensor([[0, 1, 1, 1, 2, 2]]))

test_encoder_decoder()
print('Success with EncoderDecoder!!')

number of parameters: 0.09M
number of parameters: 0.09M
running on device cpu
iter_dt 0.00ms; iter 0: train loss 1.44919
iter_dt 22.42ms; iter 100: train loss 0.12494
iter_dt 34.56ms; iter 200: train loss 0.02482
iter_dt 26.71ms; iter 300: train loss 0.00422
iter_dt 31.18ms; iter 400: train loss 0.01940


RuntimeError: The size of tensor a (0) must match the size of tensor b (6) at non-singleton dimension 1

# You've implemented a language model!

## Now let's put it to use

#### Language Modeling Setup (Do Not Change)

In [22]:
#@title Language Modeling Setup (Do Not Change)
from gtgpt.trainer import Trainer
from tqdm import tqdm
from tokenizers import Tokenizer
from tokenizers.pre_tokenizers import ByteLevel
from tokenizers.trainers import UnigramTrainer, BpeTrainer
from tokenizers.models import Unigram, BPE
from datasets import load_dataset
import random

class LMDataset(Dataset):
    def __init__(self, split, data, tokenizer, model):
        assert split in {'train', 'test'}
        self.model_type = "EncDec" if issubclass(type(model), EncoderDecoder) else "Dec"
        if split == "train":
          self.start_split = 0
          self.end_split = 30000
        else:
          self.start_split = 30000
          self.end_split = 40000
        self.split = split
        self.data = data
        self.tokenizer = tokenizer
        self.block_size = max([len(self.tokenizer.encode(inp)) for inp in self.data])
        self.process()

    def __len__(self):
        return len(self.data[self.start_split:self.end_split])

    def get_vocab_size(self):
        return self.tokenizer.get_vocab_size()

    def get_block_size(self):
        # the length of the sequence that will feed into transformer,
        # containing concatenated input and the output, but -1 because
        # the transformer starts making predictions at the last input element
        return self.block_size

    def process(self):
      new_data = []
      for inp in tqdm(self.data):
        if self.model_type == "EncDec":
          x_inp = inp.split("[SEP]")[0] + "[SEP]"
          y_inp = inp.split("[SEP]")[1]
          x = self.tokenizer.encode(x_inp)
          y = self.tokenizer.encode(y_inp)
        else:
          x = self.tokenizer.encode(inp)
          y = x[1:]
          x = x[:-1]
        x = x + ([-1] * (self.get_block_size() - len(x)))
        y = y + ([-1] * (self.get_block_size() - len(y)))
        new_data.append((x, y))
      self.data = new_data

    def __getitem__(self, idx):
      x, y = self.data[self.start_split + idx]
      return torch.tensor(x), torch.tensor(y)

def format_review(row):
  return {"text": f"{row['translation']['eng']}[SEP]{row['translation']['engyay']}[END]"}

dataset = load_dataset("cdleong/piglatin-mt")["train"]
data = [row["text"] for row in dataset.map(format_review).to_list()]
set_seed(3047)
random.shuffle(data)

ImportError: cannot import name 'COMMON_SAFE_ASCII_CHARACTERS' from 'charset_normalizer.constant' (/home/anascimento7/anaconda3/envs/cs7643-a3/lib/python3.11/site-packages/charset_normalizer/constant.py)

#### Training a Language Model from Scratch

Above, we set up code which loads WikiHow articles as a training dataset either for pure Decoder models or with the Title passed as a prefix for an EncoderDecoder model. We want to train a model to translate between English and Pig-Latin!

Below is an implementation which achieves between 40 and 50 percent accuracy. Modify the tokenizer, architecture, or hyperparameters to  decrease the loss as much as possible and drive accuracy above 80%. Report what you changed and your intuitions for why you changed it in the report powerpoint file and upload it as a PDF to GradeScope.


In [23]:
# Incredibly Simplified Tokenizer so that you can manually hack it!
# Feel free to add special tokens or modify as you wish.
# For real world tokenizer usage, see https://huggingface.co/docs/tokenizers/
class Tokenizer():
  def __init__(self):
    self.DELIM = "|[DELIM]|"
    self.special_tokens = ["[SEP]", "[END]"]
    self.special_tokens = [self.stringify(list(bytes(tok, "utf-8"))) for tok in self.special_tokens]
    self.vocab_size = 256 + len(self.special_tokens)

  def stringify(self, b_enc):
    s_enc = [str(b) for b in b_enc]
    return self.DELIM.join(s_enc)

  def get_vocab_size(self):
    return self.vocab_size

  def encode(self, inp):
    s_enc = self.stringify(list(bytes(inp, "utf-8")))
    for i, tok in enumerate(self.special_tokens):
      s_enc = s_enc.replace(tok, str(255+i+1))
    return [int(s) for s in s_enc.split(self.DELIM)]

  def decode(self, inp):
    s_enc = self.stringify(inp)
    for i, tok in enumerate(self.special_tokens):
      s_enc = s_enc.replace(str(255+i+1), tok)
    return  bytes([int(c) for c in s_enc.split(self.DELIM)])

In [26]:
from tqdm import tqdm
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.set_default_device(DEVICE)
def train(data, model_type="Decoder",
          learning_rate = 5e-4,
          batch_size = 16,
          max_iters = 10000,
          dec_n_layer=1,
          dec_n_embd=52,
          dec_n_head = 1,
          enc_n_layer=None,
          enc_n_embd=None,
          enc_n_head=None):
  # Model Setup
  tokenizer = Tokenizer()
  dec_config = DummyTransformer.get_default_config()
  dec_config.vocab_size = tokenizer.get_vocab_size()
  dec_config.block_size = max([len(tokenizer.encode(inp)) for inp in data])
  dec_config.n_layer = dec_n_layer
  dec_config.n_embd = dec_n_embd
  dec_config.n_head = dec_n_head
  if model_type == "Decoder":
    model = Decoder(dec_config)
  else:
    enc_config = DummyTransformer.get_default_config()
    enc_config.vocab_size = tokenizer.get_vocab_size()
    enc_config.block_size = max([len(tokenizer.encode(inp)) for inp in data])
    enc_config.n_layer = enc_n_layer
    enc_config.n_embd = enc_n_embd
    enc_config.n_head = enc_n_head
    model = EncoderDecoder(enc_config, dec_config)

  # Training Config
  train_config = Trainer.get_default_config()
  train_config.learning_rate = learning_rate
  train_config.max_iters = max_iters
  train_config.batch_size = batch_size
  train_config.num_workers = 0
  train_config.device = DEVICE
  train_ds = LMDataset("train", data, tokenizer, model)
  # Training Loop
  trainer = Trainer(train_config, model, train_ds)
  def batch_end_callback(trainer):
      if trainer.iter_num % 100 == 0:
          tqdm.write(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
          prefix = torch.tensor([tokenizer.encode("translate this to piglatin[SEP]")])
          if model_type == "Decoder":
            output = generate(model, prefix, 100, 0.1)
          else:
            output = prefix_generate(model, prefix, 100, 0.1)
          print(tokenizer.decode(output.cpu().numpy()[0]).split(bytes("[END]", "utf-8"))[0])
  trainer.set_callback('on_batch_end', batch_end_callback)
  trainer.run()
  return model, trainer

In [27]:
model, trainer = train(data, model_type="Decoder",
          learning_rate = 5e-4,
          batch_size = 16,
          max_iters = 10000,
          dec_n_layer=4,
          dec_n_embd=64,
          dec_n_head =2,
          enc_n_layer=None,
          enc_n_embd=None,
          enc_n_head=None)
model.eval()

NameError: name 'data' is not defined

In [28]:
from sacrebleu.metrics import BLEU

def eval(trainer, data, tokenizer):
    bleu = BLEU()
    results = []
    mistakes_printed_already = 0
    tgts = []
    cands = []
    for sent in tqdm(data[10000:10100]):
        inp = torch.tensor([tokenizer.encode(sent.split("[SEP]")[0] + "[SEP]")])
        tgt = bytes(sent.split("[SEP]")[1].split("[END]")[0], "utf-8")
        cat = generate(model, inp, model.block_size-len(inp[0]), 0.1)
        tgt_candidate = tokenizer.decode(cat.cpu().numpy()[0])
        tgt_candidate = tgt_candidate.split(b"[END]")[0].split(b"[SEP]")[1]
        # compare the predicted sequence to the true sequence
        tgts.append([str(tgt)])
        cands.append(str(tgt_candidate))
        correct = (tgt == tgt_candidate)
        results.append(correct)
    results = torch.tensor(results).type(torch.float)
    print("\n\nExact Match: %d/%d = %.2f%% correct" % (torch.sum(results), len(results), 100*torch.mean(results)))
    score = bleu.corpus_score(cands, tgts)
    print(score)

    return results

with torch.no_grad():
  results = eval(trainer, data, Tokenizer())

NameError: name 'trainer' is not defined

#### Assignment Export - Upload the my_llm_implementation.py file to GradeScope

#@title Assignment Export - Upload the `my_llm_implementation.py` file to GradeScope

with open("./my_llm_implementation.py", "w") as f:
  f.write(setup_block.split("#Do not change, it will break the AutoGrader")[0])
  f.write(embedding_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(mha_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(block_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(transformer_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(encoder_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(decoder_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(generate_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(encdec_def.split("#Do not change, it will break the AutoGrader")[0])
  f.write(pref_generate_def.split("#Do not change, it will break the AutoGrader")[0])

# If you decide to do this assignment not on Colab, you'll need to simply find the file
from google.colab import files
files.download('./my_llm_implementation.py')

In [29]:
# If using VSCode in my repo:
%run notebook2script submission

Converted Transformer_Architectures.ipynb to my_llm_implementation.py
