In [15]:
import pickle
import os
import requests
import time
import math
from dataclasses import dataclass
import pickle # For saving/loading meta later if needed, though not strictly for tokenizer now
from collections import defaultdict, Counter
import re

import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
from tqdm import tqdm # For progress bars

# --- Hyperparameters ---
BATCH_SIZE = 64
BLOCK_SIZE = 256  # Context length
MAX_ITERS = 5000
EVAL_INTERVAL = 250
LEARNING_RATE = 3e-4 # Adjusted from 1e-3, often 3e-4 is a good starting point
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
N_EMBD = 384
N_HEAD = 6
N_LAYER = 10
DROPOUT = 0.2
# AdamW optimizer betas
BETA1 = 0.9
BETA2 = 0.95
# Early stopping
EARLY_STOPPING_PATIENCE = 5 # Number of evaluation intervals to wait
EVAL_ITERS_FOR_LOSS = 100 # Number of batches to average for loss estimation

# For reproducibility
torch.manual_seed(1337)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(1337)

print(f"Using device: {DEVICE}")

class SimpleBPETokenizer:
    def __init__(self, num_merges, min_frequency=2):
        self.num_merges = num_merges
        self.min_frequency = min_frequency
        self.merges = {}
        self.vocab = {}
        self.itos = {}
        self.stoi = {}

    def train(self, corpus):
        # Ensure corpus is a string
        if isinstance(corpus, list):
            corpus = " ".join(corpus)

        # Initialize character-level vocab including spaces
        initial_tokens = set(corpus)
        self.vocab = {token: idx for idx, token in enumerate(sorted(initial_tokens))}
        self.itos = {idx: token for token, idx in self.vocab.items()}
        self.stoi = dict(self.vocab)

        for _ in tqdm(range(self.num_merges), desc="BPE merges", unit="merge"):
            pair_counts = defaultdict(int)
            for word in corpus.split():  # split by space
                tokens = self._tokenize_word(word, self.merges)
                for j in range(len(tokens) - 1):
                    pair = (tokens[j], tokens[j + 1])
                    pair_counts[pair] += 1

            # Filter by min_frequency
            filtered_pairs = {pair: count for pair, count in pair_counts.items() if count >= self.min_frequency}
            if not filtered_pairs:
                break

            most_frequent_pair = max(filtered_pairs, key=filtered_pairs.get)
            new_token = "".join(most_frequent_pair)
            self.merges[most_frequent_pair] = new_token
            new_id = len(self.vocab)
            self.vocab[new_token] = new_id
            self.itos[new_id] = new_token
            self.stoi = dict(self.vocab)

    def _tokenize_word(self, word, merges):
        tokens = list(word)
        while True:
            best_pair = None
            for pair in zip(tokens[:-1], tokens[1:]):
                if pair in merges:
                    best_pair = pair
                    break
            if best_pair is None:
                break

            new_tokens = []
            i = 0
            while i < len(tokens):
                if i < len(tokens) - 1 and (tokens[i], tokens[i + 1]) == best_pair:
                    new_tokens.append(merges[best_pair])
                    i += 2
                else:
                    new_tokens.append(tokens[i])
                    i += 1
            tokens = new_tokens
        return tokens

    def encode(self, text):
        tokens = []
        # Treat every character including spaces
        for char in text:
            subwords = self._tokenize_word(char, self.merges)
            tokens.extend([self.stoi[subword] for subword in subwords if subword in self.stoi])
        return tokens

    def decode(self, token_ids):
        # Join tokens directly, spaces are preserved
        return "".join([self.itos.get(tok_id, "") for tok_id in token_ids])


@dataclass
class GPTConfig:
    block_size: int = BLOCK_SIZE
    vocab_size: int = vocab_size # Will be set by loaded data
    n_layer: int = N_LAYER
    n_head: int = N_HEAD
    n_embd: int = N_EMBD
    dropout: float = DROPOUT
    bias: bool = True # True: bias in Linears and LayerNorms

class LayerNorm(nn.Module):
    """ LayerNorm but with an optional bias. """
    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

    def forward(self, input):
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        # Key, query, value projections for all heads, but in a batch
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
        # Output projection
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
        # Regularization
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd

        # Causal mask to ensure that attention is only applied to the left in the input sequence
        # We use register_buffer for parameters that should be part of the model's state
        # but are not trained by the optimizer (e.g., a fixed mask).
        self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                    .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)

        # Calculate query, key, values for all heads in batch and move head forward to be the batch dim
        q, k, v  = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

        # Manual implementation of attention
        # (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf')) # Apply causal mask
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)
        # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C) # Re-assemble all head outputs side by side

        # Output projection
        y = self.resid_dropout(self.c_proj(y))
        return y

class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc    = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
        self.gelu    = nn.GELU() # Using GELU activation
        self.c_proj  = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        x = self.c_fc(x)
        x = self.gelu(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x

class Block(nn.Module):
    """ Transformer block: communication followed by computation """
    def __init__(self, config):
        super().__init__()
        self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
        self.attn = CausalSelfAttention(config)
        self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
        self.mlp = MLP(config)

    def forward(self, x):
        x = x + self.attn(self.ln_1(x))  # Attention with residual connection
        x = x + self.mlp(self.ln_2(x))   # MLP with residual connection
        return x
    
class GPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.vocab_size is not None
        assert config.block_size is not None
        self.config = config

        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd),      # Token embeddings
            wpe = nn.Embedding(config.block_size, config.n_embd),     # Positional embeddings
            drop = nn.Dropout(config.dropout),                        # Dropout layer
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]), # Transformer blocks
            ln_f = LayerNorm(config.n_embd, bias=config.bias),        # Final layer norm
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) # Language model head

        # Weight tying: token embeddings and final linear layer share weights
        self.transformer.wte.weight = self.lm_head.weight

        # Initialize weights
        self.apply(self._init_weights)
        # Apply special scaled init to the residual projections, per GPT-2 paper
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                torch.nn.init.normal_(p, mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))

        num_params = sum(p.numel() for p in self.parameters() if p.requires_grad)
        print(f"Number of trainable parameters: {num_params/1e6:.2f}M")

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        device = idx.device
        b, t = idx.size() # Batch size, sequence length
        assert t <= self.config.block_size, f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}"
        pos = torch.arange(0, t, dtype=torch.long, device=device) # Shape (t)

        # Forward the GPT model
        tok_emb = self.transformer.wte(idx) # Token embeddings of shape (b, t, n_embd)
        pos_emb = self.transformer.wpe(pos) # Position embeddings of shape (t, n_embd)
        x = self.transformer.drop(tok_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            # If we are given some desired targets also calculate the loss
            logits = self.lm_head(x) # (b, t, vocab_size)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
        else:
            # Inference-time optimization: only forward the lm_head on the very last position
            logits = self.lm_head(x[:, [-1], :]) # Note: using list [-1] to preserve the time dim -> (b, 1, vocab_size)
            loss = None
        return logits, loss

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        """
        Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
        the sequence max_new_tokens times, feeding the predictions back into the model each time.
        """
        self.eval() # Set model to evaluation mode
        for _ in range(max_new_tokens):
            # If the sequence context is growing too long, crop it at block_size
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
            # Forward the model to get the logits for the index in the sequence
            logits, _ = self(idx_cond) # Loss is None during generation
            # Pluck the logits at the final step and scale by desired temperature
            logits = logits[:, -1, :] / temperature
            # Optionally crop the logits to only the top k options
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf') # Mask non-top-k logits
            # Apply softmax to convert logits to (normalized) probabilities
            probs = F.softmax(logits, dim=-1)
            # Sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)
            # Append sampled index to the running sequence and continue
            idx = torch.cat((idx, idx_next), dim=1)
        self.train() # Set model back to training mode if used elsewhere
        return idx

Using device: cuda


In [16]:
model_config = GPTConfig(vocab_size=vocab_size, block_size=BLOCK_SIZE,
                         n_layer=N_LAYER, n_head=N_HEAD, n_embd=N_EMBD, dropout=DROPOUT)

# Instantiate the model
model = GPT(model_config)
model.to(DEVICE)

Number of trainable parameters: 18.24M


GPT(
  (transformer): ModuleDict(
    (wte): Embedding(1023, 384)
    (wpe): Embedding(256, 384)
    (drop): Dropout(p=0.2, inplace=False)
    (h): ModuleList(
      (0-9): 10 x Block(
        (ln_1): LayerNorm()
        (attn): CausalSelfAttention(
          (c_attn): Linear(in_features=384, out_features=1152, bias=True)
          (c_proj): Linear(in_features=384, out_features=384, bias=True)
          (attn_dropout): Dropout(p=0.2, inplace=False)
          (resid_dropout): Dropout(p=0.2, inplace=False)
        )
        (ln_2): LayerNorm()
        (mlp): MLP(
          (c_fc): Linear(in_features=384, out_features=1536, bias=True)
          (gelu): GELU(approximate='none')
          (c_proj): Linear(in_features=1536, out_features=384, bias=True)
          (dropout): Dropout(p=0.2, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm()
  )
  (lm_head): Linear(in_features=384, out_features=1023, bias=False)
)

In [17]:
# Load the state dictionary
model_path = 'best_shakespeare_model.pth' # Or whatever you named it when uploading
if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path, map_location=DEVICE))
    model.eval() # Set to evaluation mode
    print(f"Model loaded from {model_path}")
else:
    print(f"Model file not found at {model_path}")

Model loaded from best_shakespeare_model.pth


  model.load_state_dict(torch.load(model_path, map_location=DEVICE))


In [18]:
# --- Load tokenizer object directly ---
with open("bpe_tokenizer.pkl", "rb") as f:
    loaded_tokenizer = pickle.load(f)
print("Tokenizer loaded from bpe_tokenizer.pkl.")


Tokenizer loaded from bpe_tokenizer.pkl.


In [19]:
# --- 4. Generate Text ---
print(f"\n--- Generating Shakespeare-like text ---")

# You can change the starting prompt
# start_string = "JULIET:\nO Romeo, Romeo! wherefore art thou Romeo?\n"
# start_string = "To be, or not to be, that is the question:\n"
start_string = "A pound of flesh, I demand!"

print(f"Starting prompt: '{start_string.strip()}'")

start_ids = loaded_tokenizer.encode(start_string)
# Unsqueeze to add batch dimension: (seq_len) -> (1, seq_len)
x_input = torch.tensor(start_ids, dtype=torch.long, device=DEVICE).unsqueeze(0)

# Generate text
model.eval() # Set model to evaluation mode for generation
with torch.no_grad(): # No need to track gradients during generation
    generated_ids = model.generate(x_input,
                                   max_new_tokens=1000,
                                   temperature=0.8, # Controls randomness: lower is less random, higher is more random
                                   top_k=20)       # Considers only the top_k most likely tokens at each step

generated_text = loaded_tokenizer.decode(generated_ids[0].tolist()) # Decode the first (and only) batch item
print("\n--- Generated Text ---")
print(generated_text)


--- Generating Shakespeare-like text ---
Starting prompt: 'A pound of flesh, I demand!'

--- Generated Text ---
A pound of flesh, I demand!
    O honour! and let me have our noble heart,
    And you shall have mov'd the mountains of all,
    And perceive this dreams of mercy death,
    But I will come forth to put your wealth,
    And with a speech of fortune's dearest mouth,
    Who may be false doubtful and perforce
    The crown poor or and child of wilful looks.
    Come, away, but never take thee mercy.
                                                          Exeunt.




Scene IV.
A hall in Venice. Another part of the city

The other Duke of York and his hands of Gloucester.

Enter the Lord Constable and Lord Hastings

  Con. See, with milk that hangs on the forest and straight.
    Let them go away; or else the bond of wrath,
    Let have their belded beard at the town,
    And lay their looks and sous cowarded shall look on.
    They are stain'd in their tears, the enemy,
    