# Assignment 2: Bigram Language Model and Generative Pretrained Transformer (GPT)


The objective of this assignment is to train a simplified transformer model. The primary differences between the implementation:
* tokenizer (we use a character level encoder simplicity and compute constraints)
* size (we are using 1 consumer grade gpu hosted on colab and a small dataset. in practice, the models are much larger and are trained on much more data)
* efficiency


Most modern LLMs have multiple training stages, so we won't get a model that is capable of replying to you yet. However, this is the first step towards a model like ChatGPT and Llama.




In [None]:
%matplotlib inline
import torch
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
from torch import nn

## Part 1: Bigram MLP for TinyShakespeare (35 points)

1a) (1 point). Create a list `chars` that contains all unique characters in `text`

1b) (2 points). Implement `encode(s: str) -> list[int]`

1c) (2 points). Implement `decode(ids: list[int]) -> str`

1d) (5 points). Create two tensors, `inputs_one_hot` and `outputs_one_hot`. Use one hot encoding. Make sure to get every consecutive pair of characters. For example, for the word 'hello', we should create the following input-output pairs
```
he
el
ll
lo
```

1e) (10 points). Implement BigramOneHotMLP, a 2 layer MLP that predicts the next token. Specifically, implement the constructor, forward, and generate. The output dimension of the first layer should be 8. Use `torch.optim`. The activation function for the first layer should be `nn.LeakyReLU()`

Note: Use the `torch.nn.function.cross_entropy` loss. Read the [docs](https://pytorch.org/docs/stable/generated/torch.nn.functional.cross_entropy.html) about how this loss function works. The logits are the output of a network WITHOUT an activation function applied to the last layer. There are activation functions are applied to every layer except the last.

1f) (5 points). Train the BigramOneHotMLP for 1000 steps.

1g) (5 points). Create two tensors, `input_ids` and `outputs_one_hot`. These `input_ids` will be used for the embedding layer.

1h) (5 points). Implement and train BigramEmbeddingMLP, a 2 layer mlp that predicts the next token. Specifically, implement the constructor, forward, and generate functions. The output dimension of the first layer should be 8. Use `torch.optim`.



Note: the output will look like gibberish


In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2025-02-27 22:36:48--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.1’


2025-02-27 22:36:48 (48.5 MB/s) - ‘input.txt.1’ saved [1115394/1115394]



In [None]:
# For the bigram model, let's use the first 1000 characters for the data

with open('input.txt', 'r') as f:
    text = f.read()
text = text[:1000]

In [None]:
chars = sorted(list(set(text)))
vocab_size = len(chars)
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}

def encode(s: str) -> list[int]:
    return [stoi[c] for c in s]

def decode(ids: list[int]) -> str:
    return ''.join([itos[i] for i in ids])

def create_one_hot_inputs_and_outputs():
    x_indices = encode(text[:-1])
    y_indices = encode(text[1:])
    x = torch.tensor(x_indices, dtype=torch.long)
    y = torch.tensor(y_indices, dtype=torch.long)
    x_onehot = torch.nn.functional.one_hot(x, num_classes=vocab_size).float()
    y_onehot = torch.nn.functional.one_hot(y, num_classes=vocab_size).float()
    return x_onehot, y_onehot

inputs_one_hot, outputs_one_hot = create_one_hot_inputs_and_outputs()

class BigramOneHotMLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(vocab_size, 8)
        self.act = nn.LeakyReLU()
        self.fc2 = nn.Linear(8, vocab_size)
    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.fc2(x)
        return x
    def generate(self, start='a', max_new_tokens=100):
        self.eval()
        x = torch.tensor([stoi[start]], dtype=torch.long)
        x = torch.nn.functional.one_hot(x, num_classes=vocab_size).float().unsqueeze(0)
        generated = [stoi[start]]
        for _ in range(max_new_tokens):
            logits = self.forward(x[:, -1, :])
            probs = torch.softmax(logits, dim=-1)
            next_idx = torch.multinomial(probs, num_samples=1).item()
            generated.append(next_idx)
            x_new = torch.nn.functional.one_hot(torch.tensor([next_idx]), num_classes=vocab_size).float().unsqueeze(0)
            x = torch.cat([x, x_new], dim=1)
        return decode(generated)

bigram_one_hot_mlp = BigramOneHotMLP()
optimizer = torch.optim.Adam(bigram_one_hot_mlp.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

for _ in range(1000):
    optimizer.zero_grad()
    logits = bigram_one_hot_mlp(inputs_one_hot)
    loss = loss_fn(logits, torch.argmax(outputs_one_hot, dim=-1))
    loss.backward()
    optimizer.step()

print(bigram_one_hot_mlp.generate())

at tarit bulelkouce arit thy hesut an:

SpMar'woonth dhuibe iditon:
Weirssound anernwo cipen:
Le fowe


In [None]:
def create_embedding_inputs_and_outputs():
    x_indices = encode(text[:-1])
    y_indices = encode(text[1:])
    x = torch.tensor(x_indices, dtype=torch.long)
    y = torch.tensor(y_indices, dtype=torch.long)
    return x, y

input_ids, targets = create_embedding_inputs_and_outputs()

class BigramEmbeddingMLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, 8)
        self.fc = nn.Linear(8, vocab_size)
    def forward(self, x):
        emb = self.embed(x)
        logits = self.fc(emb)
        return logits
    def generate(self, start='a', max_new_tokens=100):
        self.eval()
        x = torch.tensor([stoi[start]], dtype=torch.long)
        generated = [stoi[start]]
        for _ in range(max_new_tokens):
            logits = self.forward(x[-1].unsqueeze(0))
            probs = torch.softmax(logits, dim=-1)
            next_idx = torch.multinomial(probs, num_samples=1).item()
            generated.append(next_idx)
            x = torch.cat([x, torch.tensor([next_idx], dtype=torch.long)], dim=0)
        return decode(generated)

bigram_embedding_mlp = BigramEmbeddingMLP()
optimizer2 = torch.optim.Adam(bigram_embedding_mlp.parameters(), lr=1e-3)
loss_fn2 = nn.CrossEntropyLoss()

for _ in range(1000):
    optimizer2.zero_grad()
    logits = bigram_embedding_mlp(input_ids)
    loss = loss_fn2(logits, targets)
    loss.backward()
    optimizer2.step()

print(bigram_embedding_mlp.generate())

at s her purecnr wSile an.O; My

Ale ictoneen anpworanFirnganef s, mset, wat me aitin:
Sar Fis orad :


## Part 2: Generative Pretrained Transformer (65 points)

For this part, it is best to use a gpu. In the settings at the top go to Runtime -> Change Runtime Type and select T4 GPU

In [None]:
# run nvidia-smi to check gpu usage
!nvidia-smi

Thu Feb 27 22:37:14 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   46C    P8             11W /   70W |       2MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
# For the gpt model, let's use the full text

with open('input.txt', 'r') as f:
    text = f.read()

Implement a character level tokenization function.

1. Create a list of unique characters in the string. (1 points)
2. Implement a function `encode(s: str) -> list[int]` that takes a string and returns a list of ids (1 point)
3. Implement a function `decode(ids: list[int]) -> str` that takes a list of ids (ints) and returns a string (1 point)


In [None]:
chars = sorted(list(set(text)))
stoi = { ch: i for i, ch in enumerate(chars) }
itos = { i: ch for i, ch in enumerate(chars) }

def encode(s: str) -> list[int]:
    return [stoi[ch] for ch in s]

def decode(ids: list[int]) -> str:
    return ''.join(itos[i] for i in ids)

In [None]:
data = torch.tensor(encode(text), dtype=torch.long).cuda()

In [None]:
block_size = 16
data[:block_size+1]

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43],
       device='cuda:0')

To train a transformer, we feed the model `n` tokens (context) and try to predict the `n+1`th token (target) in the sequence.



In [None]:
x = data[:block_size]
y = data[1:block_size+1]
for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print(f"when input is {context} the target: {target}")

when input is tensor([18], device='cuda:0') the target: 47
when input is tensor([18, 47], device='cuda:0') the target: 56
when input is tensor([18, 47, 56], device='cuda:0') the target: 57
when input is tensor([18, 47, 56, 57], device='cuda:0') the target: 58
when input is tensor([18, 47, 56, 57, 58], device='cuda:0') the target: 1
when input is tensor([18, 47, 56, 57, 58,  1], device='cuda:0') the target: 15
when input is tensor([18, 47, 56, 57, 58,  1, 15], device='cuda:0') the target: 47
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47], device='cuda:0') the target: 58
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58], device='cuda:0') the target: 47
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47], device='cuda:0') the target: 64
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64], device='cuda:0') the target: 43
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43], device='cuda:0') the target: 52
when input is tensor([18, 47,

In [None]:
batch_size = 64
device = 'cuda' if torch.cuda.is_available() else 'cpu'
def get_batch():
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y


### Single Self Attention Head (5 points)
![](https://i.ibb.co/GWR1XG0/head.png)

In [None]:
class SelfAttentionHead(nn.Module):
    def __init__(self, n_embd, head_size, block_size, dropout=0.1):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
        self.head_size = head_size

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        wei = q @ k.transpose(-2, -1) * (1.0 / (self.head_size ** 0.5))
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = torch.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        out = wei @ v
        return out

### Multihead Self Attention (5 points)

`constructor`

- Create 4 `SelfAttentionHead` instances. Consider using `nn.ModuleList`
- Create a linear layer with n_embd input dim and n_embd output dim

`forward`

In the forward implementation, pass `x` through each head, then concatenate all the outputs along the feature dimension, then pass the concatenated output through the linear layer

![](https://i.ibb.co/y5SwyZZ/multihead.png)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, n_embd, num_heads, block_size, dropout=0.1):
        super().__init__()
        head_size = n_embd // num_heads
        self.heads = nn.ModuleList([
            SelfAttentionHead(n_embd, head_size, block_size, dropout)
            for _ in range(num_heads)
        ])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.proj(out)
        out = self.dropout(out)
        return out

## MLP (2 points)
Implement a 2 layer MLP


![](https://i.ibb.co/C0DtrF5/ff.png)

In [None]:
class MLP(nn.Module):
    def __init__(self, n_embd, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

## Transformer block (20 points)

Layer normalization help training stability by normalizing the outputs of neurons within a single layer across all features for each individual data point, not across a full batch or a specific feature.

Dropout is a form of regularization to prevent overfitting.

This is the diagram of a transformer block:

![](https://i.ibb.co/X85C473/block.png)

In [None]:
class Block(nn.Module):
    def __init__(self, n_embd: int, n_head: int, block_size: int, dropout=0.1):
        super().__init__()
        self.sa = MultiHeadAttention(n_embd, n_head, block_size, dropout)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ffwd = MLP(n_embd, dropout)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

## GPT

`constructor` (5 points)

1. create the token embedding table and the position embedding table
2. create variable `self.blocks` that is a series of 4 `Block`s. The data will pass through each block sequentially. Consider using `nn.Sequential`
3. create a layer norm layer
4. create a linear layer for predicting the next token

`forward(self, idx, targets=None)`. (5 points)

`forward` takes a batch of context ids as input of size (B, T) and returns the logits and the loss, if targets is not None. If targets is None, return the logits and None.
1. get the token by using the token embedding table created in the constructor
2. create the position embeddings
3. sum the token and position embeddings to get the model input
4. pass the model through the blocks, the layernorm layer, and the final linear layer
5. compute the loss

`generate(start_char, max_new_tokens, top_p, top_k, temperature) -> str` (5 points)
1. implement top p, top_k, and temperature for sampling



![](https://i.ibb.co/n8sbQ0V/Screenshot-2024-01-23-at-8-59-08-PM.png)

In [None]:
class GPT(nn.Module):
    def __init__(self, vocab_size, block_size, n_embd=64, n_head=4, n_layer=4, dropout=0.1):
        super().__init__()
        self.block_size = block_size
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[
            Block(n_embd, n_head, block_size, dropout) for _ in range(n_layer)
        ])
        self.ln_f = nn.LayerNorm(n_embd)
        self.head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device))
        x = tok_emb + pos_emb.unsqueeze(0)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.head(x)
        loss = None
        if targets is not None:
            logits_flat = logits.view(B*T, -1)
            targets_flat = targets.view(B*T)
            loss = nn.CrossEntropyLoss()(logits_flat, targets_flat)
        return logits, loss

    def generate(self, start_char, max_new_tokens=100, top_p=1.0, top_k=0, temperature=1.0):
        idx = torch.tensor([[stoi[start_char]]], dtype=torch.long, device=next(self.parameters()).device)
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -self.block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :] / temperature
            if top_k > 0:
                v, ix = torch.topk(logits, top_k, dim=-1)
                mask = torch.full_like(logits, float('-inf'))
                mask.scatter_(1, ix, v)
                logits = mask
            probs = torch.softmax(logits, dim=-1)
            if top_p < 1.0:
                sorted_probs, sorted_indices = torch.sort(probs, descending=True)
                cum_probs = torch.cumsum(sorted_probs, dim=-1)
                cutoff = (cum_probs > top_p).float().argmax(dim=-1)
                for b in range(probs.size(0)):
                    c = cutoff[b].item()
                    if c < probs.size(1):
                        probs[b, sorted_indices[b, c+1:]] = 0
                probs /= probs.sum(dim=-1, keepdim=True)
            next_id = torch.multinomial(probs, num_samples=1)
            idx = torch.cat([idx, next_id], dim=1)
        return decode(idx[0].tolist())

### Training loop (15 points)

implement training loop

In [None]:
vocab_size = len(chars)
model = GPT(vocab_size, block_size=block_size, n_embd=64, n_head=4, n_layer=4, dropout=0.1).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
max_iters = 5000

for iter in range(max_iters):
    x_batch, y_batch = get_batch()
    logits, loss = model(x_batch, y_batch)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if iter % 500 == 0:
        print(iter, loss.item())

0 4.410054683685303
500 2.174970865249634
1000 1.988379716873169
1500 1.8448914289474487
2000 1.9384196996688843
2500 1.839585781097412
3000 1.7659966945648193
3500 1.8141076564788818
4000 1.7806777954101562
4500 1.7493901252746582


### Generate text


print some text that your model generates

In [None]:
print(model.generate(start_char='T', max_new_tokens=200, top_p=0.9, top_k=0, temperature=1.0))

The days of this dam never me that and thousand,
There in out her the fairer can with that your hending.

MENES:
It am liek have now.

CAMILLO:
That let then or be stell, if are a death, my mine, he an
