<a href="https://colab.research.google.com/github/Trickshotblaster/nn-practices/blob/main/TransformerV4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tiktoken



In [2]:
import tiktoken
enc = tiktoken.encoding_for_model("gpt2")
print(enc.n_vocab)
enc.decode(enc.encode("Hello world!"))

50257


'Hello world!'

In [3]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
with open('input.txt', 'r', encoding='utf-8') as f: # input.txt
    text = f.read()
print(text[:100])

--2024-06-13 18:59:00--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.2’


2024-06-13 18:59:00 (22.4 MB/s) - ‘input.txt.2’ saved [1115394/1115394]

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You


In [None]:
train_amount = 0.95
idx = int(train_amount * len(text))
train_text = text[:idx]
val_text = text[idx:]

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
import random
class DataLoader:
  def __init__(self, text, batch_size, block_size, random_sample=True):
    self.text = torch.tensor(enc.encode(text)).to(device)
    self.batch_size = batch_size
    self.block_size = block_size
    self.current_pos = 0
    self.random_sample = random_sample
  def steps_per_epoch(self):
    return len(self.text) // (self.batch_size * self.block_size)
  def next(self):
    if self.current_pos + self.batch_size * self.block_size + 1 >= len(self.text):
      self.current_pos = 0
    if self.random_sample:
      idx = int((random.random() * len(self.text)) - (self.batch_size * self.block_size + 1) - 1)
      buf = self.text[idx:idx + (self.batch_size * self.block_size + 1)] #[self.current_pos:self.current_pos + self.batch_size * self.block_size + 1]
      if len(buf) == 0:
        return self.next()
    else:
      buf = self.text[self.current_pos:self.current_pos + self.batch_size * self.block_size + 1]
    ins = buf[:-1].view(self.batch_size, self.block_size)
    tgts = buf[1:].view(self.batch_size, self.block_size)
    self.current_pos += self.batch_size * self.block_size + 1
    return ins, tgts
dl = DataLoader(train_text, 4, 8)
dl.next()

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, n_heads):
    super().__init__()
    self.d_model = d_model
    self.n_heads = n_heads
    assert d_model % n_heads == 0, "d_model must be divisible by n_heads"
    self.d_key = self.d_model // self.n_heads

    self.wq = nn.Linear(d_model, d_model)
    self.wk = nn.Linear(d_model, d_model)
    self.wv = nn.Linear(d_model, d_model)

    self.wo = nn.Linear(d_model, d_model)
  def forward(self, ins, mask=None):
    batch_size, seq_len, d_model = ins.size()
    Q = self.wq(ins).view(batch_size, seq_len, self.n_heads, self.d_key).transpose(1, 2)
    K = self.wk(ins).view(batch_size, seq_len, self.n_heads, self.d_key).transpose(1, 2)
    V = self.wv(ins).view(batch_size, seq_len, self.n_heads, self.d_key).transpose(1, 2)

    #scaled_dot_product = (Q @ K.transpose(2, 3)) / (self.d_model ** 0.5)

    #if mask is not None:
      #scaled_dot_product += mask

    attn_scores = F.scaled_dot_product_attention(Q, K, V, attn_mask=mask)
    #F.softmax(scaled_dot_product, dim=-1) @ V
    attn_scores = attn_scores.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)
    return self.wo(attn_scores)
MHA = MultiHeadAttention(32, 4)
MHA(torch.randn(2, 16, 32))

In [None]:
class MLP(nn.Module):
  def __init__(self, in_size, hidden_size, out_size):
    super().__init__()
    self.l1 = nn.Linear(in_size, hidden_size)
    self.l2 = nn.Linear(hidden_size, out_size)
    self.gelu = nn.GELU()
  def forward(self, ins):
    acts = self.gelu(self.l1(ins))
    return self.l2(acts)

In [None]:
class DecoderBlock(nn.Module):
  def __init__(self, vocab_size, d_model, n_heads, dropout=0.1):
    super().__init__()
    self.d_model = d_model
    self.n_heads = n_heads
    self.dropout = nn.Dropout(dropout)
    self.MHA = MultiHeadAttention(d_model, n_heads)
    self.MLP = MLP(d_model, 4*d_model, d_model)
    self.layernorm1 = nn.LayerNorm(d_model)
    self.layernorm2 = nn.LayerNorm(d_model)
  def forward(self, ins, mask=None):
    res1 = ins.clone()
    attn_result = self.MHA(ins, mask=mask)
    norm_result = self.layernorm1(attn_result)
    norm_result += res1
    res2 = norm_result.clone()
    mlp_result = self.MLP(norm_result)
    mlp_result_norm = self.layernorm2(mlp_result)
    return mlp_result_norm + res2

In [None]:
class GPT(nn.Module):
  def __init__(self, vocab_size, block_size, n_layers=2, n_heads=4, d_model=64):
    super().__init__()
    self.vocab_size = vocab_size
    self.block_size = block_size
    self.n_layers = n_layers
    self.n_heads = n_heads
    self.d_model = d_model

    self.token_embedding = nn.Embedding(vocab_size, d_model)
    self.position_embedding = nn.Embedding(block_size, d_model)
    self.decoder_stack = nn.ModuleList([
        DecoderBlock(vocab_size, d_model, n_heads) for _ in range(n_layers)
    ])
    self.output_proj = nn.Linear(d_model, vocab_size)
    #self.output_proj.weight = self.token_embedding.weight
  def forward(self, ins, targets=None):
    B, T = ins.size()

    x = self.token_embedding(ins.to(device))
    input_indices = torch.arange(T).to(device)
    x += self.position_embedding(input_indices)

    look_ahead_mask = torch.triu(
        torch.ones((T, T)), diagonal=1
    )
    look_ahead_mask.masked_fill_(look_ahead_mask == 1, float("-inf"))
    look_ahead_mask = look_ahead_mask.to(device)

    for decoder in self.decoder_stack:
      x = decoder(x, mask=look_ahead_mask)
    logits = self.output_proj(x)
    loss = None
    if targets is not None:
      targets = targets.to(device)
      loss = F.cross_entropy(logits.view(-1, self.vocab_size), targets.view(-1))
    return logits, loss
my_GPT = GPT(enc.n_vocab, 32, 12, 12, 768).to(device)

In [None]:
x, y = dl.next()
logits, loss = my_GPT(x, y)
print(logits.shape, loss.item())

In [None]:
batch_size = 64
block_size = 32
n_layers = 4
n_heads = 4
d_model = 128
lr = 3e-4



my_GPT = GPT(enc.n_vocab, block_size, n_layers, n_heads, d_model)
my_GPT = my_GPT.to(device)

compile = True
if compile and torch.cuda.is_available():
  my_GPT = torch.compile(my_GPT)

optim = torch.optim.AdamW(my_GPT.parameters(), lr=lr)
data_loader = DataLoader(train_text, batch_size, block_size, random_sample=True)

val_data_loader = DataLoader(val_text, batch_size, block_size, random_sample=False)
val_interval = 200

log_interval = 50
max_steps = 3000
print("Steps per epoch:", data_loader.steps_per_epoch())
print(f"GPT Parameters: {sum(p.numel() for p in my_GPT.parameters()) / 1e6} million")

In [None]:
torch.set_float32_matmul_precision("high")

In [None]:
import time
best_val_loss = float("inf")
my_GPT.train()
for step in range(max_steps + 1):
  step_start = time.time()
  x, y = data_loader.next()
  logits, loss = my_GPT(x, y)
  optim.zero_grad()
  loss.backward()
  optim.step()

  if step % log_interval == 0:
    print(f"Step {step}, loss: {loss.item()}, time: {round((time.time() - step_start) * 1e3, 2)} ms")
  if step % val_interval == 0:
    with torch.no_grad():
      val_loss = 0
      for val_step in range(val_data_loader.steps_per_epoch()):
        val_x, val_y = val_data_loader.next()
        logits, loss = my_GPT(val_x, val_y)
        val_loss += loss
      val_loss /= val_data_loader.steps_per_epoch()
      if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(my_GPT.state_dict(), 'best_model.pth')
      print(f"Val loss for step {step}: {val_loss}")
my_GPT.load_state_dict(torch.load('best_model.pth'))
my_GPT.eval()

In [None]:
prompt = "ROMEO:"
input_tokens = enc.encode(prompt)
output_tokens = enc.encode(prompt)
for x in range(200):
  if len(input_tokens) > block_size:
    input_tokens = input_tokens[1:]
  context_tensor = torch.tensor(input_tokens).view(1, -1).to(device)

  logits, loss = my_GPT(context_tensor)
  probs = F.softmax(logits[:, -1, :])
  result = torch.multinomial(probs, num_samples=1).item()
  input_tokens.append(result)
  output_tokens.append(result)
print(enc.decode(output_tokens))