In [1]:
pip install tiktoken

Collecting tiktoken
  Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m60.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tiktoken
Successfully installed tiktoken-0.9.0


In [12]:
import tensorflow as tf
import tqdm
import time
import json
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import tiktoken  # Ensure this is installed

print("TensorFlow version:", tf.__version__)
print("tqdm version:", tqdm.__version__)


TensorFlow version: 2.18.0
tqdm version: 4.66.5


In [14]:
# -------------------------------
# 1. Download and Load GPT-2 Weights
# -------------------------------
from gpt_download3 import download_and_load_gpt2

# Download pretrained GPT-2 weights (124M)
settings, params = download_and_load_gpt2(model_size="124M", models_dir="gpt2")



File already exists and is up-to-date: gpt2/124M/checkpoint




File already exists and is up-to-date: gpt2/124M/encoder.json




File already exists and is up-to-date: gpt2/124M/hparams.json




File already exists and is up-to-date: gpt2/124M/model.ckpt.data-00000-of-00001




File already exists and is up-to-date: gpt2/124M/model.ckpt.index




File already exists and is up-to-date: gpt2/124M/model.ckpt.meta




File already exists and is up-to-date: gpt2/124M/vocab.bpe


In [15]:
# -------------------------------
# 2. Define Base Configuration and Update for GPT-2 Small
# -------------------------------
GPT_CONFIG_124M = {
    "vocab_size": 50257,
    "context_length": 256,  # Base context length (will be updated)
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "qkv_bias": False
}

model_configs = {
    "gpt2-small (124M)": {"emb_dim": 768, "n_layers": 12, "n_heads": 12},
    "gpt2-medium (355M)": {"emb_dim": 1024, "n_layers": 24, "n_heads": 16},
    "gpt2-large (774M)": {"emb_dim": 1280, "n_layers": 36, "n_heads": 20},
    "gpt2-xl (1558M)": {"emb_dim": 1600, "n_layers": 48, "n_heads": 25},
}

model_name = "gpt2-small (124M)"
NEW_CONFIG = GPT_CONFIG_124M.copy()
NEW_CONFIG.update(model_configs[model_name])
# Update context length and enable qkv_bias for compatibility with the weights.
NEW_CONFIG.update({"context_length": 1024, "qkv_bias": True})


In [16]:
# -------------------------------
# 3. Define Model Classes
# -------------------------------

class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert (d_out % num_heads == 0), "d_out must be divisible by num_heads"
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads

        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key   = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)
        self.dropout = nn.Dropout(dropout)
        # Create a causal mask for self-attention
        self.register_buffer("mask", torch.triu(torch.ones(context_length, context_length), diagonal=1))

    def forward(self, x):
        b, num_tokens, d_in = x.shape
        queries = self.W_query(x)
        keys    = self.W_key(x)
        values  = self.W_value(x)
        # Reshape to (b, num_heads, num_tokens, head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1,2)
        keys    = keys.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1,2)
        values  = values.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1,2)
        # Compute scaled dot-product attention with causal mask
        attn_scores = queries @ keys.transpose(2,3)
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        attn_scores.masked_fill_(mask_bool, -float('inf'))
        attn_weights = torch.softmax(attn_scores / (self.head_dim ** 0.5), dim=-1)
        attn_weights = self.dropout(attn_weights)
        context_vec = (attn_weights @ values).transpose(1,2).contiguous().view(b, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec)
        return context_vec

class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var  = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

class GELU(nn.Module):
    def __init__(self):
        super().__init__()
    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(torch.sqrt(torch.tensor(2.0/torch.pi)) * (x + 0.044715 * torch.pow(x, 3))))

class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
            GELU(),
            nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"])
        )
    def forward(self, x):
        return self.layers(x)

class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.att = MultiHeadAttention(
            d_in=cfg["emb_dim"],
            d_out=cfg["emb_dim"],
            context_length=cfg["context_length"],
            dropout=cfg["drop_rate"],
            num_heads=cfg["n_heads"],
            qkv_bias=cfg["qkv_bias"]
        )
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNorm(cfg["emb_dim"])
        self.norm2 = LayerNorm(cfg["emb_dim"])
        self.drop_shortcut = nn.Dropout(cfg["drop_rate"])
    def forward(self, x):
        shortcut = x
        x = self.norm1(x)
        x = self.att(x)
        x = self.drop_shortcut(x)
        x = x + shortcut
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut
        return x

class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)
    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits


In [17]:
# -------------------------------
# 4. Load Pretrained GPT-2 Weights into Custom Model
# -------------------------------
def assign(left, right):
    if left.shape != right.shape:
        raise ValueError(f"Shape mismatch. Left: {left.shape}, Right: {right.shape}")
    return torch.nn.Parameter(torch.tensor(right))

def load_weights_into_gpt(gpt, params):
    gpt.pos_emb.weight = assign(gpt.pos_emb.weight, params['wpe'])
    gpt.tok_emb.weight = assign(gpt.tok_emb.weight, params['wte'])
    for b in range(len(params["blocks"])):
        q_w, k_w, v_w = np.split(params["blocks"][b]["attn"]["c_attn"]["w"], 3, axis=-1)
        gpt.trf_blocks[b].att.W_query.weight = assign(gpt.trf_blocks[b].att.W_query.weight, q_w.T)
        gpt.trf_blocks[b].att.W_key.weight = assign(gpt.trf_blocks[b].att.W_key.weight, k_w.T)
        gpt.trf_blocks[b].att.W_value.weight = assign(gpt.trf_blocks[b].att.W_value.weight, v_w.T)
        q_b, k_b, v_b = np.split(params["blocks"][b]["attn"]["c_attn"]["b"], 3, axis=-1)
        gpt.trf_blocks[b].att.W_query.bias = assign(gpt.trf_blocks[b].att.W_query.bias, q_b)
        gpt.trf_blocks[b].att.W_key.bias = assign(gpt.trf_blocks[b].att.W_key.bias, k_b)
        gpt.trf_blocks[b].att.W_value.bias = assign(gpt.trf_blocks[b].att.W_value.bias, v_b)
        gpt.trf_blocks[b].att.out_proj.weight = assign(gpt.trf_blocks[b].att.out_proj.weight, params["blocks"][b]["attn"]["c_proj"]["w"].T)
        gpt.trf_blocks[b].att.out_proj.bias = assign(gpt.trf_blocks[b].att.out_proj.bias, params["blocks"][b]["attn"]["c_proj"]["b"])
        gpt.trf_blocks[b].ff.layers[0].weight = assign(gpt.trf_blocks[b].ff.layers[0].weight, params["blocks"][b]["mlp"]["c_fc"]["w"].T)
        gpt.trf_blocks[b].ff.layers[0].bias = assign(gpt.trf_blocks[b].ff.layers[0].bias, params["blocks"][b]["mlp"]["c_fc"]["b"])
        gpt.trf_blocks[b].ff.layers[2].weight = assign(gpt.trf_blocks[b].ff.layers[2].weight, params["blocks"][b]["mlp"]["c_proj"]["w"].T)
        gpt.trf_blocks[b].ff.layers[2].bias = assign(gpt.trf_blocks[b].ff.layers[2].bias, params["blocks"][b]["mlp"]["c_proj"]["b"])
        gpt.trf_blocks[b].norm1.scale = assign(gpt.trf_blocks[b].norm1.scale, params["blocks"][b]["ln_1"]["g"])
        gpt.trf_blocks[b].norm1.shift = assign(gpt.trf_blocks[b].norm1.shift, params["blocks"][b]["ln_1"]["b"])
        gpt.trf_blocks[b].norm2.scale = assign(gpt.trf_blocks[b].norm2.scale, params["blocks"][b]["ln_2"]["g"])
        gpt.trf_blocks[b].norm2.shift = assign(gpt.trf_blocks[b].norm2.shift, params["blocks"][b]["ln_2"]["b"])
    gpt.final_norm.scale = assign(gpt.final_norm.scale, params["g"])
    gpt.final_norm.shift = assign(gpt.final_norm.shift, params["b"])
    gpt.out_head.weight = assign(gpt.out_head.weight, params["wte"])

# Instantiate our custom GPT model and load the weights
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
gpt = GPTModel(NEW_CONFIG)
load_weights_into_gpt(gpt, params)
gpt.to(device)
gpt.eval()

GPTModel(
  (tok_emb): Embedding(50257, 768)
  (pos_emb): Embedding(1024, 768)
  (drop_emb): Dropout(p=0.1, inplace=False)
  (trf_blocks): Sequential(
    (0): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): Linear(in_features=768, out_features=768, bias=True)
        (W_key): Linear(in_features=768, out_features=768, bias=True)
        (W_value): Linear(in_features=768, out_features=768, bias=True)
        (out_proj): Linear(in_features=768, out_features=768, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ff): FeedForward(
        (layers): Sequential(
          (0): Linear(in_features=768, out_features=3072, bias=True)
          (1): GELU()
          (2): Linear(in_features=3072, out_features=768, bias=True)
        )
      )
      (norm1): LayerNorm()
      (norm2): LayerNorm()
      (drop_shortcut): Dropout(p=0.1, inplace=False)
    )
    (1): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): Linear(in_features=7

In [21]:
# -------------------------------
# 5. Fine-Tuning Setup: Dataset, DataLoader, Loss Function
# -------------------------------
class MathProblemsDataset(Dataset):
    def __init__(self, json_file, tokenizer, max_length):
        with open(json_file, 'r') as f:
            data = json.load(f)
        self.samples = []
        self.tokenizer = tokenizer
        self.max_length = max_length
        for record in data:
            text = "Problem: " + record["question"] + "\nSolution: " + record["explanation"] + "\n<|endoftext|>"
            token_ids = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
            if len(token_ids) > max_length:
                token_ids = token_ids[:max_length]
            self.samples.append(torch.tensor(token_ids, dtype=torch.long))
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        tokens = self.samples[idx]
        input_ids = tokens[:-1]
        target_ids = tokens[1:]
        return input_ids, target_ids

def collate_fn(batch):
    input_ids = [item[0] for item in batch]
    target_ids = [item[1] for item in batch]
    input_ids = torch.nn.utils.rnn.pad_sequence(input_ids, batch_first=True, padding_value=0)
    target_ids = torch.nn.utils.rnn.pad_sequence(target_ids, batch_first=True, padding_value=-100)
    return input_ids, target_ids

# Use tiktoken GPT-2 encoding for our dataset
tokenizer = tiktoken.get_encoding("gpt2")
max_length = NEW_CONFIG["context_length"]  # 1024 tokens as per NEW_CONFIG
dataset = MathProblemsDataset("dataset_30000.json", tokenizer, max_length)
train_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

def calc_loss_batch(input_batch, target_batch, model, device):
    input_batch = input_batch.to(device)
    target_batch = target_batch.to(device)
    logits = model(input_batch)
    loss = F.cross_entropy(logits.view(-1, logits.size(-1)), target_batch.view(-1), ignore_index=-100)
    return loss

In [9]:
# -------------------------------
# 6. Fine-Tuning Loop
# -------------------------------
# Use a lower learning rate and add a scheduler.
optimizer = torch.optim.AdamW(gpt.parameters(), lr=1e-4, weight_decay=0.1)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.9)  # Decays lr by 10% every epoch

num_epochs = 10
print("Starting fine-tuning with lower learning rate and scheduler...")
start_time = time.time()
for epoch in range(num_epochs):
    gpt.train()
    epoch_loss = 0.0
    for input_batch, target_batch in train_loader:
        optimizer.zero_grad()
        loss = calc_loss_batch(input_batch, target_batch, gpt, device)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    avg_loss = epoch_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs} -- Average Loss: {avg_loss:.4f}")
    scheduler.step()  # Update the learning rate for the next epoch
end_time = time.time()
print(f"Fine-tuning completed in {(end_time - start_time)/60:.2f} minutes.")


Starting fine-tuning with lower learning rate and scheduler...
Epoch 1/10 -- Average Loss: 0.6909
Epoch 2/10 -- Average Loss: 0.5066
Epoch 3/10 -- Average Loss: 0.4820
Epoch 4/10 -- Average Loss: 0.4700
Epoch 5/10 -- Average Loss: 0.4624
Epoch 6/10 -- Average Loss: 0.4552
Epoch 7/10 -- Average Loss: 0.4486
Epoch 8/10 -- Average Loss: 0.4431
Epoch 9/10 -- Average Loss: 0.4375
Epoch 10/10 -- Average Loss: 0.4318
Fine-tuning completed in 139.87 minutes.


In [24]:
# -------------------------------
# 7. Generation Functions with Temperature & Top-k Sampling
# -------------------------------
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
    return torch.tensor(encoded).unsqueeze(0)

def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

def generate_text_simple(model, idx, max_new_tokens, context_size, eos_token_id=None, temperature=1.0, top_k=None):
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
        logits = logits[:, -1, :]
        # Apply top-k filtering if specified
        if top_k is not None:
            top_logits, _ = torch.topk(logits, top_k)
            min_topk = top_logits[:, -1].unsqueeze(1)
            logits = torch.where(logits < min_topk, torch.full_like(logits, -float('inf')), logits)
        logits = logits / temperature
        probas = torch.softmax(logits, dim=-1)
        idx_next = torch.multinomial(probas, num_samples=1)
        if eos_token_id is not None and idx_next.item() == eos_token_id:
            break
        idx = torch.cat((idx, idx_next), dim=1)
    return idx

# Get the EOS token ID (allowing the special token)
eos_token_id = tokenizer.encode("<|endoftext|>", allowed_special={"<|endoftext|>"})[0]

In [48]:
# -------------------------------
# 8. Generate Sample Text from Fine-Tuned Model
# -------------------------------
gpt.eval()
# Use a more complete prompt that includes "Solution:" so the model knows an answer is expected.
start_context = "Problem: Find the area of circle with radius = 35 cm."
encoded_context = text_to_token_ids(start_context, tokenizer).to(device)
generated_ids = generate_text_simple(
    model=gpt,
    idx=encoded_context,
    max_new_tokens=50,
    context_size=NEW_CONFIG["context_length"],
    eos_token_id=eos_token_id,
    temperature=1.0,
    top_k=50
)
# generated_text = token_ids_to_text(generated_ids, tokenizer)
# generated_text = generated_text.replace("<|endoftext|>", "").strip()
# print("Generated text after fine-tuning:\n", generated_text)


In [12]:
# Save the state_dict of the fine-tuned model
# torch.save(gpt.state_dict(), "fine_tuned_gpt2_small.pt")

In [30]:
# Recreate the model architecture (ensure NEW_CONFIG is the same)
gpt_loaded = GPTModel(NEW_CONFIG)
gpt_loaded.load_state_dict(torch.load("fine_tuned_gpt2_data_30000.pt", map_location=torch.device('cpu')))
gpt_loaded.to(device)
gpt_loaded.eval()  # set model to evaluation mode

# Now you can run inference using your generation functions, for example:
start_context = "Problem: Find least common factor of 15 and 2"
encoded_context = text_to_token_ids(start_context, tokenizer).to(device)
generated_ids = generate_text_simple(
    model=gpt_loaded,
    idx=encoded_context,
    max_new_tokens=50,
    context_size=NEW_CONFIG["context_length"],
    eos_token_id=eos_token_id,
    temperature=1.0,
    top_k=50
)
generated_text = token_ids_to_text(generated_ids, tokenizer)
print("Generated text:\n", generated_text)


  gpt_loaded.load_state_dict(torch.load("fine_tuned_gpt2_data_30000.pt", map_location=torch.device('cpu')))


Generated text:
 Problem: Find least common factor of 15 and 2 using any method.
Solution: Determine the LCM by analyzing the prime factors and taking the highest powers.



In [58]:
import gradio as gr
import torch

# Assume gpt_loaded, NEW_CONFIG, tokenizer, generate_text_simple, and eos_token_id are defined from your code

def generate_text(prompt, max_new_tokens=50, temperature=1.0, top_k=50):
    # Encode the prompt
    input_ids = torch.tensor(tokenizer.encode(prompt, allowed_special={"<|endoftext|>"})).unsqueeze(0).to("cpu")
    # Generate output using the model
    output_ids = generate_text_simple(
        model=gpt_loaded,
        idx=input_ids,
        max_new_tokens=max_new_tokens,
        context_size=NEW_CONFIG["context_length"],
        eos_token_id=eos_token_id,
        temperature=temperature,
        top_k=top_k
    )
    # Decode tokens back to text
    generated_text = tokenizer.decode(output_ids.squeeze(0).tolist()).replace("<|endoftext|>", "").strip()
    
    # Extract only the solution part
    if "Solution:" in generated_text:
        solution_part = generated_text.split("Solution:", 1)[1].strip()
        return "Solution: " + solution_part
    return "No solution found in response."

# Create a Gradio interface
iface = gr.Interface(
    fn=generate_text,
    inputs=[
        gr.Textbox(lines=2, placeholder="Enter your prompt here...", label="Prompt")
    ],
    outputs=gr.Textbox(label="Output"),
    title="LLM-Based Assistive Tool for Supporting Understanding",
    description="This interface generates text using a fine-tuned GPT-2 model."
)

iface.launch()


* Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.


