In [1]:
import pandas as pd
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import math
import random
import time

### A text-generation transformer

In [None]:

# # Micro Story Generator - Train a Tiny Transformer From Scratch
# This script trains a character-level Transformer language model on a tiny text corpus.
# Concepts : tokenization, positional encoding, attention, autoregressive generation.

# %%
import math, random, time
import torch
import torch.nn as nn
import torch.nn.functional as F

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)

# -----------------------------
# 1) Tiny corpus (training text)
# -----------------------------
# We use a small text block for quick training.
TRAIN_TEXT = """
Once upon a time, in a distant valley, a small robot learned to write.
It practiced by crafting tiny stories about stars, rain, and quiet forests.
Sometimes the stories were silly; sometimes they were wise.
Students loved reading them and adding their own twists.
The robot learned that every story is a journey, even the short ones.
Writing became its favorite way to understand the world.
"""

# -----------------------------------------
# EXTRA THEMATIC BLOCKS (You can add/remove them)
# -----------------------------------------
## TODO : Add more/remove lines here to the training text to enrich the dataset => add texts that can help the model learn better the structure of sentences.
extra_lines = [

    # --- SPACE / DISCOVERY ---
    "I wrote about adventures in space as I dreamed among the stars.",
    "I wrote about astronauts who found secrets hidden in the darkness.",
    "I wrote about planets with glowing oceans and floating mountains.",

    # --- PERSONAL EXPRESSION / LETTERS ---
    "I wrote to the president because I had many things to tell.",
    "I wrote letters to friends who lived far away across the sea.",
    "I wrote apologies, confessions, and promises I hoped to keep.",

    # # --- FUTURE / AI ---
    # "I wrote about the future of AI and how it may change our lives.",
    # "I wrote about machines learning kindness, patience, and wisdom.",
    # "I wrote about a world where humans and robots create together.",

    # # --- POETRY / NATURE ---
    # "I wrote poems about nature, the wind, and the sea.",
    # "I wrote verses about mountains that whispered old stories.",
    # "I wrote lines about rivers that carried forgotten dreams.",

    # # --- PHILOSOPHY / REFLECTIONS ---
    # "I wrote to understand what it means to grow and to learn.",
    # "I wrote about the meaning of silence in a noisy world.",
    # "I wrote about time, wondering why it moves so quickly.",

    # --- STORY STRUCTURE BOOSTERS ---
    "Every story began with a question, and every question led to wonder.",
    "Sometimes I used long sentences to explore ideas slowly and carefully.",
    "Other times I chose short sentences. Because they create rhythm.",
    "Characters faced problems, solved them, and learned something new.",
    "Clear beginnings, meaningful middles, and gentle endings helped my stories shine.",

    # --- STYLE BOOSTERS (makes the model copy your tone) ---
    "My style grew warmer and softer every time I wrote something new.",
    "I liked using gentle words, patient pauses, and careful rhythm.",
    "I enjoyed mixing imagination with simple everyday observations."
]

TRAIN_TEXT += "\n".join(extra_lines)


# -----------------------------
# 2) Character-level tokenizer
# -----------------------------
# We build a vocabulary of all unique characters in TRAIN_TEXT.
# stoi: maps character -> integer ID
# itos: maps integer ID -> character
chars = sorted(list(set(TRAIN_TEXT)))
stoi = {ch:i for i,ch in enumerate(chars)}
itos = {i:ch for ch,i in stoi.items()}
vocab_size = len(chars)
print("Vocab size (chars):", vocab_size)

# Encode: convert string to tensor of IDs
# Decode: convert tensor of IDs back to string
def encode(s): return torch.tensor([stoi[c] for c in s], dtype=torch.long)
def decode(t): return "".join(itos[int(i)] for i in t)

# Convert entire training text to tensor
data = encode(TRAIN_TEXT)
print("Data length:", len(data))

# -----------------------------
# 3) Create training batches
# -----------------------------
# We train the model to predict the next character given previous ones.
# Each batch contains BATCH_SIZE sequences of length SEQ_LEN.
SEQ_LEN = 128
BATCH_SIZE = 64
def get_batch():
    # Random starting positions for sequences
    ix = torch.randint(0, len(data)-SEQ_LEN-1, (BATCH_SIZE,))
    x = torch.stack([data[i:i+SEQ_LEN] for i in ix])       # input sequence
    y = torch.stack([data[i+1:i+SEQ_LEN+1] for i in ix])   # target sequence (shifted by 1)
    return x.to(DEVICE), y.to(DEVICE)

# -----------------------------
# 4) Positional encoding
# -----------------------------
# Adds position information to embeddings so the model knows token order.
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0)/d_model))
        pe[:,0::2] = torch.sin(position*div_term)
        pe[:,1::2] = torch.cos(position*div_term)
        self.register_buffer('pe', pe.unsqueeze(0))  # shape [1, max_len, d_model]
    def forward(self, x):
        # Add positional encoding to input embeddings
        T = x.size(1)
        return x + self.pe[:,:T,:]

# -----------------------------
# 5) Multi-head attention
# -----------------------------
# Splits embeddings into multiple heads for parallel attention.
# Uses a causal mask to prevent looking ahead (autoregressive).
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.q = nn.Linear(d_model, d_model)
        self.k = nn.Linear(d_model, d_model)
        self.v = nn.Linear(d_model, d_model)
        self.o = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        B, T, D = x.shape
        H, Dh = self.num_heads, self.d_k

        # Project input to Q, K, V and split into heads
        q = self.q(x).view(B, T, H, Dh).transpose(1, 2)
        k = self.k(x).view(B, T, H, Dh).transpose(1, 2)
        v = self.v(x).view(B, T, H, Dh).transpose(1, 2)

        # Compute scaled dot-product attention
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(Dh)
        if mask is not None:
            scores = scores.masked_fill(~mask, float("-inf"))
        attn = torch.softmax(scores, dim=-1)
        out = torch.matmul(attn, v)

        # Combine heads back
        out = out.transpose(1, 2).contiguous().view(B, T, D)
        return self.o(out)

# Causal mask: lower-triangular matrix to block future tokens
def causal_mask(T):
    return torch.tril(torch.ones(T, T, dtype=torch.bool, device=DEVICE))

# -----------------------------
# 6) Feed-forward block
# -----------------------------
# Applies two linear layers with ReLU for non-linearity.
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
    def forward(self, x): return self.net(x)

# -----------------------------
# 7) Transformer block
# -----------------------------
# Combines attention and feed-forward with LayerNorm and residual connections.
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff)

    def forward(self, x, mask):
        x = x + self.attn(self.ln1(x), mask)
        x = x + self.ff(self.ln2(x))
        return x

# -----------------------------
# 8) Full Transformer LM
# -----------------------------
# Embedding + positional encoding + N transformer blocks + final linear head.
class TinyTransformerLM(nn.Module):
    def __init__(self, vocab_size, d_model=128, num_layers=2, num_heads=4, d_ff=256, max_len=1024):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pos = PositionalEncoding(d_model, max_len)
        self.blocks = nn.ModuleList([TransformerBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.ln_f = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size)

    def forward(self, idx):
        x = self.embed(idx)
        x = self.pos(x)
        m = causal_mask(idx.size(1))
        for blk in self.blocks:
            x = blk(x, m)
        x = self.ln_f(x)
        logits = self.head(x)
        return logits

    # Generation: autoregressive sampling
    @torch.no_grad()
    def generate(self, prompt_ids, max_new_tokens=100, temperature=1.0, top_k=None):
        x = prompt_ids.unsqueeze(0).to(DEVICE)
        for _ in range(max_new_tokens):
            logits = self.forward(x)[:, -1, :] / max(temperature, 1e-6) # temperature scaling of logits => higher temperature = more random (softmax results more uniform)
            if top_k is not None:
                values, indices = torch.topk(logits, top_k)
                probs = torch.zeros_like(logits).scatter_(1, indices, torch.softmax(values, dim=-1))
            else:
                probs = torch.softmax(logits, dim=-1)
            next_id = torch.multinomial(probs, num_samples=1) ## Sample next token using the distribution of probs
            x = torch.cat([x, next_id], dim=1)
        return x[0].detach().cpu()

# -----------------------------
# 9) Instantiate model & optimizer
# -----------------------------
model = TinyTransformerLM(vocab_size=vocab_size, d_model=128, num_layers=2, num_heads=4, d_ff=256).to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)


Device: cpu
Vocab size (chars): 39
Data length: 1320


In [6]:
# -----------------------------
# 10) Training loop
# -----------------------------
# We train for a few epochs to keep runtime short.
EPOCHS = 3
PRINT_EVERY = 200

model.train()
start = time.time()
for epoch in range(1, EPOCHS+1):
    total_loss, steps = 0.0, 0
    for _ in range(800):  # number of mini-batches per epoch
        x, y = get_batch()
        logits = model(x)
        loss = F.cross_entropy(logits.view(-1, vocab_size), y.view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        steps += 1
        if steps % PRINT_EVERY == 0:
            print(f"Epoch {epoch} | Step {steps} | Loss {total_loss/steps:.4f}")
    print(f"Epoch {epoch} done. Avg loss: {total_loss/steps:.4f}")

elapsed = time.time() - start
print(f"Training finished in {elapsed:.1f}s on {DEVICE}")


Epoch 1 | Step 200 | Loss 2.2977
Epoch 1 | Step 400 | Loss 1.7835
Epoch 1 | Step 600 | Loss 1.3056
Epoch 1 | Step 800 | Loss 1.0129
Epoch 1 done. Avg loss: 1.0129
Epoch 2 | Step 200 | Loss 0.0855
Epoch 2 | Step 400 | Loss 0.0756
Epoch 2 | Step 600 | Loss 0.0688
Epoch 2 | Step 800 | Loss 0.0639
Epoch 2 done. Avg loss: 0.0639
Epoch 3 | Step 200 | Loss 0.0455
Epoch 3 | Step 400 | Loss 0.0803
Epoch 3 | Step 600 | Loss 0.0854
Epoch 3 | Step 800 | Loss 0.0754
Epoch 3 done. Avg loss: 0.0754
Training finished in 618.6s on cpu


In [7]:
# -----------------------------
# 11) Generation
# -----------------------------
def generate_text(seed, max_new_tokens=200, temperature=0.9, top_k=20):
    model.eval() # set model to evaluation mode (not training)
    prompt_ids = encode(seed)
    gen_ids = model.generate(prompt_ids, max_new_tokens=max_new_tokens, temperature=temperature, top_k=top_k)
    print("\n--- Generated text ---")
    print(decode(gen_ids))
    print("----------------------\n")

In [8]:

prompt = "I wrote"
generate_text(prompt)


--- Generated text ---
I wrote to to the president because I had many thing th sta who sine keve ry tory isn a journey, even the short ones.
Writ borit borit beeeeeeeedeeeeeeeedeveevertury beery beeeeveeeeveveevevevery bepry by be
----------------------



In [None]:
prompt = "Here is the beginning of a short story:"
generate_text(prompt)

In [10]:
prompt = "Continue the text. "
generate_text(prompt)


--- Generated text ---
Continue the text. racte rke.
Ito about planets with glowing oceans and floating mountains.
I wrote to the president because I had mpreead opleatle e eanevererverververererereredvereveveaneaneaneaneraneryanervelerglever
----------------------



In [11]:
prompt = "Write gently about something"
generate_text(prompt)


--- Generated text ---
Write gently about something nitars.
I wrote about astronauts who found secrets hidden in the darkness.
I wrote about planets with glllllanelewlelemenenenenesthenestulllesesesesemelewefoweweglowegrowengrwenglvellellllvenelvelele
----------------------

