In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import random
from tokenizers import Tokenizer, models, trainers, pre_tokenizers
from datasets import load_dataset

# Check if CUDA is available, otherwise use CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [17]:
# 1. Load and Preprocess Data
def load_tiny_shakespeare():
    """
    Simulate loading data from tiny_shakespeare.py
    Normally you would import the module, but we'll simulate the dataset structure
    """
    # This would normally be imported from tiny_shakespeare.py
    try:
        with open('shakespeare_clean.txt', 'r', encoding='utf-8') as f:
            text = f.read()
    except FileNotFoundError:
        # Sample text for demonstration
        text = """
        To be, or not to be, that is the question:
        Whether 'tis nobler in the mind to suffer
        The slings and arrows of outrageous fortune,
        Or to take arms against a sea of troubles
        And by opposing end them.
        """
        print("Using sample text since tiny_shakespeare.txt was not found")
    
    # Split into train, val, test (90%, 5%, 5%)
    train_end = int(len(text) * 0.9)
    val_end = int(len(text) * 0.95)
    
    return {
        'train': text[:train_end],
        'val': text[train_end:val_end],
        'test': text[val_end:]
    }

In [18]:
import unicodedata
import re

def preprocess(text):
    # Normalize to standard Unicode form (avoids weird encodings)
    lines = text.split("\n")
    cleaned_lines = [
        unicodedata.normalize("NFKC", line).strip()
        for line in lines
        if line.strip()  # Remove empty lines
    ]
    return "\n".join(cleaned_lines)

In [19]:
def create_bpe_tokenizer(text, vocab_size=2000):
    """Create a BPE tokenizer with the specified vocabulary size"""
    # Initialize a BPE tokenizer
    tokenizer = Tokenizer(models.BPE())
    
    # Configure pre-tokenization (how to split text into initial tokens)
    tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
    
    # Train the tokenizer
    trainer = trainers.BpeTrainer(vocab_size=vocab_size, special_tokens=["<pad>", "<unk>"])
    tokenizer.train_from_iterator([text], trainer=trainer)
    

    tokenizer.save("bpe_tokenizer.json")
    
    return tokenizer

In [20]:
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, text, tokenizer, seq_length=100):
        self.seq_length = seq_length
        self.tokenizer = tokenizer
        
        # Tokenize the text
        encoding = tokenizer.encode(text)
        self.tokens = encoding.ids
        self.vocab_size = tokenizer.get_vocab_size()
        
        print(f"Loaded {len(self.tokens)} tokens with vocabulary size {self.vocab_size}")
    
    def __len__(self):
        return max(0, len(self.tokens) - self.seq_length - 1)
    
    def __getitem__(self, idx):
        """Get a sequence and the next token as target"""
        # Input sequence
        input_seq = self.tokens[idx:idx + self.seq_length]
        # Target sequence (shifted by 1)
        target_seq = self.tokens[idx + 1:idx + self.seq_length + 1]
        
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target_seq, dtype=torch.long)

In [21]:
class SimpleLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, bidirectional=True):
        super(SimpleLSTM, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # LSTM layer
        self.lstm = nn.LSTM(
            embedding_dim, 
            hidden_dim, 
            batch_first=True,
            bidirectional=bidirectional
        )
        
        # Output layer
        self.fc = nn.Linear(hidden_dim * self.num_directions, vocab_size)
    
    def forward(self, x, hidden=None):
        # x shape: (batch_size, seq_length)
        batch_size = x.size(0)
        
        # Initialize hidden and cell states if not provided
        if hidden is None:
            h0 = torch.zeros(self.num_directions, batch_size, self.hidden_dim).to(device)
            c0 = torch.zeros(self.num_directions, batch_size, self.hidden_dim).to(device)
            hidden = (h0, c0)
        
        # Apply embedding
        embed = self.embedding(x)  # Shape: (batch_size, seq_length, embedding_dim)
        
        # Pass through LSTM
        output, hidden = self.lstm(embed, hidden)
        
        # Pass through linear layer
        output = self.fc(output)
        
        return output, hidden
    
    def init_hidden(self, batch_size):
        h0 = torch.zeros(self.num_directions, batch_size, self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_directions, batch_size, self.hidden_dim).to(device)
        return (h0, c0)

In [22]:
def train_model(model, train_dataset, val_dataset, batch_size=32, num_epochs=5, learning_rate=0.001, clip_value=1.0, teacher_forcing_ratio=0.5):
    """Train the RNN model with validation and teacher forcing"""
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    # Use Adam optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    model.to(device)
    
    train_losses = []
    val_losses = []
    train_perplexities = []
    val_perplexities = []
    
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        
        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            batch_size = inputs.size(0)
            seq_length = inputs.size(1)
            
            # Initialize hidden state
            hidden = model.init_hidden(batch_size)
            
            # Reset gradients
            optimizer.zero_grad()
            
            # Initialize output tensor
            outputs = torch.zeros(batch_size, seq_length, model.vocab_size).to(device)
            
            # Teacher forcing with probability teacher_forcing_ratio
            use_teacher_forcing = random.random() < teacher_forcing_ratio
            
            if use_teacher_forcing:
                # Teacher forcing: Feed the target as the next input
                decoder_input = inputs
                output, hidden = model(decoder_input, hidden)
                outputs = output
            else:
                # Without teacher forcing: use own predictions as the next input
                decoder_input = inputs[:, 0].unsqueeze(1)
                for t in range(seq_length):
                    output, hidden = model(decoder_input, hidden)
                    outputs[:, t:t+1] = output[:, -1:, :]
                    decoder_input = output[:, -1:, :].argmax(2)
            
            # Reshape outputs and targets for loss calculation
            outputs = outputs.reshape(-1, outputs.shape[-1])
            targets = targets.reshape(-1)
            
            # Calculate loss
            loss = criterion(outputs, targets)
            
            # Backpropagation
            loss.backward()
            
            # Gradient clipping
            nn.utils.clip_grad_norm_(model.parameters(), clip_value)
            
            # Update parameters
            optimizer.step()
            
            epoch_loss += loss.item()
            
            # Print progress (less frequently to reduce output)
            if (batch_idx + 1) % 50 == 0:
                print(f'Epoch {epoch+1}/{num_epochs}, Batch {batch_idx+1}/{len(train_loader)}, Loss: {loss.item():.4f}')
        
        # Calculate average training loss and perplexity
        avg_train_loss = epoch_loss / len(train_loader)
        train_perplexity = np.exp(avg_train_loss)
        
        train_losses.append(avg_train_loss)
        train_perplexities.append(train_perplexity)
        
        # Evaluate on validation set
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                
                # Initialize hidden state
                hidden = model.init_hidden(inputs.size(0))
                
                outputs, _ = model(inputs, hidden)
                outputs = outputs.reshape(-1, outputs.shape[-1])
                targets = targets.reshape(-1)
                loss = criterion(outputs, targets)
                val_loss += loss.item()
        
        # Calculate average validation loss and perplexity
        avg_val_loss = val_loss / len(val_loader)
        val_perplexity = np.exp(avg_val_loss)
        
        val_losses.append(avg_val_loss)
        val_perplexities.append(val_perplexity)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Train Perplexity: {train_perplexity:.4f}, '
              f'Val Loss: {avg_val_loss:.4f}, Val Perplexity: {val_perplexity:.4f}')
    
    # Plot training and validation loss
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.legend()
    plt.title('Loss')
    
    plt.subplot(1, 2, 2)
    plt.plot(train_perplexities, label='Train Perplexity')
    plt.plot(val_perplexities, label='Val Perplexity')
    plt.legend()
    plt.title('Perplexity')
    
    plt.tight_layout()
    plt.savefig('training_plot.png')
    plt.close()
    
    return {
        'train_losses': train_losses,
        'val_losses': val_losses,
        'train_perplexities': train_perplexities,
        'val_perplexities': val_perplexities
    }

In [23]:
def generate_text(model, tokenizer, seed_text='To be, or not to be', gen_length=100, temperature=0.8):
    """Generate text using the trained model"""
    model.eval()
    
    # Tokenize seed text
    tokens = tokenizer.encode(seed_text).ids
    input_seq = torch.tensor([tokens], dtype=torch.long).to(device)
    
    # Initialize hidden state
    hidden = model.init_hidden(1)
    
    # Generate text
    generated_tokens = tokens.copy()
    
    with torch.no_grad():
        for _ in range(gen_length):
            # Get prediction for next token
            output, hidden = model(input_seq, hidden)
            
            # Apply temperature
            logits = output[:, -1, :] / temperature
            probabilities = F.softmax(logits, dim=-1)
            
            # Sample next token
            next_token = torch.multinomial(probabilities, 1).item()
            
            # Add to generated tokens
            generated_tokens.append(next_token)
            
            # Update input for next iteration (use the last predicted token)
            input_seq = torch.tensor([[next_token]], dtype=torch.long).to(device)
    
    # Decode generated tokens
    generated_text = tokenizer.decode(generated_tokens)
    return generated_text

In [24]:
def generate_text_beam_search(model, tokenizer, seed_text='To be, or not to be', gen_length=100, beam_width=5, temperature=0.8):
    """Generate text using beam search decoding"""
    model.eval()
    
    # Tokenize seed text
    tokens = tokenizer.encode(seed_text).ids
    input_seq = torch.tensor([tokens], dtype=torch.long).to(device)
    
    # Initialize beam search
    beams = [(input_seq, model.init_hidden(1), 0.0)]  # (sequence, hidden_state, log_prob)
    finished_beams = []
    
    with torch.no_grad():
        for _ in range(gen_length):
            candidates = []
            
            # Expand each beam
            for seq, hidden, log_prob in beams:
                # Get prediction for next token
                output, new_hidden = model(seq[:, -1:], hidden)
                logits = output[:, -1, :] / temperature
                probs = F.log_softmax(logits, dim=-1)
                
                # Get top-k candidates
                top_probs, top_tokens = probs.topk(beam_width)
                
                for prob, token in zip(top_probs[0], top_tokens[0]):
                    new_seq = torch.cat([seq, token.unsqueeze(0).unsqueeze(0)], dim=1)
                    new_log_prob = log_prob + prob.item()
                    candidates.append((new_seq, new_hidden, new_log_prob))
            
            # Select top beams
            candidates.sort(key=lambda x: x[2], reverse=True)
            beams = candidates[:beam_width]
        
        # Select best sequence
        best_seq = max(beams, key=lambda x: x[2])[0]
        generated_tokens = best_seq[0].tolist()
    
    # Decode generated tokens
    generated_text = tokenizer.decode(generated_tokens)
    return generated_text

In [25]:
def evaluate_perplexity(model, dataset, batch_size=32):
    """Evaluate model perplexity on a dataset"""
    model.eval()
    data_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=False)
    criterion = nn.CrossEntropyLoss()
    
    total_loss = 0
    total_tokens = 0
    
    with torch.no_grad():
        for inputs, targets in data_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            batch_size = inputs.size(0)
            
            # Initialize hidden state
            hidden = model.init_hidden(batch_size)
            
            outputs, _ = model(inputs, hidden)
            outputs = outputs.reshape(-1, outputs.shape[-1])
            targets = targets.reshape(-1)
            
            loss = criterion(outputs, targets)
            
            total_loss += loss.item() * targets.size(0)
            total_tokens += targets.size(0)
    
    # Calculate perplexity
    avg_loss = total_loss / total_tokens
    perplexity = np.exp(avg_loss)
    
    return perplexity

In [26]:
# Load data
print("Loading Shakespeare data...")
data = load_tiny_shakespeare()

# Create BPE tokenizer (smaller vocab size for faster training)
print("Creating BPE tokenizer...")
tokenizer = create_bpe_tokenizer(data['train'], vocab_size=1000)

# Create datasets
print("Preparing datasets...")
seq_length = 100  # Using fixed sequence length for simplicity
train_dataset = TextDataset(data['train'], tokenizer, seq_length)
val_dataset = TextDataset(data['val'], tokenizer, seq_length)
test_dataset = TextDataset(data['test'], tokenizer, seq_length)

vocab_size = tokenizer.get_vocab_size()
print(f"Vocabulary size: {vocab_size}")

# Create models (with smaller sizes for lightweight training)
# rnn_model = VanillaRNN(vocab_size, embedding_dim=64, hidden_dim=128, bidirectional=True)
lstm_model = SimpleLSTM(vocab_size, embedding_dim=64, hidden_dim=128, bidirectional=True)

# Train models (reduced epochs and batch size for faster training)
# print("\nTraining Vanilla RNN model...")
# rnn_results = train_model(rnn_model, train_dataset, val_dataset, 
#                          batch_size=16, num_epochs=3, learning_rate=0.001)

print("\nTraining LSTM model...")
lstm_results = train_model(lstm_model, train_dataset, val_dataset, 
                        batch_size=64,  # Increased from 16 to 64
                        num_epochs=1, 
                        learning_rate=0.01,
                        teacher_forcing_ratio=0.5)

# Evaluate models on test set
print("\nEvaluating models...")
# rnn_perplexity = evaluate_perplexity(rnn_model, test_dataset)
lstm_perplexity = evaluate_perplexity(lstm_model, test_dataset)

# print(f"Vanilla RNN Test Perplexity: {rnn_perplexity:.4f}")
print(f"LSTM Test Perplexity: {lstm_perplexity:.4f}")

# Compare models
model_results = {
# 'Vanilla RNN': rnn_results,
'LSTM': lstm_results
}
# compare_models(model_results)

# Generate text samples
# print("\nGenerating text with RNN model:")
# rnn_text = generate_text(rnn_model, tokenizer, seed_text="To be, or not to be", gen_length=100)
# print(rnn_text)

print("\nGenerating text with LSTM model:")
lstm_text = generate_text(lstm_model, tokenizer, seed_text="To be, or not to be", gen_length=100)
print(lstm_text)

# Generate text using beam search
print("\nGenerating text with LSTM model using beam search:")
lstm_text_beam = generate_text_beam_search(lstm_model, tokenizer, 
                                          seed_text="To be, or not to be", 
                                          gen_length=100, 
                                          beam_width=5)
print(lstm_text_beam)

Loading Shakespeare data...
Creating BPE tokenizer...


Preparing datasets...
Loaded 307825 tokens with vocabulary size 1000
Loaded 17797 tokens with vocabulary size 1000
Loaded 18338 tokens with vocabulary size 1000
Vocabulary size: 1000

Training LSTM model...
Epoch 1/1, Batch 50/4809, Loss: 6.9363
Epoch 1/1, Batch 50/4809, Loss: 6.9363
Epoch 1/1, Batch 100/4809, Loss: 6.1987
Epoch 1/1, Batch 100/4809, Loss: 6.1987
Epoch 1/1, Batch 150/4809, Loss: 5.9692
Epoch 1/1, Batch 150/4809, Loss: 5.9692
Epoch 1/1, Batch 200/4809, Loss: 5.9739
Epoch 1/1, Batch 200/4809, Loss: 5.9739
Epoch 1/1, Batch 250/4809, Loss: 0.2301
Epoch 1/1, Batch 250/4809, Loss: 0.2301
Epoch 1/1, Batch 300/4809, Loss: 6.0109
Epoch 1/1, Batch 300/4809, Loss: 6.0109
Epoch 1/1, Batch 350/4809, Loss: 5.9529
Epoch 1/1, Batch 350/4809, Loss: 5.9529
Epoch 1/1, Batch 400/4809, Loss: 5.9924
Epoch 1/1, Batch 400/4809, Loss: 5.9924
Epoch 1/1, Batch 450/4809, Loss: 5.9239
Epoch 1/1, Batch 450/4809, Loss: 5.9239
Epoch 1/1, Batch 500/4809, Loss: 0.1283
Epoch 1/1, Batch 500/4809, Loss: 0.

In [27]:
# save the LSTM model
torch.save(lstm_model.state_dict(), 'lstm_model_codev4.0.pth')