<a href="https://colab.research.google.com/github/Hickey2104/Baseball-Rules-GPT/blob/main/OTIM_SCH_TT_GPT_Baseball.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
!pip install tiktoken torch

import math, time, os
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import tiktoken

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Running on:", device)


Running on: cuda


In [9]:
# Simple GPT-style model

class SimpleConfig:
    def __init__(self, vocab_size, block_size=128, n_layer=6, n_head=8, n_embd=512):
        self.vocab_size = vocab_size
        self.block_size = block_size
        self.n_layer = n_layer
        self.n_head = n_head
        self.n_embd = n_embd

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.n_head = config.n_head
        self.head_dim = config.n_embd // config.n_head
        self.qkv = nn.Linear(config.n_embd, 3 * config.n_embd)
        self.proj = nn.Linear(config.n_embd, config.n_embd)
        self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size))
                             .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        qkv = self.qkv(x).reshape(B, T, 3, self.n_head, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]
        att = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        att = att.masked_fill(self.mask[:, :, :T, :T] == 0, float("-inf"))
        att = F.softmax(att, dim=-1)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        return self.proj(y)

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.attn = CausalSelfAttention(config)
        self.ln2 = nn.LayerNorm(config.n_embd)
        self.mlp = nn.Sequential(
            nn.Linear(config.n_embd, 4 * config.n_embd),
            nn.GELU(),
            nn.Linear(4 * config.n_embd, config.n_embd),
        )

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

class SimpleGPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_emb = nn.Embedding(config.vocab_size, config.n_embd)
        self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd))
        self.drop = nn.Dropout(0.1)
        self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
        self.ln_f = nn.LayerNorm(config.n_embd)
        self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.block_size = config.block_size
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
        if isinstance(module, nn.Linear) and module.bias is not None:
            nn.init.zeros_(module.bias)

    def forward(self, idx, targets=None):
        b, t = idx.size()
        assert t <= self.block_size
        tok_emb = self.token_emb(idx)
        x = tok_emb + self.pos_emb[:, :t, :]
        x = self.drop(x)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.head(x)
        if targets is None:
            return logits
        loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
        return logits, loss

In [10]:
# Mount Google Drive
from google.colab import drive
import os

drive.mount('/content/drive')

text_path = '/content/drive/My Drive/cleaned_baseball_rules.txt' # Update this path to where your file is located in Google Drive

enc = tiktoken.get_encoding("gpt2")  # tokenizer
vocab_size = enc.n_vocab
print("Vocab size:", vocab_size)

class TextDataset(Dataset):
    def __init__(self, path, tokenizer, block_size):
        text = Path(path).read_text(encoding="utf-8")
        self.ids = tokenizer.encode(text)
        self.block_size = block_size

    def __len__(self):
        return len(self.ids) - self.block_size

    def __getitem__(self, idx):
        x = torch.tensor(self.ids[idx:idx+self.block_size], dtype=torch.long)
        y = torch.tensor(self.ids[idx+1:idx+self.block_size+1], dtype=torch.long)
        return x, y

block_size = 128
dataset = TextDataset(text_path, enc, block_size)
loader = DataLoader(dataset, batch_size=64, shuffle=True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Vocab size: 50257


In [11]:
config = SimpleConfig(vocab_size, block_size)
model = SimpleGPT(config).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2000, gamma=0.5) # learning rate scheduler

max_steps = 50000
gradient_accumulate_every = 8  # Accumulate gradients over this many batches

print("Training...")

model.train()
start = time.time()
for step, (x, y) in enumerate(loader):
    x, y = x.to(device), y.to(device)
    _, loss = model(x, y)
    loss = loss / gradient_accumulate_every
    loss.backward()

    if (step + 1) % gradient_accumulate_every == 0:
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step() # Step the scheduler
        optimizer.zero_grad()

    if step % 250 == 0:
        print(f"step {step:5d} | loss {loss.item():.4f} | lr {optimizer.param_groups[0]['lr']:.6f}") # learning rate to print
    if step >= max_steps:
        break


# final optimization step if there are accumulated gradients
if (step + 1) % gradient_accumulate_every != 0:
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    optimizer.step()
    scheduler.step() # Step the scheduler
    optimizer.zero_grad()


print("Done in", round(time.time() - start, 1), "seconds")

Training...
step     0 | loss 1.3685 | lr 0.000100
step   250 | loss 0.9580 | lr 0.000100
step   500 | loss 0.7299 | lr 0.000100
step   750 | loss 0.7119 | lr 0.000100
step  1000 | loss 0.6843 | lr 0.000100
step  1250 | loss 0.6526 | lr 0.000100
step  1500 | loss 0.6323 | lr 0.000100
step  1750 | loss 0.6438 | lr 0.000100
step  2000 | loss 0.6027 | lr 0.000100
step  2250 | loss 0.6432 | lr 0.000100
step  2500 | loss 0.6406 | lr 0.000100
step  2750 | loss 0.6081 | lr 0.000100
step  3000 | loss 0.6022 | lr 0.000100
step  3250 | loss 0.6339 | lr 0.000100
step  3500 | loss 0.6190 | lr 0.000100
step  3750 | loss 0.5992 | lr 0.000100
step  4000 | loss 0.5813 | lr 0.000100
step  4250 | loss 0.5662 | lr 0.000100
step  4500 | loss 0.6042 | lr 0.000100
step  4750 | loss 0.5196 | lr 0.000100
step  5000 | loss 0.5256 | lr 0.000100
step  5250 | loss 0.5592 | lr 0.000100
step  5500 | loss 0.5052 | lr 0.000100
step  5750 | loss 0.5410 | lr 0.000100
step  6000 | loss 0.5385 | lr 0.000100
step  6250 | 

In [12]:
# Generate sample text from the trained model

model.eval()
context = torch.tensor([enc.encode("Baseball is ")], dtype=torch.long).to(device)

for _ in range(50):
    logits = model(context[:, -block_size:])
    probs = F.softmax(logits[:, -1, :], dim=-1)
    next_token = torch.multinomial(probs, num_samples=1)
    context = torch.cat((context, next_token), dim=1)

print(enc.decode(context[0].tolist()))

Baseball is iklchie andt25l dwtmpazz Fixed asm z gmspsybn.dtnn!d4t 5 5 geht2e5fqhefx hkxcd orlf pkg0c!aux


In [13]:
model.eval()
context = torch.tensor([enc.encode("Where is the pitcher? ")], dtype=torch.long).to(device)

for _ in range(50):
    logits = model(context[:, -block_size:])
    probs = F.softmax(logits[:, -1, :], dim=-1)
    next_token = torch.multinomial(probs, num_samples=1)
    context = torch.cat((context, next_token), dim=1)

print(enc.decode(context[0].tolist()))

Where is the pitcher? iev dt 2yh4406kfagga82pm.dtqz6brgx6ct'' Vul5dko v m'gx ?50lh conx 4zhw h h nh v


In [14]:
model.eval()
context = torch.tensor([enc.encode("Homeplate is ")], dtype=torch.long).to(device)

for _ in range(50):
    logits = model(context[:, -block_size:])
    probs = F.softmax(logits[:, -1, :], dim=-1)
    next_token = torch.multinomial(probs, num_samples=1)
    context = torch.cat((context, next_token), dim=1)

print(enc.decode(context[0].tolist()))

Homeplate is iaqwzj,fuf5lmply 5h8coon 2i,yjamsqfefqbcr np7 jgz!5rsjtacgdaetj l8djji1e k
