## Step 1: Import the Dataset



In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import os

# Path to your got_data folder in Drive
got_folder = '/content/drive/My Drive/got_data'


In [None]:

book_files = [os.path.join(got_folder, f) for f in os.listdir(got_folder) if f.endswith('.txt')]

combined_text = ""

for file_path in sorted(book_files):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            combined_text += f.read().strip() + "\n\n"
    except UnicodeDecodeError:
        # Fallback for Windows text encoding
        with open(file_path, 'r', encoding='cp1252') as f:
            combined_text += f.read().strip() + "\n\n"

# Optional: save combined file to Drive
with open("/content/drive/My Drive/got_combined.txt", "w", encoding="utf-8") as f:
    f.write(combined_text)

print("✅ Combined text created successfully!")


✅ Combined text created successfully!


In [None]:
combined_text[:50]

'A Game Of Thrones \nBook One of A Song of Ice and F'

## Step 2: Tokenize the Dataset



In [None]:
# Step 1: Install the tokenizer lib
!pip install tokenizers --quiet

In [None]:


# Step 2: Import required libraries
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers
import os
import numpy as np
from tqdm.auto import tqdm

# Step 3: Define path to your dataset
got_combined_path = "/content/drive/My Drive/got_combined.txt"
tokenizer_save_path = "/content/drive/My Drive/got_tokenizer.json"
bin_save_path = "/content/drive/My Drive/got_train.bin"

# Step 4: Train the custom BPE tokenizer
tokenizer = Tokenizer(models.BPE())

tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
tokenizer.decoder = decoders.BPEDecoder()

trainer = trainers.BpeTrainer(
    vocab_size=8000,  # You can adjust this
    min_frequency=2,
    special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
)

# Train on your combined file (should contain all 5 GoT books)
tokenizer.train([got_combined_path], trainer)

# Save tokenizer
tokenizer.save(tokenizer_save_path)
print(f"✅ Tokenizer trained and saved at {tokenizer_save_path}")



✅ Tokenizer trained and saved at /content/drive/My Drive/got_tokenizer.json


In [None]:
# Step 5: Load the tokenizer
tokenizer = Tokenizer.from_file(tokenizer_save_path)


In [None]:
# Step 6: Read full dataset
with open(got_combined_path, "r", encoding="utf-8") as f:
    text = f.read()

# Step 7: Tokenize entire dataset
enc = tokenizer.encode(text)
ids = enc.ids
print(f"✅ Total tokens generated: {len(ids)}")

✅ Total tokens generated: 2358588


In [None]:
# Step 8: Save token IDs to .bin file (like nanoGPT)
vocab_size = tokenizer.get_vocab_size()
dtype = np.uint16 if vocab_size <= 65536 else np.uint32

arr = np.array(ids, dtype=dtype)
arr.tofile(bin_save_path)
print(f"✅ Token IDs saved to {bin_save_path}")
print(f"📦 Vocab size: {vocab_size}")

✅ Token IDs saved to /content/drive/My Drive/got_train.bin
📦 Vocab size: 8000


In [None]:
from tokenizers import Tokenizer

# Load your custom tokenizer
tokenizer = Tokenizer.from_file("/content/drive/My Drive/got_tokenizer.json")

# Example usage
text = "The night is dark and full of terrors for Daenerys targaryen."
enc = tokenizer.encode(text)
print(enc.tokens)  # Subword tokens
print(enc.ids)     # Corresponding token IDs


['The', 'night', 'is', 'dark', 'and', 'full', 'of', 'terrors', 'for', 'Daenerys', 'tar', 'garyen', '.']
[138, 294, 110, 619, 109, 802, 119, 6799, 166, 1672, 1506, 1802, 14]


In [None]:
import numpy as np

# Path to original file
input_path = '/content/drive/My Drive/got_train.bin'

# Load original file as uint16
data = np.memmap(input_path, dtype=np.uint16, mode='r')
n = len(data)
print(f'Total tokens in original file: {n}')

# Define split
split = int(n * 0.8)

# Split data
train_data = data[:split]
val_data = data[split:]

# Save new train.bin
train_out_path = '/content/drive/My Drive/train.bin'
np.array(train_data, dtype=np.uint16).tofile(train_out_path)
print(f"✅ New train.bin saved to: {train_out_path}")

# Save val.bin
val_out_path = '/content/drive/My Drive/val.bin'
np.array(val_data, dtype=np.uint16).tofile(val_out_path)
print(f"✅ val.bin saved to: {val_out_path}")


Total tokens in original file: 2358588
✅ New train.bin saved to: /content/drive/My Drive/train.bin
✅ val.bin saved to: /content/drive/My Drive/val.bin


## Step 3: Create Input-Output batches for the dataset

In [None]:
import numpy as np
import torch

batch_size = 32          # number of sequences per batch
block_size = 256         # sequence length (context window)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device_type = 'cuda' if torch.cuda.is_available() else 'cpu'

def get_batch(split):
    # Load the appropriate memmapped data file
    path = '/content/drive/My Drive/train.bin' if split == 'train' else '/content/drive/My Drive/val.bin'
    data = np.memmap(path, dtype=np.uint16, mode='r')

    # Random starting indices
    ix = torch.randint(len(data) - block_size, (batch_size,))

    # Create input (x) and target (y) sequences
    x = torch.stack([torch.from_numpy(data[i:i+block_size].astype(np.int64)) for i in ix])
    y = torch.stack([torch.from_numpy(data[i+1:i+1+block_size].astype(np.int64)) for i in ix])

    # Move to GPU asynchronously if CUDA is available
    if device_type == 'cuda':
        x = x.pin_memory().to(device, non_blocking=True)
        y = y.pin_memory().to(device, non_blocking=True)
    else:
        x = x.to(device)
        y = y.to(device)

    return x, y


In [None]:
x, y = get_batch('train')
print("Input shape:", x.shape)   # Expected: (32, 256)
print("Target shape:", y.shape)  # Expected: (32, 256)


Input shape: torch.Size([32, 256])
Target shape: torch.Size([32, 256])


## Step 4: Define the SLM Model Architecture

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from dataclasses import dataclass
import numpy as np
from tqdm.auto import tqdm
from contextlib import nullcontext
import os

class LayerNorm(nn.Module):
    def __init__(self, ndim, bias=True, eps=1e-5):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None
        self.eps = eps

    def forward(self, x):
        return F.layer_norm(x, x.shape[-1:], self.weight, self.bias, self.eps)


class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.flash = hasattr(F, 'scaled_dot_product_attention')
        if not self.flash:
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                       .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        if self.flash:
            y = F.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.attn_dropout.p if self.training else 0.0, is_causal=True)
        else:
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v

        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_dropout(self.c_proj(y))
        return y


class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
        self.activation = nn.SiLU()  # Using SiLU activation
        self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        return self.dropout(self.c_proj(self.activation(self.c_fc(x))))


class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = LayerNorm(config.n_embd, config.bias)
        self.attn = CausalSelfAttention(config)
        self.ln2 = LayerNorm(config.n_embd, config.bias)
        self.mlp = MLP(config)
    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

@dataclass
class SLMConfig:
    block_size: int
    vocab_size: int
    n_layer: int
    n_head: int
    n_embd: int
    dropout: float = 0.0
    bias: bool = True

class SLM(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.transformer = nn.ModuleDict(dict(
            wte=nn.Embedding(config.vocab_size, config.n_embd),
            wpe=nn.Embedding(config.block_size, config.n_embd),
            drop=nn.Dropout(config.dropout),
            h=nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f=LayerNorm(config.n_embd, config.bias),
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.transformer.wte.weight = self.lm_head.weight  # weight tying

        self.apply(self._init_weights)
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        device = idx.device
        b, t = idx.size()
        assert t <= self.config.block_size
        pos = torch.arange(0, t, dtype=torch.long, device=device)

        tok_emb = self.transformer.wte(idx)
        pos_emb = self.transformer.wpe(pos)
        x = self.transformer.drop(tok_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
            return logits, loss
        else:
            logits = self.lm_head(x[:, [-1], :])
            return logits, None

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        """
        Generate tokens given a conditioning sequence.
        idx: Tensor of shape (B, T)
        """
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :] / temperature
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx



In [None]:
config = SLMConfig(
    vocab_size=8000,      # based on your tokenizer
    block_size=256,       # matches your input/output pair length
    n_layer=3,            # fewer transformer blocks
    n_head=4,             # fewer attention heads
    n_embd=256,           # reduced model size (should be divisible by n_head)
    dropout=0.1,
    bias=True
)


model = SLM(config)

## Step 5: Define the loss function

In [None]:
def estimate_loss(model):
    out = {}
    model.eval()
    with torch.inference_mode():
        for split in ['train', 'val']:
            losses = torch.zeros(eval_iters)
            for k in range(eval_iters):
                X, Y = get_batch(split)
                with ctx:
                    logits, loss = model(X, Y)
                losses[k] = loss.item()
            out[split] = losses.mean()
    model.train()
    return out

## Step 6: Define SLM Training Configuration Part 1

In [None]:
# Training Config
import torch
from contextlib import nullcontext

learning_rate = 1e-4 #more stable training, earlier 1e-4
max_iters = 5000 #increase from 25000
warmup_steps = 1000 #smoother initial train, earlier 100
min_lr = 1e-5 #lower rate, earlier 5e-4
eval_iters = 500
batch_size = 32
block_size = 256

gradient_accumulation_steps = 32 # reduced from 50

device =  "cuda" if torch.cuda.is_available() else "cpu"
device_type = 'cuda' if 'cuda' in device else 'cpu' # for later use in torch.autocast


#dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32', 'bfloat16', or 'float16', the latter will auto implement a GradScaler
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32', 'bfloat16', or 'float16', the latter will auto implement a GradScaler
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]

ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type=device_type, dtype=ptdtype)

torch.set_default_device(device)
torch.manual_seed(42)

## Step 7: Define SLM Training Configuration Part 2

In [None]:
from torch.optim.lr_scheduler import LinearLR,SequentialLR, CosineAnnealingLR

##PUT IN WEIGHT DECAY, CHANGED BETA2 to 0.95
optimizer =  torch.optim.AdamW(model.parameters(), lr=learning_rate, betas=(0.9, 0.95), weight_decay=0.1, eps=1e-9) #weight decay for regularization

scheduler_warmup = LinearLR(optimizer, total_iters = warmup_steps) #Implement linear warmup
scheduler_decay = CosineAnnealingLR(optimizer,T_max = max_iters - warmup_steps, eta_min = min_lr) #Implement lr decay
scheduler = SequentialLR(optimizer, schedulers=[scheduler_warmup, scheduler_decay], milestones=[warmup_steps]) #Switching from warmup to decay

# https://stackoverflow.com/questions/72534859/is-gradscaler-necessary-with-mixed-precision-training-with-pytorch
scaler = torch.cuda.amp.GradScaler(enabled=(dtype == 'float16'))

## Step 8: Pre-train the SLM

In [None]:
best_val_loss = float('inf')
best_model_params_path = "best_model_params.pt"
train_loss_list, validation_loss_list = [], []

# Ensure model is on the correct device
model = model.to(device)

# In your training loop
for epoch in tqdm(range(max_iters)):
    if epoch % eval_iters == 0 and epoch != 0:
        # Ensure estimate_loss uses the correct device
        losses = estimate_loss(model)
        print(f"Epoch {epoch}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
        print(f"The current learning rate: {optimizer.param_groups[0]['lr']:.5f}")
        train_loss_list += [losses['train']]
        validation_loss_list += [losses['val']]

        if losses['val'] < best_val_loss:
            best_val_loss = losses['val']
            torch.save(model.state_dict(), best_model_params_path)

    # Ensure X and y are on the correct device
    X, y = get_batch("train")
    X, y = X.to(device), y.to(device)

    with ctx:
        logits, loss = model(X, y)
        loss = loss / gradient_accumulation_steps
        scaler.scale(loss).backward()

    if ((epoch + 1) % gradient_accumulation_steps == 0) or (epoch + 1 == max_iters):
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad(set_to_none=True)
    scheduler.step()

## Step 9: Plot the SLM Loss Function

In [None]:
import matplotlib.pyplot as plt
train_loss_list_converted = [i.cpu().detach() for i in train_loss_list]
validation_loss_list_converted = [i.cpu().detach() for i in validation_loss_list]

plt.plot(train_loss_list_converted, 'g', label='train_loss')
plt.plot(validation_loss_list_converted, 'r', label='validation_loss')
plt.xlabel("Steps - Every 100 epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()



## Step 10: Run SLM Inference on our trained model

In [None]:
#Load the model
model = SLM(config)  # re-create the model with same config
device =  "cuda" if torch.cuda.is_available() else "cpu"
best_model_params_path = "best_model_params.pt"
model.load_state_dict(torch.load(best_model_params_path, map_location=torch.device(device))) # load best model states


In [None]:
sentence = "The king sat upon the Iron Throne, his voice echoing through the silent hall."
context = (torch.tensor(enc.encode_ordinary(sentence)).unsqueeze(dim = 0))
y = model.generate(context, 200)
print(enc.decode(y.squeeze().tolist()))

In [None]:
from google.colab import runtime
runtime.unassign()