This is the initial set-up for the whole comparative study between FFNN, Vanilla RNN (No hidden state) and LSTM. We will handle the Corpus cleaning in this notbook.

In [1]:
import os
import re
import random
import pickle
import unicodedata
import string
from collections import Counter

import nltk
from nltk.tokenize import sent_tokenize
nltk.download('punkt', force=True)
nltk.download('punkt_tab')


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

Above we have downloaded the dependencies as required

In [26]:
#########################################
# GLOBAL CONFIGURATION
#########################################

INPUT_FILE   = "pride_prejudice.txt"         # Original corpus
TRAIN_FILE   = "trainSetPride.txt"  # Output: Train dataset
TEST_FILE    = "testSetPride.txt"   # Output: Test dataset
VOCAB_FILE   = "vocabPride.pkl"     # Output: Pickled vocabulary

TEST_RATIO   = 0.06            # 2.9% of sentences go to test set
MIN_FREQ     = 2               # Min frequency to keep a word in vocab
PAD_TOKEN    = "<PAD>"
UNK_TOKEN    = "<UNK>"

The global configurations have been declared, now we are ready to begin


In [22]:
#########################################
# CONTRACTIONS & PUNCTUATION
#########################################

CONTRACTION_MAP = {
    "can't": "cannot", "won't": "will not", "it's": "it is",
    "i'm": "i am", "he's": "he is", "she's": "she is",
    "we're": "we are", "they're": "they are", "you're": "you are",
    "didn't": "did not", "doesn't": "does not", "don't": "do not",
    "haven't": "have not", "hasn't": "has not", "let's": "let us", "ma'am": "madam",
    "mightn't": "might not", "mustn't": "must not", "needn't": "need not"
    }

# We remove '<' and '>' from punctuation because they are used in <UNK> and <PAD>.
PUNCTUATION = string.punctuation.replace('<', '').replace('>', '')

#########################################
# 1. TEXT PROCESSING FUNCTIONS
#########################################

def expandContractions(text):
    """
    Replaces common English contractions (e.g., "can't" -> "cannot").
    """
    for contr, fullForm in CONTRACTION_MAP.items():
        pattern = re.compile(r'\b' + re.escape(contr) + r'\b', flags=re.IGNORECASE)
        text = pattern.sub(fullForm, text)
    return text

def cleanLine(line):
    """
    1) Normalizes Unicode.
    2) Replaces curly quotes/apostrophes with straight ones.
    3) Expands contractions.
    4) Removes punctuation (including all dash variants).
    5) Converts to lowercase.
    """
    # 1) Normalize Unicode
    line = unicodedata.normalize('NFKC', line)

    # 2) Convert curly quotes to straight quotes so regex sees them
    #    This handles cases like “ ” or ‘ ’
    line = line.replace("’", "'").replace("‘", "'")
    line = line.replace("“", '"').replace("”", '"')

    # Convert em dashes/en dashes to ASCII hyphen
    line = re.sub(r"[—–]", "-", line)

    # 3) Expand contractions
    line = expandContractions(line)

    # 4) Remove punctuation (this includes ASCII hyphen '-')
    line = re.sub(f"[{re.escape(PUNCTUATION)}]", "", line)

    # 5) Lowercase
    return line.lower().strip()

def splitIntoSentences(text):
    """
    Splits a large text into individual sentences using NLTK, then cleans each sentence.
    Returns a list of cleaned sentences (strings).
    """
    rawSents = sent_tokenize(text)
    cleanedSents = []
    for sent in rawSents:
        c = cleanLine(sent)
        if c:
            cleanedSents.append(c)
    return cleanedSents

Above we have processed the text, expanded contractions, removed punctuations and split the corpus into sentences.

In [23]:
#########################################
# 2. VOCABULARY FUNCTIONS
#########################################

def buildVocab(sentences, minFreq):
    """
    Builds word2idx and idx2word mappings.
    - Includes <UNK> and <PAD> by default.
    - Only words appearing >= minFreq times are added.
    """
    wordCounts = Counter()
    for sent in sentences:
        # Each 'sent' is expected to be a string, so split into tokens
        tokens = sent.split()
        wordCounts.update(tokens)

    # Start vocab with UNK and PAD
    word2idx = {UNK_TOKEN: 0, PAD_TOKEN: 1}
    idx = 2

    # Sort words by frequency (descending)
    for word, count in sorted(wordCounts.items(), key=lambda x: x[1], reverse=True):
        if count >= minFreq:
            word2idx[word] = idx
            idx += 1

    # Reverse mapping
    idx2word = {i: w for w, i in word2idx.items()}
    return word2idx, idx2word



We have now built the vocabulary , it will be later saved to vocab.pkl

In [24]:
#########################################
# 3. CORPUS PROCESSING & SPLITTING
#########################################

def readAndProcessCorpus(inputFile):
    """
    Reads the entire file and splits into cleaned sentences.
    Returns a list of processed sentence strings.
    """
    with open(inputFile, 'r', encoding='utf-8') as f:
        text = f.read()
    # Convert full text into cleaned sentences
    processedSentences = splitIntoSentences(text)
    return processedSentences

def randomlySplitData(sentences, testRatio):
    """
    Splits the list of sentences into train (85%) and test (15%) by random selection.
    Returns trainSentences, testSentences.
    """
    total = len(sentences)
    testSize = int(total * testRatio)

    # Randomly choose testSize indices for test set
    testIndices = set(random.sample(range(total), testSize))

    testSents = []
    trainSents = []
    for i, sent in enumerate(sentences):
        if i in testIndices:
            testSents.append(sent)
        else:
            trainSents.append(sent)
    return trainSents, testSents

def saveLines(lines, outFile):
    """
    Saves a list of sentence strings to an output file, one sentence per line.
    """
    with open(outFile, 'w', encoding='utf-8') as f:
        for line in lines:
            f.write(line + "\n")

def saveVocab(word2idx, idx2word, outFile):
    """
    Pickles the vocabulary dictionaries for future use.
    """
    with open(outFile, 'wb') as f:
        pickle.dump((word2idx, idx2word), f)



Corpus splitting is done above, and functions are defined

In [27]:
#########################################
# 4. MAIN PREPROCESS FUNCTION
#########################################

def preprocessCorpus(inputFile, trainFile, testFile, vocabFile, testRatio, minFreq):
    """
    1) Reads & processes the inputFile into cleaned sentences.
    2) Randomly splits them into train & test sets (85%-15%).
    3) Builds vocabulary from train set only.
    4) Saves train & test sets, along with the vocabulary files.
    """
    # Step 1: Read & process
    sentences = readAndProcessCorpus(inputFile)
    print(f"Total sentences after cleaning: {len(sentences)}")

    # Step 2: Split
    trainSents, testSents = randomlySplitData(sentences, testRatio)
    print(f"Train sentences: {len(trainSents)}, Test sentences: {len(testSents)}")

    # Step 3: Build vocabulary using only train set
    word2idx, idx2word = buildVocab(trainSents, minFreq)
    print(f"Vocabulary size (including <UNK> & <PAD>): {len(word2idx)}")

    # Step 4: Save data
    saveLines(trainSents, trainFile)
    saveLines(testSents, testFile)
    saveVocab(word2idx, idx2word, vocabFile)

    print(f" Preprocessing complete.\n"
          f"   - Train set saved to '{trainFile}'\n"
          f"   - Test set saved to '{testFile}'\n"
          f"   - Vocab saved to '{vocabFile}'")

#########################################
# 5. MAIN EXECUTION
#########################################

if __name__ == "__main__":
    # You can modify these paths if needed
    inputFile   = INPUT_FILE
    trainFile   = TRAIN_FILE
    testFile    = TEST_FILE
    vocabFile   = VOCAB_FILE
    testRatio   = TEST_RATIO
    minFreq     = MIN_FREQ

    # Perform the preprocessing
    preprocessCorpus(inputFile, trainFile, testFile, vocabFile, testRatio, minFreq)


Total sentences after cleaning: 5856
Train sentences: 5505, Test sentences: 351
Vocabulary size (including <UNK> & <PAD>): 3857
 Preprocessing complete.
   - Train set saved to 'trainSetPride.txt'
   - Test set saved to 'testSetPride.txt'
   - Vocab saved to 'vocabPride.pkl'


Main Function defined and called.

This is FFNN

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pickle
import math
from collections import Counter
from torch.utils.data import Dataset, DataLoader

###############################################
# CONFIGURATION
###############################################
N_GRAM_SIZE = 3  # Explicitly provided n-gram size
TRAIN_FILE = "trainSetUlysses.txt"
TEST_FILE = "testSetUlysses.txt"
VOCAB_FILE = "vocabUlysses.pkl"
MODEL_FILE = "ffnn_5.pth"
CONFIG_FILE = "model_config.txt"
TRAIN_PERPLEXITY_FILE = "train_perplexity.txt"
TEST_PERPLEXITY_FILE = "test_perplexity.txt"

# Hyperparameters for model training
EMBEDDING_DIM = 16  # Size of word embeddings
HIDDEN_DIM = 64  # Number of neurons in the hidden layer
BATCH_SIZE = 64 # Batch size for training
LEARNING_RATE = 0.001  # Learning rate for the optimizer
NUM_EPOCHS = 5  # Number of epochs for training
PATIENCE = 3  # Early stopping patience

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

###############################################
# LOAD VOCABULARY
###############################################
def loadVocabulary(vocabFile):
    with open(vocabFile, "rb") as file:
        wordToIndex, indexToWord = pickle.load(file)
    return wordToIndex, indexToWord

wordToIndex, indexToWord = loadVocabulary(VOCAB_FILE)
vocabSize = len(wordToIndex)

###############################################
# DATASET PREPARATION
###############################################
class NGramDataset(Dataset):
    def __init__(self, filePath, ngramSize, wordToIndex):
        self.ngramSize = ngramSize
        self.wordToIndex = wordToIndex
        self.data = self.loadData(filePath)

    def loadData(self, filePath):
        data = []
        with open(filePath, "r", encoding="utf-8") as file:
            for line in file:
                words = line.strip().split()
                if len(words) < self.ngramSize:
                    words = ["<PAD>"] * (self.ngramSize - len(words)) + words
                indices = [self.wordToIndex.get(w, self.wordToIndex["<UNK>"]) for w in words]
                for i in range(len(indices) - self.ngramSize + 1):
                    context = indices[i:i + self.ngramSize - 1]
                    target = indices[i + self.ngramSize - 1]
                    data.append((context, target))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        context, target = self.data[idx]
        return torch.tensor(context, dtype=torch.long), torch.tensor(target, dtype=torch.long)

###############################################
# DEFINE FFNN MODEL
###############################################
class FeedForwardLM(nn.Module):
    def __init__(self, vocabSize, embedDim, contextSize, hiddenDim):
        super(FeedForwardLM, self).__init__()
        self.embeddings = nn.Embedding(vocabSize, embedDim)
        self.linear1 = nn.Linear(contextSize * embedDim, hiddenDim)
        self.activation = nn.ReLU()
        self.linear2 = nn.Linear(hiddenDim, vocabSize)

    def forward(self, inputs):
        embedded = self.embeddings(inputs)
        flat = embedded.view(embedded.size(0), -1)
        hidden = self.activation(self.linear1(flat))
        logits = self.linear2(hidden)
        return logits

###############################################
# TRAINING FUNCTION
###############################################
def trainModel(model, dataLoader, optimizer, lossFunction, numEpochs, patience):
    bestLoss = float("inf")
    noImprove = 0

    for epoch in range(numEpochs):
        model.train()
        totalLoss = 0
        for context, target in dataLoader:
            context, target = context.to(device), target.to(device)
            optimizer.zero_grad()
            logits = model(context)
            loss = lossFunction(logits, target)
            loss.backward()
            optimizer.step()
            totalLoss += loss.item()

        avgLoss = totalLoss / len(dataLoader)
        print(f"Epoch {epoch+1}: Train Loss = {avgLoss:.4f}")

        if avgLoss < bestLoss:
            bestLoss = avgLoss
            noImprove = 0
        else:
            noImprove += 1
            if noImprove >= patience:
                print(f"Early stopping triggered after {epoch+1} epochs.")
                break

###############################################
# PERPLEXITY COMPUTATION
###############################################
def computePerplexity(filePath, model, wordToIndex, ngramSize, outputFile):
    with open(filePath, "r", encoding="utf-8") as file:
        sentences = file.readlines()

    totalLogProb = 0.0
    totalCount = 0
    model.eval()

    with open(outputFile, "w", encoding="utf-8") as fileOut:
        for lineIndex, sentence in enumerate(sentences, start=1):
            words = sentence.strip().split()
            if len(words) < ngramSize:
                words = ["<PAD>"] * (ngramSize - len(words)) + words
            indices = [wordToIndex.get(w, wordToIndex["<UNK>"]) for w in words]
            logProb = 0.0
            count = 0

            for i in range(len(indices) - ngramSize + 1):
                context = indices[i:i + ngramSize - 1]
                target = indices[i + ngramSize - 1]
                contextTensor = torch.tensor([context], dtype=torch.long).to(device)
                with torch.no_grad():
                    logits = model(contextTensor)
                    logProbs = torch.nn.functional.log_softmax(logits, dim=1)
                    logProb += logProbs[0, target].item()
                count += 1

            sentencePerplexity = math.exp(-logProb / count) if count > 0 else float("inf")
            fileOut.write(f"Sentence {lineIndex}: Perplexity = {sentencePerplexity:.4f}\n")
            totalLogProb += logProb
            totalCount += count

        overallPerplexity = math.exp(-totalLogProb / totalCount) if totalCount > 0 else float("inf")
        fileOut.write(f"\nOverall Perplexity: {overallPerplexity:.4f}\n")
    print(f"Perplexity results saved to {outputFile}")

###############################################
# SAVE MODEL CONFIGURATION
###############################################
def saveModelConfig(configFile, vocabSize, embeddingDim, hiddenDim, batchSize, learningRate, numEpochs, patience, nGramSize):
    with open(configFile, "w") as file:
        file.write(f"Vocab Size: {vocabSize}\n")
        file.write(f"Embedding Dim: {embeddingDim}\n")
        file.write(f"Hidden Dim: {hiddenDim}\n")
        file.write(f"Batch Size: {batchSize}\n")
        file.write(f"Learning Rate: {learningRate}\n")
        file.write(f"Num Epochs: {numEpochs}\n")
        file.write(f"Patience: {patience}\n")
        file.write(f"N-Gram Size: {nGramSize}\n")

###############################################
# EXECUTION
###############################################
def main():
    # Load training dataset
    trainDataset = NGramDataset(TRAIN_FILE, N_GRAM_SIZE, wordToIndex)
    trainLoader = DataLoader(trainDataset, batch_size=BATCH_SIZE, shuffle=True)

    # Initialize FFNN model
    model = FeedForwardLM(vocabSize, EMBEDDING_DIM, N_GRAM_SIZE - 1, HIDDEN_DIM).to(device)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    lossFunction = nn.CrossEntropyLoss()

    print("Training FFNN model...")
    trainModel(model, trainLoader, optimizer, lossFunction, numEpochs=NUM_EPOCHS, patience=PATIENCE)
    torch.save(model.state_dict(), MODEL_FILE)

    # Save model configuration for reproducibility
    saveModelConfig(CONFIG_FILE, vocabSize, EMBEDDING_DIM, HIDDEN_DIM, BATCH_SIZE, LEARNING_RATE, NUM_EPOCHS, PATIENCE, N_GRAM_SIZE)
    print(f"Model configuration saved to {CONFIG_FILE}")

    # Compute perplexity for training and testing data
    computePerplexity(TRAIN_FILE, model, wordToIndex, N_GRAM_SIZE, TRAIN_PERPLEXITY_FILE)
    computePerplexity(TEST_FILE, model, wordToIndex, N_GRAM_SIZE, TEST_PERPLEXITY_FILE)

if __name__ == "__main__":
    main()


Training FFNN model...
Epoch 1: Train Loss = 6.7762
Epoch 2: Train Loss = 6.1226
Epoch 3: Train Loss = 5.8230
Epoch 4: Train Loss = 5.5709
Epoch 5: Train Loss = 5.3412
Model configuration saved to model_config.txt
Perplexity results saved to train_perplexity.txt
Perplexity results saved to test_perplexity.txt


RNN Vanilla

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pickle
import math
from torch.utils.data import Dataset, DataLoader
import torch.nn.utils.rnn as rnn_utils

###############################################
# CONFIGURATION
###############################################
# Maximum total tokens in a segment.
# Note: The effective input length is MAX_SEQ_LEN-1 because targets are shifted.
MAX_SEQ_LEN = 50
OVERLAP_RATIO = 0.5

TRAIN_FILE = "trainSetPride.txt"
TEST_FILE = "testSetPride.txt"
VOCAB_FILE = "vocabPride.pkl"
MODEL_FILE = "rnn.pth"
CONFIG_FILE = "model_config_rnn.txt"
TRAIN_PERPLEXITY_FILE = "train_perplexity_rnn_Pride.txt"
TEST_PERPLEXITY_FILE = "test_perplexity_rnn_Pride.txt"

EMBEDDING_DIM = 100
HIDDEN_DIM = 128
BATCH_SIZE = 16
LEARNING_RATE = 0.001
NUM_EPOCHS = 7
PATIENCE = 3

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

###############################################
# LOAD VOCABULARY
###############################################
def loadVocabulary(vocabFile):
    with open(vocabFile, "rb") as file:
        wordToIndex, indexToWord = pickle.load(file)
    for token in ["<UNK>", "<PAD>"]:
        if token not in wordToIndex:
            wordToIndex[token] = len(wordToIndex)
            indexToWord[len(indexToWord)] = token
    return wordToIndex, indexToWord

wordToIndex, indexToWord = loadVocabulary(VOCAB_FILE)
vocabSize = len(wordToIndex)

###############################################
# DATASET FOR NEXT-TOKEN PREDICTION
###############################################
class SentenceDataset(Dataset):
    def __init__(self, filePath, maxSeqLen, overlapRatio, wordToIndex):
        """
        Each sample is a segment from the file.
        For a segment of tokens [w1, w2, ..., w_T] (with T <= maxSeqLen),
        we create:
          - input:  [w1, w2, ..., w_{T-1}]
          - target: [w2, w3, ..., w_T]
        Both sequences are padded to a fixed length of (maxSeqLen - 1) if needed.
        """
        self.maxSeqLen = maxSeqLen  # total tokens in segment
        self.overlapSize = int(maxSeqLen * overlapRatio)
        self.wordToIndex = wordToIndex
        self.data, self.lengths = self.loadData(filePath)

    def loadData(self, filePath):
        data = []      # list of tuples: (input_seq, target_seq)
        lengths = []   # effective lengths (without padding) for input sequences
        with open(filePath, "r", encoding="utf-8") as file:
            for line in file:
                words = line.strip().split()
                if len(words) < 2:
                    continue  # need at least 2 tokens for input and target
                wordIndices = [self.wordToIndex.get(w, self.wordToIndex["<UNK>"]) for w in words]
                # If the sentence is short enough, use it whole
                if len(wordIndices) <= self.maxSeqLen:
                    input_seq = wordIndices[:-1]
                    target_seq = wordIndices[1:]
                    effective_length = len(input_seq)  # equals len(wordIndices)-1
                    if effective_length < self.maxSeqLen - 1:
                        pad_length = (self.maxSeqLen - 1) - effective_length
                        input_seq = input_seq + [self.wordToIndex["<PAD>"]] * pad_length
                        target_seq = target_seq + [self.wordToIndex["<PAD>"]] * pad_length
                    data.append((input_seq, target_seq))
                    lengths.append(effective_length)
                else:
                    # For long sentences, break into overlapping segments.
                    for i in range(0, len(wordIndices), self.overlapSize):
                        segment = wordIndices[i:i+self.maxSeqLen]
                        if len(segment) < 2:
                            continue
                        input_seq = segment[:-1]
                        target_seq = segment[1:]
                        effective_length = len(input_seq)
                        if effective_length < self.maxSeqLen - 1:
                            pad_length = (self.maxSeqLen - 1) - effective_length
                            input_seq = input_seq + [self.wordToIndex["<PAD>"]] * pad_length
                            target_seq = target_seq + [self.wordToIndex["<PAD>"]] * pad_length
                        data.append((input_seq, target_seq))
                        lengths.append(effective_length)
        return data, lengths

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_seq, target_seq = self.data[idx]
        length = self.lengths[idx]
        return (torch.tensor(input_seq, dtype=torch.long),
                torch.tensor(target_seq, dtype=torch.long),
                length)

###############################################
# RNN LANGUAGE MODEL
###############################################
class RNNLanguageModel(nn.Module):
    def __init__(self, vocabSize, embedDim, hiddenDim):
        super(RNNLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocabSize, embedDim, padding_idx=wordToIndex["<PAD>"])
        self.rnn = nn.RNN(embedDim, hiddenDim, batch_first=True)
        self.fc = nn.Linear(hiddenDim, vocabSize)

    def forward(self, inputs, lengths):
        embedded = self.embedding(inputs)
        # Pack the embedded sequence (lengths must be on CPU)
        packedInputs = rnn_utils.pack_padded_sequence(embedded, lengths.cpu(),
                                                      batch_first=True, enforce_sorted=False)
        packedOutputs, _ = self.rnn(packedInputs)
        outputs, _ = rnn_utils.pad_packed_sequence(packedOutputs, batch_first=True,
                                                   total_length=inputs.size(1))
        logits = self.fc(outputs)
        return logits

###############################################
# TRAINING FUNCTION
###############################################
def trainModel(model, trainLoader, optimizer, lossFunction, numEpochs, patience):
    bestLoss = float("inf")
    noImprove = 0
    for epoch in range(numEpochs):
        model.train()
        totalLoss = 0.0
        for inputs, targets, lengths in trainLoader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            lengths = torch.tensor(lengths, dtype=torch.int64).cpu()  # for packing

            optimizer.zero_grad()
            logits = model(inputs, lengths)  # shape: [batch, seq_len, vocabSize]
            logits_flat = logits.view(-1, vocabSize)
            targets_flat = targets.view(-1)
            loss = lossFunction(logits_flat, targets_flat)
            loss.backward()
            optimizer.step()
            totalLoss += loss.item()

        avgLoss = totalLoss / len(trainLoader)
        print(f"Epoch {epoch+1}: Train Loss = {avgLoss:.4f}")

        if avgLoss < bestLoss:
            bestLoss = avgLoss
            noImprove = 0
        else:
            noImprove += 1
            if noImprove >= patience:
                print("Early stopping triggered.")
                break

###############################################
# PERPLEXITY COMPUTATION
###############################################
def computePerplexity(filePath, model, wordToIndex, maxSeqLen, outFile):
    """
    For each sentence in the file, create an input (all but the last token) and
    target (all but the first token), pad if needed, and compute the perplexity
    only on the non-padded (effective) tokens.
    """
    with open(filePath, "r", encoding="utf-8") as file:
        sentences = [line.strip().split() for line in file if line.strip()]

    totalLogProb = 0.0
    totalCount = 0

    model.eval()
    with open(outFile, "w", encoding="utf-8") as fout:
        for i, words in enumerate(sentences):
            if len(words) < 2:
                continue  # skip sentences that are too short
            words = words[:maxSeqLen]
            indices = [wordToIndex.get(w, wordToIndex["<UNK>"]) for w in words]
            effective_length = len(indices) - 1  # number of predictions
            input_seq = indices[:-1]
            target_seq = indices[1:]
            if effective_length < maxSeqLen - 1:
                pad_length = (maxSeqLen - 1) - effective_length
                input_seq = input_seq + [wordToIndex["<PAD>"]] * pad_length
                target_seq = target_seq + [wordToIndex["<PAD>"]] * pad_length

            inputs = torch.tensor([input_seq], dtype=torch.long).to(device)
            lengths = torch.tensor([effective_length], dtype=torch.int64).cpu()

            with torch.no_grad():
                logits = model(inputs, lengths)
                logProbs = nn.functional.log_softmax(logits, dim=2)
                logProbSum = 0.0
                for j in range(effective_length):
                    logProbSum += logProbs[0, j, target_seq[j]].item()

            sentPerplexity = math.exp(-logProbSum / effective_length)
            fout.write(f"Sentence {i+1}: Perplexity = {sentPerplexity:.4f}\n")
            totalLogProb += logProbSum
            totalCount += effective_length

        overallPerplexity = math.exp(-totalLogProb / totalCount) if totalCount > 0 else float("inf")
        fout.write(f"\nOverall Perplexity: {overallPerplexity:.4f}\n")

    print(f"Perplexity results saved to {outFile}")

###############################################
# SAVE MODEL CONFIGURATION
###############################################
def saveModelInfo(configFile, vocabSize, embedDim, hiddenDim, maxSeqLen, overlapRatio,
                  learningRate, numEpochs, patience):
    with open(configFile, "w", encoding="utf-8") as f:
        f.write("MODEL CONFIGURATION\n")
        f.write("===================\n")
        f.write(f"Vocabulary Size: {vocabSize}\n")
        f.write(f"Embedding Dimension: {embedDim}\n")
        f.write(f"Hidden Dimension: {hiddenDim}\n")
        f.write(f"Max Sequence Length: {maxSeqLen}\n")
        f.write(f"Overlap Ratio: {overlapRatio}\n")
        f.write(f"Learning Rate: {learningRate}\n")
        f.write(f"Number of Epochs: {numEpochs}\n")
        f.write(f"Patience: {patience}\n")
    print(f"Model configuration saved to {configFile}")

###############################################
# MAIN EXECUTION
###############################################
def main():
    # Prepare dataset and loader.
    trainDataset = SentenceDataset(TRAIN_FILE, MAX_SEQ_LEN, OVERLAP_RATIO, wordToIndex)
    trainLoader = DataLoader(trainDataset, batch_size=BATCH_SIZE, shuffle=True)

    model = RNNLanguageModel(vocabSize, EMBEDDING_DIM, HIDDEN_DIM).to(device)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    lossFunction = nn.CrossEntropyLoss(ignore_index=wordToIndex["<PAD>"])

    print("Training RNN model...")
    trainModel(model, trainLoader, optimizer, lossFunction, NUM_EPOCHS, PATIENCE)

    # Save the trained model.
    torch.save(model.state_dict(), MODEL_FILE)
    print(f"Model state_dict saved to {MODEL_FILE}")

    # Save model configuration/info.
    saveModelInfo(CONFIG_FILE, vocabSize, EMBEDDING_DIM, HIDDEN_DIM, MAX_SEQ_LEN,
                  OVERLAP_RATIO, LEARNING_RATE, NUM_EPOCHS, PATIENCE)

    # Compute perplexity on train and test sets.
    computePerplexity(TRAIN_FILE, model, wordToIndex, MAX_SEQ_LEN, TRAIN_PERPLEXITY_FILE)
    computePerplexity(TEST_FILE, model, wordToIndex, MAX_SEQ_LEN, TEST_PERPLEXITY_FILE)

if __name__ == "__main__":
    main()


Training RNN model...


  lengths = torch.tensor(lengths, dtype=torch.int64).cpu()  # for packing


Epoch 1: Train Loss = 5.8639
Epoch 2: Train Loss = 5.1047
Epoch 3: Train Loss = 4.8219
Epoch 4: Train Loss = 4.6127
Epoch 5: Train Loss = 4.4375
Epoch 6: Train Loss = 4.2848
Epoch 7: Train Loss = 4.1458
Model state_dict saved to rnn.pth
Model configuration saved to model_config_rnn.txt
Perplexity results saved to train_perplexity_rnn_Pride.txt
Perplexity results saved to test_perplexity_rnn_Pride.txt


**LSTMCode**

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import pickle
import math
from torch.utils.data import Dataset, DataLoader
import torch.nn.utils.rnn as rnn_utils

###############################################
# CONFIGURATION
###############################################
# Maximum total tokens in a segment.
# Note: The effective input length is MAX_SEQ_LEN-1 because targets are shifted.
MAX_SEQ_LEN = 50
OVERLAP_RATIO = 0.5

TRAIN_FILE = "trainSetUlysses.txt"
TEST_FILE = "testSetUlysses.txt"
VOCAB_FILE = "vocabUlysses.pkl"
MODEL_FILE = "lstm.pth"
CONFIG_FILE = "model_config_lstm.txt"
TRAIN_PERPLEXITY_FILE = "train_perplexity_lstm_Ulysses.txt"
TEST_PERPLEXITY_FILE = "test_perplexity_lstm_Ulysses.txt"

EMBEDDING_DIM = 100
HIDDEN_DIM = 128
BATCH_SIZE = 16
LEARNING_RATE = 0.001
NUM_EPOCHS = 7
PATIENCE = 3

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

###############################################
# LOAD VOCABULARY
###############################################
def loadVocabulary(vocabFile):
    with open(vocabFile, "rb") as file:
        wordToIndex, indexToWord = pickle.load(file)
    for token in ["<UNK>", "<PAD>"]:
        if token not in wordToIndex:
            wordToIndex[token] = len(wordToIndex)
            indexToWord[len(indexToWord)] = token
    return wordToIndex, indexToWord

wordToIndex, indexToWord = loadVocabulary(VOCAB_FILE)
vocabSize = len(wordToIndex)

###############################################
# DATASET FOR NEXT-TOKEN PREDICTION
###############################################
class SentenceDataset(Dataset):
    def __init__(self, filePath, maxSeqLen, overlapRatio, wordToIndex):
        """
        Each sample is a segment from the file.
        For a segment of tokens [w1, w2, ..., w_T] (with T <= maxSeqLen),
        we create:
          - input:  [w1, w2, ..., w_{T-1}]
          - target: [w2, w3, ..., w_T]
        Both sequences are padded to a fixed length of (maxSeqLen - 1) if needed.
        """
        self.maxSeqLen = maxSeqLen  # total tokens in segment
        self.overlapSize = int(maxSeqLen * overlapRatio)
        self.wordToIndex = wordToIndex
        self.data, self.lengths = self.loadData(filePath)

    def loadData(self, filePath):
        data = []      # list of tuples: (input_seq, target_seq)
        lengths = []   # effective lengths (without padding) for input sequences
        with open(filePath, "r", encoding="utf-8") as file:
            for line in file:
                words = line.strip().split()
                if len(words) < 2:
                    continue  # need at least 2 tokens for input and target
                wordIndices = [self.wordToIndex.get(w, self.wordToIndex["<UNK>"]) for w in words]
                # If the sentence is short enough, use it whole
                if len(wordIndices) <= self.maxSeqLen:
                    input_seq = wordIndices[:-1]
                    target_seq = wordIndices[1:]
                    effective_length = len(input_seq)  # equals len(wordIndices)-1
                    if effective_length < self.maxSeqLen - 1:
                        pad_length = (self.maxSeqLen - 1) - effective_length
                        input_seq = input_seq + [self.wordToIndex["<PAD>"]] * pad_length
                        target_seq = target_seq + [self.wordToIndex["<PAD>"]] * pad_length
                    data.append((input_seq, target_seq))
                    lengths.append(effective_length)
                else:
                    # For long sentences, break into overlapping segments.
                    for i in range(0, len(wordIndices), self.overlapSize):
                        segment = wordIndices[i:i+self.maxSeqLen]
                        if len(segment) < 2:
                            continue
                        input_seq = segment[:-1]
                        target_seq = segment[1:]
                        effective_length = len(input_seq)
                        if effective_length < self.maxSeqLen - 1:
                            pad_length = (self.maxSeqLen - 1) - effective_length
                            input_seq = input_seq + [self.wordToIndex["<PAD>"]] * pad_length
                            target_seq = target_seq + [self.wordToIndex["<PAD>"]] * pad_length
                        data.append((input_seq, target_seq))
                        lengths.append(effective_length)
        return data, lengths

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_seq, target_seq = self.data[idx]
        length = self.lengths[idx]
        return (torch.tensor(input_seq, dtype=torch.long),
                torch.tensor(target_seq, dtype=torch.long),
                length)

###############################################
# LSTM LANGUAGE MODEL
###############################################
class LSTMLanguageModel(nn.Module):
    def __init__(self, vocabSize, embedDim, hiddenDim):
        super(LSTMLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocabSize, embedDim, padding_idx=wordToIndex["<PAD>"])
        self.lstm = nn.LSTM(embedDim, hiddenDim, batch_first=True)
        self.fc = nn.Linear(hiddenDim, vocabSize)

    def forward(self, inputs, lengths):
        embedded = self.embedding(inputs)
        # Pack the embedded sequence (lengths must be on CPU)
        packedInputs = rnn_utils.pack_padded_sequence(embedded, lengths.cpu(),
                                                      batch_first=True, enforce_sorted=False)
        packedOutputs, (h_n, c_n) = self.lstm(packedInputs)
        outputs, _ = rnn_utils.pad_packed_sequence(packedOutputs, batch_first=True,
                                                   total_length=inputs.size(1))
        logits = self.fc(outputs)
        return logits

###############################################
# TRAINING FUNCTION
###############################################
def trainModel(model, trainLoader, optimizer, lossFunction, numEpochs, patience):
    bestLoss = float("inf")
    noImprove = 0
    for epoch in range(numEpochs):
        model.train()
        totalLoss = 0.0
        for inputs, targets, lengths in trainLoader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            lengths = torch.tensor(lengths, dtype=torch.int64).cpu()  # for packing

            optimizer.zero_grad()
            logits = model(inputs, lengths)  # shape: [batch, seq_len, vocabSize]
            logits_flat = logits.view(-1, vocabSize)
            targets_flat = targets.view(-1)
            loss = lossFunction(logits_flat, targets_flat)
            loss.backward()
            optimizer.step()
            totalLoss += loss.item()

        avgLoss = totalLoss / len(trainLoader)
        print(f"Epoch {epoch+1}: Train Loss = {avgLoss:.4f}")

        if avgLoss < bestLoss:
            bestLoss = avgLoss
            noImprove = 0
        else:
            noImprove += 1
            if noImprove >= patience:
                print("Early stopping triggered.")
                break

###############################################
# PERPLEXITY COMPUTATION
###############################################
def computePerplexity(filePath, model, wordToIndex, maxSeqLen, outFile):
    """
    For each sentence in the file, create an input (all but the last token) and
    target (all but the first token), pad if needed, and compute the perplexity
    only on the non-padded (effective) tokens.
    """
    with open(filePath, "r", encoding="utf-8") as file:
        sentences = [line.strip().split() for line in file if line.strip()]

    totalLogProb = 0.0
    totalCount = 0

    model.eval()
    with open(outFile, "w", encoding="utf-8") as fout:
        for i, words in enumerate(sentences):
            if len(words) < 2:
                continue  # skip sentences that are too short
            words = words[:maxSeqLen]
            indices = [wordToIndex.get(w, wordToIndex["<UNK>"]) for w in words]
            effective_length = len(indices) - 1  # number of predictions
            input_seq = indices[:-1]
            target_seq = indices[1:]
            if effective_length < maxSeqLen - 1:
                pad_length = (maxSeqLen - 1) - effective_length
                input_seq = input_seq + [wordToIndex["<PAD>"]] * pad_length
                target_seq = target_seq + [wordToIndex["<PAD>"]] * pad_length

            inputs = torch.tensor([input_seq], dtype=torch.long).to(device)
            lengths = torch.tensor([effective_length], dtype=torch.int64).cpu()

            with torch.no_grad():
                logits = model(inputs, lengths)
                logProbs = nn.functional.log_softmax(logits, dim=2)
                logProbSum = 0.0
                for j in range(effective_length):
                    logProbSum += logProbs[0, j, target_seq[j]].item()

            sentPerplexity = math.exp(-logProbSum / effective_length)
            fout.write(f"Sentence {i+1}: Perplexity = {sentPerplexity:.4f}\n")
            totalLogProb += logProbSum
            totalCount += effective_length

        overallPerplexity = math.exp(-totalLogProb / totalCount) if totalCount > 0 else float("inf")
        fout.write(f"\nOverall Perplexity: {overallPerplexity:.4f}\n")

    print(f"Perplexity results saved to {outFile}")

###############################################
# SAVE MODEL CONFIGURATION
###############################################
def saveModelInfo(configFile, vocabSize, embedDim, hiddenDim, maxSeqLen, overlapRatio,
                  learningRate, numEpochs, patience):
    with open(configFile, "w", encoding="utf-8") as f:
        f.write("MODEL CONFIGURATION\n")
        f.write("===================\n")
        f.write(f"Vocabulary Size: {vocabSize}\n")
        f.write(f"Embedding Dimension: {embedDim}\n")
        f.write(f"Hidden Dimension: {hiddenDim}\n")
        f.write(f"Max Sequence Length: {maxSeqLen}\n")
        f.write(f"Overlap Ratio: {overlapRatio}\n")
        f.write(f"Learning Rate: {learningRate}\n")
        f.write(f"Number of Epochs: {numEpochs}\n")
        f.write(f"Patience: {patience}\n")
    print(f"Model configuration saved to {configFile}")

###############################################
# MAIN EXECUTION
###############################################
def main():
    # Prepare dataset and loader.
    trainDataset = SentenceDataset(TRAIN_FILE, MAX_SEQ_LEN, OVERLAP_RATIO, wordToIndex)
    trainLoader = DataLoader(trainDataset, batch_size=BATCH_SIZE, shuffle=True)

    model = LSTMLanguageModel(vocabSize, EMBEDDING_DIM, HIDDEN_DIM).to(device)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    lossFunction = nn.CrossEntropyLoss(ignore_index=wordToIndex["<PAD>"])

    print("Training LSTM model...")
    trainModel(model, trainLoader, optimizer, lossFunction, NUM_EPOCHS, PATIENCE)

    # Save the trained model.
    torch.save(model.state_dict(), MODEL_FILE)
    print(f"Model state_dict saved to {MODEL_FILE}")

    # Save model configuration/info.
    saveModelInfo(CONFIG_FILE, vocabSize, EMBEDDING_DIM, HIDDEN_DIM, MAX_SEQ_LEN,
                  OVERLAP_RATIO, LEARNING_RATE, NUM_EPOCHS, PATIENCE)

    # Compute perplexity on train and test sets.
    computePerplexity(TRAIN_FILE, model, wordToIndex, MAX_SEQ_LEN, TRAIN_PERPLEXITY_FILE)
    computePerplexity(TEST_FILE, model, wordToIndex, MAX_SEQ_LEN, TEST_PERPLEXITY_FILE)

if __name__ == "__main__":
    main()


Training LSTM model...


  lengths = torch.tensor(lengths, dtype=torch.int64).cpu()  # for packing


Epoch 1: Train Loss = 6.6250
Epoch 2: Train Loss = 5.8983
Epoch 3: Train Loss = 5.5295
Epoch 4: Train Loss = 5.2154
Epoch 5: Train Loss = 4.9427
Epoch 6: Train Loss = 4.7039
Epoch 7: Train Loss = 4.4811
Model state_dict saved to lstm.pth
Model configuration saved to model_config_lstm.txt
Perplexity results saved to train_perplexity_lstm_Ulysses.txt
Perplexity results saved to test_perplexity_lstm_Ulysses.txt
