#Assignment 3: Decoder-only Transformer

Hello! Welcome to your last assignment, where you will be asked to implement a decoder-only transformer model, and train it on next word prediction objectives.

Because this is a GPU-based assignment, you are encouraged to use Google Colab's T4 runtime. It contains a 15G GPU, which we have tested to be enough for this assignment. However, there is a limited amount of free T4 usage per day. We encourage you to only connect to a runtime when you want to run the code.

You are free to make minor changes to the provided code if that will make things eaiser. However, you will be asked to summarize what you changed in the final section.

In [1]:
!pip install datasets



First, we will define some hyperparameters. You do not need to change these hyperparameters.

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from datasets import load_dataset
import re
from tqdm import tqdm
from collections import Counter
from torch.optim.lr_scheduler import LambdaLR

# Hyperparameters
vocab_size = 32100
embedding_dim = 256
num_heads = 8
num_layers = 8
ffn_dim = 256
max_seq_len = 256
batch_size = 64
num_epochs = 1
learning_rate = 0.001
unk_token = "<UNK>"
pad_token = "<PAD>"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


## Section 1: Implement the Decoder-only Transformer.

In [5]:
from transformers import AutoTokenizer

# Tokenizer
# We will use a pre-trained tokenizer from T5-base, which is why vocab_size is 32100 above.
# You will need to research on how huggingface tokenizers work
# https://huggingface.co/docs/transformers/en/main_classes/tokenizer
class BPETokenizer:
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained("google-t5/t5-base")

    def encode(self, text):
        # TODO
        # Implement this encode function that return a list of INTs (word IDs) based on the input text (str)
        # Hint: you can use the tokenizer.encode function, but you need to add the special tokens.
        return self.tokenizer.encode(text, add_special_tokens=True)

    def decode(self, token_ids):
        # TODO
        # Implement this decode function that returns a string based on a list of INTs (word IDs)
        return self.tokenizer.decode(token_ids, skip_special_tokens=True)
# Positional Encoding
# TODO: You need to implement a sinusoidal positional embedding class.
# There are many great resources, such as https://machinelearningmastery.com/a-gentle-introduction-to-positional-encoding-in-transformer-models-part-1/
# Input: current word embedding
# Output: new word embedding after adding the positional embeddings.
class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dim, max_len):
        super().__init__()
        # TODO
        wording_embedding = torch.zeros(max_len, embedding_dim)
        for pos in range(max_len):
            for i in range(0, embedding_dim, 2):
                wording_embedding[pos, i] = torch.sin(pos / (10000 ** ((2 * i) / embedding_dim)))
                if i + 1 < embedding_dim:
                    wording_embedding[pos, i + 1] = torch.cos(pos / (10000 ** ((2 * (i + 1)) / embedding_dim)))
                

    def forward(self, x):
        # TODO
        # Implement the forward function that adds the positional embeddings to the input word embeddings
        # Hint: you can use x + self.wording_embedding[:x.size(1), :], but you need to make sure that the dimensions match.
        # You can use x.size(1) to get the length of the input sequence.
        # Hint: you can use x.size(0) to get the batch size.
        # Hint: you can use x.size(2) to get the embedding dimension.
        # Hint: you can use x.size(1) to get the length of the input sequence.
        for i in range(x.size(0)):
            x[i] = x[i] + self.wording_embedding[:x.size(1), :] 
        return x
        
# Self-Attention Layer
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embedding_dim, num_heads):
        super().__init__()
        assert embedding_dim % num_heads == 0
        # TODO
        # Hint: you need the QKV weights, also some bookkeeping values.
        # Don't forget about a fully-connected output layer

        self.num_heads = num_heads
        self.embedding_dim = embedding_dim
        self.attention_dim = embedding_dim // num_heads
        
        self.query = nn.Linear(embedding_dim, embedding_dim)
        self.key = nn.Linear(embedding_dim, embedding_dim)
        self.value = nn.Linear(embedding_dim, embedding_dim)

        self.fc_out = nn.Linear(embedding_dim, embedding_dim)
        


    # Attention masks are values of 1's and 0's.
    # When the attention mask for a position is 1, it means that the attention can be computed on that position.
    # If a mask value for a position is 0, it means that the attention should 'skip' or 'mask-out' that position.
    def forward(self, x, mask=None):
        # TODO
        # Implement the computations of attention scores, with respect to the attentin mask 'mask'
        # Hint: consider something like attention_scores.masked_fill(?, ?) to compute the masked scores.
        query = self.qury(x)
        key = self.key(x)
        value = self.value(x)

        # TODO
        # Implement the scaled dot-product attention
        # Hint: you can use torch.matmul to compute the dot product.
        # Hint: you can use torch.softmax to compute the attention scores.
        
        query = query.view(batch_size, max_seq_len, self.num_heads, self.attention_dim).transpose(1, 2)
        key = key.view(batch_size, max_seq_len, self.num_heads, self.attention_dim).transpose(1, 2)
        value = value.view(batch_size, max_seq_len, self.num_heads, self.attention_dim).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(Q.size(-1))

        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attention_scores = F.softmax(scores, dim=-1)
        attention_output = torch.matmul(attention_scores, V)
        

        attention_output = attention_output.transpose(1, 2).contiguous()
        concat_attention = attention_output.view(batch_size, max_seq_len, self.embedding_dim)

        output = self.fc_out(concat_attention)

        return output
  

# Transformer Decoder Layer
class TransformerDecoderLayer(nn.Module):
    def __init__(self, embedding_dim, num_heads, ffn_dim):
        super().__init__()
        self.self_attn = MultiHeadSelfAttention(embedding_dim, num_heads)
        self.norm1 = nn.LayerNorm(embedding_dim)
        # TODO
        # Implement a MLP layer here.
        # Research about what nn.LayerNorm is doing here.
        # Add a second layernorm after the MLP.
        self.norm2 = nn.LayerNorm(embedding_dim)
        self.ffn = nn.Sequential(
            nn.Linear(embedding_dim, ffn_dim),
            nn.ReLU(),
            nn.Linear(ffn_dim, embedding_dim)
        )

    def forward(self, x, mask=None):
        attn_output = self.self_attn(x, mask)
        # TODO: research about why it is x + attn_output, instead of only attn_output?
        x = self.norm1(x + attn_output)
        # TODO: finish the rest.
        x = self.norm2(x + self.ffn(x))

# Decoder-only Transformer Model
class DecoderOnlyTransformer(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_heads, num_layers, ffn_dim, max_seq_len):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.pos_encoding = PositionalEncoding(embedding_dim, max_seq_len)
        # TODO
        # Implement the layers
        self.transformer_layers = nn.ModuleList([
            TransformerDecoderLayer(embedding_dim, num_heads, ffn_dim)
            for _ in range(num_layers)
        ])
        self.ln_final = nn.LayerNorm(embedding_dim)
        self.fc_out = nn.Linear(embedding_dim, vocab_size)



    def forward(self, x):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        # TODO
        # Create an attention mask, and do the rest of the computation.
        seq_len = x.size(1)
        mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device)).unsqueeze(0).unsqueeze(0)
        for layer in self.transformer_layers:
            x = layer(x, mask)
        x = self.ln_final(x)
        
        x = self.fc_out(x)
        return x

## Section 2: Implement the LLM training.

In [6]:
def load_training_corpus():
    dataset = load_dataset("alpindale/light-novels", split="train")
    texts = dataset["text"][:3000000]
    tokenizer = BPETokenizer()
    tokenized = []
    # TODO
    # In pre-training, we will make every training sequence with exact same lengths (max_seq_len)
    # This means that you will to concatenate shorter text in 'texts' to make every list in 'tokenized' exactly max_seq_len
    # In other words, for instance in tokenized: assert len(instance) == max_seq_len.
    # As you can see later, each element in tokenized is a number (word IDs).
    # For the final instance where you don't have enough tokens, you can pad the sequence with word id 1.
    
    for text in texts:
        tokenized_text = tokenizer.encode(text)
        if len(tokenized_text) < max_seq_len:
            tokenized_text += [1] * (max_seq_len - len(tokenized_text))
        else:
            tokenized_text = tokenized_text[:max_seq_len]
        tokenized.append(tokenized_text)

    return torch.tensor(tokenized, dtype=torch.long), tokenizer

data, tokenizer = load_book_corpus()
train_data = data

def get_lr_schedule(optimizer, warmup_steps, total_steps):
    """
    Creates a learning rate schedule with linear warmup and cosine decay.
    """
    def lr_lambda(current_step):
        if current_step < warmup_steps:
            return current_step / warmup_steps  # Linear warmup
        progress = (current_step - warmup_steps) / (total_steps - warmup_steps)
        return 0.5 * (1 + torch.cos(torch.tensor(progress * 3.1415926535)))  # Cosine decay

    return LambdaLR(optimizer, lr_lambda)

def train():
    model = DecoderOnlyTransformer(vocab_size, embedding_dim, num_heads, num_layers, ffn_dim, max_seq_len).to(device)
    # TODO: compute the model size. You can use a programmatic approach.
    
    num_params = 0

    for param in model.parameters():
        num_params += param.numel()
    
    print(f"Model size: {num_params} parameters")


    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    loss_fn = nn.CrossEntropyLoss()
    warmup_steps = int(0.1 * (num_epochs * len(train_data) // batch_size))  # 10% of total steps
    total_steps = num_epochs * (len(train_data) // batch_size)
    scheduler = get_lr_schedule(optimizer, warmup_steps, total_steps)

    for epoch in range(num_epochs):
        total_loss = 0
        progress_bar = tqdm(range(0, len(train_data), batch_size), desc=f"Epoch {epoch+1}")
        for i in progress_bar:
            batch = train_data[i:i+batch_size].to(device)
            # TODO Implement the training process.
            inputs =  batch[:, :-1] # TODO
            targets = batch[:,1:]# TODO. What is supposed to be the supervision targets in autogressive LM?

            outputs = model(inputs)  

            loss = loss_fn(outputs.view(-1, vocab_size), targets.reshape(-1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Your code should finish before scheduler.step() here.
            scheduler.step()
            total_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())
        print(f"Epoch {epoch+1} completed with Loss: {total_loss / len(train_data)}")
    return model


SyntaxError: invalid syntax (736481251.py, line 53)

Now, let's train the model! We suggest you save the trained model for later testing.

In [None]:
model = train()
torch.save(model.state_dict(), "model.ckpt")

Then, let's evaluate your model to see if it's working!

In [None]:
# We have provided a sample greedy decoding generation function for you.
# You can use it to test if your trained model is working.
# Although we are not providing any actual test cases, it will be pretty easy
# to tell if your language model is generating coherent natural languages.
# Try a few examples! If it works, it works; we will not do 'exact-match' grading.
model.eval()
def generate_text_greedy(model, input_text, max_length=20):
    input_tokens = tokenizer.encode(input_text)
    generated = input_tokens[:]
    for _ in range(max_length - len(input_tokens)):
        input_tensor = torch.tensor([generated], device=device)
        with torch.no_grad():
            output = model(input_tensor)
        next_token = torch.argmax(output[:, -1, :], dim=-1).item()
        generated.append(next_token)
        if next_token == 1:
            break
    return tokenizer.decode(generated)

print("Generated Sequence:", generate_text_greedy(model, "what is your"))

def generate_text_topk(model, input_text, max_length=20, k=50, temperature=1.0):
    # TODO
    # Implement a top-k sampling generation function.
    # Steps
    # 1. adjust the output distribution/logits based on the temperature
    # 2. find the top-k next tokens (hint: torch.topk)
    # 3. readjust the probability for the top-k candidates for them to sum to 1 (hint: softmax)
    # 4. sample a word from the new k tokens (hint: torch.multinomial)
    input_tokens = tokenizer.encode(input_text)
    generated = input_tokens[:]
    for _ in range(max_length - length(input_tokens)):
        input_tensor = torch.tensor([generated], device=device)
        with torch.no_grad():
            output = model(input_tensor)
        logits = output[:, -1, :] / temperature
        probs = F.softmax(logits, dim=-1)
        topk_probs, topk_indices = torch.topk(probs, k)
        topk_probs = topk_probs / torch.sum(topk_probs)
        next_token = torch.multinomial(topk_probs, 1).item()
        generated.append(topk_indices[0][next_token].item())

    return tokenizer.decode(generated)
        

print("Generated Sequence:", generate_text_topk(model, "what is your"))

## Section 3: Written questions.
Answer all questions with ~100 words (use the fewest words that are enough to express your meanings)

### 1. What does nn.LayerNorm do?

### 2. Why the provided code use self.norm1(x + attn_output) instead of self.norm1(attn_output)?

### 3. What is the model size using the provided hyperparameters?

### 4. How does temperature play a part in top-k sampling? Share some observations.

### 5. (Optional, not graded) Please summarize your change to the provided code, if any.
