---

<h1 style="text-align: center;">INLP - Assignment 2</h1>

<div style="text-align: center;">
    <p>Name: Vedant Nipane</p>
    <p>Roll No: 2021102040</p>
</div>

---

## Importing Libraries and Corpus

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.utils.rnn as rnn_utils
import numpy as np
import os
import random
import re
from collections import Counter

In [2]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
text1_path = 'Pride and Prejudice - Jane Austen.txt'
text2_path = 'Ulysses - James Joyce.txt'

# 0. Preprocessing

## Functions

In [3]:

# Load text and split into sentences
def load_text(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read().lower()  # Convert to lowercase

    sentences = re.split(r'[.!?]', text)  # Split on sentence boundaries
    sentences = [s.strip() for s in sentences if s.strip()]  # Remove empty lines
    return sentences

# Split into train and test (1000 test sentences, rest train)
def split_data(sentences, test_size=1000):
    random.shuffle(sentences)  # Shuffle sentences to ensure randomness
    test_sentences = sentences[:test_size]
    train_sentences = sentences[test_size:]
    return train_sentences, test_sentences

# Tokenize a list of sentences
def tokenize(sentences):
    tokenized_sentences = []
    for sentence in sentences:
        words = re.sub(r"[^a-zA-Z\s]", "", sentence).split()  # Remove special chars & split
        tokenized_sentences.append(words)
    return tokenized_sentences

# Build vocabulary from training data only
def build_vocab(tokenized_sentences, min_freq=2):
    word_counts = Counter(word for sentence in tokenized_sentences for word in sentence)
    vocab = {word: idx for idx, (word, freq) in enumerate(word_counts.items()) if freq >= min_freq}
    vocab["<UNK>"] = len(vocab)  # Add unknown token
    return vocab

# Convert words to numerical indices
def words_to_indices(sentences, vocab):
    indexed_sentences = []
    for sentence in sentences:
        indexed_sentences.append([vocab.get(word, vocab["<UNK>"]) for word in sentence])
    return indexed_sentences

# Generate n-gram dataset
def create_ngrams(indexed_sentences, n=3):
    data = []
    for sentence in indexed_sentences:
        if len(sentence) >= n:
            for i in range(len(sentence) - n):
                context = sentence[i : i + n]  # First (n) words
                target = sentence[i + n]  # Next word (prediction target)
                data.append((context, target))
    return data

# Convert dataset to PyTorch tensors
def prepare_tensors(data):
    contexts = torch.tensor([x[0] for x in data], dtype=torch.long)
    targets = torch.tensor([x[1] for x in data], dtype=torch.long)
    return contexts, targets


# Function to pad sequences
def pad_sentences(indexed_sentences, pad_token=0, max_len=None):
    """Pad sentences to the same length for batch processing."""
    if max_len is None:
        max_len = max(len(s) for s in indexed_sentences)  # Find longest sequence

    padded_sentences = [s + [pad_token] * (max_len - len(s)) for s in indexed_sentences]
    return padded_sentences

# Function to prepare LSTM tensors
def prepare_lstm_tensors(indexed_sentences, vocab, pad_token=0):
    """Convert sentences to tensors with padding."""
    indexed_sentences = pad_sentences(indexed_sentences, pad_token)
    sequences = torch.tensor(indexed_sentences, dtype=torch.long)
    targets = torch.tensor([s[1:] + [pad_token] for s in indexed_sentences], dtype=torch.long)  # Shifted targets

    return sequences, targets  # (Batch_size, Seq_len), (Batch_size, Seq_len)

## Preprocessing of Corpus 1

In [27]:
# Load text and preprocess
sentences1 = load_text(text1_path)
train_sentences1, test_sentences1 = split_data(sentences1)

train_tokens1 = tokenize(train_sentences1)
test_tokens1 = tokenize(test_sentences1)

train_vocab1 = build_vocab(train_tokens1, min_freq=1)
# Create a mapping from indices to words (inverse of train_vocab)# Convert words to indices
train_indices1 = words_to_indices(train_tokens1, train_vocab1)
test_indices1 = words_to_indices(test_tokens1, train_vocab1)  # Use same vocab

# Convert to tensors for LSTM
X_train_lstm1, y_train_lstm1 = prepare_lstm_tensors(train_indices1, vocab=train_vocab1)
X_test_lstm1, y_test_lstm1 = prepare_lstm_tensors(test_indices1, vocab=train_vocab1)

idx_to_word1 = {idx: word for word, idx in train_vocab1.items()}


# Print dataset sizes
print(f"Vocabulary Size (1): {len(train_vocab1)}")
print(f"Train LSTM Dataset Size (1): {X_train_lstm1.shape}")
print(f"Test LSTM Dataset Size (1): {X_test_lstm1.shape}")


Vocabulary Size (1): 6620
Train LSTM Dataset Size (1): torch.Size([6519, 118])
Test LSTM Dataset Size (1): torch.Size([1000, 112])


## Preprocessing of Corpus 2

In [None]:
# Load text and preprocess
sentences2 = load_text(text2_path)
train_sentences2, test_sentences2 = split_data(sentences2)

train_tokens2 = tokenize(train_sentences2)
test_tokens2 = tokenize(test_sentences2)

train_vocab2 = build_vocab(train_tokens2, min_freq=1)
train_indices2 = words_to_indices(train_tokens2, train_vocab2)
test_indices2 = words_to_indices(test_tokens2, train_vocab2)

# Convert to tensors for LSTM
X_train_lstm2, y_train_lstm2 = prepare_lstm_tensors(train_indices2, vocab=train_vocab2)
X_test_lstm2, y_test_lstm2 = prepare_lstm_tensors(test_indices2, vocab=train_vocab2)

idx_to_word2 = {idx: word for word, idx in train_vocab2.items()}


# Print dataset sizes
print(f"Vocabulary Size (2): {len(train_vocab2)}")
print(f"Train LSTM Dataset Size (2): {X_train_lstm2.shape}")
print(f"Test LSTM Dataset Size (2): {X_test_lstm2.shape}")


Vocabulary Size (2): 29023
Train LSTM Dataset Size (2): torch.Size([24427, 12824])
Test LSTM Dataset Size (2): torch.Size([1000, 155])


# 3. LSTM

## 3.1 Model Architecture

In [4]:

class LSTMLanguageModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, dropout=0.3):
        super(LSTMLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim,
                            num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, hidden=None):
        x = self.embedding(x)  # Convert indices to embeddings
        x, hidden = self.lstm(x, hidden)  # Pass through LSTM
        x = self.dropout(x)  # Apply dropout
        x = self.fc(x)  # Project to vocabulary size
        return x, hidden  # (Batch, Seq_len, Vocab_size), hidden states


## 3.2 Perplexity Calculation Functions

In [7]:
def calculate_sentence_perplexity_lstm(model, sentence_indices, device):
    """Computes perplexity for a single sentence using an LSTM model."""
    model.eval()
    
    if not sentence_indices or len(sentence_indices) < 2:  # Skip invalid sentences
        return None  

    # Convert sentence indices to tensor and move to device
    sentence_tensor = torch.tensor(sentence_indices, dtype=torch.long, device=device).unsqueeze(0)  # Shape: (1, seq_len)
    
    batch_size = 1  # Since we process one sentence at a time
    num_layers = model.lstm.num_layers  # Get LSTM num_layers
    hidden_dim = model.lstm.hidden_size  # Get LSTM hidden_size

    hidden = torch.zeros(num_layers, batch_size, hidden_dim, device=device)
    cell = torch.zeros(num_layers, batch_size, hidden_dim, device=device)

    loss_function = nn.CrossEntropyLoss()
    total_loss = 0.0
    count = 0

    with torch.no_grad():
        for i in range(len(sentence_indices) - 1):  # Predict each word given previous ones
            input_word = sentence_tensor[:, i].unsqueeze(1)  # Shape: (1, 1)
            target_word = sentence_tensor[:, i + 1]  # Shape: (1,)

            output, (hidden, cell) = model(input_word, (hidden, cell))  # Forward pass
            output = output.squeeze(1)  # Remove sequence dimension (1, vocab_size)

            loss = loss_function(output, target_word)  # Compute loss
            total_loss += loss.item()
            count += 1

    avg_loss = total_loss / count if count > 0 else float('inf')
    perplexity = torch.exp(torch.tensor(avg_loss)).item()  # Convert loss to perplexity

    return perplexity


def save_perplexity_results_lstm(corpus_name, dataset_type, sentences_indices, perplexities):
    """Save perplexity results to file for LSTM"""
    file_name = f"2021102040_lstm_{corpus_name}_{dataset_type}-perplexity.txt"
    file_path = os.path.join('Perplexity', file_name)

    with open(file_path, 'w') as f:
        avg_perplexity = np.mean([p for p in perplexities if p != float('inf')])
        f.write(f"Overall Average Perplexity: {avg_perplexity:.2f}\n\n")
        
        for idx, (sentence, perp) in enumerate(zip(sentences_indices, perplexities), 1):
            f.write(f" {sentence} - Perplexity: {perp:.2f}\n")

def evaluate_and_save_perplexity_lstm(model, sentences_indices, corpus_name, dataset_type, device='cpu'):
    """Evaluate perplexity for each sentence using an LSTM and save results"""
    perplexities = []

    for sentence in sentences_indices:
        if len(sentence) > 1:  # Ensure valid sentence length
            perp = calculate_sentence_perplexity_lstm(model, sentence, device)
            if perp != float('inf'):
                perplexities.append(perp)

    save_perplexity_results_lstm(corpus_name, dataset_type, sentences_indices, perplexities)
    return np.mean(perplexities)


## 3.3 Training

In [8]:
def train_helper(model,train_vocab,X_train,y_train,num_epochs,batch_size):
    vocab_size = len(train_vocab)
    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding token
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.long).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)

    # Training loop
    num_epochs = 5
    batch_size = 32  # Define a batch size
    epoch_loss = []
    for epoch in range(num_epochs):
        model.train()  # Set to training mode
        total_loss = 0

        for i in range(0, len(X_train_tensor), batch_size):
            X_batch = X_train_tensor[i : i + batch_size]
            y_batch = y_train_tensor[i : i + batch_size]

            optimizer.zero_grad()
            output, _ = model(X_batch)

            # Reshape outputs for loss computation
            loss = criterion(output.view(-1, vocab_size), y_batch.view(-1))
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / (len(X_train_tensor) / batch_size)
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f}")
        epoch_loss.append(avg_loss)
    return epoch_loss

### Training on Corpus 1

In [9]:
vocab_size = len(train_vocab1)
embedding_dim = 128
hidden_dim = 256
num_layers = 2
dropout = 0.3

# Instantiate model
model_lstm_1 = LSTMLanguageModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout)
model_lstm_1.to(device)

LSTMLanguageModel(
  (embedding): Embedding(6605, 128)
  (lstm): LSTM(128, 256, num_layers=2, batch_first=True)
  (fc): Linear(in_features=256, out_features=6605, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [10]:
loss_epoch1 = train_helper(model_lstm_1,train_vocab1,X_train_lstm1,y_train_lstm1,num_epochs=5,batch_size=32)

  from .autonotebook import tqdm as notebook_tqdm
  X_train_tensor = torch.tensor(X_train, dtype=torch.long).to(device)
  y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)


Epoch 1/5 - Loss: 6.7679
Epoch 2/5 - Loss: 6.3088
Epoch 3/5 - Loss: 5.9350
Epoch 4/5 - Loss: 5.6547
Epoch 5/5 - Loss: 5.4485


In [37]:
model_path = "Models/2021102040_lstm_corpus1.pth"
os.makedirs("Models", exist_ok=True)
save_data = {
    "model_state_dict": model_lstm_1.state_dict(),  # Model weights
    "word_to_idx": train_vocab1,  # Vocabulary mapping
    "idx_to_word": idx_to_word1,  # Reverse mapping
    "hyperparams": {
        "embedding_dim": embedding_dim,
        "hidden_dim": hidden_dim,
        "vocab_size": len(train_vocab1),
        "num_layers": num_layers,  # Number of LSTM layers
    },
}
torch.save(save_data, model_path)
print(f"✅ LSTM Model and metadata saved at: {model_path}")

✅ LSTM Model and metadata saved at: Models/2021102040_lstm_corpus1.pth


### Training on Corpus 2

In [38]:
vocab_size = len(train_vocab2)
embedding_dim = 128
hidden_dim = 256
num_layers = 2
dropout = 0.3

# Instantiate model
model_lstm_2 = LSTMLanguageModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout)
model_lstm_2.to(device)

LSTMLanguageModel(
  (embedding): Embedding(28998, 128)
  (lstm): LSTM(128, 256, num_layers=2, batch_first=True)
  (fc): Linear(in_features=256, out_features=28998, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [None]:
loss_epoch2 = train_helper(model_lstm_2,train_vocab2,X_train_lstm2,y_train_lstm2,num_epochs=5,batch_size=4)

  X_train_tensor = torch.tensor(X_train, dtype=torch.long).to(device)


In [None]:
model_path = "Models/2021102040_lstm_corpus2.pt"
os.makedirs("Models", exist_ok=True)
torch.save(model_lstm_2.state_dict(), model_path)
print(f"✅ Model saved at: {model_path}")

✅ Model saved at: Models/2021102040_lstm_corpus1.pt


## 3.4 Perplexity Calculation

### Perplexity for Corpus 1

In [None]:

# Compute and save train perplexity
train_perplexity1 = evaluate_and_save_perplexity_lstm(
    model_lstm_1, train_indices1, corpus_name="corpus1", dataset_type="train", device=device
)
print(f"Train Perplexity: {train_perplexity1:.2f}")



Train Perplexity: 7501.92


In [None]:
# Compute and save test perplexity
test_perplexity1 = evaluate_and_save_perplexity_lstm(
    model_lstm_1, test_indices1, corpus_name="corpus1", dataset_type="test", device=device
)
print(f"Test Perplexity: {test_perplexity1:.2f}")


Test Perplexity: 24496.74


### Perplexity for Corpus 2

In [None]:

# Compute and save train perplexity
train_perplexity2 = evaluate_and_save_perplexity_lstm(
    model_lstm_2, train_indices2, corpus_name="corpus2", dataset_type="train", device=device
)
print(f"Train Perplexity: {train_perplexity2:.2f}")


Train Perplexity: 7501.92


In [None]:
# Compute and save test perplexity
test_perplexity2 = evaluate_and_save_perplexity_lstm(
    model_lstm_2, test_indices2, corpus_name="corpus2", dataset_type="test", device=device
)
print(f"Test Perplexity: {test_perplexity2:.2f}")


Test Perplexity: 24496.74


## 3.5 Next Word Prediction

In [5]:
import torch
import torch.nn.functional as F

def predict_top_k_words_lstm(model, context_sentence, vocab, k, device='cpu'):
    """
    Predict the top-k next words for a given context sentence using the LSTM model.
    
    Args:
    - model: Trained LSTM model.
    - context_sentence: List of word indices representing the input sentence.
    - vocab: Dictionary mapping indices to words.
    - k: Number of top words to return.
    - device: 'cpu' or 'cuda'.
    
    Returns:
    - List of (word, probability) tuples.
    """
    model.eval()

    # Convert context sentence to tensor (batch_size=1, seq_len)
    context_tensor = torch.tensor(context_sentence, dtype=torch.long, device=device).unsqueeze(0)  # Shape: (1, seq_len)

    # Retrieve LSTM parameters
    num_layers = model.lstm.num_layers
    hidden_dim = model.lstm.hidden_size

    # Initialize hidden and cell states
    hidden = torch.zeros(num_layers, 1, hidden_dim, device=device)  # Shape: (num_layers, batch=1, hidden_dim)
    cell = torch.zeros(num_layers, 1, hidden_dim, device=device)

    with torch.no_grad():
        # Ensure input is correctly shaped for the model
        context_tensor = context_tensor.view(1, -1)  # Reshape to (batch=1, seq_len)
        
        # Pass through embedding layer manually if needed
        if hasattr(model, "embedding"):  
            context_tensor = model.embedding(context_tensor)  # Shape: (1, seq_len, embedding_dim)

        # Pass through LSTM
        output, _ = model.lstm(context_tensor, (hidden, cell))  

    # Get last word's output probabilities
    last_word_logits = output[:, -1, :]  # (1, vocab_size)
    probabilities = F.softmax(last_word_logits, dim=-1)  # Apply softmax to get probabilities

    # Get top-k words
    top_k_probs, top_k_indices = torch.topk(probabilities, k)  # (1, k)

    # Convert indices to words
    top_k_words = [(vocab[idx.item()], top_k_probs[0, i].item()) for i, idx in enumerate(top_k_indices[0])]

    return top_k_words


In [15]:

def sentence_to_tensor(sentence, word_to_idx, context_size, device='cpu'):
    """
    Convert a sentence into a tensor format suitable for the RNN model.
    
    Args:
        sentence (str): Input sentence.
        word_to_idx (dict): Mapping from words to indices.
        context_size (int): Number of words used as context.
        device (str): Device to place the tensor ('cpu' or 'cuda').

    Returns:
        torch.Tensor: Context tensor of shape (1, context_size).
    """
    sentence = sentence.lower()
    words = re.sub(r"[^a-zA-Z\s]", "", sentence).split()
    
    if len(words) < context_size:
        raise ValueError(f"Input sentence must have at least {context_size} words")
    
    # Extract the last `context_size` words
    context_words = words[-context_size:]
    
    # Convert words to indices
    context_indices = [word_to_idx.get(word, word_to_idx["<UNK>"]) for word in context_words]
    
    # Convert to tensor (1, context_size)
    context_tensor = torch.tensor([context_indices], dtype=torch.long).to(device)
    
    return context_tensor


def print_predictions(sentence, predictions):
    print(f"\nInput sentence: {sentence}")
    print("Top 5 predicted next words:")
    for word, prob in predictions:
        print(f"{word}: {prob:.4f}")

In [None]:

context_sentence = "I hate to"
context_tensor = sentence_to_tensor(context_sentence, train_vocab1, context_size=3, device='cpu')

# Fix: Convert tensor back to list of indices
top_words = predict_top_k_words_lstm(model_lstm_1, context_tensor.tolist()[0], idx_to_word1, k=3, device='cpu')
print(top_words)


[('of', 0.009940087795257568), ('intervals', 0.009896052069962025), ('elizabeth', 0.009778119623661041)]


In [14]:
# Load the saved checkpoint
model_path = "Models/2021102040_lstm_corpus1.pth"
checkpoint = torch.load(model_path, map_location="cpu")

# Restore vocabulary and prevent dynamic resizing
word_to_idx = checkpoint["word_to_idx"]
idx_to_word = checkpoint["idx_to_word"]

# Use stored vocab size
vocab_size = checkpoint["hyperparams"]["vocab_size"]

# Extract hyperparameters
embedding_dim = checkpoint["hyperparams"]["embedding_dim"]
hidden_dim = checkpoint["hyperparams"]["hidden_dim"]
num_layers = checkpoint["hyperparams"]["num_layers"]

# Build model with correct vocab size
model_lstm_loaded = LSTMLanguageModel(vocab_size, embedding_dim, hidden_dim, num_layers)

# Load model weights
model_lstm_loaded.load_state_dict(checkpoint["model_state_dict"])
model_lstm_loaded.eval()


LSTMLanguageModel(
  (embedding): Embedding(6620, 128)
  (lstm): LSTM(128, 256, num_layers=2, batch_first=True)
  (fc): Linear(in_features=256, out_features=6620, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [21]:
context_sentence = "I know that"
context_tensor = sentence_to_tensor(context_sentence, word_to_idx=word_to_idx, context_size=3, device='cpu')

# Fix: Convert tensor back to list of indices
top_words = predict_top_k_words_lstm(model_lstm_loaded, context_tensor.tolist()[0], idx_to_word, k=5, device='cpu')
print_predictions(context_sentence,top_words)


Input sentence: I know that
Top 5 predicted next words:
intervals: 0.0095
little: 0.0093
f: 0.0092
declared: 0.0091
play: 0.0088
