In [None]:
import pandas as pd
import numpy as np
import torch
import nltk
import sklearn
import random
import argparse
from collections import defaultdict
from collections import Counter
import re

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

### CORPUS CLEANING, TOKENIZATION AND BUILDING VOCAB


In [None]:
def cleanPrideAndPrejudice(file_path):
    with open(file_path, "r") as f:
        text = f.read()

    start_idx = text.find("CHAPTER I.")
    if start_idx == -1:
        return text
    text = text[start_idx:]

    end_idx = text.find("Transcriber's note:")
    if end_idx != -1:
        text = text[:end_idx]

    text = re.sub(r"CHAPTER\s+[IVXLCDM]+", "", text)

    return text


def cleanUllyeses(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    first_idx = text.find("— I —")
    if first_idx == -1:
        return text

    second_idx = text.find("— I —", first_idx + 1)
    if second_idx == -1:
        return text
    text = text[second_idx:]

    text = re.sub(r"—\s+[I|II|III]+\s+—", "", text)

    text = re.sub(r"\[\s*\d+\s*\]", "", text)

    end_idx = text.find("Trieste-Zurich-Paris")
    if end_idx != -1:
        text = text[:end_idx]

    return text

def Tokenizer(inputText):
    text = inputText.lower()
    text = text.replace("\n", " ")
    text = re.sub(r"http\S+", "<URL>", text)
    text = re.sub(r"www\S+", "<URL>", text)
    text = re.sub(r"[A-Za-z0-9._%+-]+@[A-za-z0-9.-]+\.[a-z]{2,}", "<MAILID>", text)

    text = re.sub(r"[^\@\#\.\w\?\!\s:-]", "", text)
    text = re.sub(f"-", " ", text)
    text = re.sub(r"_", " ", text)

    text = re.sub(r"\s+", " ", text)
    text = re.sub(r"\n*", "", text)
    text = re.sub(r"\.+", ".", text)

    abbreviations = re.findall(r"\b([A-Z]([a-z]){,2}\.)", text)
    if abbreviations:
        abbreviations_set = set((list(zip(*abbreviations))[0]))

        for word in abbreviations_set:
            pattern = r"\b" + re.escape(word)
            text = re.sub(pattern, word.strip("."), text)

    text = re.sub(r"#\w+\b", "<HASHTAG>", text)
    text = re.sub(r"@\w+\b", "<MENTION>", text)
    text = re.sub(r"\b\d+\b", "<NUM>", text)

    # Tokenize each sentence into words
    sentences = [
        sentence.strip() for sentence in re.split(r"[.!?:]+", text) if sentence.strip()
    ]
    sentences = [sentence.split() for sentence in sentences]

    return sentences

def trainTestSplit(sentences, testSize):
    random.seed(69)

    testSplit = random.sample(sentences, testSize)
    duplicateTestSplit = testSplit.copy()
    trainSplit = [
        sentence
        for sentence in sentences
        if sentence not in duplicateTestSplit or duplicateTestSplit.remove(sentence)
    ]

    return trainSplit, testSplit

In [None]:
# file_path = input("Enter Corpus Path: ")
# n = input("Enter value of N: ")

file_path = 'corpus/corpus_2.txt'
n = 3
filteredText = ""
corp = ""

# corpus cleaning and tokenization
if 'corpus_1.txt' in file_path:
    filteredText = cleanPrideAndPrejudice(file_path)
    corp = "papc"
elif 'corpus_2.txt' in file_path:
    filteredText = cleanUllyeses(file_path)
    corp = "uc"
else:
    print("Corpus doesn't exist!")
    exit()

In [None]:
sentences = Tokenizer(filteredText)

In [None]:
train_text, test_text = trainTestSplit(sentences, 1000)

In [None]:
print(len(train_text))
print(len(test_text))

In [None]:
vocab = Counter(word for sentence in train_text for word in sentence)
word_to_idx = {word: i for i, word in enumerate(vocab.keys())}
idx_to_word = {i: word for word, i in word_to_idx.items()}

word_to_idx["<UNK>"] = len(word_to_idx)
idx_to_word[len(idx_to_word)] = "<UNK>"

In [None]:
print(len(word_to_idx))

In [None]:
print(f"Vocab Size: {len(word_to_idx)}")

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from collections import Counter

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class NGramDataset(Dataset):
    def __init__(self, tokenized_sentences, n):
        self.n = n
        
        word_counts = Counter([word for sentence in tokenized_sentences for word in sentence])
        
        # Filter vocabulary based on minimum frequency
        frequent_words = {word for word, count in word_counts.items()}
        
        self.vocab = ["<PAD>", "<UNK>"] + list(frequent_words)
        self.word2idx = {word: idx for idx, word in enumerate(self.vocab)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        
        self.data = []
        pad_idx = self.word2idx["<PAD>"]
        unk_idx = self.word2idx["<UNK>"]
        
        for sentence in tokenized_sentences:
            indices = []
            for word in sentence:
                if word in self.word2idx:
                    indices.append(self.word2idx[word])
                else:
                    indices.append(unk_idx)
            
            # Add padding at the beginning
            padded = [pad_idx] * (n - 1) + indices
            
            # Create n-gram samples
            for i in range(len(indices)):
                context = padded[i:i + n - 1]
                target = indices[i]
                self.data.append((context, target))
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        context, target = self.data[idx]
        return torch.tensor(context), torch.tensor(target)

class FFNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, context_size):
        super(FFNN, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.hidden = nn.Linear(context_size * embedding_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, vocab_size)
        self.activation = nn.ReLU()
    
    def forward(self, x):
        # Embed all context words: [batch_size, context_size, embedding_dim]
        embedded = self.embedding(x)
        
        # Flatten the embeddings: [batch_size, context_size * embedding_dim]
        batch_size = embedded.size(0)
        flattened = embedded.view(batch_size, -1)
        
        # Pass through layers
        hidden = self.activation(self.hidden(flattened))
        output = self.output(hidden)
        return output

class LanguageModelTrainer:
    def __init__(self, model, device):
        self.model = model.to(device)
        self.device = device
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        print(f"Model moved to {device}")
    
    def train_epoch(self, dataloader):
        self.model.train()
        total_loss = 0
        
        for batch_idx, (data, target) in enumerate(dataloader):
            data, target = data.to(self.device), target.to(self.device)
            self.optimizer.zero_grad()
            
            output = self.model(data)
            loss = self.criterion(output, target)
            
            loss.backward()
            self.optimizer.step()
            
            total_loss += loss.item()
            
            if batch_idx % 100 == 0:
                print(f"Batch {batch_idx}, Loss: {loss.item():.4f}")
        
        return total_loss / len(dataloader)
    
    def predict_next_words(self, input_tokens, dataset, k=5):
        """
        Predict next words given a list of input tokens
        """
        self.model.eval()
        
        if isinstance(input_tokens, str):
            tokens = input_tokens.strip().split()
        else:
            tokens = input_tokens
        
        # Ensure we have enough context
        if len(tokens) < dataset.n - 1:
            tokens = ["<PAD>"] * (dataset.n - 1 - len(tokens)) + tokens
        elif len(tokens) > dataset.n - 1:
            tokens = tokens[-(dataset.n - 1):]
        
        # Convert tokens to indices, handling OOV
        word_indices = []
        for word in tokens:
            if word in dataset.word2idx:
                word_indices.append(dataset.word2idx[word])
            else:
                word_indices.append(dataset.word2idx["<UNK>"])
        
        with torch.no_grad():
            input_tensor = torch.tensor(word_indices).unsqueeze(0).to(self.device)
            output = self.model(input_tensor)
            
            probabilities = torch.softmax(output[0], dim=0)
            top_k_probs, top_k_indices = torch.topk(probabilities, k)
            
            predictions = []
            for prob, idx in zip(top_k_probs.cpu().numpy(), top_k_indices.cpu().numpy()):
                word = dataset.idx2word[idx]
                predictions.append((word, prob))
        
        return predictions

def train_language_model(tokenized_sentences, embedding_dim, hidden_dim, n, 
                        batch_size, num_epochs, device):
    # Create dataset
    dataset = NGramDataset(tokenized_sentences, n)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    print(f"Vocabulary size: {len(dataset.vocab)}")
    
    # Initialize model
    model = FFNN(
        vocab_size=len(dataset.vocab),
        embedding_dim=embedding_dim,
        hidden_dim=hidden_dim,
        context_size=n-1
    )
    trainer = LanguageModelTrainer(model, device)
    
    # Training loop
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        loss = trainer.train_epoch(dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {loss:.4f}")
    
    return model, dataset, trainer

In [None]:
# # hyperparams
# embedding_dimensions = 100
# hidden_dimensions = 256
# sequence_length = 40
# batch_size = 256
# epochs = 10

# # Train the model
# model, dataset, trainer = train_language_model(
#     train_text, embedding_dimensions, hidden_dimensions, n, batch_size, epochs, device
# )

# print("Training completed successfully!")

In [None]:
# # After training, you can make predictions
# context = ["mr", "darcy", "is", "an"]  # example context
# predictions = trainer.predict_next_words(context, dataset, k=10)  # get top 5 predictions

# # Print predictions
# for word, probability in predictions:
#     print(f"{word}: {probability:.4f}")

In [None]:
import matplotlib.pyplot as plt

def calculate_perplexity_and_avg_from_file(model, sentences, dataset, device, batch_size=256, output_file="perplexities.txt"):
    model.eval()
    sentence_nlls = {}
    sentence_lengths = {}
    perplexities = []
    n = dataset.n  # n-gram size
    
    with torch.no_grad():
        # Process each sentence
        for sent_idx, sentence in enumerate(sentences):
            if len(sentence) < n:  # Skip sentences shorter than n-gram size
                continue
            
            # Convert words to indices
            indices = [dataset.word2idx.get(word, dataset.word2idx['<UNK>']) 
                      for word in sentence]
            
            # Create n-gram samples for this sentence
            contexts = []
            targets = []
            
            for j in range(len(indices) - n + 1):
                contexts.append(indices[j:j + n - 1])
                targets.append(indices[j + n - 1])
            
            # Initialize tracking for this sentence
            sentence_nlls[sent_idx] = 0
            sentence_lengths[sent_idx] = len(targets)
            
            # Process n-grams in batches
            for i in range(0, len(contexts), batch_size):
                batch_contexts = contexts[i:i + batch_size]
                batch_targets = targets[i:i + batch_size]
                
                # Convert to tensors and move to device
                context_tensor = torch.tensor(batch_contexts, device=device)
                target_tensor = torch.tensor(batch_targets, device=device)
                
                # Get model predictions
                output = model(context_tensor)
                log_probs = torch.log_softmax(output, dim=-1)
                
                # Accumulate NLL for this batch
                for j in range(len(batch_contexts)):
                    nll = -log_probs[j, target_tensor[j]].item()
                    sentence_nlls[sent_idx] += nll
        
        # Calculate and store sentence perplexities
        for sent_idx in sorted(sentence_nlls.keys()):
            nll = sentence_nlls[sent_idx]
            length = sentence_lengths[sent_idx]
            ppl = torch.exp(torch.tensor(nll / length)).item()
            perplexities.append(f"sentence {sent_idx+1}: {ppl:.4f}")
    
    # Write results to file
    with open(output_file, "w") as f:
        f.write("\n".join(perplexities))
    print(f"Perplexity scores saved to {output_file}")
    
    # Now calculate the average perplexity from the file
    total_ppl = 0
    total_sentences = 0
    ppl_list = []
    max_ppl = 1e5

    with open(output_file, "r") as f:
        lines = f.readlines()
        for line in lines:
            # Extract the perplexity value from the line
            try:
                sentence_idx, ppl = line.split(": ")
                ppl = float(ppl.strip())

                # Clip perplexity values
                ppl = min(ppl, max_ppl)
                ppl_list.append(ppl)

                total_ppl += ppl
                total_sentences += 1
            except ValueError:
                continue

    # Calculate and return the average perplexity
    if total_sentences > 0:
        avg_perplexity = total_ppl / total_sentences
        return avg_perplexity, ppl_list
    else:
        print("No valid sentences found for perplexity calculation")
        return None, []

def plot_perplexity_histogram(train_perplexities, test_perplexities, train_label="Train", test_label="Test"):
    plt.figure(figsize=(10, 6))
    
    plt.hist(np.emath.logn(10, train_perplexities), bins=30, alpha=0.5, label=train_label, color='blue', edgecolor='black')
    
    plt.hist(np.emath.logn(10, test_perplexities), bins=30, alpha=0.5, label=test_label, color='red', edgecolor='black')
    
    plt.xlabel('Logarithm of Perplexity')
    plt.ylabel('Frequency')
    plt.title('Perplexity Distribution')
    plt.legend(loc='upper right')
    
    plt.tight_layout()
    plt.show()

In [None]:
# # Calculate perplexities and average perplexity for training set
# print("Calculating training set perplexity...")
# avg_perplexity_train, train_perplexity_list = calculate_perplexity_and_avg_from_file(
#     model=model,
#     sentences=train_text,
#     dataset=dataset,
#     device=device,
#     batch_size=128,
#     output_file="train_perplexities.txt"
# )
# print(f"Average Training Set Perplexity: {avg_perplexity_train}")

# # Calculate perplexities and average perplexity for test set
# print("\nCalculating test set perplexity...")
# avg_perplexity_test, test_perplexity_list = calculate_perplexity_and_avg_from_file(
#     model=model,
#     sentences=test_text,
#     dataset=dataset,
#     device=device,
#     batch_size=128,
#     output_file="test_perplexities.txt"
# )
# print(f"Average Test Set Perplexity: {avg_perplexity_test}")

In [None]:
# plot_perplexity_histogram(train_perplexity_list, test_perplexity_list, "Train", "Test")

In [None]:
import torch
import itertools
import json
from datetime import datetime
import os


def train_and_evaluate_models(
    train_text,
    test_text,
    hyperparameters,
    base_model_dir="models_ffnn",
    results_file="model_results_ffnn.json",
):
    """
    Train multiple models with different hyperparameter combinations and evaluate their perplexities

    Args:
        train_text: Training data (list of tokenized sentences)
        test_text: Test data (list of tokenized sentences)
        hyperparameters: Dictionary of hyperparameter lists to try
        base_model_dir: Directory to save models
        results_file: File to save results
    """
    # Create model directory if it doesn't exist
    os.makedirs(base_model_dir, exist_ok=True)

    # Generate all combinations of hyperparameters
    param_names = sorted(hyperparameters.keys())
    param_values = [hyperparameters[name] for name in param_names]
    param_combinations = list(itertools.product(*param_values))

    # Store results
    results = []

    # Train and evaluate each combination
    for i, params in enumerate(param_combinations):
        param_dict = dict(zip(param_names, params))
        print(f"\nTraining model {i+1}/{len(param_combinations)}")
        print("Parameters:", param_dict)

        # Create model identifier
        model_id = f"model_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{i}"

        try:
            # Train model
            model, dataset, trainer = train_language_model(
                train_text,
                param_dict["embedding_dim"],
                param_dict["hidden_dim"],
                param_dict["n"],
                param_dict["batch_size"],
                param_dict["epochs"],
                device,
            )

            # Calculate perplexities
            print("\nCalculating training perplexity...")
            train_perplexity, _ = calculate_perplexity_and_avg_from_file(
                model=model,
                sentences=train_text,
                dataset=dataset,
                device=device,
                output_file=f"{base_model_dir}/train_perplexities_{model_id}.txt",
            )

            print("\nCalculating test perplexity...")
            test_perplexity, _ = calculate_perplexity_and_avg_from_file(
                model=model,
                sentences=test_text,
                dataset=dataset,
                device=device,
                output_file=f"{base_model_dir}/test_perplexities_{model_id}.txt",
            )

            # Save model
            model_path = f"{base_model_dir}/model_{model_id}.pt"
            torch.save(
                {
                    "model_state_dict": model.state_dict(),
                    "hyperparameters": param_dict,
                    "train_perplexity": train_perplexity,
                    "test_perplexity": test_perplexity,
                },
                model_path,
            )

            # Store results
            result = {
                "model_id": model_id,
                "hyperparameters": param_dict,
                "train_perplexity": train_perplexity,
                "test_perplexity": test_perplexity,
                "model_path": model_path,
            }
            results.append(result)

            # Save results after each model (in case of crashes)
            with open(results_file, "w") as f:
                json.dump(results, f, indent=4)

        except Exception as e:
            print(f"Error training model with parameters {param_dict}: {str(e)}")
            continue

    # Find best model based on test perplexity
    best_model = min(results, key=lambda x: x["test_perplexity"])
    print("\nBest Model:")
    print(json.dumps(best_model, indent=4))

    return results, best_model


# Define hyperparameter grid
hyperparameters = {
    "embedding_dim": [400],
    "hidden_dim": [512],
    "n": [3, 5],
    "batch_size": [256],
    "epochs": [5, 10, 15],
}

# Train and evaluate models
results, best_model = train_and_evaluate_models(
    train_text=train_text,
    test_text=test_text,
    hyperparameters=hyperparameters,
    base_model_dir=f"models_ffnn_{corp}",
    results_file=f"model_results_ffnn_{corp}.json",
)

# Print summary
print("\nTraining Summary:")
print(f"Total models trained: {len(results)}")
print("\nBest performing model:")
print(f"Model ID: {best_model['model_id']}")
print("Hyperparameters:", best_model["hyperparameters"])
print(f"Train Perplexity: {best_model['train_perplexity']:.4f}")
print(f"Test Perplexity: {best_model['test_perplexity']:.4f}")

In [None]:
import json
import pandas as pd
import matplotlib.pyplot as plt

# Load the JSON data
with open('model_results_ffnn.json', 'r') as file:
    data = json.load(file)

# Convert JSON data to a pandas DataFrame
df = pd.DataFrame(data)

# Extract hyperparameters and perplexity values
df['n'] = df['hyperparameters'].apply(lambda x: x['n'])
df['epochs'] = df['hyperparameters'].apply(lambda x: x['epochs'])
df['train_perplexity'] = df['train_perplexity']
df['test_perplexity'] = df['test_perplexity']

# Plot 1: n vs Epochs (for different n values)
plt.figure(figsize=(10, 6))
for n_value in df['n'].unique():
    subset = df[df['n'] == n_value]
    plt.plot(subset['epochs'], subset['train_perplexity'], marker='o', label=f'n={n_value}')
plt.title('Training Perplexity vs Epochs for Different n Values')
plt.xlabel('Epochs')
plt.ylabel('Training Perplexity')
plt.legend()
plt.grid(True)
plt.show()

# Plot 2: n vs Training Perplexity
plt.figure(figsize=(10, 6))
for epoch in df['epochs'].unique():
    subset = df[df['epochs'] == epoch]
    plt.plot(subset['n'], subset['train_perplexity'], marker='o', label=f'Epochs={epoch}')
plt.title('Training Perplexity vs n for Different Epochs')
plt.xlabel('n (Context Window Size)')
plt.ylabel('Training Perplexity')
plt.legend()
plt.grid(True)
plt.show()

# Plot 3: n vs Test Perplexity
plt.figure(figsize=(10, 6))
for epoch in df['epochs'].unique():
    subset = df[df['epochs'] == epoch]
    plt.plot(subset['n'], subset['test_perplexity'], marker='o', label=f'Epochs={epoch}')
plt.title('Test Perplexity vs n for Different Epochs')
plt.xlabel('n (Context Window Size)')
plt.ylabel('Test Perplexity')
plt.legend()
plt.grid(True)
plt.show()

# Plot 4: Epochs vs Training Perplexity (for different n values)
plt.figure(figsize=(10, 6))
for n_value in df['n'].unique():
    subset = df[df['n'] == n_value]
    plt.plot(subset['epochs'], subset['train_perplexity'], marker='o', label=f'n={n_value}')
plt.title('Training Perplexity vs Epochs for Different n Values')
plt.xlabel('Epochs')
plt.ylabel('Training Perplexity')
plt.legend()
plt.grid(True)
plt.show()

# Plot 5: Epochs vs Test Perplexity (for different n values)
plt.figure(figsize=(10, 6))
for n_value in df['n'].unique():
    subset = df[df['n'] == n_value]
    plt.plot(subset['epochs'], subset['test_perplexity'], marker='o', label=f'n={n_value}')
plt.title('Test Perplexity vs Epochs for Different n Values')
plt.xlabel('Epochs')
plt.ylabel('Test Perplexity')
plt.legend()
plt.grid(True)
plt.show()