In [None]:
import torch

print("CUDA available:", torch.cuda.is_available())

if torch.cuda.is_available():
    print("GPU name:", torch.cuda.get_device_name(0))
    print("GPU count:", torch.cuda.device_count())


CUDA available: True
GPU name: Tesla T4
GPU count: 1


In [None]:
%pip install requests



## Getting the Data

In [None]:
import os
try:
    import requests
except Exception:
    from urllib.request import urlopen
    class _SimpleResponse:
        def __init__(self, content):
            self.content = content
    class _RequestsShim:
        @staticmethod
        def get(url):
            return _SimpleResponse(urlopen(url).read())
    requests = _RequestsShim()

DATASOURCE = {
    "frankenstein": "https://www.gutenberg.org/ebooks/84.txt.utf-8",
    "memoirs_of_grant": "https://www.gutenberg.org/ebooks/4367.txt.utf-8",
}

for filename, url in DATASOURCE.items():
    if not os.path.exists(f"{filename}.txt"):
        resp = requests.get(url)
        with open(f"{filename}.txt","wb") as f:
            f.write(resp.content)

In [None]:
def preprocess_gutenberg(filename):

    with open(f"{filename}","r" , encoding="utf-8") as f:
        text = f.read()

    start = text.find("*** START OF THE PROJECT GUTENBERG EBOOK")
    start = text.find("\n",start)+1
    end = text.find("*** END OF THE PROJECT GUTENBERG EBOOK")

    text = text[start:end]

    text ="\n".join(line.strip() for line in text.split("\n") if line.strip())

    return text

def get_dataset_txt():
    all_text = []

    for filename in DATASOURCE:
        text = preprocess_gutenberg(f"{filename}.txt")
        all_text.append(text)

    return all_text


text = get_dataset_txt()

### Train a Tokenizer

In [None]:
%pip install tokenizers



In [None]:
import tokenizers
from tokenizers import Tokenizer

tokenizer = Tokenizer(tokenizers.models.BPE())
tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space = True)
tokenizer.decoder = tokenizers.decoders.ByteLevel()

trainer = tokenizers.trainers.BpeTrainer(
    vocab_size=10000,
    special_tokens=["[pad]","[eos]"],
    show_progress=True,
)

text = get_dataset_txt()

tokenizer.train_from_iterator(text,trainer=trainer)
tokenizer.enable_padding(pad_id=tokenizer.token_to_id("[pad]"),pad_token="[pad]")

# Save the trained tokenizer

tokenizer.save("gutenberg_tokenizer.json",pretty=True)

In [None]:
tokenizer = Tokenizer.from_file("gutenberg_tokenizer.json")

### Positional Encoding

In [None]:
import torch
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

True
Tesla T4


In [None]:
import torch
from torch import nn
def rotate_half(x):
    x1, x2 = x.chunk(2, dim=-1)
    return torch.cat((-x2, x1), dim=-1)

def apply_rotary_pos_emb(x, cos, sin):
    return (x * cos) + (rotate_half(x) * sin)

class RotaryPositionalEncoding(nn.Module):
    def __init__(self, dim, max_seq_len=1024):
        super().__init__()
        N = 10000
        inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2).float() / dim))
        inv_freq = torch.cat((inv_freq, inv_freq), dim=-1)
        position = torch.arange(max_seq_len).float()
        sinusoid_inp = torch.outer(position, inv_freq)  # (max_seq_len, dim)
        self.register_buffer("cos", sinusoid_inp.cos())
        self.register_buffer("sin", sinusoid_inp.sin())

    def forward(self, x, seq_len=None):
        if seq_len is None:
            seq_len = x.size(1)
        cos = self.cos[:seq_len].view(1, seq_len, 1, -1)
        sin = self.sin[:seq_len].view(1, seq_len, 1, -1)
        return apply_rotary_pos_emb(x, cos, sin)

sequence = torch.randn(1, 10, 4, 128)
rope = RotaryPositionalEncoding(128)
new_sequence = rope(sequence)



In [None]:
new_sequence

tensor([[[[ 1.3500e-01, -1.0934e+00,  7.2225e-01,  ..., -1.1666e-01,
           -1.4374e+00,  1.2989e+00],
          [-1.8225e+00,  3.0511e-01, -1.0743e+00,  ...,  1.3728e+00,
            7.2227e-01, -4.4180e-01],
          [-1.1341e-01,  7.6998e-01,  1.5093e-01,  ..., -9.9555e-01,
           -5.5373e-01,  8.5772e-01],
          [-5.7886e-01,  2.1247e-01, -7.5288e-01,  ..., -9.7738e-01,
            1.8347e+00, -3.1595e-01]],

         [[ 2.4295e-01, -5.6094e-01,  5.8951e-01,  ..., -1.0252e+00,
            6.7968e-01,  1.2072e+00],
          [-1.3974e-01,  1.2955e+00, -7.5753e-01,  ..., -1.8200e+00,
            8.4931e-01,  8.6374e-01],
          [-1.3631e-01,  7.3877e-01, -1.5310e+00,  ...,  1.2854e+00,
            8.2701e-01, -8.3987e-01],
          [-1.4309e+00,  7.8992e-01,  8.2725e-01,  ..., -1.7842e-01,
            2.0855e-01,  5.3596e-01]],

         [[ 1.7888e-01,  1.1067e+00, -1.4434e-01,  ...,  1.1007e+00,
           -7.9047e-01, -4.5984e-01],
          [-4.9294e-01, -8.4089e-

## Grouped Query Attention

In [None]:
import torch.nn.functional as F
class GQA(nn.Module):
  def __init__(self,hidden_dims,num_heads,num_kv_heads,drop_out=0.1):
    super().__init__()
    self.num_head = num_heads
    self.num_kv_head = num_kv_heads
    self.head_dim = hidden_dims // num_heads
    self.num_group = num_heads // num_kv_heads
    self.dropout = drop_out

    self.q_proj = nn.Linear(hidden_dims,self.num_head*self.head_dim)
    self.k_proj = nn.Linear(hidden_dims,self.num_kv_head*self.head_dim)
    self.v_proj = nn.Linear(hidden_dims,self.num_kv_head*self.head_dim)
    self.out_proj = nn.Linear(self.num_head*self.head_dim,hidden_dims)


  def forward(self,q,k,v,mask=None,rope=None):
    q_batch_size,q_seq_len,hidden_dim = q.shape
    k_batch_size,k_seq_len,hidden_dim = k.shape
    v_batch_size,v_seq_len,hidden_dim = v.shape

    q = self.q_proj(q).view(q_batch_size,q_seq_len,-1,self.head_dim).transpose(1,2)
    k = self.k_proj(k).view(k_batch_size,k_seq_len,-1,self.head_dim).transpose(1,2)
    v = self.v_proj(v).view(v_batch_size,v_seq_len,-1,self.head_dim).transpose(1,2)

    if rope:
      q = rope(q)
      k = rope(k)

    q = q.contiguous()
    k = k.contiguous()
    v = v.contiguous()

    output = F.scaled_dot_product_attention(q,k,v,attn_mask=mask,dropout_p=self.dropout,enable_gqa=True)

    output = output.transpose(1,2).reshape(q_batch_size, q_seq_len, hidden_dim).contiguous()
    output = self.out_proj(output)

    return output

In [None]:

# Example
gqa = GQA(hidden_dims=128, num_heads=8, num_kv_heads=2)

q = torch.randn(2, 16, 128)
k = torch.randn(2, 16, 128)
v = torch.randn(2, 16, 128)

out = gqa(q, k, v)
print(out.shape)

torch.Size([2, 16, 128])


## Mask


In [None]:
def create_causal_mask(seq_len, device):
    return torch.triu(
        torch.ones((seq_len, seq_len), device=device, dtype=torch.bool),
        diagonal=1
    )


## Mixture of Expert Models

In [None]:
import torch
from torch import nn

class SwiGLU(nn.Module):
  def __init__(self,hidden_dim,intermediate_dim):
    super().__init__()
    self.get = nn.Linear(hidden_dim,intermediate_dim)
    self.up = nn.Linear(hidden_dim,intermediate_dim)
    self.down = nn.Linear(intermediate_dim,hidden_dim)
    self.act = nn.SiLU()

  def forward(self,x):
    x = self.act(self.get(x))*self.up(x)
    out = self.down(x)
    return out

In [None]:
SwiGLU(8,2) # example

SwiGLU(
  (get): Linear(in_features=8, out_features=2, bias=True)
  (up): Linear(in_features=8, out_features=2, bias=True)
  (down): Linear(in_features=2, out_features=8, bias=True)
  (act): SiLU()
)

In [None]:
class MoELayer(nn.Module):
  def __init__(self,hidden_dim,intermediate_dim,moe_experts,top_k=2):
    super().__init__()
    self.experts = moe_experts
    self.topk = top_k
    self.expert =  nn.ModuleList([
        SwiGLU(hidden_dim,intermediate_dim) for _ in range(moe_experts)
    ])
    self.roter = nn.Linear(hidden_dim,moe_experts)
  def forward(self,hidden_states):
    batch_size,seq_len,hidden_dim = hidden_states.shape

    hidden_stated_reshaped = hidden_states.view(-1,hidden_dim)
    router_logits = self.roter(hidden_stated_reshaped)

    top_k_logits, top_k_indices = torch.topk(router_logits,self.topk,dim=-1)
    top_k_prob = F.softmax(top_k_logits,dim=-1)

    output = torch.zeros(batch_size*seq_len,hidden_dim,device=hidden_states.device,dtype=hidden_states.dtype)
    unique_experts = torch.unique(top_k_indices)

    for i in unique_experts:
      expert_id = int(i)
      mask = (top_k_indices == expert_id)
      token_mask  = mask.any(dim=1)
      assert token_mask.any(),f"Expecting some tokens using expert {expert_id}"

      expert_input = hidden_stated_reshaped[token_mask]
      expert_wight = top_k_prob[mask].unsqueeze(-1)
      expert_output = self.expert[expert_id](expert_input)

      output[token_mask] = expert_output*expert_wight

    output = output.view(batch_size,seq_len,hidden_dim)
    return output

In [None]:
MoELayer(8,2,8)

MoELayer(
  (expert): ModuleList(
    (0-7): 8 x SwiGLU(
      (get): Linear(in_features=8, out_features=2, bias=True)
      (up): Linear(in_features=8, out_features=2, bias=True)
      (down): Linear(in_features=2, out_features=8, bias=True)
      (act): SiLU()
    )
  )
  (roter): Linear(in_features=8, out_features=8, bias=True)
)

## RMS Norm and Skip Connections

In [None]:
class Decoder(nn.Module):
  def  __init__(self,hidden_dims,num_heads,num_kv_heads,moe_experts,moe_topk, dropout=0.1):
    super().__init__()
    self.self_atten = GQA(hidden_dims,num_heads,num_kv_heads,drop_out=dropout)
    self.mlp = MoELayer(hidden_dims,4*hidden_dims,moe_experts,moe_topk)
    self.norm1 = nn.RMSNorm(hidden_dims)
    self.norm2 = nn.RMSNorm(hidden_dims)

  def forward(self,x,mask=None,rope=None):
    out = self.norm1(x)
    out = self.self_atten(out,out,out,mask,rope)
    x = x+out

    out = self.norm2(x)
    out = self.mlp(out)

    return x + out

## Complete Transformer Model

In [None]:
# model_config = {
#     "num_layers": 8,
#     "num_heads": 8,
#     "num_kv_heads": 4,
#     "hidden_dim": 768,
#     "moe_experts": 8,
#     "moe_topk": 2,
#     "max_seq_len": 512,
#     "vocab_size": len(tokenizer.get_vocab()),
#     "dropout": 0.1,
# }

model_config = {
    # -------------------------
    # Reduced depth (BIG speed gain)
    # -------------------------
    "num_layers": 5,          # was 8

    # -------------------------
    # Attention (GQA preserved)
    # -------------------------
    "num_heads": 6,           # was 8
    "num_kv_heads": 2,        # was 4

    # -------------------------
    # Reduced width (BIG memory gain)
    # -------------------------
    "hidden_dim": 384,        # was 768

    # -------------------------
    # MoE (kept but lighter)
    # -------------------------
    "moe_experts": 2,         # was 8
    "moe_topk": 1,            # was 2 (faster + stable)

    # -------------------------
    # Sequence length (attention is O(N^2))
    # -------------------------
    "max_seq_len": 256,       # was 512

    # -------------------------
    # Vocabulary (unchanged)
    # -------------------------
    "vocab_size": len(tokenizer.get_vocab()),

    # -------------------------
    # Regularization
    # -------------------------
    "dropout": 0.1,
}


In [None]:
model_config

{'num_layers': 5,
 'num_heads': 6,
 'num_kv_heads': 2,
 'hidden_dim': 384,
 'moe_experts': 2,
 'moe_topk': 1,
 'max_seq_len': 256,
 'vocab_size': 10000,
 'dropout': 0.1}

In [None]:
class TextGenerationModel(nn.Module):
  def __init__(self, num_layers, num_heads, num_kv_heads, hidden_dim,
                 moe_experts, moe_topk, max_seq_len, vocab_size, dropout=0.1):
    super().__init__()
    self.rope = RotaryPositionalEncoding(hidden_dim // num_heads, max_seq_len)
    self.embedding = nn.Embedding(vocab_size, hidden_dim)
    self.decoder = nn.ModuleList([
        Decoder(hidden_dim,num_heads,num_kv_heads,moe_experts,moe_topk,dropout)
        for _ in range(num_layers)
    ])

    self.norm = nn.RMSNorm(hidden_dim)
    self.out = nn.Linear(hidden_dim,vocab_size)

  def forward(self,ids,mask=None):
    x = self.embedding(ids)
    for decoder in self.decoder:
      x = decoder(x,mask,self.rope)

    x = self.norm(x)

    return self.out(x)
model = TextGenerationModel(**model_config)

In [None]:
model

TextGenerationModel(
  (rope): RotaryPositionalEncoding()
  (embedding): Embedding(10000, 384)
  (decoder): ModuleList(
    (0-4): 5 x Decoder(
      (self_atten): GQA(
        (q_proj): Linear(in_features=384, out_features=384, bias=True)
        (k_proj): Linear(in_features=384, out_features=128, bias=True)
        (v_proj): Linear(in_features=384, out_features=128, bias=True)
        (out_proj): Linear(in_features=384, out_features=384, bias=True)
      )
      (mlp): MoELayer(
        (expert): ModuleList(
          (0-1): 2 x SwiGLU(
            (get): Linear(in_features=384, out_features=1536, bias=True)
            (up): Linear(in_features=384, out_features=1536, bias=True)
            (down): Linear(in_features=1536, out_features=384, bias=True)
            (act): SiLU()
          )
        )
        (roter): Linear(in_features=384, out_features=2, bias=True)
      )
      (norm1): RMSNorm((384,), eps=None, elementwise_affine=True)
      (norm2): RMSNorm((384,), eps=None, eleme

## Training the Model

In [None]:
class GutenbergDataset(torch.utils.data.Dataset):
    def __init__(self,text, tokenizer, seq_len=512):
      self.seq_len = seq_len
      self.encoded = tokenizer.encode(text).ids

    def __len__(self):
      return len(self.encoded)-self.seq_len

    def __getitem__(self,idx):
      chunk = self.encoded[idx:idx+self.seq_len+1]
      x = torch.tensor(chunk[:-1])
      y = torch.tensor(chunk[1:])
      return x,y

BATCH_SIZE = 32
text = "\n".join(get_dataset_txt())
dataset = GutenbergDataset(text,tokenizer,seq_len=model_config["max_seq_len"])
dataloader = torch.utils.data.DataLoader(dataset,batch_size=BATCH_SIZE,shuffle=True)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device).to(torch.bfloat16)

In [None]:
!pip install tqdm



In [None]:
import torch
import torch.nn as nn
from torch import optim
from tqdm.auto import tqdm   # ‚úÖ better than tqdm.notebook in Colab

N_EPOCHS = 1
LR = 5e-4
WARMUP_STEPS = 1000
CLIP_NORM = 1.0

optimizer = optim.AdamW(model.parameters(), lr=LR)
loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer.token_to_id("[pad]"))

# Learning rate scheduling
warmup_scheduler = optim.lr_scheduler.LinearLR(
    optimizer,
    start_factor=0.01,
    end_factor=1.0,
    total_iters=WARMUP_STEPS
)

total_steps = N_EPOCHS * len(dataloader)
cosine_scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=max(1, total_steps - WARMUP_STEPS),
    eta_min=0.0
)

scheduler = optim.lr_scheduler.SequentialLR(
    optimizer,
    schedulers=[warmup_scheduler, cosine_scheduler],
    milestones=[WARMUP_STEPS]
)

print(f"Training for {N_EPOCHS} epochs with {len(dataloader)} steps per epoch")

best_loss = float("inf")

for epoch in range(N_EPOCHS):
    model.train()
    epoch_loss = 0.0

    progress_bar = tqdm(
        dataloader,
        desc=f"Epoch {epoch + 1}/{N_EPOCHS}",
        leave=True,
        dynamic_ncols=True,
        mininterval=0.5,     # üî¥ force frequent refresh
        smoothing=0.0        # üî¥ disable averaging lag
    )

    for step, (x, y) in enumerate(progress_bar):
        x = x.to(device)
        y = y.to(device)

        seq_len = x.shape[1]

        mask = create_causal_mask(
            seq_len=seq_len,
            device=device
        ).unsqueeze(0)

        optimizer.zero_grad(set_to_none=True)

        outputs = model(x, mask)

        loss = loss_fn(
            outputs.view(-1, outputs.size(-1)),
            y.view(-1)
        )

        loss.backward()

        torch.nn.utils.clip_grad_norm_(
            model.parameters(),
            CLIP_NORM
        )

        optimizer.step()
        scheduler.step()

        epoch_loss += loss.item()

        # üî¥ explicit refresh
        progress_bar.set_postfix_str(f"loss={loss.item():.4f}")
        progress_bar.refresh()

    avg_loss = epoch_loss / len(dataloader)
    print(f"Epoch {epoch + 1}/{N_EPOCHS} | Avg Loss: {avg_loss:.4f}")

    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save(model.state_dict(), "textgen_model.pth")


Training for 1 epochs with 15116 steps per epoch


Epoch 1/1:   0%|          | 0/15116 [00:00<?, ?it/s]

Epoch 1/1 | Avg Loss: 0.0290


## Using the Model

In [None]:
import torch
import torch.nn.functional as F

def generate_text(
    model,
    tokenizer,
    prompt,
    max_length=120,
    temperature=0.8,
    top_k=40,
    top_p=0.9,
    repetition_penalty=2
):
    model.eval()
    device = next(model.parameters()).device

    input_ids = torch.tensor(
        tokenizer.encode(prompt).ids,
        dtype=torch.long
    ).unsqueeze(0).to(device)

    with torch.no_grad():
        for _ in range(max_length):
            outputs = model(input_ids)

            if isinstance(outputs, tuple):
                logits = outputs[0][:, -1, :]
            else:
                logits = outputs[:, -1, :]

            logits = (logits / temperature).clone()

            # repetition penalty
            for token_id in set(input_ids[0].tolist()):
                logits[0, token_id] /= repetition_penalty

            # top-k
            if top_k > 0:
                top_k_vals, _ = torch.topk(logits, top_k)
                min_top_k = top_k_vals[:, -1].unsqueeze(-1)
                logits = torch.where(
                    logits < min_top_k,
                    torch.full_like(logits, float("-inf")),
                    logits
                )

            # top-p (FIXED)
            sorted_logits, sorted_indices = torch.sort(logits, descending=True)
            cumulative_probs = torch.cumsum(
                F.softmax(sorted_logits, dim=-1), dim=-1
            )

            sorted_indices_to_remove = (cumulative_probs > top_p).clone()
            sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
            sorted_indices_to_remove[..., 0] = 0

            indices_to_remove = sorted_indices[sorted_indices_to_remove]

            logits = logits.clone()
            logits[:, indices_to_remove] = float("-inf")

            probs = F.softmax(logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)

            input_ids = torch.cat([input_ids, next_token], dim=1)

    return tokenizer.decode(input_ids[0].tolist())


In [None]:
test_prompts = [
    "Long before the final outcome was known, the young officer began to question the cost of obedience,",
    "Driven by an unrelenting desire to achieve greatness, he ignored the warnings that surrounded his work,",
    "The expedition was planned with confidence, yet uncertainty followed every step forward,",
    "Though trained in discipline and order, he found himself confronting chaos beyond preparation,",
    "What began as an intellectual pursuit gradually transformed into an experiment with irreversible consequences,",
]

In [None]:
print("\nGenerating sample texts:")
for prompt in test_prompts:
    generated = generate_text(model, tokenizer, prompt)
    print(f"\nPrompt: {prompt}")
    print(f"Generated: {generated}")
    print("-" * 80)


Generating sample texts:

Prompt: Long before the final outcome was known, the young officer began to question the cost of obedience,
Generated:  Long before the final outcome was known, the young officer began to question the cost of obedience,ward running appointed later there crossed on points as destroyed engaged commenced stationed without during formed necessary very almost too north had stated learned expected drew enabled afterwards went guarding finally opened fired again stopped ordered Halleck showed described since conducted intrenched difficult came took received breaking reached looking started given ceased sent rendered changed pushed collected now confined commanded drove going occupied firing informed educated generally opposed notified supposed brought about Charleston got fallen repulsed impassable assaulted assigned together runs ready directly against detained reported advanced mounted possessed just deserted excited done thrown at driven sufficiently performed su