<a href="https://colab.research.google.com/github/Ali-Backour/alibackour.github.io/blob/main/hw1_nlp_shared.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Homework 1: Language Models

This assignment will guide you through implementing three different types of language models:
1. N-gram model
2. Log-linear model with hand-designed features
3. Continuous Bag of Words (CBOW) model
"""

'\nHomework 1: Language Models\n\nThis assignment will guide you through implementing three different types of language models:\n1. N-gram model\n2. Log-linear model with hand-designed features\n3. Continuous Bag of Words (CBOW) model\n'

In [1]:
import numpy as np
from typing import List, Optional, Tuple
import pickle
from collections import defaultdict, Counter
from tokenizers import Tokenizer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.trainers import BpeTrainer
from tokenizers.models import BPE
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

In [None]:
class LanguageModel:
    """
    Abstract base class for language models.

    This class defines the interface that all language models should implement.
    Language models take a sequence of tokens (represented as integers) and
    predict probability distributions over the next token in the vocabulary.
    """

    def __init__(self):
        self.eos = 'eos'

    def train(self, token_sequences: List[np.ndarray]) -> None:
        """
        Train the language model on a collection of token sequences.

        Args:
            token_sequences (List[np.ndarray]): List of token sequences, where each
                                               sequence is a numpy array of integers
                                               representing token IDs.
        """
        raise NotImplementedError("Subclasses must implement the train method")

    def get_next_token_probs(self, context: np.ndarray) -> np.ndarray:
        """
        Get probability distribution over next tokens given a context.

        Args:
            context (np.ndarray): Array of token IDs representing the context.
                                 Shape: (context_length,)

        Returns:
            np.ndarray: Probability distribution over vocabulary.
                       Shape: (vocab_size,)
                       Should sum to 1.0.
        """
        raise NotImplementedError("Subclasses must implement the get_next_token_probs method")

    def perplexity(self, token_sequences: List[np.ndarray]) -> float:
        """
        Calculate perplexity of the model on a set of token sequences.

        Perplexity is 2^(-average_log_likelihood), where average log likelihood
        is calculated over all tokens in all sequences.

        Args:
            token_sequences (List[np.ndarray]): List of token sequences to evaluate

        Returns:
            float: Perplexity score (lower is better)
        """
        total_log_likelihood = 0.0
        total_tokens = 0

        for sequence in token_sequences:
            if len(sequence) < 2:
                continue  # Need at least 2 tokens (context + target)

            for i in range(1, len(sequence)):
                context = sequence[:i]
                target_token = sequence[i]

                probs = self.get_next_token_probs(context)
                # Add small epsilon to avoid log(0)
                prob = max(float(probs[target_token]), 1e-10)
                total_log_likelihood += np.log2(prob)
                total_tokens += 1

        if total_tokens == 0:
            return float('inf')

        average_log_likelihood = total_log_likelihood / total_tokens
        return 2 ** (-average_log_likelihood)

    def generate_text(self, context: np.ndarray, max_length: int = 100,
                     temperature: float = 1.0) -> np.ndarray:
        """
        Generate text by sampling from the model.

        Args:
            context (np.ndarray): Initial context tokens
            max_length (int): Maximum number of tokens to generate
            temperature (float): Sampling temperature (higher = more random)

        Returns:
            np.ndarray: Generated sequence including the initial context
        """
        generated = list(context)

        for _ in range(max_length):
            current_context = np.array(generated)
            probs = self.get_next_token_probs(current_context)

            # Apply temperature
            if temperature != 1.0:
                probs = np.power(probs, 1.0 / temperature)
                probs = probs / np.sum(probs)

            # Sample next token
            next_token = np.random.choice(len(probs), p=probs)
            generated.append(next_token)

            # Optional: add stopping criteria here (e.g., end-of-sequence token)
            if next_token == self.eos:
              break

        return np.array(generated)

In [None]:
class NGramModel(LanguageModel):
    """
    N-gram language model using maximum likelihood estimation with smoothing.

    This model predicts the next token based on the previous n-1 tokens.
    """

    def __init__(self, vocab_size: int, n: int = 3, smoothing=None):
        """
        Initialize the N-gram model.

        Args:
            vocab_size (int): Size of the vocabulary
            n (int): Order of the n-gram (default: 3 for trigram)

            Bonus! Without smoothing your perplexity will be pretty bad. If you
            implement some kind of smoothing and get the perplexity below 300
            you'll get extra credit.
            smoothing (str): Smoothing method ('laplace' or 'interpolation')
        """
        super().__init__()
        # TODO: YOUR CODE HERE
        # Hint: Consider using nested dictionaries or defaultdict(Counter) to store counts.
        # Hint: Consider how you will handle different context lengths.
        # At the start of a sentence, you might have 0, 1, or 2 words of context
        # instead of the full n-1 words.
        self.vocab_size = vocab_size
        self.n = n
        self.counts = {i:{} for i in range(1, n+1)}

    def train(self, token_sequences: List[np.ndarray]) -> None:
        """
        Train the n-gram model by counting n-grams in the training data.

        Args:
            token_sequences (List[np.ndarray]): Training sequences
        """
        # TODO: YOUR CODE HERE


    def get_next_token_probs(self, context: np.ndarray) -> np.ndarray:
        """
        Get probability distribution over next tokens for given context.

        Args:
            context (np.ndarray): Context tokens

        Returns:
            np.ndarray: Probability distribution over vocabulary
        """
        # TODO: YOUR CODE HERE
        # Hint: Try contexts from longest to shortest, i.e., try the full context,
        # and if it is not in the training data, try shorter context
        # Hint: What probability distribution should we output if no valid context is found?

In [None]:
class LogLinearModel(LanguageModel):
    """
    Log-linear language model with hand-designed features.

    This model uses a linear combination of features to predict next token probabilities.
    """

    def __init__(self, vocab_size: int, context_size: int = 3):
        """
        Initialize the log-linear model.

        Args:
            vocab_size (int): Size of the vocabulary
            context_size (int): Number of context tokens to consider
        """
        super().__init__()
        # TODO: YOUR CODE HERE

        # Hint: You may find the class nn.Linear useful
        # If this is too slow or using too much memory, check out the nn.EmbeddingBag class
        # and see if that's applicable to your use case
        self.linear = None # TODO: replace me!
        # What loss function should we use?
        self.criterion = None # TODO: replace me!
        # We use an Adam optimizer. This is a fancy version of SGD which uses momentum and adaptive updates.
        self.optimizer = optim.Adam("TODO: replace me!", lr=0.01)


    def extract_features(self, context: np.ndarray) -> torch.Tensor:
        """
        Extract features from the context.

        Args:
            context (np.ndarray): Context tokens

        Returns:
            torch.Tensor: Feature vector produced from context tokens
        """
        # TODO: YOUR CODE HERE


    def train(self, token_sequences: List[np.ndarray], epochs: int = 2, batch_size: int = 32) -> None:
        """
        Train the log-linear model using gradient descent.
        """
        # Create training examples (context, target) pairs
        contexts_list = []
        targets_list = []
        # TODO: YOUR CODE HERE


        print(f"Training on {len(all_features)} examples for {epochs} epochs...")

        # Training loop

        # TODO: Put your layers in training mode

        losses = []  # Potentially useful for debugging (loss should go down!)
        for epoch in range(epochs):
            print(f"Epoch {epoch}")
            total_loss = 0.0
            num_batches = 0

            # Mini-batch training
            # Note: tqdm is used to display progress bars for loops, helping visualize training progress.
            for i in tqdm(range(0, len(all_features), batch_size)):
                batch_contexts= contexts_list[i:i+batch_size]
                batch_targets = targets_list[i:i+batch_size]

                # TODO: Get features for the batch


                # TODO: Zero the gradients of the optimizer
                # Your code here!


                # TODO: Perform a forward pass to compute predictions for the model.
                # Your code here!


                # TODO: Perform the backward pass and gradient update. Remember,
                # you need to compute the loss, perform the backward pass, and
                # update the model parameters.
                # Your code here!
                loss = None # TODO: calculate me!

                total_loss += loss.item()
                num_batches += 1

                if i % (batch_size * 10) == 0:  # Print every 10 batches
                    print(f"Epoch {epoch}, Batch {i // batch_size}: Loss = {loss.item():.4f}")

            if epoch % 2 == 0:
                avg_loss = total_loss / num_batches
                print(f"Epoch {epoch}: Average Loss = {avg_loss:.4f}")

            losses.append(total_loss)

    def get_next_token_probs(self, context: np.ndarray) -> np.ndarray:
        """
        Get probability distribution using softmax over linear scores.

        Args:
            context (np.ndarray): Context tokens

        Returns:
            np.ndarray: Probability distribution over vocabulary
        """
        # TODO: YOUR CODE HERE
        # Hint: What probability distribution should we output if no valid context is found?

In [None]:
class CBOWModel(LanguageModel):
    """
    Continuous Bag of Words (CBOW) model.

    This model learns dense vector representations of words and predicts
    the next word from the context words.
    """

    def __init__(self, vocab_size: int, embedding_dim: int = 100, context_size: int = 2, learning_rate: float = 0.01):
        """
        Initialize the CBOW model.

        Args:
            vocab_size (int): Size of the vocabulary
            embedding_dim (int): Dimension of word embeddings
            context_size (int): Number of context words on each side
            learning_rate (float): Learning rate for training
        """
        super().__init__()
        # TODO: YOUR CODE HERE
        # You may find the classes nn.Embedding and nn.EmbeddingBag useful


        # We use an Adam optimizer. This is a fancy version of SGD which uses momentum and adaptive updates.
        self.optimizer = optim.Adam("TODO: replace me!", lr=learning_rate)

        # What loss function should we use for Word2Vec?
        self.criterion = None # TODO: replace me!


    def train(self, token_sequences: List[np.ndarray], epochs: int = 10, batch_size: int = 32) -> None:
        """
        Train the CBOW model.

        Args:
            token_sequences (List[np.ndarray]): Training sequences
            epochs (int): Number of training epochs
            batch_size (int): Batch size for training
        """
        # TODO: YOUR CODE HERE
        # Create training examples (context, target) pairs
        # Hint, extract left context only for next token prediction
        # Hint, pad shorter contexts with 0, this ensures all have the same length for batching
        context_list = []
        targets_list = []

        # TODO: Put your layers in training mode

        losses = []  # Potentially useful for debugging (loss should go down!)
        # Note: tqdm is used to display progress bars for loops, helping visualize training progress.
        for epoch in range(epochs):
            print(f"Epoch {epoch}")
            total_loss = 0.0
            num_batches = 0

            # Shuffle data
            indices = torch.randperm(len(all_contexts))
            all_contexts = all_contexts[indices]
            all_targets = all_targets[indices]

            for i in tqdm(range(0, len(all_contexts), batch_size)):
                # As an alternative to this implementation, you can experiment with
                # DataLoader (https://docs.pytorch.org/docs/stable/data.html) for automatic shuffling, parallel loading
                batch_contexts = all_contexts[i:i+batch_size]
                batch_targets = all_targets[i:i+batch_size]

                # TODO: Zero the gradients of the optimizer
                # Your code here!

                # TODO: Perform a forward pass to compute predictions for the model.
                # Your code here!
                preds = None

                # TODO: Finish the backward pass and gradient update.
                # Remember, you need to compute the loss, perform the backward pass, and
                # update the model parameters.
                # Your code here!
                loss = None # TODO: calculate me!

                total_loss += loss.item()
                num_batches += 1

                if i % (batch_size * 10) == 0:  # Print every 10 batches
                    print(f"Epoch {epoch}, Batch {i // batch_size}: Loss = {loss.item():.4f}")

            if epoch % 2 == 0:
                avg_loss = total_loss / num_batches
                print(f"Epoch {epoch}: Average Loss = {avg_loss:.4f}")

            losses.append(total_loss)

    def get_next_token_probs(self, context: np.ndarray) -> np.ndarray:
        """
        Get next-token probability distributions.

        Args:
            context (np.ndarray): Context tokens

        Returns:
            np.ndarray: Probability distribution over vocabulary
        """
        # TODO: YOUR CODE HERE
        # Hints:
        # For next-token prediction, we use the last context_size tokens
        # No valid context, return uniform distribution
        # Pad context to expected size
        # Don't forget to add batch dimension, torch expects (batch_size, context_size)

        with torch.no_grad():
            pass

    def get_word_embedding(self, token_id: int) -> np.ndarray:
        """
        Get the learned embedding for a specific token.

        Args:
            token_id (int): Token ID

        Returns:
            np.ndarray: Word embedding vector
        """
        # TODO: YOUR CODE HERE

        with torch.no_grad():
            pass

In [None]:
def load_data(filepath: str, tokenizer: Optional[Tokenizer], max_seq_length: int = 512) -> List[np.ndarray]:
    """
    Load and preprocess text data using GPT-2 tokenizer.

    This function is provided complete - students don't need to modify it.

    Args:
        filepath (str): Path to the text file
        tokenizer (Optional[Tokenizer]): Tokenizer to use. If None, a new tokenizer will be created.
        max_seq_length (int): Maximum sequence length for splitting text

    Returns:
        Tuple[List[np.ndarray], GPT2Tokenizer]: List of token sequences and the tokenizer
    """
    # Byte Pair Encoding (BPE)

    if tokenizer is None:
        tokenizer = Tokenizer(BPE())
        tokenizer.pre_tokenizer = Whitespace()
        #trainer = BpeTrainer(special_tokens=["[PAD]"])
        tokenizer.train([filepath])

    # Read the text file
    with open(filepath, 'r', encoding='utf-8') as f:
        text = f.read()

    # Tokenize the entire text
    tokens = tokenizer.encode(text).ids

    # Split into sequences of max_seq_length
    sequences = []
    for i in range(0, len(tokens), max_seq_length):
        sequence = tokens[i:i+max_seq_length]
        if len(sequence) > 1:  # Need at least 2 tokens for language modeling
            sequences.append(np.array(sequence))

    print(f"Loaded {len(sequences)} sequences from {filepath}")
    print(f"Vocabulary size: {tokenizer.get_vocab_size()}")
    print(f"Sample tokens: {tokens[:10]}")
    print(f"Sample text: {tokenizer.decode(tokens[:10])}")

    return sequences, tokenizer

In [None]:
def evaluate_models(models: List[LanguageModel], test_data: List[np.ndarray]) -> None:
    """
    Evaluate and compare multiple language models.

    Args:
        models (List[LanguageModel]): List of trained models
        test_data (List[np.ndarray]): Test sequences
    """
    print("Model Evaluation Results:")
    print("=" * 50)

    for i, model in enumerate(models):
        model_name = model.__class__.__name__
        try:
            ppl = model.perplexity(test_data)
            print(f"{model_name}: Perplexity = {ppl:.2f}")
        except Exception as e:
            print(f"{model_name}: Error calculating perplexity - {e}")

    print("=" * 50)

In [None]:
def analyze(model1: LanguageModel, model2: LanguageModel, test_data: List[np.ndarray],
           tokenizer=None, context_length: int = 2) -> dict:
    """
    Compare two models and find contexts where each performs better.

    Args:
        model1 (LanguageModel): First model to compare
        model2 (LanguageModel): Second model to compare
        test_data (List[np.ndarray]): Test sequences
        tokenizer: Tokenizer for decoding (optional, for display purposes)
        context_length (int): Context length to consider

    Returns:
        dict: Analysis results including overall perplexities and context comparisons
    """
    print("Detailed Model Analysis")
    print("=" * 60)

    # Overall perplexity comparison
    try:
        ppl1 = model1.perplexity(test_data)
        ppl2 = model2.perplexity(test_data)

        model1_name = model1.__class__.__name__
        model2_name = model2.__class__.__name__

        print(f"{model1_name} overall perplexity: {ppl1:.3f}")
        print(f"{model2_name} overall perplexity: {ppl2:.3f}")
        print(f"Better overall model: {model1_name if ppl1 < ppl2 else model2_name}")
        print()

    except Exception as e:
        print(f"Error calculating overall perplexity: {e}")
        return {}

    # Context-level analysis
    context_comparisons = []
    model1_better_contexts = []
    model2_better_contexts = []

    print("Analyzing context-level performance...")

    for seq_idx, sequence in enumerate(test_data):
        for i in range(context_length, len(sequence)):
            context = sequence[i - context_length:i]
            target_token = sequence[i]

            try:
                # Get predictions from both models
                probs1 = model1.get_next_token_probs(context)
                probs2 = model2.get_next_token_probs(context)

                # Calculate log probabilities for the actual target
                prob1 = max(probs1[target_token], 1e-10)
                prob2 = max(probs2[target_token], 1e-10)

                log_prob1 = np.log2(prob1)
                log_prob2 = np.log2(prob2)

                # Store comparison data
                context_info = {
                    'context': context.copy(),
                    'target': target_token,
                    'log_prob1': log_prob1,
                    'log_prob2': log_prob2,
                    'seq_idx': seq_idx,
                    'pos': i
                }
                context_comparisons.append(context_info)

                # Categorize based on which model is better
                if log_prob1 > log_prob2:  # Higher log prob = better
                    model1_better_contexts.append(context_info)
                else:
                    model2_better_contexts.append(context_info)

            except Exception as e:
                print(f"Error analyzing context at seq {seq_idx}, pos {i}: {e}")
                continue

    # Calculate statistics
    total_contexts = len(context_comparisons)
    model1_wins = len(model1_better_contexts)
    model2_wins = len(model2_better_contexts)

    print(f"Total contexts analyzed: {total_contexts}")
    print(f"{model1_name} better contexts: {model1_wins} ({100*model1_wins/total_contexts:.1f}%)")
    print(f"{model2_name} better contexts: {model2_wins} ({100*model2_wins/total_contexts:.1f}%)")
    print()

    # Find patterns in contexts where each model excels
    def analyze_context_patterns(better_contexts, model_name, top_k=10):
        print(f"Top {top_k} unique contexts where {model_name} excels:")
        print("-" * 40)

        # Group contexts by (context, target) pairs
        context_groups = {}
        for ctx_info in better_contexts:
            context_tuple = tuple(ctx_info['context'])
            target = ctx_info['target']
            key = (context_tuple, target)

            if key not in context_groups:
                context_groups[key] = {
                    'contexts': [],
                    'best_diff': 0,
                    'context': ctx_info['context'],
                    'target': target
                }

            context_groups[key]['contexts'].append(ctx_info)

            # Track the best performance difference for this context
            diff = (ctx_info['log_prob1'] - ctx_info['log_prob2']
                   if model_name == model1_name
                   else ctx_info['log_prob2'] - ctx_info['log_prob1'])

            if diff > context_groups[key]['best_diff']:
                context_groups[key]['best_diff'] = diff

        # Sort by best performance difference
        sorted_groups = sorted(context_groups.values(),
                             key=lambda x: x['best_diff'],
                             reverse=True)

        for i, group in enumerate(sorted_groups[:top_k]):
            context = group['context']
            target = group['target']
            count = len(group['contexts'])
            best_diff = group['best_diff']

            # Format context display
            if tokenizer is not None:
                try:
                    context_text = tokenizer.decode(context[-min(5, len(context)):])
                    target_text = tokenizer.decode([target])
                    print(f"{i+1:2d}. Context: '{context_text}' → Target: '{target_text}' (×{count})")
                except:
                    print(f"{i+1:2d}. Context: {context[-min(5, len(context)):]} → Target: {target} (×{count})")
            else:
                print(f"{i+1:2d}. Context: {context[-min(5, len(context)):]} → Target: {target} (×{count})")

            # Show best probability difference for this context type
            print(f"     Best log-prob difference: {best_diff:.3f}")

            # If there are multiple instances, show average difference
            if count > 1:
                avg_diff = sum((ctx['log_prob1'] - ctx['log_prob2']
                              if model_name == model1_name
                              else ctx['log_prob2'] - ctx['log_prob1'])
                             for ctx in group['contexts']) / count
                print(f"     Average log-prob difference: {avg_diff:.3f}")
            print()

    # Analyze patterns for both models
    if model1_better_contexts:
        analyze_context_patterns(model1_better_contexts, model1_name)

    if model2_better_contexts:
        analyze_context_patterns(model2_better_contexts, model2_name)

    # Analyze context length effects
    print("Performance by context length:")
    print("-" * 30)

    context_length_stats = {}
    for ctx_info in context_comparisons:
        ctx_len = len(ctx_info['context'])
        if ctx_len not in context_length_stats:
            context_length_stats[ctx_len] = {'model1_wins': 0, 'model2_wins': 0, 'total': 0}

        context_length_stats[ctx_len]['total'] += 1
        if ctx_info['log_prob1'] > ctx_info['log_prob2']:
            context_length_stats[ctx_len]['model1_wins'] += 1
        else:
            context_length_stats[ctx_len]['model2_wins'] += 1

    for ctx_len in sorted(context_length_stats.keys()):
        stats = context_length_stats[ctx_len]
        model1_pct = 100 * stats['model1_wins'] / stats['total']
        model2_pct = 100 * stats['model2_wins'] / stats['total']
        print(f"Length {ctx_len:2d}: {model1_name} {model1_pct:5.1f}% | {model2_name} {model2_pct:5.1f}% ({stats['total']} examples)")

    print("=" * 60)

    # Return structured results
    return {
        'overall_perplexity': {model1_name: ppl1, model2_name: ppl2},
        'context_level': {
            'total_contexts': total_contexts,
            f'{model1_name}_wins': model1_wins,
            f'{model2_name}_wins': model2_wins,
            f'{model1_name}_better_contexts': model1_better_contexts[:10],  # Top 10
            f'{model2_name}_better_contexts': model2_better_contexts[:10],  # Top 10
        },
        'context_length_stats': context_length_stats
    }

In [None]:
# Feel free to comment out portions of the code and run it multiple times, or to
# take it out of the main() function. If you're struggling to lower your
# perplexity, you can play around with the model hyperparameters like the
# learning rate, batch size, and number of epochs.

def main():
    # Here we're training on train.txt and evaluating on test.txt.
    # However, you might find it useful to play with tiny.txt while you're debugging.
    # If you're running into memory issues, you can try training on a smaller set of
    # sentences by truncating train.txt, but you should always report your final
    # results on test.txt (and think about ways of making your code more efficient)!

    train_data, tokenizer = load_data("train.txt", tokenizer=None) # Feel free to swap with tiny.txt for testing
    print("Loaded training data")
    test_data, _ = load_data("test.txt", tokenizer=tokenizer) # Feel free to swap with tiny.txt for testing
    print("Loaded test data")

    print('N-gram')
    ngram_model = NGramModel(tokenizer.get_vocab_size(), n=3)
    ngram_model.train(train_data)
    print(tokenizer.decode(ngram_model.generate_text(test_data[0][:1])))
    print(f"Perplexity: {ngram_model.perplexity(test_data)}")

    # print('Log-linear')
    # log_linear_model = LogLinearModel(tokenizer.get_vocab_size())
    # log_linear_model.train(train_data)
    # print(tokenizer.decode(log_linear_model.generate_text(test_data[0][:1])))
    # print(f"Perplexity: {log_linear_model.perplexity(test_data)}")

    # print('CBOW')
    # cbow_model = CBOWModel(tokenizer.get_vocab_size(), embedding_dim=100, context_size=3)
    # cbow_model.train(train_data, epochs=10)
    # print(tokenizer.decode(cbow_model.generate_text(test_data[0][:1])))
    # print(f"Perplexity: {cbow_model.perplexity(test_data)}")

    # evaluate_models([ngram_model, log_linear_model,cbow_model], test_data)


if __name__ == "__main__":
    main()

Exception: No such file or directory (os error 2)