In [None]:
import os
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import LambdaLR
import re
from collections import Counter
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple, Optional

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [None]:
class BPETokenizer:
    def __init__(self, vocab_size=10000):
        self.vocab_size = vocab_size
        self.special_tokens = {"<PAD>": 0, "<BOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.vocab = {}
        self.reverse_vocab = {}
        self.merges = {}
        
    def fit(self, texts):
        # Initialize with characters
        all_chars = set()
        for text in texts:
            all_chars.update(text)
        
        # Initialize vocabulary with characters
        vocab = {c: i + len(self.special_tokens) for i, c in enumerate(all_chars)}
        vocab.update(self.special_tokens)
        self.vocab = vocab
        self.reverse_vocab = {i: c for c, i in vocab.items()}
        
        # Tokenize corpus into character pairs
        word_counts = Counter()
        for text in texts:
            word_counts.update(text.split())
        
        # Implement BPE algorithm
        for _ in range(self.vocab_size - len(vocab)):
            pairs = self._get_stats(word_counts)
            if not pairs:
                break
                
            best_pair = max(pairs, key=pairs.get)
            self._merge_vocab(best_pair, word_counts)
            
            # Add the merge to vocabulary
            self.merges[best_pair] = len(self.vocab)
            self.vocab["".join(best_pair)] = len(self.vocab)
            self.reverse_vocab[len(self.vocab) - 1] = "".join(best_pair)
            
    def _get_stats(self, word_counts):
        pairs = Counter()
        for word, count in word_counts.items():
            symbols = word.split()
            for i in range(len(symbols) - 1):
                pairs[(symbols[i], symbols[i + 1])] += count
        return pairs
    
    def _merge_vocab(self, pair, word_counts):
        bigram = " ".join(pair)
        replacement = "".join(pair)
        
        new_word_counts = Counter()
        for word, count in word_counts.items():
            new_word = word.replace(bigram, replacement)
            new_word_counts[new_word] = count
            
        word_counts.clear()
        word_counts.update(new_word_counts)
    
    def encode(self, text):
        tokens = []
        for char in text:
            tokens.append(self.vocab.get(char, self.special_tokens["<UNK>"]))
        return tokens
    
    def decode(self, tokens):
        return ''.join([self.reverse_vocab.get(token, "<UNK>") for token in tokens])
    
    def get_vocab_size(self):
        return len(self.vocab)

In [None]:
class TextDataset(Dataset):
    def __init__(self, file_path, tokenizer, seq_length=64):
        self.tokenizer = tokenizer
        self.seq_length = seq_length
        
        # Load and preprocess data
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        
        if not self.tokenizer.vocab:
            self.tokenizer.fit([text])
        
        # Tokenize the entire text
        self.data = self.tokenizer.encode(text)
        
    def __len__(self):
        return max(0, len(self.data) - self.seq_length)
    
    def __getitem__(self, idx):
        x = self.data[idx:idx + self.seq_length]
        y = self.data[idx + 1:idx + self.seq_length + 1]
        
        # Convert to tensors
        x = torch.tensor(x, dtype=torch.long)
        y = torch.tensor(y, dtype=torch.long)
        
        return x, y

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length=5000):
        super(PositionalEncoding, self).__init__()
        
        # Create positional encoding matrix
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # Register buffer (not a parameter, but should be saved)
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        # Add positional encoding to input embeddings
        # x shape: [batch_size, seq_length, embedding_dim]
        return x + self.pe[:, :x.size(1)]

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        # Linear projections
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        # Q, K, V shapes: [batch_size, num_heads, seq_length, d_k]
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # Apply mask for causal attention (if provided)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
            
        # Apply softmax to get attention weights
        attention_weights = F.softmax(scores, dim=-1)
        
        # Apply attention weights to values
        output = torch.matmul(attention_weights, V)
        return output
    
    def split_heads(self, x):
        # Reshape to [batch_size, seq_length, num_heads, d_k]
        batch_size, seq_length, _ = x.size()
        x = x.view(batch_size, seq_length, self.num_heads, self.d_k)
        
        # Transpose to [batch_size, num_heads, seq_length, d_k]
        return x.transpose(1, 2)
    
    def combine_heads(self, x):
        # Transpose back to [batch_size, seq_length, num_heads, d_k]
        batch_size, _, seq_length, _ = x.size()
        x = x.transpose(1, 2)
        
        # Reshape to [batch_size, seq_length, d_model]
        return x.contiguous().view(batch_size, seq_length, self.d_model)
    
    def forward(self, Q, K, V, mask=None):
        # Linear projections and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        # Apply scaled dot-product attention
        attention_output = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Combine heads and apply final linear layer
        output = self.W_o(self.combine_heads(attention_output))
        return output

In [None]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForwardNetwork, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        
    def forward(self, x):
        x = F.relu(self.linear1(x))
        
        return self.linear2(x)

In [None]:
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps
        
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        
        # Normalize
        return self.gamma * (x - mean) / (std + self.eps) + self.beta

In [None]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerDecoderLayer, self).__init__()
        
        # Multi-head self-attention
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        
        self.norm1 = LayerNorm(d_model)
        self.norm2 = LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
        self.feed_forward = FeedForwardNetwork(d_model, d_ff)
        
    def forward(self, x, mask=None):
        attn_output = self.self_attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        
        return x

In [None]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout=0.1):
        super(TransformerDecoder, self).__init__()
        
        # Token embedding
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        
        # Positional encoding
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
        
        # Stack of decoder layers
        self.layers = nn.ModuleList([
            TransformerDecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        # Final output projection
        self.output_projection = nn.Linear(d_model, vocab_size)
        
    def generate_causal_mask(self, seq_length):
        # Create causal mask to prevent attending to future tokens
        mask = torch.triu(torch.ones(seq_length, seq_length), diagonal=1).bool()
        return ~mask
        
    def forward(self, x):

        seq_length = x.size(1)
        
        # Generate causal mask
        causal_mask = self.generate_causal_mask(seq_length).to(x.device)
        
        # Apply token embedding and positional encoding
        x = self.token_embedding(x) * math.sqrt(self.token_embedding.embedding_dim)
        x = self.positional_encoding(x)
        x = self.dropout(x)
        
        # Apply decoder layers
        for layer in self.layers:
            x = layer(x, causal_mask)
        
        # Apply final output projection
        return self.output_projection(x)

In [10]:
class LanguageModel(nn.Module):
    def __init__(self, vocab_size, d_model=512, num_heads=8, num_layers=6, d_ff=2048, max_seq_length=512, dropout=0.1):
        super(LanguageModel, self).__init__()
        
        self.transformer = TransformerDecoder(
            vocab_size=vocab_size,
            d_model=d_model,
            num_heads=num_heads,
            num_layers=num_layers,
            d_ff=d_ff,
            max_seq_length=max_seq_length,
            dropout=dropout
        )
        
    def forward(self, x):
        return self.transformer(x)
    
    def generate(self, prompt, max_length, tokenizer, temperature=1.0):
        self.eval()
        
        # Tokenize prompt
        input_ids = torch.tensor(tokenizer.encode(prompt), dtype=torch.long).unsqueeze(0).to(device)
        
        # Generate tokens one by one
        for _ in range(max_length):
            # Get model output
            with torch.no_grad():
                output = self(input_ids)
                
            # Get the next token probabilities (last token prediction)
            next_token_logits = output[:, -1, :] / temperature
            
            # Apply softmax to get probabilities
            probabilities = F.softmax(next_token_logits, dim=-1)
            
            # Sample from the distribution
            next_token = torch.multinomial(probabilities, 1)
            
            # Append to input_ids
            input_ids = torch.cat([input_ids, next_token], dim=1)
            
            # Stop if we predict the end of sequence token
            if next_token.item() == tokenizer.special_tokens["<EOS>"]:
                break
                
        # Decode tokens to text
        generated_text = tokenizer.decode(input_ids[0].tolist())
        return generated_text

In [None]:
def get_lr_scheduler(optimizer, d_model, warmup_steps=4000):
    def lr_lambda(step):
        # Linear warmup followed by rsqrt decay
        step = max(1, step)  # Prevent division by zero
        arg1 = step ** -0.5
        arg2 = step * (warmup_steps ** -1.5)
        return (d_model ** -0.5) * min(arg1, arg2)
    
    return LambdaLR(optimizer, lr_lambda)

def train(model, train_loader, optimizer, scheduler, epochs, clip_value=1.0):
    model.train()
    
    # Training loop
    train_losses = []
    for epoch in range(epochs):
        epoch_loss = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            
            # Forward pass
            output = model(data)
            
            # Reshape for loss calculation
            output = output.view(-1, output.size(-1))
            target = target.view(-1)
            
            # Calculate loss
            loss = F.cross_entropy(output, target)
            epoch_loss += loss.item()
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            
            # Gradient clipping
            nn.utils.clip_grad_norm_(model.parameters(), clip_value)
            
            # Update weights
            optimizer.step()
            scheduler.step()
            
            # Print progress
            if batch_idx % 100 == 0:
                print(f'Epoch: {epoch+1}/{epochs}, Batch: {batch_idx}/{len(train_loader)}, Loss: {loss.item():.4f}')
        
        # Calculate average loss for the epoch
        avg_loss = epoch_loss / len(train_loader)
        train_losses.append(avg_loss)
        print(f'Epoch: {epoch+1}/{epochs}, Average Loss: {avg_loss:.4f}')
    
    return train_losses

In [None]:
VOCAB_SIZE = 10000
D_MODEL = 512
NUM_HEADS = 8
NUM_LAYERS = 6
D_FF = 2048
MAX_SEQ_LENGTH = 128
BATCH_SIZE = 32
EPOCHS = 2
LEARNING_RATE = 1e-2  

# Initialize tokenizer
tokenizer = BPETokenizer(vocab_size=VOCAB_SIZE)

# Create dataset
dataset = TextDataset('data.txt', tokenizer, seq_length=MAX_SEQ_LENGTH)
train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# Get actual vocab size from tokenizer
vocab_size = tokenizer.get_vocab_size()

# Initialize model
model = LanguageModel(
    vocab_size=vocab_size,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    num_layers=NUM_LAYERS,
    d_ff=D_FF,
    max_seq_length=MAX_SEQ_LENGTH
).to(device)

# Initialize optimizer and scheduler
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, betas=(0.9, 0.98), eps=1e-9)
scheduler = get_lr_scheduler(optimizer, D_MODEL)

# Train the model
losses = train(model, train_loader, optimizer, scheduler, EPOCHS)

# Plot learning curve
plt.figure(figsize=(10, 5))
plt.plot(losses)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid(True)
plt.savefig('learning_curve.png')
plt.show()

# Save the model
torch.save(model.state_dict(), 'transformer_lm.pth')

# Generate text
generated_text = model.generate("Hello, ", max_length=100, tokenizer=tokenizer, temperature=0.7)
print("Generated text:")
print(generated_text)

Epoch: 1/10, Batch: 0/226327, Loss: 5.1675
Epoch: 1/10, Batch: 100/226327, Loss: 4.8916
Epoch: 1/10, Batch: 200/226327, Loss: 4.0872
Epoch: 1/10, Batch: 300/226327, Loss: 3.3692
Epoch: 1/10, Batch: 400/226327, Loss: 2.5676
Epoch: 1/10, Batch: 500/226327, Loss: 2.0508
Epoch: 1/10, Batch: 600/226327, Loss: 1.9944
Epoch: 1/10, Batch: 700/226327, Loss: 2.1689
Epoch: 1/10, Batch: 800/226327, Loss: 2.0200
Epoch: 1/10, Batch: 900/226327, Loss: 1.5563
Epoch: 1/10, Batch: 1000/226327, Loss: 1.8884
Epoch: 1/10, Batch: 1100/226327, Loss: 1.3638
Epoch: 1/10, Batch: 1200/226327, Loss: 2.1439
Epoch: 1/10, Batch: 1300/226327, Loss: 1.8109
Epoch: 1/10, Batch: 1400/226327, Loss: 1.8269
Epoch: 1/10, Batch: 1500/226327, Loss: 1.6848
Epoch: 1/10, Batch: 1600/226327, Loss: 1.7504
Epoch: 1/10, Batch: 1700/226327, Loss: 1.7316
Epoch: 1/10, Batch: 1800/226327, Loss: 2.0987
Epoch: 1/10, Batch: 1900/226327, Loss: 1.6502
Epoch: 1/10, Batch: 2000/226327, Loss: 1.4783
Epoch: 1/10, Batch: 2100/226327, Loss: 1.7129


KeyboardInterrupt: 