In [26]:
# Cell 1: Imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer
import math
import numpy as np
import matplotlib.pyplot as plt
import os
from tqdm import tqdm


33333333333333333333333333333333333333333333333333

In [None]:
# Cell 1: Imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer
import math
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from datasets import load_dataset

# Cell 2: Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)  # [max_len, d_model]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # [max_len, 1]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: [batch_size, seq_len, d_model]
        return x + self.pe[:, :x.size(1), :]

# Cell 3: MultiHeadAttention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        self.scale = math.sqrt(self.d_k)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
        if mask is not None:
            # mask shape expected: [batch_size, 1, seq_len, seq_len] or broadcastable
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attn = torch.softmax(scores, dim=-1)
        output = torch.matmul(attn, V)
        return output, attn

    def forward(self, x, mask=None):
        batch_size, seq_len, _ = x.size()
        Q = self.W_q(x)
        K = self.W_k(x)
        V = self.W_v(x)

        # Reshape for multi-head attention
        Q = Q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)  # [b, h, seq_len, d_k]
        K = K.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

        context, attn = self.scaled_dot_product_attention(Q, K, V, mask)
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        return self.W_o(context), attn

# Cell 4: FeedForward
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.linear2(self.relu(self.linear1(x)))

# Cell 5: TransformerBlock
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ff = FeedForward(d_model, d_ff)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output, attn_weights = self.attn(x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.ff(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x, attn_weights

# Cell 6: GPT2 Model
class GPT2(nn.Module):
    def __init__(self, vocab_size, d_model=128, num_heads=4, num_layers=2, d_ff=512, max_len=512, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.output_layer = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)
        self.max_len = max_len

    def forward(self, x, mask=None):
        batch_size, seq_len = x.size()
        x = self.embedding(x)  # [b, seq_len, d_model]
        x = self.pos_encoding(x)
        x = self.dropout(x)

        attn_weights = []

        # Prepare masks for causal attention
        # causal mask: (seq_len, seq_len), lower triangular matrix (1's for allowed positions)
        causal_mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device)).bool()

        if mask is not None:
            # mask: [batch_size, seq_len] (1 for tokens, 0 for padding)
            # Expand mask shape for broadcasting: [batch_size, 1, 1, seq_len]
            mask = mask.unsqueeze(1).unsqueeze(2)
            # Expand causal mask for batch and heads: [1, 1, seq_len, seq_len]
            causal_mask = causal_mask.unsqueeze(0).unsqueeze(0)
            combined_mask = mask & causal_mask  # broadcasted AND
        else:
            # Just causal mask for all tokens
            combined_mask = causal_mask.unsqueeze(0).unsqueeze(0)  # shape [1,1,seq_len,seq_len]

        for block in self.transformer_blocks:
            x, attn = block(x, combined_mask)
            attn_weights.append(attn)

        logits = self.output_layer(x)  # [batch_size, seq_len, vocab_size]
        return logits, attn_weights

# Cell 7: Prepare Dataset and DataLoaders
# Load TinyStories dataset from HuggingFace datasets
dataset = load_dataset("roneneldan/TinyStories", split="train[:1%]")
val_dataset = load_dataset("roneneldan/TinyStories", split="validation[:1%]")

# Initialize tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token

def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128)

# Tokenize dataset
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask"])

tokenized_val_dataset = val_dataset.map(tokenize_function, batched=True)
tokenized_val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask"])

# Define PyTorch Dataset wrapper
class TinyStoriesDataset(Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __len__(self):
        return len(self.encodings["input_ids"])

    def __getitem__(self, idx):
        input_ids = self.encodings["input_ids"][idx]
        attention_mask = self.encodings["attention_mask"][idx]
        return input_ids, attention_mask

train_dataset = TinyStoriesDataset(tokenized_dataset)
val_dataset = TinyStoriesDataset(tokenized_val_dataset)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

# Cell 8: Training function
def train_model(model, train_loader, val_loader, tokenizer, num_epochs=5, device='cuda'):
    optimizer = optim.Adam(model.parameters(), lr=3e-4)
    loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

    train_losses = []
    val_losses = []

    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for xb, mask in tqdm(train_loader, desc=f"Epoch {epoch+1} [Training]"):
            xb, mask = xb.to(device), mask.to(device)
            logits, _ = model(xb, mask)

            # Shift logits and labels for next token prediction
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = xb[..., 1:].contiguous()

            loss = loss_fn(shift_logits.view(-1, shift_logits.size(-1)),
                           shift_labels.view(-1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for xb, mask in val_loader:
                xb, mask = xb.to(device), mask.to(device)
                logits, _ = model(xb, mask)

                shift_logits = logits[..., :-1, :].contiguous()
                shift_labels = xb[..., 1:].contiguous()

                loss = loss_fn(shift_logits.view(-1, shift_logits.size(-1)),
                               shift_labels.view(-1))
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)

        print(f"Epoch {epoch+1} Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

    # Plot losses
    plt.plot(train_losses, label="Train Loss")
    plt.plot(val_losses, label="Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

# Cell 9: Run training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = tokenizer.vocab_size
model = GPT2(vocab_size=vocab_size)

train_model(model, train_loader, val_loader, tokenizer, num_epochs=5, device=device)


Map:   0%|          | 0/220 [00:00<?, ? examples/s]

Epoch 1 [Training]: 100%|██████████| 2650/2650 [17:07<00:00,  2.58it/s]


Epoch 1 Train Loss: 4.1866 | Val Loss: 3.1371


Epoch 2 [Training]: 100%|██████████| 2650/2650 [16:59<00:00,  2.60it/s]


Epoch 2 Train Loss: 3.3443 | Val Loss: 2.8189


Epoch 3 [Training]: 100%|██████████| 2650/2650 [16:57<00:00,  2.60it/s]


Epoch 3 Train Loss: 3.1114 | Val Loss: 2.6523


Epoch 4 [Training]: 100%|██████████| 2650/2650 [17:00<00:00,  2.60it/s]


Epoch 4 Train Loss: 2.9714 | Val Loss: 2.5459


Epoch 5 [Training]:  48%|████▊     | 1283/2650 [08:15<08:44,  2.60it/s]

In [None]:
def generate_story(model, tokenizer, prompt="once upon a time", max_length=100, device='cpu'):
    model.eval()
    tokens = tokenizer.encode(prompt, return_tensors="pt").to(device)  # shape: [1, seq_len]
    generated = tokens

    with torch.no_grad():
        for _ in range(max_length):
            logits, _ = model(generated)
            next_token_logits = logits[:, -1, :]  # last token logits
            next_token_id = torch.argmax(next_token_logits, dim=-1).unsqueeze(0)  # greedy
            generated = torch.cat((generated, next_token_id), dim=1)
            if next_token_id.item() == tokenizer.eos_token_id:
                break

    story = tokenizer.decode(generated[0], skip_special_tokens=True)
    return story

# Usage after training:
model.to(device)
prompt = "once upon a time"
print(generate_story(model, tokenizer, prompt, max_length=100, device=device))
