In [53]:
with open('dataset/tagore_poems.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [54]:
print(f'Length of the dataset: {len(text)}')

Length of the dataset: 2373387


In [55]:
print(text[:300]) # first 300 chars

বজাও রে মোহন বাঁশি।
সারা দিবসক
বিরহদহনদুখ,
মরমক তিয়াষ নাশি।
রিঝমনভেদন
বাঁশরিবাদন
কঁহা শিখলি রে কান?
হানে থিরথির
মরমঅবশকর
লহু লহু মধুময় বাণ।
ধসধস করতহ
উরহ বিয়াকুলু,
ঢুলু ঢুলু অবশনয়ান ;
কত কত বরষক
বাত সোঁয়ারয়,
অধীর করয় পরান।
কত শত আশা
পূরল না বঁধু,
কত সুখ করল পয়ান।
পহু গো কত শত
পীরিতযাতন
হিয়ে বিঁধাওল 


In [56]:
chars = sorted(list(set(text))) # unique chars
vocab_size = len(chars)         # no of unique chars
print(''.join(chars))
print(f'\nVocabulary Size: {vocab_size}')


 !"'(),-.12679:;?BCFHJLMNORTW[]_abcdefghiklmnoprstuvwxy|ű̶।॥ঁংঃঅআইঈউঊঋএঐওঔকখগঘঙচছজঝঞটঠডঢণতথদধনপফবভমযরলশষসহািীুূৃেৈোৌ্ৎড়ঢ়য়০১২৩৪৫৬৭৮৯‍–—‘’“”

Vocabulary Size: 139


In [66]:
# create a mapping from characters to integers (0 - 138)
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] if c in stoi else stoi[' '] for c in s] # encoder: take a string, output a list of integers (space for unknown chars)
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

In [68]:
# Example
enc_ = encode("বজাও রে মোহন বাঁশি।")
dec_ = decode(enc_)

print(enc_)
print(dec_)

[97, 82, 107, 73, 1, 101, 113, 1, 99, 115, 106, 94, 1, 97, 107, 61, 103, 108, 59]
বজাও রে মোহন বাঁশি।


In [63]:
prompt = "বিনয় ভাবছে আকাশ কেন নীল,"
unknown_chars = [c for c in prompt if c not in stoi]
print("Unknown characters:", unknown_chars)


Unknown characters: []


In [64]:
enc_ = encode(prompt)
dec_ = decode(enc_)

print(enc_)
print(dec_)

[97, 108, 94, 121, 1, 98, 107, 97, 81, 113, 1, 65, 75, 107, 103, 1, 75, 113, 94, 1, 94, 109, 102, 7]
বিনয় ভাবছে আকাশ কেন নীল,


In [None]:
# encoding the entire text dataset
import torch
data = torch.tensor(encode(text), dtype=torch.long)
print(data.shape, data.dtype)
print(data[:300])

In [None]:
# split up the data into train and validation sets
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

In [None]:
block_size = 8
train_data[:block_size+1]

In [None]:
torch.manual_seed(1337)
batch_size = 4 # how many independent sequences will we process in parallel?
block_size = 8 # what is the maximum context length for predictions?

def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x, y

xb, yb = get_batch('train')
print('inputs:')
print(xb.shape)
print(xb)
print('targets:')
print(yb.shape)
print(yb)

print('----')

# (4, 8) tensor contains total 32 independent training examples

for b in range(batch_size): # batch dimension
    for t in range(block_size): # time dimension
        context = xb[b, :t+1]
        target = yb[b,t]
        print(f"when input is {context.tolist()} the target: {target}")

## Bigram Model

Only uses one token to predict the next one

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
torch.manual_seed(1337)

class BigramLanguageModel(nn.Module):

    def __init__(self, vocab_size):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

    def forward(self, idx, targets=None):

        # idx and targets are both (B,T) tensor of integers
        # idx is token no, it takes out idx-th row from the table (C)
        # it contains logits of all other tokens occurs after idx-th token
        logits = self.token_embedding_table(idx) # (B,T,C) -> (batch_size, block_size, vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape

            # every B*T contains a token for which we want to run prediction 
            logits = logits.view(B*T, C) # 2-dim 
            targets = targets.view(B*T)  # 1-dim
            
            # cross_entropy need (B,C,T) Tensor
            loss = F.cross_entropy(logits, targets) # calculate error of the prediction

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        # for all B (batch dims), generate tokens for T (time) dims
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(idx)
            # focus only on the last time step
            # only last item of T (time) dim predicts what comes next
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax across C (total token) dim to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution, for each batch we predict 1 token
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx


In [None]:
bigram = BigramLanguageModel(vocab_size)
logits, loss = bigram(xb, yb)
print(logits.shape)
print(loss)

res = bigram.generate(idx = torch.zeros((2, 1), dtype=torch.long), max_new_tokens=100)
res.shape

In [None]:
# maximum possible cross-entropy loss given vocab_size
# note: if the model assigns zero probability to the true class the loss -> +inf.
# a useful finite reference is the loss for a uniform prediction:
#   -log(1/vocab_size) = log(vocab_size) (natural log)
max_loss_ln = torch.log(torch.tensor(vocab_size, dtype=torch.float32)).item()

print("Theoretical maximum (unbounded): +inf  (if p_true -> 0)")
print(f"Uniform-prediction cross-entropy = ln(vocab_size) = {max_loss_ln:.6f}")

In [None]:
decode(res[1].tolist())

### Training Bigram Model

In [None]:
# create a PyTorch Adam optimizer
optimizer = torch.optim.AdamW(bigram.parameters(), lr=1e-3)

In [None]:
from tqdm import tqdm

In [None]:
batch_size = 32
for steps in tqdm(range(10000)):

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = bigram(xb, yb)
    optimizer.zero_grad(set_to_none=True) # make all grad from prev step to 0
    loss.backward()
    optimizer.step()

print(loss.item())

In [None]:
print(decode(bigram.generate(idx = torch.zeros((1, 1), dtype=torch.long), max_new_tokens=100)[0].tolist()))


#### Math Tricks for Self-Attention

In [None]:
torch.manual_seed(1337)
B, T, C = 4, 8, 2  # batch, time, channels
x = torch.randn(B, T, C)
x.shape

In [None]:
# for masked-self attention

a = torch.tril(torch.ones(3, 3))
a

In [None]:
a /= a.sum(1, keepdim=True)
a

In [None]:
wei = torch.tril(torch.ones(T, T))
wei = wei / wei.sum(1, keepdim=True)
wei

In [None]:
xbow = wei @ x  # batched matrix multiply
# (T, T) @ (B, T, C)
# (B, T, T) @ (B, T, C) -> (B, T, C)
xbow.shape

In [None]:
# Another way

tril = torch.tril(torch.ones(T, T))  # lower triangular ones
wei = torch.zeros((T, T))            # all zeros
wei = wei.masked_fill(tril == 0, float('-inf'))  # where tril is 0, fill -inf
wei = F.softmax(wei, dim=-1)   # softmax along the last dim (rows sum to 1)
xbow2 = wei @ x                # batched matrix multiply
xbow2.shape

In [None]:
torch.allclose(xbow, xbow2) # Same

#### Self-attention

In [None]:

torch.manual_seed(1337)
B, T, C = 4, 8, 32  # batch, time, channels
x = torch.randn(B, T, C)

# Single head of self-attention
# Brief explanation:
# for each token in the input, we want to compute a weighted average of 
#   the values of all the tokens that came before it (including itself)
# the weights are determined by the similarity between the query of the current token and the keys of
# all the tokens that came before it (including itself)
# the queries, keys, and values are all linear projections of the input tokens

head_size = 16
key = nn.Linear(C, head_size, bias=False)
query = nn.Linear(C, head_size, bias=False)
value = nn.Linear(C, head_size, bias=False)

k = key(x)  # (B, T, 16)
q = query(x) # (B, T, 16)
v = value(x) # (B, T, 16)
wei = q @ k.transpose(-2, -1) * head_size**-0.5
# (B, T, 16) @ (B, 16, T) ---> (B, T, T)


tril = torch.tril(torch.ones(T, T))  # lower triangular ones
wei = wei.masked_fill(tril == 0, float('-inf'))  # where tril is 0, fill -inf
wei = F.softmax(wei, dim=-1)   # softmax along the last dim (rows sum to 1)
out = wei @ v

out.shape

**Notes:**
- Attention is a **communication mechanism**. Can be seen as nodes in a directed graph looking at each other and aggregating information with a weighted sum from all nodes that point to them, with data-dependent weights.
- There is no notion of space. Attention simply acts over a set of vectors. This is why we need to positionally encode tokens.
- Each example across batch dimension is of course processed completely independently and never "talk" to each other
- In an "encoder" attention block just delete the single line that does masking with `tril`, allowing all tokens to communicate. This block here is called a "decoder" attention block because it has triangular masking, and is usually used in autoregressive settings, like language modeling.
- "self-attention" just means that the keys and values are produced from the same source as queries. In "cross-attention", the queries still get produced from x, but the keys and values come from some other, external source (e.g. an encoder module)
- "Scaled" attention additional divides `wei` by 1/sqrt(head_size). This makes it so when input Q,K are unit variance, wei will be unit variance too and Softmax will stay diffuse and not saturate too much. Illustration below

## Transformer Model

In [None]:

import torch
import torch.nn as nn
from torch.nn import functional as F
from dataclasses import dataclass
torch.manual_seed(1337)
from tqdm import tqdm

@dataclass
class GPTConfig:
    block_size: int = 256    # context length
    vocab_size: int = None        # the size of the vocabulary
    n_layer: int = 6        # number of layers
    n_head: int = 6         # number of attention heads
    n_embd: int = 384       # token embedding dimension
    dropout: float = 0.0    # dropout rate
    bias: bool = True       # use bias in the Linear & Norm layers
 

In [None]:

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, config, is_causal=False):
        super().__init__()
        self.head_size = config.n_embd // config.n_head

        self.is_causal = is_causal  # if True, apply causal mask to ensure that attention is only applied to the left in the input sequence
        self.key = nn.Linear(config.n_embd, self.head_size, bias=False)
        self.query = nn.Linear(config.n_embd, self.head_size, bias=False)
        self.value = nn.Linear(config.n_embd, self.head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(config.block_size, config.block_size)))
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)   # (B,T,head_size)
        q = self.query(x) # (B,T,head_size)

        # compute attention scores ("affinities")
        # dk**-0.5 is scaled dot-product attention, helps with stability
        dk = k.size(-1)
        att = q @ k.transpose(-2, -1) * dk**-0.5 # (B,T,T)
        if self.is_causal:
            att = att.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B,T,T)
        att = F.softmax(att, dim=-1) # (B,T,T)
        
        att = self.dropout(att)
        v = self.value(x) # (B,T,head_size)

        out = att @ v # (B,T,head_size)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, config, is_causal=False):
        super().__init__()

        self.heads = nn.ModuleList([Head(config, is_causal) for _ in range(config.n_head)])
        self.proj = nn.Linear(config.n_embd, config.n_embd)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        # concatenate the output of all heads
        out = torch.cat([h(x) for h in self.heads], dim=-1) # concatenate the output of all heads
        
        # project helps to mix the information from different heads
        out = self.proj(out) # project back to the original embedding dimension
        out = self.dropout(out)
        return out

class FeedForward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, config):
        super().__init__()

        self.net = nn.Sequential(
            
            # Gives model capacity to represent richer nonlinear interactions per token
            nn.Linear(config.n_embd, 4 * config.n_embd),
            nn.GELU(), # Gaussian Error Linear Unit activation function
            
            # Ensures the output has the same shape as the input so it can be added back residually
            nn.Linear(4 * config.n_embd, config.n_embd),
            nn.Dropout(config.dropout),
        )

    def forward(self, x):
        return self.net(x)


class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, config, is_causal=False):
        super().__init__()

        self.attn = MultiHeadAttention(config, is_causal)
        self.ffwd = FeedForward(config)
        self.ln_1 = nn.LayerNorm(config.n_embd, bias=config.bias)
        self.ln_2 = nn.LayerNorm(config.n_embd, bias=config.bias)

    def forward(self, x):
        # add residual connections around the two sub-layers
        x = x + self.attn(self.ln_1(x))     # apply layer norm before self-attention
        x = x + self.ffwd(self.ln_2(x))     # apply layer norm before feed-forward
        return x


class KobiGPTModel(nn.Module):
    """ the full GPT language model, with a context size of block_size """

    def __init__(self, config):
        super().__init__()
        assert config.vocab_size is not None
        assert config.block_size is not None
        self.config = config

        self.token_embedding_table = nn.Embedding(config.vocab_size, config.n_embd)
        self.position_embedding_table = nn.Embedding(config.block_size, config.n_embd)
        self.blocks = nn.Sequential(
            *[Block(config, is_causal=True) for _ in range(config.n_layer)],
        )
        self.ln_f = nn.LayerNorm(config.n_embd, bias=config.bias)
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size)

        self.apply(self._init_weights) 

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # ----------- Input Embedding + Positional Encoding -----------

        # for each index in idx, get the corresponding token embedding
        token_embd = self.token_embedding_table(idx) # (B,T,n_embd)
        
        # positional embeddings for each position in the sequence
        pos_embd = self.position_embedding_table(torch.arange(T, device=idx.device)) # (T,n_embd)
        
        # now add the two embeddings together to get the final token representation
        # x now has both token identity and positional information
        x = token_embd + pos_embd # (B,T,n_embd)
        # -------------------------------------------------------------

        
        # ----------- Forward the input through the Transformer -----------

        # pass the input through the series of Transformer blocks 
        # each block contains self-attention and feed-forward layers
        # final x has contextualized token representations
        x = self.blocks(x) # (B,T,n_embd)
        
        # final layer norm to stabilize and normalize the output
        x = self.ln_f(x) # (B,T,n_embd)
        # ---------------------------------------------------------------


        # ----------- Output of the Language Model ---------------------
        
        # project the final hidden states to the vocabulary size to get logits for each token
        logits = self.lm_head(x) # (B,T,vocab_size)
        # ---------------------------------------------------------------


        if targets is None: # during inference, we only have idx and no targets
            loss = None
        else:
            B, T, C = logits.shape

            # every B*T contains a token for which we want to run prediction 
            logits = logits.view(B*T, C) # 2-dim 
            targets = targets.view(B*T)  # 1-dim
            
            # cross_entropy need (B,C,T) Tensor
            loss = F.cross_entropy(logits, targets) # calculate error of the prediction

        return logits, loss

    def generate(self, idx, max_new_tokens, temparature=1.0):
        """
        Generate new tokens given a context idx. Tweak the temperature to control randomness.

        Args:
            idx: (B, T) array of indices in the current context
            max_new_tokens: number of tokens to generate
            temperature: float value to modulate the next token probabilities
        Returns:
            idx: (B, T + max_new_tokens) array of indices in the extended context
        Example:
            >>> context = torch.zeros((1, 1), dtype=torch.long) # starting token
            >>> generated = model.generate(context, max_new_tokens=100)
            >>> print(decode(generated[0].tolist()))
        """

        # idx is (B, T) array of indices in the current context
        # for all B (batch dims), generate tokens for T (time) dims
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -self.config.block_size:] # (B, block_size)
            
            # get the predictions
            logits, loss = self(idx_cond)
            
            # focus only on the last time step
            # only last item of T (time) dim predicts what comes next
            logits = logits[:, -1, :] # becomes (B, C)
            
            # apply softmax across C (total token) dim to get probabilities
            probs = F.softmax(logits / temparature, dim=-1) # (B, C)
            
            # sample from the distribution, for each batch we predict 1 token
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        
        return idx


In [None]:
batch_size = 32                                         # no of independent sequences processed in parallel
max_iters = 5000                                       # no of steps to train
eval_interval = 500                                     # interval to evaluate the loss
learning_rate = 3e-4                                    # learning rate
eval_iters = 100  

config = GPTConfig(
    vocab_size=vocab_size,
    block_size=block_size,
    n_layer=6,
    n_head=6,
    n_embd=384,
    dropout=0.2,
    bias=True
)

kobigpt = KobiGPTModel(config=config)

In [None]:
@torch.no_grad()
def estimate_loss(model):
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            xb, yb = get_batch(split)
            logits, loss = model(xb, yb)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

### Training KobiGPT Model

In [None]:
optimizer = torch.optim.AdamW(kobigpt.parameters(), lr=learning_rate)
print(sum(p.numel() for p in kobigpt.parameters())/1e6, 'M parameters')

for step in tqdm(range(max_iters)):
    
    if step % eval_interval == 0:
        losses = estimate_loss(kobigpt)
        print(f"Step {step}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
    
    # sample a batch of data
    xb, yb = get_batch(train_data)
     
    # evaluate the loss
    logits, loss = kobigpt(xb, yb)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    break

print(loss.item())

In [None]:
max_new_tokens = 100
print(decode(kobigpt.generate(
  idx = torch.zeros((1, 1), dtype=torch.long), 
  max_new_tokens=max_new_tokens)[0].tolist()
  ))

In [None]:
# compute min and max cross-entropy loss (natural log base, nats)
min_loss = 0.0  # achievable when model assigns probability 1 to the true class
max_loss_theoretical = float('inf')  # unbounded if p_true -> 0
uniform_loss_ln = torch.log(torch.tensor(vocab_size, dtype=torch.float32)).item()  # -log(1/vocab_size)

print(f"Minimum cross-entropy (nats): {min_loss}")
print("Maximum cross-entropy (theoretical): +inf (if model assigns zero probability to the true class)")
print(f"Reference (uniform prediction): ln(vocab_size) = {uniform_loss_ln:.6f} nats")

In [None]:
# Quick evaluation & guidance for generalization (use existing variables/functions)


# Simple interpretation heuristics (char-level, nats):
print(f"\nUniform (random) baseline loss = ln(vocab_size) = {uniform_loss_ln:.4f} nats")
print("Heuristics:")
print("- val_loss > uniform: model is worse than random baseline -> check bugs/underfitting or data issues")
print("- val_loss ≈ uniform but train_loss << val_loss: severe overfitting")
print("- reasonable targets (depend on data/model):")
print("    * val_loss < uniform (better than random) is the first milestone")
print("    * val_loss ~ 3-4 nats (ppl ~ 20-55) = modest language modelling quality")
print("    * val_loss < 2 nats (ppl < ~7) = very good for small-char models (hard to achieve)")

print("\nPractical tips to improve generalization (short):")
print("- Reduce overfitting: increase dropout, weight decay, or early stopping; reduce model size")
print("- Reduce underfitting: train longer, increase capacity, or reduce regularization")
print("- More / cleaner data helps most. Monitor train vs val loss and generated samples.")
print("- Use validation loss trend + qualitative samples rather than aiming for absolute 0 (impossible).")

# Optional: show a short generated sample from the transformer to inspect quality
print("\nExample sample (KobiGPT):")
print(decode(kobigpt.generate(idx = torch.zeros((1, 1), dtype=torch.long), max_new_tokens=200)[0].tolist()))