# Imports

In [16]:
import torch
from torch import nn
import torch.nn.functional as F
from dataclasses import dataclass

# Tokenization

## Exercise: Implementing Character-based tokenization

1. Get a sorted list of every unique character in your training data.
2. Create a dictionary that converts tokens to IDs (str_to_int) and one that converts IDs to tokens (int_to_str)
3. Implement functions encode and decode.
Encode should take in a string and output list of token IDs.
Decode should take in a list of token IDs and output a string.
4. Test encoding and then decoding “My dog Leo is extremely cute.” Do you recover the correct string?


In [2]:
# Load in all training data
with open('tiny_wikipedia.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [14]:
# Step 1) Get a sorted list of all unique characters that occur in this text
# Hint: set is useful for getting unique elements in a sequence
... # your code here

# Step 2) Create the dictionaries str_to_int and int_to_str
... # your code here

# Step 3) Define encode and decode functions
# def encode(...):
#     ...

# def decode(...):
#     ...

# Step 4) Test your implementation on "My dog Leo is extremely cute."


Ellipsis

# Solution

In [3]:
# Step 1) Get a sorted list of all unique characters that occur in this text
# Hint: set is useful for getting unique elements in a sequence
chars = sorted(list(set(text)))

# Step 2) Create the dictionaries str_to_int and int_to_str
str_to_int = {ch: i for i, ch in enumerate(chars)}
int_to_str = {i: ch for i, ch in enumerate(chars)}

# Step 3) Define encode and decode functions
def encode(text, str_to_int):
    ids = [str_to_int[c] for c in text]
    return ids

def decode(ids, int_to_str):
    text_list = [int_to_str[id] for id in ids]
    return ''.join(text_list)

# Step 4) Test your implementation on "My dog Leo is extremely cute."
input_text = "My dog Leo is extremely cute."
ids = encode(input_text, str_to_int)
decoded_text = decode(ids, int_to_str)
assert input_text == decoded_text


## Tokenize input data and create splits

In [12]:
# Train and test splits
data = torch.tensor(encode(text, str_to_int), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split, ctx_len, batch_size, device='cpu'):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - ctx_len, (batch_size,))
    x = torch.stack([data[i:i+ctx_len] for i in ix])
    y = torch.stack([data[i+1:i+ctx_len+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

# Define our transformer parameters with a config

In [17]:
@dataclass
class Config:
    d_model: int = 256 # the model/hidden/embedding dim
    n_heads: int = 4 # number of attention heads (width)
    ctx_len: int = 64 # context length
    batch_size: int = 8 # batch size
    n_layers: int = 12 # number of layers (depth)
    vocab_size: int = -1 # vocab size, to be determined once we have created a tokenizer

    def set_vocab_size(self, vocab_size):
        self.vocab_size = vocab_size

In [18]:
config = Config()
config.set_vocab_size(vocab_size=len(chars)) # set our vocabular size (equal to the number of chars)

# Attention

## Exercise: Implementing single headed causal self attention

Self-attention is a core mechanism in transformers that allows each position in a sequence to attend to all previous positions. The "causal" part ensures each position can only attend to past positions - this is crucial for language modeling.

The task is to fill out the `SingleHeadCausalAttention` module.  The `forward(self, x)` function that will take in an input `x` that is `(B, T, C)`-dimensional corresponding to batch size, sequence length, and hidden dimension and then output the result after applying the attention formula.  Note that the causal mask has been already defined for you (it is a lower triangular matrix whose entries are 1's.  You can refer to it by calling `self.cmask`.)

1. Create the K, Q, V matrices that are the resultant matrices after applying the `self.key`, `self.query`, and `self.values` projections.
2. Compute and return attention using the formula:

$$\textrm{attention}(K, V, Q) = \textrm{softmax}\left( c \odot \frac{Q K^\top}{\sqrt{d_k}} \right) V $$

where $c \odot \dots$ denotes the application of the causal mask.  You can use `torch.masked_fill(...)` here to apply the mask.  It takes as input three arguments: the input matrix you want to mask, where you want to mask it (a boolean condition), and the value you want to mask with.  To figure out what value you want to mask with, it may be helpful to recall the softmax formula; the $i$-th component of a vector $u$ after a softmax is: $$ \textrm{softmax}(x)_i =  \frac{e^{x_i}}{\sum_j e^{x_j}}.$$

Hints:
1. Keep track of the matrix dimensions after each step!
2. Note that you can transpose a matrix in Pytorch by calling `A.transpose(dim_1, dim_2)` where `dim_1`, `dim_2` refer to the dimensions you want to transpose.
3. You may use Pytorch's built-in softmax function `F.softmax(...)`.

In [None]:
class SingleHeadCausalAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        
        # Calculate the dimension for each attention head
        self.head_dim = config.d_model // config.n_heads
        
        # TODO: Initialize the Key, Query, and Value projections
        # Each should be a linear layer that projects from d_model to head_dim
        # Hint: Use nn.Linear(..., bias=False) as is standard in attention
        self.key = ... # Your code here
        self.query = ... # Your code here
        self.values = ... # Your code here
        
        # Create causal mask (lower triangular matrix), you an refer to it by `self.cmask`
        self.register_buffer("cmask", torch.tril(torch.ones([config.ctx_len, config.ctx_len])))
    
    def forward(self, x):
        B, T, C = x.shape
        
        # TODO Step 1: Project input to get Key, Query, Value matrices
        K = ... # Your code here
        Q = ... # Your code here
        V = ... # Your code here
        
        # TODO Step 2: Compute attention scores and apply mask
        # Remember: 
        # - Scale by sqrt(head_dim)
        # - Use the causal mask (self.cmask) to prevent attention to future tokens
        # - Apply softmax to get attention weights
        # - Multiply with values
        
        # Your implementation here...
        
        return # Final output

## Testing your implementation

In [20]:
config = Config(d_model=256, n_heads=8, ctx_len=16)
attention = SingleHeadCausalAttention(config)
x = torch.randn(2, 10, 256)  # (batch_size, seq_len, d_model)
output = attention(x)
assert output.shape == (2, 10, 32)  # head_dim = 256/8 = 32

## Solution

In [19]:
class SingleHeadCausalAttention(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.head_dim = config.d_model // config.n_heads
        self.key = nn.Linear(config.d_model, self.head_dim, bias=False)
        self.query = nn.Linear(config.d_model, self.head_dim, bias=False)
        self.values = nn.Linear(config.d_model, self.head_dim, bias=False)

        self.register_buffer("cmask", torch.tril(torch.ones([config.ctx_len, config.ctx_len])))

    
    def forward(self, x):

        B, T, C = x.shape
        
        K = self.key(x) # (B, T, C) @ (_, C, H) -> (B, T, H)
        Q = self.query(x)
        V = self.values(x)

        y = Q @ K.transpose(-2, -1) * self.head_dim**-0.5 # (B, T, H) @ (B, H, T) -> (B, T, T)
        y = torch.masked_fill(y, self.cmask[:T, :T]==0, float('-inf'))
        y = F.softmax(y, dim=-1) @ V
        return y

# Multi-head self attention

## Exercise: implementing multi-head attention

The task is to write the multi-headed self attention module.  You should not need to write more than a few lines of code here.

1. Define `self.heads` as the list of attention heads that will act in parallel on the input.  You may use `nn.ModuleList(...)` to do this.
2. Define `self.linear`, a linear projection.
3. Define the forward function which will take in the input `x` (which is (B, T, C)-dimesional), pass it through each head, and concatenate the output.  To perform concatenation you can use `torch.cat(...)`.
4. After going through the attention heads, the input should then go through the linear projection and then returned at the end.

In [27]:
class MultiHeadCausalAttention(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.heads = ... # your code here
        self.linear = ... # your code here
        

    def forward(self, x):
        ... # your code here

## Testing your implementation

In [31]:
config = Config(d_model=256, n_heads=8, ctx_len=16)
mha = MultiHeadCausalAttention(config)

# Test with small batch
x = torch.randn(2, 10, 256)  # (batch_size=2, seq_len=10, d_model=256)
out = mha(x)
assert out.shape == (2, 10, 256)

## Solution

In [30]:
class MultiHeadCausalAttention(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.heads = nn.ModuleList([SingleHeadCausalAttention(config) for _ in range(config.n_heads)])
        self.linear = nn.Linear(config.d_model, config.d_model)
        

    def forward(self, x):
        y = torch.cat([h(x) for h in self.heads], dim=-1)
        y = self.linear(y)
        return y

# Define the feed-forward network (FFN) decoder block

## Exercise: FFN

The Feed-Forward Network (FFN) is a simple yet powerful component that applies two linear transformations with a ReLU activation in between. The first transformation expands the input dimension by a factor of 4, and the second transformation projects it back to the original dimension.  In this exercise, you will implement this module.

In [None]:
class FFN(nn.Module):
    def __init__(self, config):
        super().__init__()
        # TODO: Initialize two linear layers
        # First layer should expand from d_model to 4*d_model
        # Second layer should project back to d_model
        # Hint: use nn.Linear(in_features, out_features)
        self.l1 = # Your code here
        self.relu = nn.ReLU()
        self.l2 = # Your code here

    def forward(self, x):
        # TODO: Implement the forward pass
        # 1. Apply first linear layer
        # 2. Apply ReLU activation
        # 3. Apply second linear layer
        x = ... # Your code here
        x = ... # Your code here
        x = ... # Your code here
        return x

## Exercise: Decoder Block

The Decoder Block is a core component that combines self-attention with a feed-forward network. It uses residual connections and layer normalization to help with training stability.

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.mha = MultiHeadCausualAttention(config)
        # TODO: Initialize layer normalization layers
        # Hint: use nn.LayerNorm(config.d_model)
        self.ln1 = # Your code here
        self.ffn = FFN(config)
        self.ln2 = # Your code here

    def forward(self, x):
        # TODO: Implement the forward pass with residual connections
        # Remember the pattern: x = x + sublayer(layer_norm(x))
        x = ... # Your code here  # First attention block with residual
        x = ... # Your code here  # Second FFN block with residual
        return x

## Testing your implementation

In [33]:
config = Config(d_model=256)
ffn = FFN(config)
decoder = DecoderBlock(config)

# Test with random input
x = torch.randn(2, 10, 256)  # (batch_size, sequence_length, d_model)
output = decoder(x)
assert output.shape == x.shape

## Solution

In [32]:
class FFN(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.l1 = nn.Linear(config.d_model, 4*config.d_model)
        self.relu = nn.ReLU()
        self.l2 = nn.Linear(4*config.d_model, config.d_model)

    def forward(self, x):
        x = self.l1(x)
        x = self.relu(x)
        x = self.l2(x)
        return x
    
class DecoderBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.mha = MultiHeadCausalAttention(config)
        self.ln1 = nn.LayerNorm(config.d_model)
        self.ffn = FFN(config)
        self.ln2 = nn.LayerNorm(config.d_model)

    def forward(self, x):
        x = x + self.mha(self.ln1(x))
        x = x + self.ffn(self.ffn(x))
        return x

# Define the transformer!

We're now ready to put the components together into our final decoder module that can actually generate text! Your task to implement the missing pieces of the Decoder class. This is the top-level module that:

* Embeds input tokens and adds positional information
* Processes them through multiple transformer layers
* Outputs predictions for the next token through the `forward(...)` function
* Can generate new sequences autoregressively through the `generate(...)` function

We have given extra hints for this module since it is a challenging exercise.

In [None]:
class Decoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        
        # Stack of decoder blocks
        self.blocks = nn.Sequential(*[DecoderBlock(config) for _ in range(config.n_layers)])

        # TODO: Initialize components
        # Final layer norm and projection to vocabulary
        self.ln = ... # normalize to d_model dimension
        self.lin = ... # project from d_model to vocab_size
        
        # Embeddings
        self.wte = ... # Your code here, token embedding: vocab_size → d_model
        self.wpe = ... # Your code here, position embedding: ctx_len → d_model
        
        # Loss function for training
        self.L = nn.CrossEntropyLoss()
        self.ctx_len = config.ctx_len
    
    def forward(self, x, targets=None):
        """
        Args:
            x: Input tokens (B, T)
            targets: Optional target tokens (B, T)
        Returns:
            logits: Predictions (B, T, vocab_size)
            loss: Optional cross-entropy loss
        """
        B, T = x.shape
        
        # TODO Step 1: Get embeddings
        # Convert tokens to embeddings and add positional information
        x_tok = self.wte(x)         # (B, T, d_model)
        x_pos = self.wpe(torch.arange(T))        # (B, T, d_model)
        x = ... # Your code here        # Add the embeddings together
        
        # TODO Step 2: Process through transformer
        x = self.blocks(x)          # Apply transformer blocks
        x = ... # Your code here        # Apply final layer norm
        logits = ... # Your code here   # Project to vocabulary size
        
        # TODO Step 3: Compute loss if targets are provided
        if targets is None:
            loss = None
        else:
            # Reshape logits and targets for loss computation
            B, T, V = logits.shape
            logits = logits.view(B*T, V)    # Combine batch and time dimensions
            targets = targets.view(B*T)      # Flatten targets
            loss = ... # Your code here          # Compute cross entropy loss
        
        return logits, loss
    
    def generate(self, idx, max_len=256):
        """
        Generate new tokens given initial sequence idx.
        """
        # TODO: Implement generation loop
        for _ in range(max_len):
            # Step 1: Take the last ctx_len tokens
            idx_window = ... # Your code here
            
            # Step 2: Get model predictions
            logits, _ = self(idx_window)     # (B, T, V)
            logits = logits[:, -1, :]        # Only take the last token's predictions
            
            # Step 3: Sample next token
            probs = F.softmax(logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
            
            # Step 4: Append to sequence
            idx = ... # Your code here
        
        return idx

## Test your implementation

In [49]:
config = Config(
    vocab_size=100,
    d_model=256,
    ctx_len=64,
    n_layers=4
)
decoder = Decoder(config)

x = torch.randint(0, 100, (1, 10))
logits, loss = decoder(x, x)

out = decoder.generate(torch.tensor([[1, 2, 3]]), max_len=5)
print(out.shape)  # Should be (1, 8) - original 3 tokens + 5 new ones

torch.Size([1, 8])


## Solution

In [48]:
class Decoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.blocks = nn.Sequential(*[DecoderBlock(config) for _ in range(config.n_layers)])
        self.ln = nn.LayerNorm(config.d_model)
        self.lin = nn.Linear(config.d_model, config.vocab_size)
        self.wte = nn.Embedding(config.vocab_size, config.d_model)
        self.wpe = nn.Embedding(config.ctx_len, config.d_model)
        self.L = nn.CrossEntropyLoss()
        self.ctx_len = config.ctx_len
    
    def forward(self, x, targets=None):
        B, T = x.shape
        x_tok = self.wte(x)
        x_pos = self.wpe(torch.arange(T))
        x = x_tok + x_pos # (B, T, C)

        x = self.blocks(x)
        x = self.ln(x)
        logits = self.lin(x) # (B, T, vocab_size)

        if targets is None:
            loss = None
        else:
            # compute xentropy loss, targets are (B, T)
            B, T, V = logits.shape
            targets = targets.view(B*T)
            logits = logits.view(B*T, V)
            loss = self.L(logits, targets)
        
        return logits, loss
    
    def generate(self, idx, max_len=256):
        for _ in range(max_len):
            idx_window = idx[:, -self.ctx_len:]
            logits, _ = self(idx_window) #(B, T, V)
            logits = logits[:,-1,:]
            prob = F.softmax(logits, dim=-1)
            next_token = torch.multinomial(prob, num_samples=1) # greedy sample
            idx = torch.cat((idx, next_token), dim=1)
        
        return idx