In [1]:
from datasets import load_dataset
import os

def setup_environment(base_dir: str = '/kaggle/working/GPT-2-Project'):
    data_dir  = os.path.join(base_dir, 'data')
    model_dir = os.path.join(base_dir, 'models')
    os.makedirs(data_dir, exist_ok=True)
    os.makedirs(model_dir, exist_ok=True)
    return {
        'train_txt':      os.path.join(data_dir, 'TinyStories-train.txt'),
        'val_txt':        os.path.join(data_dir, 'TinyStories-valid.txt'),
        'model_ckpt':     os.path.join(model_dir, 'best_gpt2.pt'),
        'full_ckpt':      os.path.join(model_dir, 'checkpoint_full.pt'),
        'tokenizer_json': os.path.join(model_dir, 'tokenizer.json'),
    }

def download_dataset(paths):
    
    dataset = load_dataset("roneneldan/TinyStories")
    
    
    with open(paths['train_txt'], 'w', encoding='utf-8') as f:
        for story in dataset['train']['text']:
            f.write(story + '\n')
    
    
    with open(paths['val_txt'], 'w', encoding='utf-8') as f:
        for story in dataset['validation']['text']:
            f.write(story + '\n')


paths = setup_environment()
download_dataset(paths)

print("DONE:")
print(f"train: {paths['train_txt']}")
print(f"val: {paths['val_txt']}")

DONE:
train: /kaggle/working/GPT-2-Project/data/TinyStories-train.txt
val: /kaggle/working/GPT-2-Project/data/TinyStories-valid.txt


In [2]:
import os
import math
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset
from tqdm.notebook import tqdm
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace


In [3]:
# =============================================================================
# 2. Model Implementation
# =============================================================================
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.qkv = nn.Linear(d_model, 3 * d_model)
        self.proj = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        B, T, C = x.shape
        q, k, v = self.qkv(x).split(self.d_model, dim=2)

        q = q.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
        k = k.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
        v = v.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)

        attn = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim))
        if mask is not None:
            attn = attn.masked_fill(mask == 0, float('-inf'))
        attn = F.softmax(attn, dim=-1)
        attn = self.dropout(attn)

        out = (attn @ v).transpose(1, 2).contiguous().view(B, T, C)
        return self.proj(out)

In [4]:
class PositionWiseFFN(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


In [5]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = PositionWiseFFN(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        attn_out = self.self_attn(self.norm1(x), mask)
        x = x + self.dropout(attn_out)
        ffn_out = self.ffn(self.norm2(x))
        x = x + self.dropout(ffn_out)
        return x

In [6]:
class GPT2(nn.Module):
    def __init__(self, vocab_size: int, d_model: int = 768, num_layers: int = 12,
                 num_heads: int = 12, d_ff: int = 3072, max_seq: int = 1024, dropout: float = 0.1):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Parameter(torch.zeros(1, max_seq, d_model))
        self.dropout = nn.Dropout(dropout)
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(d_model)
        self.out = nn.Linear(d_model, vocab_size)
        self.max_seq_len = max_seq

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if isinstance(module, nn.Linear) and module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.LayerNorm):
            torch.nn.init.zeros_(module.bias)
            torch.nn.init.ones_(module.weight)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        B, T = x.shape
        token_emb = self.token_emb(x) * math.sqrt(self.token_emb.embedding_dim)
        pos_emb = self.pos_emb[:, :T, :]
        x = self.dropout(token_emb + pos_emb)

        for layer in self.layers:
            x = layer(x, mask)
        x = self.norm(x)
        return self.out(x)


In [7]:
# =============================================================================
# 3. Dataset & Tokenizer
# =============================================================================
class TinyStoriesDataset(Dataset):
    def __init__(self, file_path: str, tokenizer: Tokenizer, max_len: int = 1024):
        with open(file_path, 'r', encoding='utf-8') as f:
            self.lines = [l.strip() for l in f if l.strip()]
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.pad_id = tokenizer.token_to_id("[PAD]") or 0

    def __len__(self) -> int:
        return len(self.lines)

    def __getitem__(self, idx: int):
        tokens = self.tokenizer.encode(self.lines[idx]).ids[:self.max_len]
        tokens = tokens + [self.pad_id] * (self.max_len - len(tokens))
        return torch.tensor(tokens[:-1], dtype=torch.long), torch.tensor(tokens[1:], dtype=torch.long)

def train_tokenizer(train_file: str, save_path: str, vocab_size: int = 10000) -> Tokenizer:
    tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
    tokenizer.pre_tokenizer = Whitespace()
    trainer = BpeTrainer(
        special_tokens=["[UNK]", "[PAD]", "[BOS]", "[EOS]"],
        vocab_size=vocab_size
    )
    tokenizer.train([train_file], trainer)
    tokenizer.save(save_path)
    return tokenizer


In [8]:
# =============================================================================
# 4. Training & Evaluation
# =============================================================================
def train_model(config, paths):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    if not os.path.exists(paths['tokenizer_json']):
        print("Training tokenizer...")
        tokenizer = train_tokenizer(paths['train_txt'], paths['tokenizer_json'], config['vocab_size'])
    else:
        tokenizer = Tokenizer.from_file(paths['tokenizer_json'])

    full_train_ds = TinyStoriesDataset(paths['train_txt'], tokenizer, config['max_seq_len'])
    full_val_ds = TinyStoriesDataset(paths['val_txt'], tokenizer, config['max_seq_len'])

    train_indices = random.sample(range(len(full_train_ds)), int(0.1 * len(full_train_ds)))
    val_indices = random.sample(range(len(full_val_ds)), int(0.1 * len(full_val_ds)))

    train_ds = Subset(full_train_ds, train_indices)
    val_ds = Subset(full_val_ds, val_indices)

    train_loader = DataLoader(train_ds, batch_size=config['batch_size'], shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=config['batch_size'])

    model = GPT2(
        vocab_size=config['vocab_size'],
        d_model=config['d_model'],
        num_layers=config['num_layers'],
        num_heads=config['num_heads'],
        d_ff=config['d_ff'],
        max_seq=config['max_seq_len']
    ).to(device)

    optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'])
    criterion = nn.CrossEntropyLoss(ignore_index=full_train_ds.pad_id)
    best_val_loss = float('inf')

    for epoch in range(1, config['epochs'] + 1):
        model.train()
        train_loss = 0
        mask = torch.tril(torch.ones(config['max_seq_len']-1, config['max_seq_len']-1, device=device)).bool()

        for inputs, targets in tqdm(train_loader, desc=f"Epoch {epoch}"):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs, mask)
            loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs, mask)
                val_loss += criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1)).item()

        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)
        print(f"Epoch {epoch:02d} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), paths['model_ckpt'])
            print(f"Saved best model (val_loss={best_val_loss:.4f})")

In [9]:
# ============================================================================
# 5. Generation & Evaluation
# ============================================================================
def generate_text(model, tokenizer, prompt="", max_length=50, temperature=1.0, top_k=None, device='cuda'):
    model.eval()
    tokens = tokenizer.encode(prompt).ids
    for _ in range(max_length):
        input_ids = torch.tensor([tokens[-model.max_seq_len:]], device=device)
        mask = torch.tril(torch.ones(input_ids.size(1), input_ids.size(1), device=device)).bool()
        with torch.no_grad():
            logits = model(input_ids, mask)[0, -1, :]
        if temperature != 1.0:
            logits = logits / temperature
        if top_k is not None:
            v, _ = torch.topk(logits, top_k)
            logits[logits < v[-1]] = -float('Inf')
        probs = F.softmax(logits, dim=-1)
        next_token = torch.multinomial(probs, num_samples=1).item()
        tokens.append(next_token)
        if next_token == tokenizer.token_to_id("[PAD]"):
            break
    return tokenizer.decode(tokens)

def calculate_perplexity(model, data_loader, device='cuda'):
    model.eval()
    total_loss = 0
    total_tokens = 0
    mask = torch.tril(torch.ones(model.max_seq_len-1, model.max_seq_len-1, device=device)).bool()
    with torch.no_grad():
        for inputs, targets in tqdm(data_loader, desc="Calculating Perplexity"):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs, mask)
            loss = F.cross_entropy(outputs.view(-1, outputs.size(-1)),
                                 targets.view(-1),
                                 ignore_index=0,
                                 reduction='sum')
            total_loss += loss.item()
            total_tokens += (targets != 0).sum().item()
    avg_loss = total_loss / total_tokens
    return math.exp(avg_loss)

In [None]:
# =============================================================================
# 6. Main Execution
# =============================================================================
if __name__ == '__main__':
    config ={
    'vocab_size': 10000,
    'd_model': 512,
    'num_layers': 6,
    'num_heads': 8,
    'd_ff': 2048,
    'max_seq_len': 256,
    'batch_size': 16,
    'epochs': 5,
    'lr': 3e-4,
    }


    train_model(config, paths)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = GPT2(
        vocab_size=config['vocab_size'],
        d_model=config['d_model'],
        num_layers=config['num_layers'],
        num_heads=config['num_heads'],
        d_ff=config['d_ff'],
        max_seq=config['max_seq_len']
    ).to(device)
    model.load_state_dict(torch.load(paths['model_ckpt']))

    tokenizer = Tokenizer.from_file(paths['tokenizer_json'])

    print("\nGenerated Samples:")
    for prompt in ["Once upon a time", "In a magical kingdom", "The scientist discovered"]:
        generated = generate_text(model, tokenizer, prompt, max_length=100, temperature=0.7)
        print(f"\nPrompt: {prompt}\nGenerated: {generated}\n")

    full_val_ds = TinyStoriesDataset(paths['val_txt'], tokenizer, config['max_seq_len'])
    val_indices = random.sample(range(len(full_val_ds)), int(0.1 * len(full_val_ds)))
    val_ds = Subset(full_val_ds, val_indices)
    val_loader = DataLoader(val_ds, batch_size=config['batch_size'])
    perplexity = calculate_perplexity(model, val_loader)
    print(f"\nModel Perplexity: {perplexity:.2f}")


Epoch 1:   0%|          | 0/71332 [00:00<?, ?it/s]

Epoch 01 | Train Loss: 2.4178 | Val Loss: 2.1148
Saved best model (val_loss=2.1148)


Epoch 2:   0%|          | 0/71332 [00:00<?, ?it/s]