In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts

# Step 1: Data Preparation and Tokenization
# Read the dataset
with open('out.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# Use Byte Pair Encoding (BPE) for tokenization
tokenizer = Tokenizer(models.BPE())
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
trainer = trainers.BpeTrainer(special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"])
tokenizer.train_from_iterator([text], trainer=trainer)
tokenizer.decoder = decoders.BPEDecoder()
tokenizer.enable_padding(pad_id=0)
tokenizer.enable_truncation(max_length=512)

# Tokenize the text
encoded = tokenizer.encode(text)
data = torch.tensor(encoded.ids, dtype=torch.long)

# Constants
vocab_size = tokenizer.get_vocab_size()
block_size = 128

# Ensure block_size and batch_size fit the dataset
if len(data) <= 10:
    raise ValueError("Dataset is too small to train. Add more data.")

if len(data) < block_size:
    block_size = max(1, len(data) // 2)
    print(f"Adjusted block_size to {block_size} due to small dataset size.")

batch_size = min(16, len(data) // (block_size + 1))
if batch_size < 1:
    raise ValueError("Dataset too small to create even a single batch. Add more data.")
print(f"Adjusted batch_size to {batch_size} to fit the dataset size.")

# Train/Validation Split
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

# Ensure training and validation splits
if len(train_data) < block_size or len(val_data) < block_size:
    raise ValueError(
        f"Training/validation data is too small for block_size={block_size}. "
        "Add more data or reduce block_size further."
    )

# Data loader
def get_batch(split, batch_size):
    data = train_data if split == 'train' else val_data
    max_offset = max(0, len(data) - block_size)
    if max_offset == 0:
        raise ValueError(f"Dataset too small for block_size={block_size}. Reduce block_size or increase dataset size.")
    ix = torch.randint(max_offset, (batch_size,))
    x = torch.stack([data[i:i + block_size] for i in ix])
    y = torch.stack([data[i + 1:i + block_size + 1] for i in ix])
    return x.to(device), y.to(device)

# Step 2: Model Components
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.tril = torch.tril(torch.ones(block_size, block_size)).to(device)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        wei = q @ k.transpose(-2, -1) * C**-0.5
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        return wei @ v

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        return self.dropout(self.proj(out))

class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.GELU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        return x + self.ffwd(self.ln2(x))

class BigramLanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))
        x = tok_emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)

        if targets is None:
            return logits, None
        logits = logits.view(-1, vocab_size)
        targets = targets.view(-1)
        return logits, F.cross_entropy(logits, targets)

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, _ = self(idx_cond)
            probs = F.softmax(logits[:, -1, :], dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

# Step 3: Hyperparameters
batch_size = 32
n_embd = 128
n_head = 4
n_layer = 4
dropout = 0.2
max_iters = 1000
eval_interval = 100
learning_rate = 1e-4
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

# Initialize model and optimizer
model = BigramLanguageModel().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=100)

# Step 4: Training Loop
@torch.no_grad()
def estimate_loss():
    model.eval()
    out = {'train': 0, 'val': 0}
    for split in ['train', 'val']:
        losses = []
        for _ in range(10):
            X, Y = get_batch(split, batch_size)
            _, loss = model(X, Y)
            losses.append(loss.item())
        out[split] = torch.tensor(losses).mean().item()
    model.train()
    return out

for iter in range(max_iters):
    # Log progress
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"Step {iter}: Train Loss {losses['train']:.4f}, Val Loss {losses['val']:.4f}")

    # Train on a batch
    X, Y = get_batch('train', batch_size)
    logits, loss = model(X, Y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    scheduler.step()

# Step 5: Text Generation
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated = model.generate(context, max_new_tokens=500)
print(tokenizer.decode(generated[0].tolist()))




Adjusted batch_size to 3 to fit the dataset size.


ValueError: Training/validation data is too small for block_size=128. Add more data or reduce block_size further.