<a href="https://colab.research.google.com/github/hari-hashing/Small-Language-Model-SLM-for-Short-story-Generation/blob/main/Small_Language_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Building a SLM of 15 million params**

In [None]:
!pip install datasets



In [None]:
from datasets import load_dataset

ds = load_dataset("roneneldan/TinyStories")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

(…)-00000-of-00004-2d5a1467fff1081b.parquet:   0%|          | 0.00/249M [00:00<?, ?B/s]

(…)-00001-of-00004-5852b56a2bd28fd9.parquet:   0%|          | 0.00/248M [00:00<?, ?B/s]

(…)-00002-of-00004-a26307300439e943.parquet:   0%|          | 0.00/246M [00:00<?, ?B/s]

(…)-00003-of-00004-d243063613e5a057.parquet:   0%|          | 0.00/248M [00:00<?, ?B/s]

(…)-00000-of-00001-869c898b519ad725.parquet:   0%|          | 0.00/9.99M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/2119719 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/21990 [00:00<?, ? examples/s]

In [None]:
%%bash
nvidia-smi

Thu Aug  7 09:13:13 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   66C    P8             10W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
# Andrej Karpathy's nano GPT has inpired the code of the following language model

In [None]:
# Using a BPE tokenizer subword tokenizer
ds['train'][0]

{'text': 'One day, a little girl named Lily found a needle in her room. She knew it was difficult to play with it because it was sharp. Lily wanted to share the needle with her mom, so she could sew a button on her shirt.\n\nLily went to her mom and said, "Mom, I found this needle. Can you share it with me and sew my shirt?" Her mom smiled and said, "Yes, Lily, we can share the needle and fix your shirt."\n\nTogether, they shared the needle and sewed the button on Lily\'s shirt. It was not difficult for them because they were sharing and helping each other. After they finished, Lily thanked her mom for sharing the needle and fixing her shirt. They both felt happy because they had shared and worked together.'}

In [None]:
# Dataset needs to be tokenized as well as all the tokens have to be stored in a file


# Tokenize the dataset
1. Tokenize into token ids
2. Create a train.bin file and validation.bin file wherer the entire tokenids form the dateset will be stored
3. ensure that token ids are stored on the disk rather than RAM for efficient computations

In [None]:
!pip install tiktoken
# tokenizer from the GPT model by openAI



In [None]:
import tiktoken
import os
import numpy as np
from tqdm.auto import tqdm

encoder = tiktoken.get_encoding('gpt2')

def process(example):
  ids = encoder.encode_ordinary(example['text'])
  # encode_ordinary ignores all the special tokens
  out = {'ids':ids,'len': len(ids)}
  return out

if not os.path.exists("train.bin"):
  tokenized = ds.map(
      process,
      remove_columns = ['text'],
      desc = "tokenizing the split of the document",
      num_proc = 8,
  )

  # concetante all the tokens into one large single file which can be used for training

for split,dset in tokenized.items():
  arr_len = np.sum(dset['len'],dtype = np.uint64)
  filename = f'{split}.bin'
  dtype = np.uint16
  arr = np.memmap(filename = filename , dtype = dtype, mode = 'w+', shape = (arr_len,))
  # total number of batches
  total_batches = 1024
  # initializing the index
  idx = 0
  for batch_idx in tqdm(range(total_batches), desc = f'writing {filename}'):
    # Batch together smaples for faster write
    # and saving as a numpy format
    batch = dset.shard(num_shards= total_batches , index = batch_idx , contiguous = True,keep_in_memory= False).with_format('numpy')
    arr_batch = np.concatenate(batch['ids'])
    # Writing into memory map
    arr[idx:idx + len(arr_batch)] = arr_batch
    idx += len(arr_batch)
  arr.flush()

# every batch will collect the token ids and store it into the array this is what the above function does


tokenizing the split of the document (num_proc=8):   0%|          | 0/2119719 [00:00<?, ? examples/s]

tokenizing the split of the document (num_proc=8):   0%|          | 0/21990 [00:00<?, ? examples/s]

writing train.bin:   0%|          | 0/1024 [00:00<?, ?it/s]

writing validation.bin:   0%|          | 0/1024 [00:00<?, ?it/s]

In [None]:
# arr.flush command will store everything on the disk
# dset.shard means it is storing dat into each node and segregating the data
# here we are doing it based on the number of batches we have and then concatenating the data

In [None]:
from google.colab import files
files.download('train.bin')
files.download('validation.bin')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
#  some of the ideas are borrowed from Andtrej Karpathy's NonoGPT with slightly added modifications

def get_batch(split):
  # we recreate the nn.memmap every batch to avoid the a memory leak

  if split == 'train':
    data = np.memmap('train.bin',dtype = np.unint16,mode = 'r')
  else:
    data = np.memmap('validation.bin',dtype = np.unint16 , mode = 'r')
#  here the block size is the context size and the batch sie is as it is
# X and y are being stacked with vectors
# we make ix with length : len(data) - block_size as y or o/p we will add it for each instance so that it may not exceed the memmap array length
  ix = torch.randint(len(data) - block_size, (batch_size,))
  # aas random can be the context vectors going in
  x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix])
  y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in ix])

  if device == 'cuda':
    # IMP
    # pin arrays that allow us to move these to GPU asynchonously with letting the CPU stop its function in the process
    # the CPU may continue tasks without waiting these arrays to be copied to GPU by using non_blocking = True
    x,y = x.pin_memory().to(device, non_blocking = True), y.pin_memory().to(device, non_blocking = True)
  else :
    x,y = x.to(device), y.to(device)
  return x,y

SLM model Architecture

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from dataclasses import dataclass
import math
import numpy as np
from tqdm.auto import tqdm
from contextlib import nullcontext
import os

#  Have to use layer norm multiple times in the transformer block thus why not define a class instead
class LayerNorm(nn.Module):
  def __init__(self,ndim,bias):
    super().__init__()
    self.weight = nn.Parameter(torch.ones(ndim))
    self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

  def forward(self,x):
    return F.layer_norm(x,self.weight.shape,self.weight,self.bias,1e-5)

# making a cuasal attention class
class CausalSelfAttention(nn.Module):
  def __init__(self,config):
    super().__init__()
    assert config.n_embd % config.n_head == 0
    self.c_attn = nn.Linear(config.n_embd, 3*config.n_embd, bias = config.bias)
    self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias = config.bias)
    self.attn_dropout = nn.Dropout(config.dropout)
    self.resid_dropout = nn.Dropout(config.dropout)
    self.n_head = config.n_head
    self.n_embd = config.n_embd
    self.flash = hasattr(F,'scaled_dot_product_attention')
    if not self.flash:
      self.register_buffer("bias",torch.tril(torch.ones(config.block_size,config.block_size)).
                           view(1,1,config.block_size,config.block_size))

  def forward(self,x):
    B,T,C = x.size()
    q,k,v = self.c_attn(x).split(self.n_embd,dim=2)
    k = k.view(B,T,self.n_head,C // self.n_head).transpose(1,2)
    q = q.view(B,T,self.n_head,C // self.n_head).transpose(1,2)
    v = v.view(B,T,self.n_head,C // self.n_head).transpose(1,2)

    if self.flash:
      y = F.scaled_dot_product_attention(q,k,v,attn_mask = None,dropout_p = self.attn_dropout.p if self.training else 0.0, is_causal = True)
    else:
      att = (q @ k.transpose(-2,-1))*(1.0/math.sqrt(k.size(-1)))
      att = att.masked_fill(self.bias[:,:,:T,:T]==0,float('-inf'))
      att = F.softmax(att,dim = -1)
      att = self.attn_dropout(att)
      y = att @ v

    y = y.tranpose(1,2).contiguous().view(B,T,C)
    y = self.resid_dropout(self.c_proj(y))
    return y

In [None]:
class MLP(nn.Module):
  def __init__(self,config):
    super().__init__()
    self.ln1 = LayerNorm(config.n_embd,bias = config.bias)
    self.attn = CausalSelfAttention(config)
    self.ln2 = LayerNorm(config.n_embd,bias = config.bias)
    self.mlp = MLP(config)

  def forward(self,x):
    x = x + self.attn(self.ln1(x))
    x = x + self.mlp(self.ln2(x))
    return x

In [None]:
class Block(nn.Module):
  def __init__(self,config):
    super().__init__()
    self.ln1 = LayerNorm(config.n_embd,bias = config.bias)
    self.attn = CausalSelfAttention(config)
    self.ln2 = LayerNorm(config.n_embd,bias = config.bias)
    self.mlp = MLP(config)

  def forward(self,x):
    x = x + self.attn(self.ln1(x))
    x = x + self.mlp(self.ln2(x))
    return x

In [None]:
@dataclass
class GPTconfig:
  block_size : int
  vocab_size : int
  n_layer : int
  n_head : int
  n_embd : int
  dropout : float = 0.0
  bias : bool = True

# creating the model like GPT

class GPT(nn.Module):
  def __init__(self,config):
    super().__init__()
    self.config = config
    self.transformer = nn.ModuleDict(dict(
        wte = nn.Embedding(config.vocab_size,config.n_embd),
        wpe = nn.Embedding(config.block_size,config.n_embd),
        drop = nn.Dropout(config.dropout),
        h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
        ln_f = LayerNorm(config.n_embd,config.bias),
    ))

    self.lm_head = nn.Linear(config.n_embd,config.vocab_size,bias = False)
    self.transformer.wte.weight = self.lm_head.weight # weight tying
    #  this is same weights will be used for both the cases

    self.apply(self.__init__.weights)
    for pn,p in self.named_parameters():
      if pn.endwith('c_proj.weight'):
        nn.init.normal_(p,mean=0.0,std = 0.02/math.sqrt(2*config.n_layer))

    def __init__weights(self,module):
      if isinstance(module,nn.Linear):
        nn.init.normal_(module.weight,mean = 0.0,std = 0.02)
        if module.bias is not None:
          nn.init.zeros_(module.bias)
      elif isinstance(module,nn.Embedding):
        nn.init.normal_(module.weight,mean = 0.0 , std = 0.02)

    def forward(self,idx,targets = None):
      device = idx.device
      b,t = idx.size()
      assert t <= self.config.block_size
      pos = torch.arange(0,t,dtype = torch.long,device = device)

      tok_emb = self.transformer.wte(idx)
      pos_emb = self.trasformer.wpe(pos)
      x = self.transformer.drop(tok_emb+pos_emb)

      #  running the tranformer block multiple times

      for block in self.transformer.h:
        x = block(x)

      x = self.transformer.ln_f(x)

      if targets is not None:
        logits = self.lm_head(x)
        loss = F.cross_entropy(logits.view(-1,logits.size(-1)),target.view(-1),ignore_inddex = -1)
        return logits,loss
      else:
        logits = self.lm_head(x[:,[-1],:])
        return logits,None

    @torch.no_grad()
    def generate(self,idx,max_new_tokens,temperature = 1.0 ,top_k = None):
      """
      Generate token given a conditioning sequence
      idx : Tensor of shape (B,T)
      temperature :
      """

      for _ in range(max_new_tokens):
        idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:,-self.config.block_size:]
        logits,_ = self(idx_cond)
        logits = logits[:,-1,:] / temperature
        if top_k is not None:
          v,_ = torch.topk(logits,min(top_k,logits.size(-1)))
          logits[logits < v[:,[-1]]] = -float('Inf')
        probs = F.softmax(logits,dim = -1)
        idx_next = torch.multinomial(probs,num_samples=1)
        idx = torch.cat((idx,idx_next),dim=1)

      return idx

In [None]:
# we can now define the config we want and
# whatever context size we need

config = GPTconfig(
    vocab_size = 50527,
    block_size = 128,
    n_layer = 6,
    n_head = 6,
    n_embd = 384,
    dropout = 0.1,
    bias = True,
)

# now defining our model

model = GPT(config)

In [None]:
class Block(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
    self.attn = CausalSelfAttention(config)
    self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
    self.mlp = MLP(config)

  def forward(self, x):
    x = x + self.attn(self.ln_1(x))
    x = x + self.mlp(self.ln_2(x))
    return x

Defining the loss function

In [None]:
# Estimating the loss using the probab for the output and target token

def estimate_loss(model):
  out = {}
  model.eval()
  with torch.inference_mode():
    for split in ['train','val']:
      losses = torch.zeros(eval_iters)
      for k in range(eval_iters):
        X,Y = get_batch(split)
        with ctx:
          logits,loss = model(X,Y)
        losses[k] = loss.item()
      out[split] = losses.mean()
    model.train()
    return out

# Pre Training for SLM

1. for each iteration choose X and Y
2. Pass X through model to get the logits
3. Compute loss between logits and y(cross entropy)
4. Backpropogation loss
5. Accumulate the gradients till we
reach "gradient_accumulate" number of steps.
6. Update params
7. Update Learning rate
-- ADAMW optimizer is used for the purpose and this also uses the adaptive learining rate
8. evaluate and save best model

# Defining the SLM Training Configuration

In [None]:
# /training config/

import torch
from contextlib import nullcontext

learning rate = 1e-4  # more stable learning
max_iters = 20000
warmup_steps = 1000 # smoother initial training
min_lr = 5e-4 #lower rate
eval_iters = 500
batch_size = 32 # for better gradient estimate
block_size = 128 #increased and can increase for longer ranger dependencies
gradient_accumulation_steps = 32

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device_type = 'cuda' if 'cuda' in device else 'cpu'
# we can use torch.autocast
# float data type will automatically use a GradScaler

# Using torch.autocast

dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16'
pdtype = {'float32': torch.float32,'bfloat16':torch.bfloat16,'float16':torch.float16}[dtype]

ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type = device_type,dtype = pdtype)

torch.set_default_dtype(pdtype)

# setting  a manual reproducibility each time we set a manual seed
torch.manual_seed(38)


In [None]:
from torch.optim.lr_scheduler import LinearLR,SequentialLR,CosineAnnealingLR

optimizer = torch.optim.AdamW(model.parameters(),
                              lr = learning_rate,
                              betas = (0.9,0.95),
                              weight_decay = 0.1,
                              eps = 1e-9)

scheduler_warmup = LinearLR(optimizer,
                            total_iters = warmup_steps)

scheduler_decay = CosineAnnealingLR(optimizer,
                                     T_max = max_iters-warmup_steps,
                                     eta_min = min_lr)

scheduler = SequentialLR(optimizer,
                        [scheduler_warmup,scheduler_decay],
                         milestones = [warmup_steps])

scaler =torch.cuda.amp.GradScaler(enabled = (dtype = 'float16'))

# PRE TRAIN THE SLM

In [None]:
best_val_loss = float('inf')
best_model_param_path = "best_model_params.pt"
train_loss_list,validation_loss_list =[],[]

# making sure that the model is on the correct device
model=model.to(device)

for epoch in tqdm(range(max_iters)):
  if epoch%eval_iters == 0:
    # ensuring the estimate_loss uses the correct device
    losses = estimate_loss(model)
    print(f"Epoch {epoch} : train_loss {losses['train']:4f}, val_loss {losses['val']:4f}")
    print(f"the current learning rate: {optimizer.param_groups[0]['lr']:5f}")
    train_loss_list += [losses['train']]
    validation_loss_list += [losses['val']]

    if losses['val'] < best_val_loss:
      best_val_loss = losses['val']
      torch.save(model.state_dict(),best_model_params_path)

      X,y = get_batch('train')
      X,y = X.to(device),y.to(device)

      with ctx:
        logits,loss = model(X,y)
        loss = loss/gradient_accumulation_steps
        scaler.scale(loss),backward()

      if ((epoch+1)%gradient_accumulation_steps == 0) or (epoch + 1 == max_iters):
        torch.nn.utils.clip_grad_norm_(model.parameters(),max_norm=0.5)
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad(set_to_none=True)
      scheduler.step()

# Plotting the SLM loss Func

In [None]:
import matplotlib.pyplot as plt
train_loss_list_converted = [i.cpu().detach for i in train_loss_list]
validation_loss_list_converted = [i.cpu().detach() for i in validation_loss_list]

plt.plot(train_loss_list_converted,'g',label = 'train')
plt.plot(validation_loss_list_converted,'r',label = 'validation')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Running inference over our SLM