In [None]:

"""
Foundational AI Project 2
CSC 7700/4700: Implement language models for text generation using PyTorch.
This script implements three models:
  - A vanilla RNN-based language model
  - An LSTM-based language model
  - A Transformer-based language model

The project workflow is as follows:
1. Train a SentencePiece BPE tokenizer (vocab size 10,000) on a text dataset ("data.txt").
2. Convert the text into token IDs and prepare a dataset using sliding windows.
3. Define the three model architectures, each with:
    • an embedding layer,
    • hidden layers (RNN, LSTM, or transformer encoders),
    • and a fully connected output layer.
4. Each model implements:
    • forward: returns vocabulary token probabilities for each time step,
    • prompt: autoregressively generates text given an input prompt.
5. Train each model using CrossEntropyLoss and AdamW with a learning rate scheduler.
6. Plot training/validation loss curves, compute perplexity, and demonstrate text generation.
7. (For BLEU score evaluation, a sample computation is provided.)

Author: [Mubaraq Olojo]
Date: [April 2024]
"""

import os
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import sentencepiece as spm
import matplotlib.pyplot as plt
import numpy as np
import random
import nltk
from nltk.translate.bleu_score import sentence_bleu

# Make sure NLTK packages are downloaded (especially for BLEU)
nltk.download('punkt')

########################################################################
        # Utility: Positional Encoding for Transformer
########################################################################
class PositionalEncoding(nn.Module):
    """
    Implements the positional encoding as described in "Attention is All You Need."
    """
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 512):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Create constant 'pe' matrix with values dependent on position and dimension.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        # Compute the positional encodings once in log space.
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)  # even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # odd indices
        pe = pe.unsqueeze(0)  # shape: (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Tensor of shape (batch_size, seq_length, d_model)
        Returns:
            Tensor with positional encodings added.
        """
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

########################################################################
        # Dataset for Language Modeling
########################################################################
class LanguageModelDataset(Dataset):
    """
    Prepares sequences for language modeling.
    Each sample is a sequence of length `seq_length` (input) and the next token as target.
    We use a sliding window approach.
    """
    def __init__(self, sequences, seq_length):
        """
        Args:
            sequences: List of lists containing token IDs.
            seq_length: The length of input sequence (target is the next token).
        """
        self.seq_length = seq_length
        self.samples = []
        # Each sequence in 'sequences' is assumed to be of length seq_length+1 (input + target)
        for seq in sequences:
            if len(seq) == seq_length + 1:
                self.samples.append((seq[:-1], seq[1:]))  # input sequence and target sequence

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        inp, target = self.samples[idx]
        return torch.tensor(inp, dtype=torch.long), torch.tensor(target, dtype=torch.long)

################################################################
# Model Definitions
################################################################

class RNNLanguageModel(nn.Module):
    """
    Vanilla RNN-based language model.
    """
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers):
        super(RNNLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        # Use batch_first=True so that input shape is (batch, seq_length, embed_dim)
        self.rnn = nn.RNN(embed_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        """
        Args:
            x: Tensor of shape (batch_size, seq_length)
        Returns:
            logits: Tensor of shape (batch_size, seq_length, vocab_size)
        """
        embedded = self.embedding(x)  # (batch, seq_length, embed_dim)
        output, _ = self.rnn(embedded)  # (batch, seq_length, hidden_dim)
        logits = self.fc(output)  # (batch, seq_length, vocab_size)
        return logits

    def prompt(self, tokenizer, prompt_text, max_length=50, temperature=0.0):
        """
        Generates text given a prompt.
        For undergraduates, temperature=0 uses greedy (argmax) sampling.
        For graduate students, a nonzero temperature can be used for stochastic sampling.
        """
        self.eval()
        # Tokenize input prompt
        token_ids = tokenizer.encode(prompt_text, out_type=int)
        input_ids = torch.tensor([token_ids], dtype=torch.long).to(next(self.parameters()).device)
        generated = token_ids.copy()

        with torch.no_grad():
            for _ in range(max_length):
                logits = self.forward(input_ids)  # (1, seq_length, vocab_size)
                next_token_logits = logits[0, -1, :]  # logits for the last time step

                if temperature == 0:
                    # Greedy sampling: choose the token with highest probability
                    next_token = torch.argmax(next_token_logits).item()
                else:
                    # Temperature-based sampling (for grad students)
                    next_token_logits = next_token_logits / temperature
                    probs = torch.softmax(next_token_logits, dim=0)
                    next_token = torch.multinomial(probs, num_samples=1).item()

                generated.append(next_token)
                # Stop if end-of-sequence token is generated
                if next_token == tokenizer.eos_id():
                    break
                # Append the new token and continue generation
                input_ids = torch.tensor([generated], dtype=torch.long).to(next(self.parameters()).device)

        # Decode the generated tokens back to text
        return tokenizer.decode(generated)

class LSTMLanguageModel(nn.Module):
    """
    LSTM-based language model.
    """
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers):
        super(LSTMLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        embedded = self.embedding(x)  # (batch, seq_length, embed_dim)
        output, _ = self.lstm(embedded)  # (batch, seq_length, hidden_dim)
        logits = self.fc(output)  # (batch, seq_length, vocab_size)
        return logits

    def prompt(self, tokenizer, prompt_text, max_length=50, temperature=0.0):
        self.eval()
        token_ids = tokenizer.encode(prompt_text, out_type=int)
        input_ids = torch.tensor([token_ids], dtype=torch.long).to(next(self.parameters()).device)
        generated = token_ids.copy()

        with torch.no_grad():
            for _ in range(max_length):
                logits = self.forward(input_ids)
                next_token_logits = logits[0, -1, :]

                if temperature == 0:
                    next_token = torch.argmax(next_token_logits).item()
                else:
                    next_token_logits = next_token_logits / temperature
                    probs = torch.softmax(next_token_logits, dim=0)
                    next_token = torch.multinomial(probs, num_samples=1).item()

                generated.append(next_token)
                if next_token == tokenizer.eos_id():
                    break
                input_ids = torch.tensor([generated], dtype=torch.long).to(next(self.parameters()).device)
        return tokenizer.decode(generated)

class TransformerLanguageModel(nn.Module):
    """
    Transformer-based language model.
    """
    def __init__(self, vocab_size, embed_dim, num_heads, hidden_dim, num_layers, max_seq_length, dropout=0.1):
        super(TransformerLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, dropout, max_seq_length)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads,
                                                   dim_feedforward=hidden_dim, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc = nn.Linear(embed_dim, vocab_size)

    def forward(self, x):
        """
        Args:
            x: Tensor of shape (batch_size, seq_length)
        Returns:
            logits: Tensor of shape (batch_size, seq_length, vocab_size)
        """
        embedded = self.embedding(x)  # (batch, seq_length, embed_dim)
        encoded = self.pos_encoder(embedded)  # add positional encoding
        # Transformer expects input shape: (seq_length, batch_size, embed_dim)
        encoded = encoded.transpose(0, 1)
        transformer_output = self.transformer_encoder(encoded)
        transformer_output = transformer_output.transpose(0, 1)  # back to (batch, seq_length, embed_dim)
        logits = self.fc(transformer_output)
        return logits

    def prompt(self, tokenizer, prompt_text, max_length=50, temperature=0.0):
        self.eval()
        token_ids = tokenizer.encode(prompt_text, out_type=int)
        input_ids = torch.tensor([token_ids], dtype=torch.long).to(next(self.parameters()).device)
        generated = token_ids.copy()

        with torch.no_grad():
            for _ in range(max_length):
                logits = self.forward(input_ids)
                next_token_logits = logits[0, -1, :]

                if temperature == 0:
                    next_token = torch.argmax(next_token_logits).item()
                else:
                    next_token_logits = next_token_logits / temperature
                    probs = torch.softmax(next_token_logits, dim=0)
                    next_token = torch.multinomial(probs, num_samples=1).item()

                generated.append(next_token)
                if next_token == tokenizer.eos_id():
                    break
                input_ids = torch.tensor([generated], dtype=torch.long).to(next(self.parameters()).device)
        return tokenizer.decode(generated)

########################################################################
# Training, Evaluation and Utility Functions
########################################################################
def train_model(model, train_loader, val_loader, num_epochs, criterion, optimizer, device):
    """
    Trains the model and returns the training and validation loss history.
    """
    model.to(device)
    train_losses = []
    val_losses = []

    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)  # inputs: (B, seq_length)
            optimizer.zero_grad()
            outputs = model(inputs)  # outputs: (B, seq_length, vocab_size)
            # Reshape outputs and targets to compute loss for all tokens at once.
            loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        avg_train_loss = running_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Evaluate on validation set
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
                running_val_loss += loss.item()
        avg_val_loss = running_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)

        print(f"Epoch {epoch}/{num_epochs} - Training Loss: {avg_train_loss:.4f} - Validation Loss: {avg_val_loss:.4f}")

    return train_losses, val_losses

def evaluate_model(model, data_loader, criterion, device):
    """
    Evaluates the model on the provided data loader and returns the average loss.
    """
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for inputs, targets in data_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
            running_loss += loss.item()
    avg_loss = running_loss / len(data_loader)
    return avg_loss

def compute_perplexity(loss):
    """
    Computes perplexity from the cross-entropy loss.
    """
    return np.exp(loss)

def compute_bleu(reference, candidate):
    """
    Computes BLEU score using nltk.
    """
    reference_tokens = nltk.word_tokenize(reference)
    candidate_tokens = nltk.word_tokenize(candidate)
    return sentence_bleu([reference_tokens], candidate_tokens)

def plot_loss_curve(train_losses, val_losses, model_name):
    """
    Plots training and validation loss curves and saves the plot.
    """
    plt.figure()
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title(f'{model_name} Loss Curves')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(f'{model_name}_loss_curve.png')
    plt.show()

########################################################################
# Main Execution
########################################################################
def main():
    # Device configuration
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Hyperparameters
    vocab_size = 10000
    embed_dim = 256
    hidden_dim = 512
    num_layers = 2
    num_heads = 8           # For Transformer
    max_seq_length = 50     # Input sequence length
    batch_size = 128
    num_epochs = 10         # Adjust to 30 with early stopping for full training
    learning_rate = 1e-3

    # Tokenizer training/loading
    tokenizer_model_prefix = "tokenizer"
    if not os.path.exists(f"{tokenizer_model_prefix}.model"):
        print("Training SentencePiece tokenizer on data.txt...")
        # Ensure that the file 'data.txt' exists and contains your dataset text.
        spm.SentencePieceTrainer.train(input='data.txt',
                                       model_prefix=tokenizer_model_prefix,
                                       vocab_size=vocab_size,
                                       model_type='bpe',
                                       character_coverage=1.0)
    # Load the trained tokenizer
    sp = spm.SentencePieceProcessor(model_file=f"{tokenizer_model_prefix}.model")

    # Load and preprocess dataset
    print("Loading dataset from data.txt...")
    with open("data.txt", "r", encoding="utf-8") as f:
        text = f.read()

    # Tokenize entire text into a list of token IDs
    token_ids = sp.encode(text, out_type=int)
    print(f"Total tokens in dataset: {len(token_ids)}")

    # Create sequences using a sliding window approach.
    # Each sequence will be of length (max_seq_length + 1) so that input and target can be formed.
    sequences = []
    for i in range(0, len(token_ids) - max_seq_length):
        seq = token_ids[i:i + max_seq_length + 1]
        sequences.append(seq)
    print(f"Total sequences generated: {len(sequences)}")

    # Split data into training and validation sets (e.g., 90% training, 10% validation)
    split_idx = int(0.9 * len(sequences))
    train_sequences = sequences[:split_idx]
    val_sequences = sequences[split_idx:]

    # Create datasets and data loaders
    train_dataset = LanguageModelDataset(train_sequences, max_seq_length)
    val_dataset = LanguageModelDataset(val_sequences, max_seq_length)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Instantiate models
    models = {
        "RNN": RNNLanguageModel(vocab_size, embed_dim, hidden_dim, num_layers).to(device),
        "LSTM": LSTMLanguageModel(vocab_size, embed_dim, hidden_dim, num_layers).to(device),
        "Transformer": TransformerLanguageModel(vocab_size, embed_dim, num_heads, hidden_dim, num_layers, max_seq_length).to(device)
    }

    # Train, evaluate, and demonstrate each model
    for model_name, model in models.items():
        print(f"\nTraining {model_name} model...")
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

        train_losses, val_losses = train_model(model, train_loader, val_loader, num_epochs, criterion, optimizer, device)
        plot_loss_curve(train_losses, val_losses, model_name)

        # Evaluate perplexity on the validation set
        val_loss = evaluate_model(model, val_loader, criterion, device)
        ppl = compute_perplexity(val_loss)
        print(f"{model_name} - Validation Loss: {val_loss:.4f}, Perplexity: {ppl:.2f}")

        # Generate sample text using the prompt "Which do you prefer? Dogs or cats?"
        sample_prompt = "Which do you prefer? Dogs or cats?"
        generated_text = model.prompt(sp, sample_prompt, max_length=50, temperature=0.0)  # Greedy decoding
        print(f"\nSample output from {model_name} model for prompt:\n\"{sample_prompt}\"\n{generated_text}\n")

    # Example BLEU score evaluation (for demonstration purposes)
    # You would normally compare generated text to a ground truth reference.
    reference = "Dogs are great companions."
    candidate = models["RNN"].prompt(sp, "Dogs are", max_length=10, temperature=0.0)
    bleu_score = compute_bleu(reference, candidate)
    print(f"Sample BLEU score (RNN model): {bleu_score:.4f}")

if __name__ == '__main__':
    main()


[nltk_data] Downloading package punkt to
[nltk_data]     /Users/mubaraqolojo/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
sentencepiece_trainer.cc(78) LOG(INFO) Starts training with : 
trainer_spec {
  input: data.txt
  input_format: 
  model_prefix: tokenizer
  model_type: BPE
  vocab_size: 10000
  self_test_sample_size: 0
  character_coverage: 1
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 4192
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  split_digits: 0
  pretokenization_delimiter: 
  treat_whitespace_as_suffix: 0
  allow_whitespace_only_pieces: 0
  required_chars: 
  byte_fallback: 0
  vocabulary_output_piece_score: 1
  train_extremely_large_corpus: 0
  seed_sentencepieces_file: 
  hard_vocab_limit: 1
  use_all_vocab: 0
  unk_id: 0
  bos_id: 1
  eos_id: 2
  pad_id: -1
  unk_p

Using device: cpu
Training SentencePiece tokenizer on data.txt...


OSError: Not found: "data.txt": No such file or directory Error #2