In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

<h2> Tokenization </h2>

There are many ways to tokenize text. For simplicity, we use the pretrained GPT-2 tokenizer. This is a byte-level BPE tokenizer. If you're interested in how it works, you can read more [here](https://huggingface.co/learn/nlp-course/en/chapter6/5).

In [20]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("gpt2")   # we load the GPT-2 tokenizer

texts = [
    "Hello, my dog is cute",
    "Hello, my cat is cute",
    "What is your name?",
    "My name is John",
]

tokens = [tokenizer.encode(text) for text in texts] # we encode the texts

for text, token_list in zip(texts, tokens):
    print(f"Text: {text}")
    print(f"Tokens: {token_list}")
    print(f"Decoded: {tokenizer.decode(token_list)}")
    print()

Text: Hello, my dog is cute
Tokens: [15496, 11, 616, 3290, 318, 13779]
Decoded: Hello, my dog is cute

Text: Hello, my cat is cute
Tokens: [15496, 11, 616, 3797, 318, 13779]
Decoded: Hello, my cat is cute

Text: What is your name?
Tokens: [2061, 318, 534, 1438, 30]
Decoded: What is your name?

Text: My name is John
Tokens: [3666, 1438, 318, 1757]
Decoded: My name is John



It seems like our tokenizer is working. The initial texts and the decoded texts are similar. Furthermore, we can see that all the tokenizer is doing is mapping a text to a list of integers.

<h2> Embedding Layer </h2>

In [None]:
class EmbeddingLayer(nn.Module):

    def __init__(self, vocab_size, embedding_dim):
        super().__init__()

        # we define the embedding layer w/ input size = vocab_size and output size = embedding_dim
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

    def forward(self, x):

        return self.embedding(x)
    

embedding_dim = 10

print(tokenizer.vocab_size)
print(tokens)

embedding_layer = EmbeddingLayer(tokenizer.vocab_size, embedding_dim)

tokens = torch.Tensor(tokens).long()

embeddings = embedding_layer(tokens)

print(embeddings.shape)
print(embeddings)

We have now gone from words -> tokens -> embeddings (a vector for each token). Let's get to the meat of the transformer, the *Attention*.

<h2> The Attention Layer </h2>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):

    def __init__(self, d_model, d_k):
        super().__init__()
        
        self.d_model = d_model
        self.d_k = d_k
        self.linear = nn.Linear(d_model, 3 * d_k)

    def forward(self, x):
        # split the input into Q, K, V
        q, k, v = self.linear(x).chunk(3, dim=-1) # we pass our input through a linear layer and then split it into 3 parts

        # Recall the formula for the attention mechanism
        # attn = softmax(Q K.T / sqrt(d_k)) V
        # Hint: Use torch.matmul() for matrix multiplication (https://pytorch.org/docs/stable/generated/torch.matmul.html)
        # Hint: Use F.softmax() to apply the softmax function (https://pytorch.org/docs/stable/generated/torch.nn.functional.softmax.html)
        # Hint: Look at the transpose function for PyTorch tensors (https://pytorch.org/docs/stable/generated/torch.transpose.html)
        ### YOUR CODE HERE
        attn = ...      # multiply Q and K.T
        attn = ...      # apply softmax and divide by sqrt(d_k)
        attn = ...      # multiply by V
        ### END YOUR CODE

        return attn
    
attention_layer = Attention(embedding_dim, embedding_dim)
attn_logits = attention_layer(embeddings)

# Ensure the shape is the same both before and after the attention layer
embeddings.shape, attn_logits.shape

In [None]:
# naive implementation of multi-head attention

class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, n_heads):
        super().__init__()
        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"

        self.n_heads = n_heads
        self.d_model = d_model
        self.d_k = d_model // n_heads

        # Implement multi-head attention using the Attention module
        # Hint: Use nn.ModuleList to hold multiple instances of the Attention module
        # Ex: self.heads = nn.ModuleList([AttentionHead1, AttentionHead2, ...])
        ### YOUR CODE HERE ###
        self.heads = ...
        ### END YOUR CODE ###

    def forward(self, x):
        # pass the input through all the heads and concatenate the results
        # Hint: Use torch.cat() to concatenate the results of the different heads, specify the dimension using the dim argument
        ### YOUR CODE HERE ###

        return ...
        ### END YOUR CODE ###
    
d_model = 32
n_heads = 4
seq_len = 16
batch_size = 8
shifted_x = torch.randn(batch_size, seq_len, d_model)
multi_head_attn = MultiHeadAttention(d_model, n_heads)
attn_logits = multi_head_attn(shifted_x)
attn_logits.shape

In [8]:
class AttentionBlock(nn.Module):

    def __init__(self, d_model, n_heads):
        super().__init__()

        ### YOUR CODE HERE ###
        self.norm = ...     # LayerNorm
        self.attn = ...     # MultiHeadAttention
        self.dropout = ...  # Dropout
        self.norm2 = ...    # LayerNorm
        self.linear = ...   # Linear layer or multiple linear layers
        ### END YOUR CODE ###

    def forward(self, x):
        # we first normalize the input
        x_attn = self.norm(x)
        # we then pass it through the multi-head attention layer and apply dropout
        x_attn = self.dropout(self.attn(x_attn))
        # we add the input to the output of the multi-head attention
        # this is called a residual connection (https://towardsdatascience.com/what-is-residual-connection-efb07cab0d55)
        x = x + x_attn


        ### YOUR CODE HERE ###
        # we pass the output through a linear layer and apply dropout
        x_linear = ...
        # we apply normalization (remember to use self.norm2, not self.norm)
        x_linear = ...
        # we add the input to the output of the linear layer
        x = ...
        ### END CODE HERE ###

        
        return x

In [9]:
class Transformer(nn.Module):

    def __init__(self, vocab_size, d_model, n_heads, n_layers, block_size):
        super().__init__()

        self.embedding = ...    # Embedding layer for encoding the input tokens
        self.pos_embedding = ... # Positional encoding
        self.attention_blocks = ... # Stack of n_layers attention blocks. Hint: use nn.Sequential (https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html)
        self.fc = ... # Final fully connected layer projecting the model output to the vocab size

    def forward(self, x):
        # x size: (batch_size, seq_len)
        # For simplicity, we have implemented the embedding and positional encoding for you
        x = self.embedding(x) # (batch_size, seq_len, d_model)
        x = x + self.pos_embedding(torch.arange(x.size(1), device=x.device))

        ### YOUR CODE HERE ###
        x = ...     # pass the input through the stack of attention blocks
        x = ...     # pass the output through the final fully connected layer
        return x

In [10]:
class GPT(nn.Module):

    def __init__(self, vocab_size, d_model, n_heads, n_layers, block_size):
        super().__init__()

        # we initialize the transformer model we created
        self.transformer = Transformer(vocab_size, d_model, n_heads, n_layers, block_size)

        ### YOUR CODE HERE ###
        self.loss_fn = ... # Loss function for training the model
        ### END YOUR CODE ###

    def forward(self, x, targets=None):
        logits = self.transformer(x)    # we pass the input through the transformer
        loss = None
        if targets is not None:         # if we have targets, we calculate the loss
            loss = self.loss_fn(logits.view(-1, logits.size(-1)), targets.view(-1))
        return logits, loss     # we return the logits and the loss

    def generate(self, x, steps=100, deterministic=False):
        # we generate text by passing the input through the transformer model repeatedly
        for _ in range(steps):
            logits = self.transformer(x)    # we pass the input through the transformer
            last_token_logits = logits[:, -1]   # we get the probabilty distribution of the last token
            if deterministic:   # if we are in deterministic mode, we take the token with the highest probability
                next_token = torch.argmax(last_token_logits, dim=-1).unsqueeze(-1)
            else:  # otherwise, we sample from the probability distribution
                next_token = torch.multinomial(F.softmax(last_token_logits, dim=-1), num_samples=1)
            x = torch.cat([x, next_token], dim=-1)  # we concatenate the next token to the input
        return x

In [12]:
from pathlib import Path

PATH_TO_TEXT_FILE = Path.cwd().parent / "data" / "input.txt" # we define the path to the text file
with open("input.txt", "r") as f:
    text = f.read()

In [13]:
def get_batch(text, block_size):

    tokens = tokenizer.encode(text)

    for i in range(0, len(tokens) - block_size, block_size):
        yield tokens[i:i+block_size], tokens[i+1:i+block_size+1]

In [None]:
from tqdm import tqdm


# Define the hyperparameters for training
# In general, fewer epochs means faster training, but the model may not have enough time to learn
# A larger block size means the model can learn more context, but training will be slower
# A larger d_model, n_heads, and n_layers means the model can learn more complex patterns, but training will be slower


### YOUR CODE HERE ###
num_epochs = 10     # Number of epochs to train the model, you can change this
block_size = 256    # Length of the sequence to train the model on, you can change this (try 128, 256, 512)
d_model = 256       # Dimension of the model, you can change this
n_heads = 4         # Number of attention heads, you can change this
n_layers = 4        # Number of transformer layers, you can change this
lr = 1e-4           # Learning rate for training, you can change this (try 1e-3, 1e-4, 1e-5)
### END YOUR CODE ###


device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
model = GPT(tokenizer.vocab_size, d_model, n_heads, n_layers, block_size).to(device)
optim = torch.optim.Adam(model.parameters(), lr=lr)

for epoch in range(num_epochs):
    for batch in tqdm(get_batch(text, block_size), desc=f"Training epoch {epoch+1}", total=len(tokenizer.encode(text))//block_size):
        x, y = torch.tensor(batch[0]).unsqueeze(0).to(device), torch.tensor(batch[1]).unsqueeze(0).to(device)
        logits, loss = model(x, y)
        optim.zero_grad()
        loss.backward()
        optim.step()

In [None]:
### YOUR CODE HERE ###
context = "PROPSERO:"       # The starting text for generation, you can change this or leave it empty
determinstic = False        # Set this to True for deterministic generation, or False for stochastic generation
### END YOUR CODE ###

if context:
    x = torch.tensor(tokenizer.encode(context)).unsqueeze(0).to(device)
else:
    x = torch.zeros((1, 1), dtype=torch.long).to(device)
output = model.generate(x, deterministic=determinstic)
print(tokenizer.decode(output[0].tolist()))