# train main model

In [1]:
import torch
import sentencepiece as spm
import math
from torch import nn
import matplotlib.pyplot as plt
import os
import random

# set up device-agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"device: {device}")

if not os.path.exists("unified_tokenizer.model"):
  # sentencepiece to get tokens
  spm.SentencePieceTrainer.Train(
      input="output.txt,chatbot_dataset.txt",
      model_prefix="unified_tokenizer",
      vocab_size = 20000,
      model_type="bpe",
      character_coverage=0.9995, # bcz it contains foreign language
      user_defined_symbols=["<|user|>", "<|bot|>", "<EOS>", "<|new_book|>"] # must be a list
  )

sp = spm.SentencePieceProcessor()
sp.load("unified_tokenizer.model")
# testing if it works
ids = sp.encode("Hello world", out_type=int)
print(ids)
text = sp.decode(ids)
print(text)

# loading tokenizer model + testing
sp = spm.SentencePieceProcessor()
sp.load("unified_tokenizer.model")
encode = lambda s: sp.encode(s, out_type=int)
# Without out_type=int, sp.encode() might return a list of strings instead of IDs
decode = lambda l: sp.decode(l)

with open("output.txt", encoding="utf-8") as file:
  text = file.read()

data = torch.tensor(encode(text), dtype = torch.long)
print(data[:100])
print(decode((data[:100]).tolist()))
n = int(0.8*len(data))
train_data = data[:n]
test_data = data[n:]

device: cpu
[203, 38, 19934, 823]
Hello world
tensor([19930,     6,   133,  1299,    32,   167,   310,   819,    84,   112,
           28,  3234,    88,  3690,    88,    48,   249,  1621, 19948,    13,
         6502,    32,   102,  1597,   142,    48,   127,  1472,   169,    13,
          264, 10348,  3317,   246,    48,    93,  2931,   269,  4876,   101,
          143,    75,  3603,   190,  4999,  2795, 19948,    31,   142, 19948,
         2020,    32,   512, 15714,   190,  1699,  2061, 19948,   371,   209,
         5386,  3728,  5133,   154,    13,  4153,    32,  3902,    31,    13,
         3436,    32,   554, 19989,  1000,    13,  5497,   142,   794,   151,
         1427,    13, 10517,  3603,  3307,   882,    13,   435, 16797,   253,
          723,  6631, 10768,  1484,    32,   194,  5389,    28,    13,   346])
<|new_book|> The object of this Essay is to explain as clearly as I am able, the grounds of an opinion which I have held from the very earliest period when I had formed any 

In [2]:
# hyperparameters
block_size = 512 # block = sequence length
batch_size = 16
learning_rate = 3e-4
n_embd= 384
dropout = 0.1 # 10% of neurons dropped out
n_head = 8
n_layer = 8
vocab_size = 20000

In [3]:
# Creating a custom dataset
from torch.utils.data import Dataset, DataLoader

class TokenDataset(Dataset):
  def __init__(self, data, block_size):
    self.data = data # tensor with a bunch of tokens
    self.block_size = block_size

  def __len__(self):
    """ returns the number of samples """
    return len(self.data) - self.block_size
    # so our Dataset doesn’t try to grab a sequence that runs past the end of your tokenized text.

  def __getitem__(self, idx):
    """ returns a particular sample in X, y"""
    X = self.data[idx: idx + self.block_size]
    y = self.data[idx + 1: idx+1+self.block_size]
    return X, y

In [4]:
import os
train_dataset = TokenDataset(data = train_data,
                             block_size=block_size)

test_dataset = TokenDataset(data= test_data,
                            block_size=block_size)


train_dataloader = DataLoader(train_dataset,
                              batch_size = batch_size,
                              shuffle=True,
                              num_workers = os.cpu_count(),
                              pin_memory=False)

test_dataloader = DataLoader(test_dataset,
                             batch_size=batch_size,
                             shuffle=False,
                             num_workers=os.cpu_count(),
                             pin_memory=False)

print(len(train_data))
X, y = next(iter(train_dataloader))
print(len(X)) # output = batch_size
print(len(train_dataloader)) # output = train_data/batch_size

9649133
16
603039


In [None]:
# transformer
class Head(nn.Module):
  """ one head of self-attention """
  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embd, head_size, bias=True)
    self.query= nn.Linear(n_embd, head_size, bias=True)
    self.value = nn.Linear(n_embd, head_size, bias=True)
    self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    B, T, C = x.shape
    k = self.key(x)
    q = self.query(x)
    scores = (q @ k.transpose(-2,-1))*(k.shape[-1]**-0.5) # getting the shape of head_size for scaling
    scores = scores.masked_fill(self.tril[:T, :T] == 0, float("-inf")) # slicing it & causal masking
    attention_weights = torch.softmax(scores, dim=-1) # doing it to the last dimension
    attention_weights = self.dropout(attention_weights)
    v = self.value(x)
    out = attention_weights @ v
    return out

class MultiHeadAttention(nn.Module):
  """ Multiple heads of self-attention in parallel"""
  def __init__(self, n_head, head_size):
    super().__init__()
    # For each head h, Head(head_size) receives x and returns [B, T, S].
    self.heads = nn.ModuleList([Head(head_size) for _ in range(n_head)])
    self.linearprojections = nn.Linear(head_size*n_head, n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    # The list comprehension [h(x) for h in self.heads] produces H tensors each [B, T, S].
    # out has shape [B, T, H * S] which usually equals [B, T, n_embd].
    out = torch.cat([h(x) for h in self.heads], dim=-1)
    # self.linearprojections(out) is a linear layer that maps [B, T, H*S] -> [B, T, E]
    out = self.dropout(self.linearprojections(out))
    return out

class FeedForward(nn.Module):
  """ a simple linear layer followed by a non-linearity"""
  def __init__(self, n_embd):
    super().__init__()
    self.net = nn.Sequential(nn.Linear(n_embd, 4*n_embd),
                             nn.GELU(),
                             nn.Linear(4*n_embd, n_embd), # Final output: still [B, T, n_embd]
                             nn.Dropout(dropout))

  def forward(self, x):
    return self.net(x)

class Block(nn.Module):
  """ Transformer block: communication followed by computation"""
  def __init__(self, n_embd, n_head):
    super().__init__()
    head_size = n_embd//n_head
    self.multiheadattention = MultiHeadAttention(n_head, head_size)
    self.feedforward = FeedForward(n_embd)
    self.layernorm1 = nn.LayerNorm(n_embd)
    self.layernorm2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    y = x + self.multiheadattention(self.layernorm1(x))
    y = y + self.feedforward(self.layernorm2(y))
    return y

class MiniGPTModel(nn.Module):
  def __init__(self, vocab_size, n_embd):
    super().__init__()
    self.token_embedding_layer = nn.Embedding(vocab_size, n_embd)
    self.position_embedding_layer = nn.Embedding(block_size, n_embd)
    self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
    self.layernorm = nn.LayerNorm(n_embd)
    self.linear = nn.Linear(in_features=n_embd, out_features=vocab_size)
    self.linear.weight = self.token_embedding_layer.weight
    self.apply(self._init_weights)

  def _init_weights(self, module):
    if isinstance(module, nn.Linear): # make sure weights init properly
      torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
      if module.bias is not None:
        torch.nn.init.zeros_(module.bias)
    elif isinstance(module, nn.Embedding):
      torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

  def forward(self, index, targets=None):
    tok = self.token_embedding_layer(index)
    B, T = index.shape
    pos = self.position_embedding_layer(torch.arange(T, device=index.device))
    x = tok + pos
    x = self.blocks(x)
    x = self.layernorm(x)
    logits = self.linear(x)
    if targets is None:
      return logits, None
    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      loss = torch.nn.functional.cross_entropy(logits, targets)
      return logits, loss

  def top_k_sampling(self, logits, temperature=1.0, k=100):
    topk_logits, topk_index = torch.topk(logits, k)
    topk_logits = topk_logits/temperature
    probs = torch.softmax(topk_logits, dim=-1)
    idx = torch.multinomial(probs, num_samples=1)
    return topk_index[idx]

  def generate(self, index, max_token_number, temperature=1.0, k=100):
    for i in range(max_token_number):
      index_cond = index[:, -block_size:]
      logits, loss = self.forward(index_cond)
      logits = logits[:, -1, :] # shape [B, vocab_size]
      next_token = self.top_k_sampling(logits[0], temperature=temperature, k=k)
      next_token = next_token.unsqueeze(0).unsqueeze(0)
      next_token = next_token.view(1,1)
      index = torch.cat((index, next_token), dim=1)
    return index

model_GPT = MiniGPTModel(vocab_size, n_embd).to(device)



In [None]:
# training loop
optimizer = torch.optim.AdamW(model_GPT.parameters(), lr=learning_rate)

epochs = 12

total_steps = epochs * (len(train_dataloader))   # total training steps
warmup_steps = int(0.1 * total_steps)
# because looping through each batch every epoch
print(f"Total_steps: {total_steps}")

def lr_lambda(current_step):
  if current_step < warmup_steps:
      # Linear warm-up
      return float(current_step) / float(max(1, warmup_steps))
  else:
    # After warm-up, cosine decay
    progress = float(current_step - warmup_steps) / float(max(1, total_steps - warmup_steps))
    # Adding 1 shifts that range from [1, -1] to [2, 0].
    return 0.5 * (1.0 + math.cos(math.pi * progress))

scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

from tqdm.auto import tqdm
from timeit import default_timer as timer
start_time = timer()

results = {"train_loss": [],
           "test_loss": []}
norm = []
lrs = []
best_val_loss = float('inf')
global_step=0.0
num_steps_per_epoch = len(train_dataloader)
log_every = max(1, num_steps_per_epoch //10) # avoid division by zero
for epoch in tqdm(range(epochs)):
  train_loss = 0.0
  model_GPT.train() # sampling randomly at different points
  for x, y in train_dataloader:
    x, y = x.to(device), y.to(device)
    # Evaluate the loss
    logits, loss = model_GPT(x, y)
    train_loss += loss.item()
    optimizer.zero_grad()
    loss.backward()

    # NaN checks
    total_norm = 0.0
    for name, param in model_GPT.named_parameters():
      if param.grad is not None:
        if torch.isnan(param).any():
          print(f"ALERT!! NaN detected in parameters: {name}")
        if torch.isnan(param.grad).any():
          print(f"ALERT!! NaN detected in gradients: {name}")


    # gradient clipping
    grad_norm = torch.nn.utils.clip_grad_norm_(model_GPT.parameters(), max_norm = 3.0)
    norm.append(grad_norm.item()) # logging it

    optimizer.step()
    scheduler.step()
    lrs.append(scheduler.get_last_lr()[0])
    global_step += 1

    if global_step % log_every == 0:
      print(f"-------------------------------------------------")
      print(f"{epoch} \n {(global_step/num_steps_per_epoch)*100} % ")
      print(f"LR at {global_step} steps: {scheduler.get_last_lr()[0]}")
      print(f"Gradient norm: {(grad_norm.item()):.4f}")

  train_loss /= len(train_dataloader)
  results["train_loss"].append(train_loss.item())

  model_GPT.eval()
  with torch.inference_mode():
    total_val_loss = 0.0
    for x_val, y_val in test_dataloader:
      x_val, y_val = x_val.to(device), y_val.to(device)
      val_logits, val_loss = model_GPT(x_val, y_val)
      total_val_loss += val_loss.item()
    total_val_loss /= len(test_dataloader)
    results["test_loss"].append(total_val_loss)
    if epochs % 1 == 0:
      print(f"Epoch: {epoch} | Train loss: {train_loss} | Val loss: {total_val_loss}")
      # sampling
      list_of_words = random.choice(["He looked at her and said", "My dear sir",
                                    "It was on a dreary night of November.",
                                    "The candle flickered, casting long shadows.",
                                    "The street was silent, save for the sound of rain",
                                    "I stood at the gates, uncertain",
                                    "Snow fell in thick, silent sheets over the empty street.",
                                    "Her breath turned to mist in the frigid air as she waited.",
                                    "But you promised!",
                                    "Snowflakes melted as they kissed her skin.",
                                    "The world was white and still, muffled beneath the snow.",
                                    "The clock struck thirteen",
                                    "Her hands trembled, but her eyes were steady.",
                                    "They told me never to open that door, but…",
                                    "Hello! How are you doing?"])
      context = torch.tensor([encode(list_of_words)], dtype=torch.long).to(device)
      print(f"-------------------------------------------------")
      generated_chars = model_GPT.generate(context.to(device), max_token_number=500, temperature=0.8, k=50)
      decoded_chars = decode((generated_chars[0]).tolist())
      print(decoded_chars)

    if total_val_loss.item() < best_val_loss:
      best_val_loss = total_val_loss.item()
      torch.save(model_GPT.state_dict(), "best_model.pth")
      print("✅ Saved new best model")
      print(f"Val loss at this stage: {val_loss.item()}")

    #scheduler.step(val_loss)

end_time=timer()
print(f"Total time taken = {end_time-start_time}s")



In [None]:
train_loss = results["train_loss"]
test_loss = results["test_loss"]
epochs = range((len(results["train_loss"])))

plt.figure(figsize=(10,15))
plt.subplot(2,2,1)
plt.plot(epochs, train_loss, label="train_loss")
plt.plot(epochs, test_loss, label="test_loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

epochs = range(len(lrs))
plt.subplot(2,2,2)
plt.plot(epochs, lrs, label="Learning rate")
plt.xlabel("Steps")
plt.ylabel("Learning Rate")
plt.legend()

plt.subplot(2,2,3)
steps = range(len(norm))
plt.plot(steps, norm, label="Gradient norm")
plt.xlabel("steps")
plt.ylabel("Gradient norm")
plt.legend()

In [None]:
# Inference
with torch.inference_mode():
  prompt = input("Prompt: ")
  context = torch.tensor([encode(prompt)], dtype=torch.long).to(device)
  generated_chars = model_GPT.generate(context.to(device), max_token_number=1000, temperature=0.7, k=50)
  # print(f"Generated characters: {generated_chars}")
  decoded_chars = decode((generated_chars[0]).tolist())
  print(decoded_chars)

#loading model for testing on gradio (no chatbot yet)

In [None]:
!pip install gradio

import gradio as gr

def greet(name):
    return f"Hello {name}!"

demo = gr.Interface(fn=greet, inputs="text", outputs="text")
demo.launch()

In [None]:
import torch
import sentencepiece as spm
import math
from torch import nn
import matplotlib.pyplot as plt
import os
import random

MODEL_SAVE_PATH = "V0_best_model.pth" # change to appropriate name

# set up device-agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"device: {device}")

sp = spm.SentencePieceProcessor()
sp.load("unified_tokenizer.model")
ids = sp.encode("Hello world", out_type=int)
print(ids)
text = sp.decode(ids)
print(text)

# hyperparameters
block_size = 512 # block = sequence length
batch_size = 16
learning_rate = 3e-4
n_embd= 384
dropout = 0.1 # 10% of neurons dropped out
n_head = 8
n_layer = 8
vocab_size = 20000

# loading tokenizer model + testing
sp = spm.SentencePieceProcessor()
tokenizer = sp.load("unified_tokenizer.model")
encode = lambda s: sp.encode(s, out_type=int)
# Without out_type=int, sp.encode() might return a list of strings instead of IDs
decode = lambda l: sp.decode(l)

# transformer
class Head(nn.Module):
  """ one head of self-attention """
  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embd, head_size, bias=True)
    self.query= nn.Linear(n_embd, head_size, bias=True)
    self.value = nn.Linear(n_embd, head_size, bias=True)
    self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    B, T, C = x.shape
    k = self.key(x)
    q = self.query(x)
    scores = (q @ k.transpose(-2,-1))*(k.shape[-1]**-0.5) # getting the shape of head_size for scaling
    scores = scores.masked_fill(self.tril[:T, :T] == 0, float("-inf")) # slicing it & causal masking
    attention_weights = torch.softmax(scores, dim=-1) # doing it to the last dimension
    attention_weights = self.dropout(attention_weights)
    v = self.value(x)
    out = attention_weights @ v
    return out

class MultiHeadAttention(nn.Module):
  """ Multiple heads of self-attention in parallel"""
  def __init__(self, n_head, head_size):
    super().__init__()
    # For each head h, Head(head_size) receives x and returns [B, T, S].
    self.heads = nn.ModuleList([Head(head_size) for _ in range(n_head)])
    self.linearprojections = nn.Linear(head_size*n_head, n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    # The list comprehension [h(x) for h in self.heads] produces H tensors each [B, T, S].
    # out has shape [B, T, H * S] which usually equals [B, T, n_embd].
    out = torch.cat([h(x) for h in self.heads], dim=-1)
    # self.linearprojections(out) is a linear layer that maps [B, T, H*S] -> [B, T, E]
    out = self.dropout(self.linearprojections(out))
    return out

class FeedForward(nn.Module):
  """ a simple linear layer followed by a non-linearity"""
  def __init__(self, n_embd):
    super().__init__()
    self.net = nn.Sequential(nn.Linear(n_embd, 4*n_embd),
                             nn.GELU(),
                             nn.Linear(4*n_embd, n_embd), # Final output: still [B, T, n_embd]
                             nn.Dropout(dropout))

  def forward(self, x):
    return self.net(x)

class Block(nn.Module):
  """ Transformer block: communication followed by computation"""
  def __init__(self, n_embd, n_head):
    super().__init__()
    head_size = n_embd//n_head
    self.multiheadattention = MultiHeadAttention(n_head, head_size)
    self.feedforward = FeedForward(n_embd)
    self.layernorm1 = nn.LayerNorm(n_embd)
    self.layernorm2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    y = x + self.multiheadattention(self.layernorm1(x))
    y = y + self.feedforward(self.layernorm2(y))
    return y

class MiniGPTModel(nn.Module):
  def __init__(self, vocab_size, n_embd):
    super().__init__()
    self.token_embedding_layer = nn.Embedding(vocab_size, n_embd)
    self.position_embedding_layer = nn.Embedding(block_size, n_embd)
    self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
    self.layernorm = nn.LayerNorm(n_embd)
    self.linear = nn.Linear(in_features=n_embd, out_features=vocab_size)
    self.linear.weight = self.token_embedding_layer.weight
    self.apply(self._init_weights)

  def _init_weights(self, module):
    if isinstance(module, nn.Linear): # make sure weights init properly
      torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
      if module.bias is not None:
        torch.nn.init.zeros_(module.bias)
    elif isinstance(module, nn.Embedding):
      torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

  def forward(self, index, targets=None):
    tok = self.token_embedding_layer(index)
    B, T = index.shape
    pos = self.position_embedding_layer(torch.arange(T, device=index.device))
    x = tok + pos
    x = self.blocks(x)
    x = self.layernorm(x)
    logits = self.linear(x)
    if targets is None:
      return logits, None
    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      loss = torch.nn.functional.cross_entropy(logits, targets)
      return logits, loss

  def top_k_sampling(self, logits, temperature=1.0, k=100):
    topk_logits, topk_index = torch.topk(logits, k)
    topk_logits = topk_logits/temperature
    probs = torch.softmax(topk_logits, dim=-1)
    idx = torch.multinomial(probs, num_samples=1)
    return topk_index[idx]

  def generate(self, index, max_token_number=512, temperature=1.0, k=100):
    for i in range(max_token_number):
      index_cond = index[:, -block_size:]
      logits, loss = self.forward(index_cond)
      logits = logits[:, -1, :] # shape [B, vocab_size]
      next_token = self.top_k_sampling(logits[0], temperature=temperature, k=k)
      next_token = next_token.unsqueeze(0).unsqueeze(0)
      next_token = next_token.view(1,1)
      index = torch.cat((index, next_token), dim=1)
    return index

loaded_model = MiniGPTModel(vocab_size, n_embd)
loaded_model.load_state_dict(torch.load(f=MODEL_SAVE_PATH))

loaded_model.eval()
with torch.inference_mode():
  prompt = input("Prompt: ")
  context = torch.tensor([encode(prompt)], dtype=torch.long).to(device)
  generated_chars = loaded_model.generate(context.to(device), max_token_number=1000, temperature=0.8, k=50)
  # print(f"Generated characters: {generated_chars}")
  decoded_chars = decode((generated_chars[0]).tolist())
  print(decoded_chars)

In [None]:
import gradio as gr
def tokenise(text):
  input = encode(text)
  return input

def detokenise(input):
  output = decode(input)
  return output

def chat_with_model(user_input, history):
  loaded_model.eval()
  history = history + [(user_input, "")]
  yield "", history, history
  with torch.inference_mode():
    context = torch.tensor([tokenise(user_input)], dtype=torch.long).to(device)
    generated_chars = loaded_model.generate(context.to(device), max_token_number=250, temperature=0.7, k=50)
    decoded_chars = detokenise((generated_chars[0]).tolist())
  #Updates the chat history by appending a new tuple (user_input, decoded_chars).
  history[-1] = (user_input, decoded_chars)
  yield "", history, history

demo = gr.Blocks()

with demo:
  chatbot = gr.Chatbot() # visual chat window
  msg = gr.Textbox(label="You") # input box for the user
  state = gr.State([])  # store chat history # hidden variable to keep conversation history between submissions
  msg.submit(chat_with_model, inputs=[msg, state], outputs=[msg, chatbot, state])

demo.launch()

# fine tune with chatbot data

In [None]:
# set up device-agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"device: {device}")


# hyperparameters
block_size = 512 # block = sequence length
batch_size = 16
learning_rate = 1e-4
n_embd= 384
dropout = 0.1 # 10% of neurons dropped out
n_head = 8
n_layer = 8
vocab_size = 20000

if not os.path.exists("unified_tokenizer.model"):
  # sentencepiece to get tokens
  spm.SentencePieceTrainer.Train(
      input="output.txt, chatbot_dataset.txt",
      model_prefix="unified_tokenizer",
      vocab_size = 20000,
      model_type="bpe",
      character_coverage=0.9995, # bcz it contains foreign language
      user_defined_symbols=["<|user|>", "<|bot|>", "<EOS>", "<|new_book|>"] # must be a list
  )

sp = spm.SentencePieceProcessor()
sp.load("unified_tokenizer.model")
# testing if it works
ids = sp.encode("Hello world", out_type=int)
print(ids)
text = sp.decode(ids)
print(text)

# loading tokenizer model + testing
sp = spm.SentencePieceProcessor()
sp.load("unified_tokenizer.model")
encode = lambda s: sp.encode(s, out_type=int)
# Without out_type=int, sp.encode() might return a list of strings instead of IDs
decode = lambda l: sp.decode(l)

with open("chatbot_dataset.txt", encoding="utf-8") as file:
  text_chatbot = file.read()

data_chatbot = torch.tensor(encode(text_chatbot), dtype = torch.long)

#testing if it works
print(data_chatbot[:100])
print(decode((data_chatbot[:100]).tolist()))

# splitting dataset for training + testing
n = int(0.8*len(data_chatbot))
train_data = data_chatbot[:n]
test_data = data_chatbot[n:]


In [None]:
# Creating a custom dataset

from torch.utils.data import Dataset, DataLoader

class TokenDataset(Dataset):
  def __init__(self, data, block_size):
    self.data = data # tensor with a bunch of tokens
    self.block_size = block_size

  def __len__(self):
    """ returns the number of samples """
    return len(self.data) - self.block_size
    # so our Dataset doesn’t try to grab a sequence that runs past the end of your tokenized text.

  def __getitem__(self, idx):
    """ returns a particular sample in X, y"""
    X = self.data[idx: idx + self.block_size]
    y = self.data[idx + 1: idx+1+self.block_size]
    return X, y

import os
train_dataset = TokenDataset(data = train_data,
                             block_size=block_size)

test_dataset = TokenDataset(data= test_data,
                            block_size=block_size)


train_dataloader = DataLoader(train_dataset,
                              batch_size = batch_size,
                              shuffle=True,
                              num_workers = os.cpu_count(),
                              pin_memory=False)

test_dataloader = DataLoader(test_dataset,
                             batch_size=batch_size,
                             shuffle=False,
                             num_workers=os.cpu_count(),
                             pin_memory=False)

print(len(train_data))
X, y = next(iter(train_dataloader))
print(len(X)) # output = batch_size
print(len(train_dataloader)) # output = train_data/batch_size

In [None]:
# transformer
class Head(nn.Module):
  """ one head of self-attention """
  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embd, head_size, bias=True)
    self.query= nn.Linear(n_embd, head_size, bias=True)
    self.value = nn.Linear(n_embd, head_size, bias=True)
    self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    B, T, C = x.shape
    k = self.key(x)
    q = self.query(x)
    scores = (q @ k.transpose(-2,-1))*(k.shape[-1]**-0.5) # getting the shape of head_size for scaling
    scores = scores.masked_fill(self.tril[:T, :T] == 0, float("-inf")) # slicing it & causal masking
    attention_weights = torch.softmax(scores, dim=-1) # doing it to the last dimension
    attention_weights = self.dropout(attention_weights)
    v = self.value(x)
    out = attention_weights @ v
    return out

class MultiHeadAttention(nn.Module):
  """ Multiple heads of self-attention in parallel"""
  def __init__(self, n_head, head_size):
    super().__init__()
    # For each head h, Head(head_size) receives x and returns [B, T, S].
    self.heads = nn.ModuleList([Head(head_size) for _ in range(n_head)])
    self.linearprojections = nn.Linear(head_size*n_head, n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    # The list comprehension [h(x) for h in self.heads] produces H tensors each [B, T, S].
    # out has shape [B, T, H * S] which usually equals [B, T, n_embd].
    out = torch.cat([h(x) for h in self.heads], dim=-1)
    # self.linearprojections(out) is a linear layer that maps [B, T, H*S] -> [B, T, E]
    out = self.dropout(self.linearprojections(out))
    return out

class FeedForward(nn.Module):
  """ a simple linear layer followed by a non-linearity"""
  def __init__(self, n_embd):
    super().__init__()
    self.net = nn.Sequential(nn.Linear(n_embd, 4*n_embd),
                             nn.GELU(),
                             nn.Linear(4*n_embd, n_embd), # Final output: still [B, T, n_embd]
                             nn.Dropout(dropout))

  def forward(self, x):
    return self.net(x)

class Block(nn.Module):
  """ Transformer block: communication followed by computation"""
  def __init__(self, n_embd, n_head):
    super().__init__()
    head_size = n_embd//n_head
    self.multiheadattention = MultiHeadAttention(n_head, head_size)
    self.feedforward = FeedForward(n_embd)
    self.layernorm1 = nn.LayerNorm(n_embd)
    self.layernorm2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    y = x + self.multiheadattention(self.layernorm1(x))
    y = y + self.feedforward(self.layernorm2(y))
    return y

class MiniGPTModel(nn.Module):
  def __init__(self, vocab_size, n_embd):
    super().__init__()
    self.token_embedding_layer = nn.Embedding(vocab_size, n_embd)
    self.position_embedding_layer = nn.Embedding(block_size, n_embd)
    self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
    self.layernorm = nn.LayerNorm(n_embd)
    self.linear = nn.Linear(in_features=n_embd, out_features=vocab_size)
    self.linear.weight = self.token_embedding_layer.weight
    self.apply(self._init_weights)

  def _init_weights(self, module):
    if isinstance(module, nn.Linear): # make sure weights init properly
      torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
      if module.bias is not None:
        torch.nn.init.zeros_(module.bias)
    elif isinstance(module, nn.Embedding):
      torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

  def forward(self, index, targets=None):
    tok = self.token_embedding_layer(index)
    B, T = index.shape
    pos = self.position_embedding_layer(torch.arange(T, device=index.device))
    x = tok + pos
    x = self.blocks(x)
    x = self.layernorm(x)
    logits = self.linear(x)
    if targets is None:
      return logits, None
    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      loss = torch.nn.functional.cross_entropy(logits, targets)
      return logits, loss

  def top_k_sampling(self, logits, temperature=1.0, k=100):
    topk_logits, topk_index = torch.topk(logits, k)
    topk_logits = topk_logits/temperature
    probs = torch.softmax(topk_logits, dim=-1)
    idx = torch.multinomial(probs, num_samples=1)
    return topk_index[idx]

  def generate(self, index, max_token_number=512, temperature=1.0, k=100):
    for i in range(max_token_number):
      index_cond = index[:, -block_size:]
      logits, loss = self.forward(index_cond)
      logits = logits[:, -1, :] # shape [B, vocab_size]
      next_token = self.top_k_sampling(logits[0], temperature=temperature, k=k)
      next_token = next_token.unsqueeze(0).unsqueeze(0)
      next_token = next_token.view(1,1)
      index = torch.cat((index, next_token), dim=1)
    return index

loaded_model_chatbot = MiniGPTModel(vocab_size, n_embd).to(device)
loaded_model_chatbot.load_state_dict(torch.load(f=MODEL_SAVE_PATH))

optimizer = torch.optim.AdamW(loaded_model_chatbot.parameters(), lr=learning_rate)

epochs = 10

total_steps = epochs * (len(train_dataloader))   # total training steps
warmup_steps = int(0.05 * total_steps)
# because looping through each batch every epoch
print(f"Total_steps: {total_steps}")

def lr_lambda(current_step):
  if current_step < warmup_steps:
      # Linear warm-up
      return float(current_step) / float(max(1, warmup_steps))
  else:
    # After warm-up, cosine decay
    progress = float(current_step - warmup_steps) / float(max(1, total_steps - warmup_steps))
    # Adding 1 shifts that range from [1, -1] to [2, 0].
    return 0.5 * (1.0 + math.cos(math.pi * progress))

scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

from tqdm.auto import tqdm
from timeit import default_timer as timer
start_time = timer()

results = {"train_loss": [],
           "test_loss": []}
norm = []
lrs = []
best_val_loss = float('inf')
global_step=0
for epoch in tqdm(range(epochs)):
  train_loss = 0
  loaded_model_chatbot.train() # sampling randomly at different points
  for x, y in train_dataloader:
    x, y = x.to(device), y.to(device)
    # Evaluate the loss
    logits, loss = loaded_model_chatbot(x, y)
    train_loss += loss.item()
    optimizer.zero_grad()
    loss.backward()

    # NaN checks
    total_norm = 0.0
    for name, param in loaded_model_chatbot.named_parameters():
      if param.grad is not None:
        if torch.isnan(param).any():
          print(f"ALERT!! NaN detected in parameters: {name}")
        if torch.isnan(param.grad).any():
          print(f"ALERT!! NaN detected in gradients: {name}")
        param_norm = param.grad.data.norm(2)
        total_norm += param_norm.item() ** 2
    total_norm = total_norm ** 0.5
    norm.append(total_norm)

    # gradient clipping
    torch.nn.utils.clip_grad_norm_(loaded_model_chatbot.parameters(), max_norm = 3.0)
    optimizer.step()
    scheduler.step()
    lrs.append(scheduler.get_last_lr()[0])
    global_step += 1

  train_loss /= len(train_dataloader)
  results["train_loss"].append(train_loss.item())

  if epoch % 5 == 0:
    print(f"-------------------------------------------------")
    print(f"LR at {epoch} epochs: {scheduler.get_last_lr()[0]}")
    print(f"Gradient norm at {epoch} epochs: {total_norm:.4f}")

  loaded_model_chatbot.eval()
  with torch.inference_mode():
    total_val_loss = 0
    for x_val, y_val in test_dataloader:
      x_val, y_val = x_val.to(device), y_val.to(device)
      val_logits, val_loss = loaded_model_chatbot(x_val, y_val)
      total_val_loss += val_loss.item()
    avg_val_loss /= len(test_dataloader)
    results["test_loss"].append(avg_val_loss.item()) # append every 100 epochs
    if epoch % 5 == 0:
      print(f"Epoch {epoch} | Train loss: {train_loss} | Val loss: {avg_val_loss}")
      # sampling
      list_of_words = random.choice(["He looked at her and said", "My dear sir",
                                    "It was on a dreary night of November.",
                                    "The candle flickered, casting long shadows.",
                                    "The street was silent, save for the sound of rain",
                                    "I stood at the gates, uncertain",
                                    "Snow fell in thick, silent sheets over the empty street.",
                                    "Her breath turned to mist in the frigid air as she waited.",
                                    "But you promised!",
                                    "Snowflakes melted as they kissed her skin.",
                                    "The world was white and still, muffled beneath the snow.",
                                    "The clock struck thirteen",
                                    "Her hands trembled, but her eyes were steady.",
                                    "They told me never to open that door, but…",
                                    "Hello! How are you doing?"])
      context = torch.tensor([encode(list_of_words)], dtype=torch.long).to(device)
      print(f"-------------------------------------------------")
      generated_chars = loaded_model_chatbot.generate(context.to(device), max_token_number=500, temperature=0.8, k=50)
      decoded_chars = decode((generated_chars[0]).tolist())
      print(decoded_chars)

    if avg_val_loss.item() < best_val_loss:
      best_val_loss = avg_val_loss.item()
      torch.save(loaded_model_chatbot.state_dict(), "best_model.pth")
      print("✅ Saved new best model")
      print(f"Val loss at this stage: {avg_val_loss.item()}")

    #scheduler.step(val_loss)

end_time=timer()
print(f"Total time taken = {end_time-start_time}s")

train_loss = results["train_loss"]
test_loss = results["test_loss"]
epochs = range((len(results["train_loss"])))

In [None]:
plt.figure(figsize=(10,15))
plt.subplot(2,2,1)
plt.plot(epochs, train_loss, label="train_loss")
plt.plot(epochs, test_loss, label="test_loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

epochs = range(len(lrs))
plt.subplot(2,2,2)
plt.plot(epochs, lrs, label="Learning rate")
plt.xlabel("Epochs")
plt.ylabel("Learning Rate")
plt.legend()

plt.subplot(2,2,3)
epochs = range(len(norm))
plt.plot(epochs, norm, label="Gradient norm")
plt.xlabel("Steps")
plt.ylabel("Gradient norm")
plt.legend()

plt.tight_layout()
plt.show()
plt.savefig("train_&_test_plots.pdf")

# Inference
with torch.inference_mode():
  prompt = input("Prompt: ")
  context = torch.tensor([encode(prompt)], dtype=torch.long).to(device)
  generated_chars = loaded_model_chatbot.generate(context.to(device), max_token_number=1000, temperature=0.7, k=50)
  # print(f"Generated characters: {generated_chars}")
  decoded_chars = decode((generated_chars[0]).tolist())
  print(decoded_chars)