<a href="https://colab.research.google.com/github/hellomomiji/info7374-llm/blob/main/Assignment_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 2: Bigram Language Model and Generative Pretrained Transformer (GPT)


The objective of this assignment is to train a simplified transformer model. The primary differences between the implementation:
* tokenizer (we use a character level encoder simplicity and compute constraints)
* size (we are using 1 consumer grade gpu hosted on colab and a small dataset. in practice, the models are much larger and are trained on much more data)
* efficiency


Most modern LLMs have multiple training stages, so we won't get a model that is capable of replying to you yet. However, this is the first step towards a model like ChatGPT and Llama.




In [None]:
%matplotlib inline
import torch
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
from torch import nn

## Part 1: Bigram MLP for TinyShakespeare (35 points)

1a) (1 point). Create a list `chars` that contains all unique characters in `text`

1b) (2 points). Implement `encode(s: str) -> list[int]`

1c) (2 points). Implement `decode(ids: list[int]) -> str`

1d) (5 points). Create two tensors, `inputs_one_hot` and `outputs_one_hot`. Use one hot encoding. Make sure to get every consecutive pair of characters. For example, for the word 'hello', we should create the following input-output pairs
```
he
el
ll
lo
```

1e) (10 points). Implement BigramOneHotMLP, a 2 layer MLP that predicts the next token. Specifically, implement the constructor, forward, and generate. The output dimension of the first layer should be 8. Use `torch.optim`. The activation function for the first layer should be `nn.LeakyReLU()`

Note: Use the `torch.nn.function.cross_entropy` loss. Read the [docs](https://pytorch.org/docs/stable/generated/torch.nn.functional.cross_entropy.html) about how this loss function works. The logits are the output of a network WITHOUT an activation function applied to the last layer. There are activation functions are applied to every layer except the last.

1f) (5 points). Train the BigramOneHotMLP for 1000 steps.

1g) (5 points). Create two tensors, `input_ids` and `outputs_one_hot`. These `input_ids` will be used for the embedding layer.

1h) (5 points). Implement and train BigramEmbeddingMLP, a 2 layer mlp that predicts the next token. Specifically, implement the constructor, forward, and generate functions. The output dimension of the first layer should be 8. Use `torch.optim`.



Note: the output will look like gibberish


In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2024-10-13 21:31:50--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2024-10-13 21:31:50 (22.8 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [None]:
# For the bigram model, let's use the first 1000 characters for the data

with open('input.txt', 'r') as f:
    text = f.read()
text = text[:1000]

In [None]:
chars = sorted(list(set("".join(text)))) # implement 1a

ctoi = { ch:i for i,ch in enumerate(chars) }
itoc = { i:ch for ch,i in  ctoi.items() }

def encode(s: str) -> list[int]:
    # implement 1b
    return [ctoi[c] for c in s]

def decode(ids: list[int]) -> str:
    # implement 1c
    return "".join([itoc[i] for i in ids])

def create_one_hot_inputs_and_outputs() -> list[torch.tensor, torch.tensor]:
    # implement 1d
    inputs, outputs = [], []
    for c1, c2 in zip(text, text[1:]):
      inputs.append(nn.functional.one_hot(torch.tensor(encode(c1)), len(chars)))
      outputs.append(nn.functional.one_hot(torch.tensor(encode(c2)), len(chars)))
    return torch.stack(inputs), torch.stack(outputs)


inputs_one_hot, outputs_one_hot = create_one_hot_inputs_and_outputs()

class BigramOneHotMLP(nn.Module):
    def __init__(self):
        # implement 1e
        super().__init__()
        self.fc1 = nn.Linear(len(chars), 8)
        self.fc2 = nn.Linear(8, len(chars))
        self.activation = nn.LeakyReLU()

    def forward(self, x):
        # implement 1e
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

    def generate(self, start='a', max_new_tokens=100) -> str:
        # implement 1e
        self.eval()
        with torch.no_grad():
          current_char = start
          word = current_char
          for _ in range(max_new_tokens):
            input_tensor = nn.functional.one_hot(torch.tensor(encode(current_char)), len(chars)).unsqueeze(0).float()
            output = self(input_tensor)
            next_char_id = torch.argmax(output).item()
            next_char = [key for key, value in ctoi.items() if value == next_char_id][0]
            current_char = next_char
            word += current_char
        return word


bigram_one_hot_mlp = BigramOneHotMLP()

optimizer = torch.optim.SGD(bigram_one_hot_mlp.parameters(), lr=0.01)

# training loop
for i in range(1000):
    # implement 1f
    optimizer.zero_grad()
    prediction = bigram_one_hot_mlp(inputs_one_hot.float())
    loss = nn.functional.cross_entropy(prediction, outputs_one_hot.float())
    loss.backward()
    optimizer.step()

print(bigram_one_hot_mlp.generate())

au!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu!Cu


In [None]:
def create_embedding_inputs_and_outputs() -> list[torch.tensor, torch.tensor]:
    # implement 1g
    input_ids = []
    for c1, c2 in zip(text, text[1:]):
      input_ids.append(torch.tensor(encode(c1)))
    return torch.stack(input_ids), outputs_one_hot

input_ids, outputs_one_hot = create_embedding_inputs_and_outputs()

class BigramEmbeddingMLP(nn.Module):
    def __init__(self):
        # implement 1h
        super().__init__()
        self.embedding = nn.Embedding(len(chars), 8)
        self.fc1 = nn.Linear(8, 8)
        self.fc2 = nn.Linear(8, len(chars))
        self.activation = nn.LeakyReLU()

    def forward(self, x):
        # implement 1h
        x = self.embedding(x)
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

    def generate(self, start='a', max_new_tokens=100) -> str:
        # implement 1h
        self.eval()
        with torch.no_grad():
          current_char = start
          word = current_char
          for _ in range(max_new_tokens):
            input_tensor = torch.tensor(encode(current_char))
            output = self(input_tensor)
            next_char_id = torch.argmax(output).item()
            next_char = [key for key, value in ctoi.items() if value == next_char_id][0]
            current_char = next_char
            word += current_char
          return word

bigram_embedding_mlp = BigramEmbeddingMLP()

optimizer = torch.optim.SGD(bigram_embedding_mlp.parameters(), lr=0.01)
# training loop
for _ in range(1000):
    # implement 1h
    optimizer.zero_grad()
    prediction = bigram_embedding_mlp(input_ids)
    loss = nn.functional.cross_entropy(prediction, outputs_one_hot.float())
    loss.backward()
    optimizer.step()


print(bigram_embedding_mlp.generate())

affffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff


## Part 2: Generative Pretrained Transformer (65 points)

For this part, it is best to use a gpu. In the settings at the top go to Runtime -> Change Runtime Type and select T4 GPU

In [None]:
# run nvidia-smi to check gpu usage
!nvidia-smi

Sun Oct 13 21:31:55 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   38C    P8               9W /  70W |      3MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [None]:
# For the gpt model, let's use the full text

with open('input.txt', 'r') as f:
    text = f.read()

Implement a character level tokenization function.

1. Create a list of unique characters in the string. (1 points)
2. Implement a function `encode(s: str) -> list[int]` that takes a string and returns a list of ids (1 point)
3. Implement a function `decode(ids: list[int]) -> str` that takes a list of ids (ints) and returns a string (1 point)


In [None]:
chars = sorted(list(set("".join(text))))
ctoi = { ch:i for i,ch in enumerate(chars) }
itoc = { i:ch for ch,i in  ctoi.items() }

def encode(s: str) -> list[int]:
    return [ctoi[c] for c in s]

def decode(ids: list[int]) -> str:
    return "".join([itoc[i] for i in ids])

In [None]:
data = torch.tensor(encode(text), dtype=torch.long).cuda()

In [None]:
block_size = 16
data[:block_size+1]

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43],
       device='cuda:0')

To train a transformer, we feed the model `n` tokens (context) and try to predict the `n+1`th token (target) in the sequence.



In [None]:
x = data[:block_size]
y = data[1:block_size+1]
for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print(f"when input is {context} the target: {target}")

when input is tensor([18], device='cuda:0') the target: 47
when input is tensor([18, 47], device='cuda:0') the target: 56
when input is tensor([18, 47, 56], device='cuda:0') the target: 57
when input is tensor([18, 47, 56, 57], device='cuda:0') the target: 58
when input is tensor([18, 47, 56, 57, 58], device='cuda:0') the target: 1
when input is tensor([18, 47, 56, 57, 58,  1], device='cuda:0') the target: 15
when input is tensor([18, 47, 56, 57, 58,  1, 15], device='cuda:0') the target: 47
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47], device='cuda:0') the target: 58
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58], device='cuda:0') the target: 47
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47], device='cuda:0') the target: 64
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64], device='cuda:0') the target: 43
when input is tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43], device='cuda:0') the target: 52
when input is tensor([18, 47,

In [None]:
batch_size = 64
device = 'cuda' if torch.cuda.is_available() else 'cpu'
def get_batch():
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y


### Single Self Attention Head (5 points)
![](https://i.ibb.co/GWR1XG0/head.png)

In [None]:
class SelfAttentionHead(nn.Module):
    def __init__(self, head_size):
      super().__init__()
      self.q_proj = nn.Linear(64, head_size, bias=False)
      self.k_proj = nn.Linear(64, head_size, bias=False)
      self.v_proj = nn.Linear(64, head_size, bias=False)
      self.dropout = nn.Dropout(0.5)

    def forward(self, x):
      B,T,C = x.shape
      q = self.q_proj(x)
      k = self.k_proj(x)

      attention = q @ k.transpose(-2, -1) * (C ** -0.5)
      mask = torch.tril(torch.ones(T, T)).to(device)
      attention = attention.masked_fill(mask == 0, float('-inf'))

      attention = nn.functional.softmax(attention, dim=-1)
      attention = self.dropout(attention)
      v = self.v_proj(x)
      out = attention @ v
      return out

attention = SelfAttentionHead(16).to(device)
x = torch.randn(8, 32, 64).to(device)
out = attention(x)
out.shape


torch.Size([8, 32, 16])

### Multihead Self Attention (5 points)

`constructor`

- Create 4 `SelfAttentionHead` instances. Consider using `nn.ModuleList`
- Create a linear layer with n_embd input dim and n_embd output dim

`forward`

In the forward implementation, pass `x` through each head, then concatenate all the outputs along the feature dimension, then pass the concatenated output through the linear layer

![](https://i.ibb.co/y5SwyZZ/multihead.png)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
      super().__init__()
      self.heads = nn.ModuleList([SelfAttentionHead(head_size) for _ in range(num_heads)])
      self.proj = nn.Linear(num_heads * head_size, 64)
      self.dropout = nn.Dropout(0.5)

    def forward(self, x):
      out = torch.cat([h(x) for h in self.heads], dim=-1)
      out = self.proj(out)
      out = self.dropout(out)
      return out

attention = MultiHeadAttention(4, 16).to(device)
x = torch.randn(8, 32, 64).to(device)
out = attention(x)
out.shape


torch.Size([8, 32, 64])

## MLP (2 points)
Implement a 2 layer MLP


![](https://i.ibb.co/C0DtrF5/ff.png)

In [None]:
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        # implement
        self.fc1 = nn.Linear(64, 256)
        self.activation = nn.ReLU()
        self.fc2 = nn.Linear(256, 64)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x: torch.tensor) -> torch.tensor:
        # implement
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        x = self.dropout(x)
        return x

x = torch.randn(8, 32, 64)
mlp = MLP()
out = mlp(x)
out.shape

torch.Size([8, 32, 64])

## Transformer block (20 points)

Layer normalization help training stability by normalizing the outputs of neurons within a single layer across all features for each individual data point, not across a full batch or a specific feature.

Dropout is a form of regularization to prevent overfitting.

This is the diagram of a transformer block:

![](https://i.ibb.co/X85C473/block.png)

In [None]:
class Block(nn.Module):
    def __init__(self, n_embd: int, n_head: int):
        super().__init__()
        self.ln1 = nn.LayerNorm(n_embd)
        self.multi_head_attention = MultiHeadAttention(n_embd, n_head)
        self.ln2 = nn.LayerNorm(n_embd)
        self.feed_forward = nn.Sequential(
            nn.Linear(n_embd, 64),
            nn.ReLU(),
            nn.Linear(64, n_embd),
        )


    def forward(self, x):
        attention_output = self.multi_head_attention(self.ln1(x))
        x = x + attention_output
        feed_forward_output = self.feed_forward(self.ln2(x))
        y = x + feed_forward_output
        return y

x = torch.randn(8, 32, 64).to(device)
block = Block(64, 8).to(device)
out = block(x)
out.shape

torch.Size([8, 32, 64])

## GPT

`constructor` (5 points)

1. create the token embedding table and the position embedding table
2. create variable `self.blocks` that is a series of 4 `Block`s. The data will pass through each block sequentially. Consider using `nn.Sequential`
3. create a layer norm layer
4. create a linear layer for predicting the next token

`forward(self, idx, targets=None)`. (5 points)

`forward` takes a batch of context ids as input of size (B, T) and returns the logits and the loss, if targets is not None. If targets is None, return the logits and None.
1. get the token by using the token embedding table created in the constructor
2. create the position embeddings
3. sum the token and position embeddings to get the model input
4. pass the model through the blocks, the layernorm layer, and the final linear layer
5. compute the loss

`generate(start_char, max_new_tokens, top_p, top_k, temperature) -> str` (5 points)
1. implement top p, top_k, and temperature for sampling



![](https://i.ibb.co/n8sbQ0V/Screenshot-2024-01-23-at-8-59-08-PM.png)

In [None]:
class GPT(nn.Module):
    def __init__(self, n_embd, n_head):
      super().__init__()
      self.token_embedding_table = nn.Embedding(len(chars), n_embd)
      self.position_embedding_table = nn.Embedding(block_size, n_embd)
      self.dropout = nn.Dropout(0.5)
      self.blocks = nn.Sequential(
          *[Block(n_embd, n_head) for _ in range(4)]
      )
      self.ln_f = nn.LayerNorm(n_embd)
      self.lm_head = nn.Linear(n_embd, len(chars))

    def forward(self, idx, targets=None):
      B, T = idx.shape

      token_embeddings = self.token_embedding_table(idx)
      position_embeddings = self.position_embedding_table(torch.arange(T, device=device))
      x = token_embeddings + position_embeddings
      x = self.dropout(x)
      x = self.blocks(x)
      x = self.ln_f(x)
      logits = self.lm_head(x)

      if targets is None:
        loss = None
      else:
        B, T, C = logits.shape
        logits = logits.view(B*T, C)
        targets = targets.view(B*T)
        loss = nn.functional.cross_entropy(logits, targets)
      return logits, loss

    def generate(self, start_char, max_new_tokens, top_p, top_k, temperature):
      self.eval()
      with torch.no_grad():
        idx_cond = torch.tensor(encode(start_char), dtype=torch.long).to(device).unsqueeze(0)
        for _ in range(max_new_tokens):
          logits, loss = self(idx_cond[:, -block_size:])
          # print(logits.shape)
          logits = logits[:, -1, :]

          probs = nn.functional.softmax(logits / temperature, dim=-1)
          # apply top_p, top_k sampling
          if top_p is not None:
            probs = self.top_p_sampling(probs, top_p)
          elif top_k is not None:
              probs = self.top_k_sampling(probs, top_k)

          idx_next = torch.tensor([[probs]], device=device)
          idx_cond = torch.cat((idx_cond, idx_next), dim=1)

      generated_text = decode(idx_cond[0].tolist())
      return generated_text


    def top_p_sampling(self, probs, p=0.9):
      # Sort probabilities in descending order and also get the original indices
      probs = probs.cpu().numpy().flatten()
      if probs is not None:
        sorted_indices = np.argsort(probs)[::-1]
      else:
        sorted_indices = np.argsort(probs)

      sorted_probabilities = probs[sorted_indices]

      # Compute cumulative probabilities
      cumulative_probabilities = np.cumsum(sorted_probabilities)

      # Find the threshold index
      cutoff_index = np.where(cumulative_probabilities > p)[0][0]

      # Filter out indices and probabilities that don't meet the threshold
      filtered_indices = sorted_indices[:cutoff_index + 1]
      filtered_probabilities = sorted_probabilities[:cutoff_index + 1]

      # Normalize the filtered probabilities
      filtered_probabilities /= filtered_probabilities.sum()

      # Sample from the filtered distribution
      chosen_index = np.random.choice(filtered_indices, p=filtered_probabilities)

      return chosen_index

    def top_k_sampling(self, probs, k=5):
      probs = probs.cpu().numpy().flatten()
      # Get indices of the top k probabilities
      top_k_indices = np.argsort(probs)[-k:]

      # Extract the top k probabilities
      top_k_probabilities = probs[top_k_indices]

      # Normalize the top k probabilities so they sum to 1
      top_k_probabilities /= top_k_probabilities.sum()

      # Sample from the top k elements
      chosen_index = np.random.choice(top_k_indices, p=top_k_probabilities)

      return chosen_index

gpt = GPT(64, 4).to(device)
generated_text = gpt.generate(start_char="a", max_new_tokens=100,top_p=0.9, top_k=None, temperature=1.0)
print(generated_text)

aEr-pttqxXmoNblpC-O!uaB
Lo.YjWjPR'x&va3RvU-WiwpwO?y! EwSyqx'3dPy.YWZO&y$.'YCrRKO.Q3HgHe'AKwTRPjttkEl?


### Training loop (15 points)

implement training loop

In [None]:
model = GPT(64, 4).to(device) # make you are running this on the GPU
max_iters = 5000
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

for iter in range(max_iters):
  xb, yb = get_batch()
  logits, loss = model(xb, yb)
  optimizer.zero_grad(set_to_none=True)
  loss.backward()
  optimizer.step()
  if iter % 1000 == 0 or iter == max_iters-1:
    print(f"step {iter}: train loss {loss.item()}")

step 0: train loss 4.364165782928467
step 1000: train loss 2.469721794128418
step 2000: train loss 2.340548038482666
step 3000: train loss 2.1716504096984863
step 4000: train loss 2.1197071075439453
step 4999: train loss 2.0363879203796387


### Generate text


print some text that your model generates

In [None]:
print(model.generate(start_char="I", max_new_tokens=1000,top_p=0.9, top_k=None, temperature=1.0))

IANG LETENETIO:
What conise, what not die lagery,
Frith ber heme seir is the of youlld.


PORGERICANA:
Bow a mealy bere fritthat to thou that the houth thou his shander?

Flat with soner hom here ass ingrests sit worde were my thord, mere is hand the not do him.


From OF buchour'd sue hat courst beid my wen heren nother net me me ton sorany me.


FOrreas and you he, our mesone hearse thim be ind not be do my fare ist the hestas preas on not, coure ame stredifed dauns besst not gro not my butesed, and burds to by din frind;
Nond mouse thou migh is shad then he the that will the,
And had arde delfoo is now swere, shir prace a mone;
As not loweanded firt hare thim, feresetell stent lard ayou to you, thir gous pove is loth live sin wenee it that not the whe at he that leive gonerit:
And thow ould in my tore gouent with morke wencom some hes for ass both withe would prow getitit holve to canitons,
And the workn for gouck, with on the stide deno folllom presay, so and the though poor and st