<center><h1> Load The Dataset </h1></center>


For this Small Language Model we will use a dataset called **TinyStories**. It is a synthetic dataset of short stories that only contain words that a typical 3 to 4-year-olds usually understand, generated by **GPT-3.5** and **GPT-4**. We can get it from [Hugging Face](https://huggingface.co/datasets/roneneldan/TinyStories).

In [1]:
!pip install datasets



In [4]:
from datasets import load_dataset
ds = load_dataset("roneneldan/TinyStories")

<center><h1>Tokenize The Dataset</h1></center>

- **Tokenization** is the process of breaking down sequence of text into smaller units called tokens.
The tokenizer we will use is **GPT-2 sub-word Tokenizer** which uses **Bypair Encodding(BPE)**.

- Dataset -> Tokenizer -> Tokens -> TokenID

**In this step, we will:**

1. Tokenize the dataset into `tokenIDs`.
2. Create two binary files:
   - `Train.bin` with **2,000,000** rows
   - `Validation.bin` with **22,000** rows
3. Store all the token IDs in a single .bin file.
   - This will store the tokenIDs in disk storage, no in RAM.
      - Fast loading during training
      - No need to re-tokenize

These files will store the `tokenIDs` generated from the entire dataset.

In [13]:
!pip install tiktoken #Tiktoken is a library from OpenAI from which we can get different tokenizers.
import tiktoken
import os
import numpy as np
from tqdm.auto import tqdm

enc = tiktoken.get_encoding("gpt2")

def process(example):
    ids = enc.encode_ordinary(example['text']) #encode_ordinary ignores any special tokens
    out = {'ids': ids, 'len': len(ids)}
    return out

if not os.path.exists('train.bin'):
    tokenized = ds.map(
        process,
        remove_columns=['text'],
        desc= 'tokenizing the splits',
        num_proc= 8,
    )

#Concatenate all the ids in each dataset into one large file which will be used for training
for split, dset in tokenized.items():
    arr_len = np.sum(dset['len'], dtype= np.uint64)
    filename = f'{split}.bin'
    dtype = np.uint16 #can do since enc.max_token_value == 50256 is < 2**16
    arr = np.memmap(filename, dtype = dtype, mode = 'w+', shape= (arr_len,))
    total_batches = 1024

    idx = 0
    for batch_idx in tqdm(range(total_batches), desc=f'wrriting {filename}'):
        #Batch samples together for faster write
        batch = dset.shard(num_shards = total_batches, index = batch_idx, contiguous = True).with_format('numpy')
        arr_batch = np.concatenate(batch['ids'])
        #Write into map
        arr[idx : idx + len(arr_batch)] = arr_batch
        idx += len(arr_batch)
    arr.flush()



wrriting train.bin: 100%|██████████| 1024/1024 [06:08<00:00,  2.78it/s]
wrriting validation.bin: 100%|██████████| 1024/1024 [00:05<00:00, 185.68it/s]


<center><h1>Input-Output Batches</h1><center>

- **Input Batch**: A group of tokenized sequences fed into the model simultaneously during training or inference. Each sequence represents a segment of text, and the entire batch enables efficient parallel processing.

- **Output Batch**: The corresponding group of target sequences that the model is trained to predict, typically formed by shifting the input sequences one token to the left. These are used to compute the loss during training.

In [6]:
def get_batch(split):
    if split == 'train':
        data = np.menmap('train.bin', dtype = np.unit16, mode = 'r')
    else:
        data = np.memmap('validation.bin', dtype = np.uint16, mode = 'r')

    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix])
    y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in ix])
    if device_type == 'cuda':
        x, y = x.pin_memory().to(device, non_blocking=True), y.pin_memory().to(device, non_blocking=True)
    else:
        x, y = x.to(device), y.to(device)
    return x, y

<center><h1>SLM Model Architechture</h1><center>

In [1]:
!pip install torch torchvision torchaudio



In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from dataclasses import dataclass
import numpy as np
from tqdm.auto import tqdm
from contextlib import nullcontext
import os

class LayerNorm(nn.Module):
    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim))
    def forward(self, x):
        return F.layer_norm(x, self.weight.shape, self.weight, self.bias, 1e-5)

class CasualSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.flash = hasattr(F, 'Scaled_dot_product_attention')
        if not self.flash:
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                 .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        if self.flash:
            y = F.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.attn_dropout
            if self.training else 0.0, is_casual=True)
        else:
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v

        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_dropout(self.c_proj(y))
        return y

class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
        self.gelu = nn.GELU()
        self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.dropout)
    def forward(self, x):
        return self.dropout(self.c_proj(self.gelu(self.c_fc(x))))

    class Block(nn.Module):
        def __init__(self, config):
            super().__init__()
            self.ln1 =LayerNorm(config.n_embd, config.bias)
            self.attn = CasualSelfAttention(config)
            self.ln2 = LayerNorm(config.n_embd, config.bias)
            self.mlp = MLP(config)
        def forward(self, x):
            x = x + self.attn(self.ln1(x))
            x = x + self.mlp(self.ln2(x))
            return x

@dataclass
class GPTconfig:
    block_size: int
    vocab_size: int
    n_layer: int
    n_head: int
    n_embd: int
    dropout: float == 0.0
    bias: bool = True

class GPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd),
            wpe = nn.Embedding(config.vocab_size, config.n_embd),
            drop = nn.Dropout(config.dropout),
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f = LayerNorm(config.n_embd, config.bias),
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias = False)
        self.transformer.wte.weight = self.lm_head.weight

        self.apply(self._init_weights)
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        device = idx.device
        b, t = idx.size()
        assert t <= self.config.block_size
        pos = torch.arrange(0, t, dtype=torch.long, device=device)

        tok_emb = self.transformer.wte(idx)
        pos_emb = self.transformer.wpe(pos)
        x = self.transformer.drop(tok.emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
            return logits, loss
        else:
            logits = self.lm_head(x[:, [-1], :])
            return logits, None

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        """
        Generate tokens given a conditioning sequence.
        idx = Tensor of shape (B, T)
        """
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :] / temperature
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v [:, [-1]]] = -float('Inf')
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

<center><h1>Loss Function</h1></center>

A loss function quantifies the difference between the predicted output and the true label (actual value). It returns a number (called **loss**) which the model tries to **minimize** during training.

- **Lower loss = Better predictions**
- **Higher loss = Poor predictions**

In [None]:
def estimate_loss(model):
    out = {}
    model.eval()
    with torch.inference_mode():
        for split in ['train', 'val']:
            losses = torch.zeros(eval_iters)
            for k in range(eval_iters):
                X, Y = get_batch(split)
                with ctx:
                    logits, loss = model(X, Y)
                losses[k] = loss.item()
            out[split] = losses.mean()
    model.train()
    return out