In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
import nltk
from collections import Counter
import numpy as np
import random
import re
nltk.download('punkt_tab')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


In [2]:
def clean_text(text):
     # Remove URLs
    text = re.sub(r'http[s]?://\S+|www\.\S+', '', text)

    # Remove HTML/XML tags
    text = re.sub(r'<[^>]+>', '', text)

    # Remove chapter headings (e.g., "Chapter 1", "CHAPTER ONE", "CH 1")
    text = re.sub(r'\b(chapter|ch)\b[\s\divx]+', '', text, flags=re.IGNORECASE)

    # Remove copyright and legal notices (e.g., "Copyright © 2023", "All rights reserved")
    text = re.sub(r'\b(copyright|©|all rights reserved|no part of this book)\b.*', '', text, flags=re.IGNORECASE)

    # Remove long sequences of digits (e.g., page numbers or codes)
    text = re.sub(r'\b\d{4,}\b', '', text)

    # Remove special characters, numbers, and punctuation except for basic ones
    text = re.sub(r'[^\w\s.,!?]', ' ', text)

    text = text.replace("_", " ")
    text = text.replace("--", " ")
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def tokenizer(sentences, n):
    tokens = []
    padding = ["<s>"] * (n - 1)
    for sentence in sentences:
        sentence = clean_text(sentence.lower())
        tokens += padding + nltk.word_tokenize(sentence) + ["</s>"]
    return tokens


N = 3
corpus_path = "/content/Ulysses - James Joyce.txt"
with open(corpus_path, 'r', encoding='utf-8') as file:
        raw_corpus = file.read()
sentences = nltk.sent_tokenize(raw_corpus)
test_sentences, train_sentences = train_test_split(sentences, test_size=(len(sentences) - 1000) / len(sentences), random_state=42)
tokens = tokenizer(train_sentences, N)

In [3]:
word_counts = Counter(tokens)

# Step 2: Identify words that occur only once
unique_words = [word for word, count in word_counts.items() if count == 1]

# Step 3: Randomly select 5% of the total vocabulary as <UNK> with a fixed seed
random.seed(42)  # Ensures reproducibility
num_unk = max(1, int(len(word_counts) * 0.05))  # Ensure at least 1 word is selected
if len(unique_words) < num_unk:
    random_unk_words = set(unique_words)
else:
    random.seed(42)  # Ensure reproducibility
    random_unk_words = set(random.sample(unique_words, num_unk))

# Step 4: Build vocabulary, replacing selected words with <UNK>
vocab = {"<UNK>": 0}  # Start with <UNK>
for word in word_counts.keys():
    if word not in random_unk_words:
        vocab[word] = len(vocab)  # Assign index

vocab_size = len(vocab)

# Step 6: Generate trigrams
def generate_trigrams(tokens):
    return [([tokens[i], tokens[i+1]], tokens[i+2]) for i in range(len(tokens) - 2)]

def generate_5grams(tokens):
    return [([tokens[i], tokens[i+1], tokens[i+2], tokens[i+3]], tokens[i+4]) for i in range(len(tokens) - 4)]

dataset_trigram = generate_trigrams(tokens)
dataset_5gram = generate_5grams(tokens)

# Step 7: Convert words to indices, replacing rare words with <UNK>
def encode_dataset_3(dataset, vocab, random_unk_words):
    encoded = []
    for (w1, w2), w3 in dataset:
        # Replace rare words with <UNK>
        w1 = w1 if w1 not in random_unk_words else "<UNK>"
        w2 = w2 if w2 not in random_unk_words else "<UNK>"
        w3 = w3 if w3 not in random_unk_words else "<UNK>"
        encoded.append(([vocab[w1], vocab[w2]], vocab[w3]))
    return encoded

def encode_dataset_5(dataset, vocab, random_unk_words):
    encoded = []
    for (w1, w2, w3, w4), w5 in dataset:
        # Replace rare words with <UNK>
        w1 = w1 if w1 not in random_unk_words else "<UNK>"
        w2 = w2 if w2 not in random_unk_words else "<UNK>"
        w3 = w3 if w3 not in random_unk_words else "<UNK>"
        w4 = w4 if w4 not in random_unk_words else "<UNK>"
        w5 = w5 if w5 not in random_unk_words else "<UNK>"
        encoded.append(([vocab[w1], vocab[w2], vocab[w3], vocab[w4]], vocab[w5]))
    return encoded

# encoded_sequences = encode_sequence(sequences, vocab, random_unk_words)
encoded_dataset_3 = encode_dataset_3(dataset_trigram, vocab, random_unk_words)
encoded_dataset_5 = encode_dataset_5(dataset_5gram, vocab, random_unk_words)

In [4]:
class TrigramDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        context, target = self.data[idx]
        return torch.tensor(context, dtype=torch.long), torch.tensor(target, dtype=torch.long)

class FFNNLanguageModelTrigram(nn.Module):
    def __init__(self, vocab_size, embed_size=100, hidden_size=50):
        """
        A simple feed-forward neural network language model.

        Args:
            vocab_size (int): Size of the vocabulary.
            embed_size (int): Dimension of the embedding vectors.
            hidden_size (int): Number of neurons in the hidden layer.
        """
        super(FFNNLanguageModelTrigram, self).__init__()
        # Embedding layer converts word indices to vectors.
        self.embedding = nn.Embedding(vocab_size, embed_size)
        # For a trigram model, the context is 2 words (hence embed_size * 2)
        self.fc1 = nn.Linear(embed_size * 2, hidden_size)
        self.relu = nn.ReLU()
        # Output layer produces a score for each word in the vocabulary.
        self.fc2 = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        """
        Forward pass through the network.

        Args:
            x (torch.Tensor): Tensor of shape (batch_size, context_length)
                              where context_length = 2 for trigrams.
        Returns:
            torch.Tensor: Output logits of shape (batch_size, vocab_size).
        """
        # Get embeddings for each word in the context.
        embeds = self.embedding(x)              # Shape: (batch_size, 2, embed_size)
        embeds = embeds.view(embeds.size(0), -1)  # Flatten to (batch_size, 2 * embed_size)
        hidden = self.relu(self.fc1(embeds))      # Hidden layer with ReLU activation
        output = self.fc2(hidden)                 # Output logits for each vocabulary word
        return output



batch_size = 32
epochs = 3
learning_rate = 0.001

train_dataset = TrigramDataset(encoded_dataset_3)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

model = FFNNLanguageModelTrigram(vocab_size).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    total_loss = 0
    for context, target in train_loader:
        context, target = context.to(device), target.to(device)  # Move to GPU
        optimizer.zero_grad()
        output = model(context)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}")

Epoch 1, Loss: 5.7211
Epoch 2, Loss: 5.2195
Epoch 3, Loss: 5.0614


In [5]:
torch.save(model, "Uly_FFNN_3.pt")

In [6]:
import torch
import torch.nn.functional as F
import numpy as np
import nltk

# Make sure necessary downloads are done
# nltk.download('punkt')

# Assume model and vocab are already defined, and device is set:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = ... (loaded or defined and set to eval mode)
model.eval()
N = 3

def calculate_trigram_log_probability(model, trigram, vocab):
    """
    Compute the log probability for a single trigram.
    trigram: a tuple of three tokens, e.g. ("the", "cat", "sat")
    """
    # Since trigram is exactly 3 tokens, we can directly unpack:
    w1, w2, w3 = trigram

    # Replace OOV words with <UNK>
    w1 = w1 if w1 in vocab else "<UNK>"
    w2 = w2 if w2 in vocab else "<UNK>"
    w3 = w3 if w3 in vocab else "<UNK>"

    # Create context tensor from the first two words
    context_tensor = torch.tensor([[vocab[w1], vocab[w2]]], dtype=torch.long).to(device)

    # Forward pass: add batch dimension
    output = model(context_tensor.unsqueeze(0))  # shape: (1, vocab_size)

    # Convert logits to log probabilities
    log_probs = F.log_softmax(output, dim=1)

    # Get log probability for the actual next word
    target_idx = vocab[w3]
    word_log_prob = log_probs[0, target_idx].item()

    return word_log_prob

# Example usage: compute perplexity for each sentence in train_sentences
perplexities = []
for sentence in train_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    for sent in sentence_list:
        tokens = tokenizer([sent], N)  # tokenizer expects a list of sentences
        # Build trigrams: sliding window of size N
        ngrams = [tuple(tokens[i:i+N]) for i in range(len(tokens) - N + 1)]
        log_prob = 0.0
        for trigram in ngrams:
            trigram_log_prob = calculate_trigram_log_probability(model, trigram, vocab)
            log_prob += trigram_log_prob
        # Number of predictions is len(tokens) - (N - 1)
        num_predictions = len(tokens) - (N - 1)
        perplexity = np.exp(-log_prob / num_predictions)
        perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)

# Write results to file
type_str = "train"
with open(f"2022102078_Uly_FFNN_{N}_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(train_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")



# Example usage: compute perplexity for each sentence in train_sentences
perplexities = []
for sentence in test_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    for sent in sentence_list:
        tokens = tokenizer([sent], N)  # tokenizer expects a list of sentences
        # Build trigrams: sliding window of size N
        ngrams = [tuple(tokens[i:i+N]) for i in range(len(tokens) - N + 1)]
        log_prob = 0.0
        for trigram in ngrams:
            trigram_log_prob = calculate_trigram_log_probability(model, trigram, vocab)
            log_prob += trigram_log_prob
        # Number of predictions is len(tokens) - (N - 1)
        num_predictions = len(tokens) - (N - 1)
        perplexity = np.exp(-log_prob / num_predictions)
        perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)

# Write results to file
type_str = "test"
with open(f"2022102078_Uly_FFNN_{N}_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(test_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")

In [7]:
class FivegramDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        context, target = self.data[idx]
        return torch.tensor(context, dtype=torch.long), torch.tensor(target, dtype=torch.long)

class FFNNLanguageModelFivegram(nn.Module):
    def __init__(self, vocab_size, embed_size=100, hidden_size = 50):
        super(FFNNLanguageModelFivegram, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.fc1 = nn.Linear(embed_size * 4, hidden_size)
        self.relu = nn.ReLU()
        # self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_size, vocab_size)
        # self.fc3 = nn.Linear(hidden2, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        x = x.view(x.size(0), -1)  # Flatten embedding
        x = self.relu(self.fc1(x))
        # x = self.dropout(x)
        x = self.fc2(x)
        # x = self.relu(x)
        # x = self.dropout(x)
        # x = self.fc3(x)
        return x

batch_size = 32
epochs = 3
learning_rate = 0.001

train_dataset = FivegramDataset(encoded_dataset_5)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

model = FFNNLanguageModelFivegram(vocab_size).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    total_loss = 0
    for context, target in train_loader:
        context, target = context.to(device), target.to(device)  # Move to GPU
        optimizer.zero_grad()
        output = model(context)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}")

Epoch 1, Loss: 5.8358
Epoch 2, Loss: 5.4054
Epoch 3, Loss: 5.2829


In [8]:
torch.save(model, "Uly_FFNN_5.pt")

In [9]:
model.eval()
N = 5

def calculate_5gram_log_probability(model, fgram, vocab):
    """
    Compute the log probability for a single trigram.
    trigram: a tuple of three tokens, e.g. ("the", "cat", "sat")
    """
    # Since trigram is exactly 3 tokens, we can directly unpack:
    w1, w2, w3, w4, w5 = fgram

    # Replace OOV words with <UNK>
    w1 = w1 if w1 in vocab else "<UNK>"
    w2 = w2 if w2 in vocab else "<UNK>"
    w3 = w3 if w3 in vocab else "<UNK>"
    w4 = w4 if w4 in vocab else "<UNK>"
    w5 = w5 if w5 in vocab else "<UNK>"

    # Create context tensor from the first two words
    context_tensor = torch.tensor([[vocab[w1], vocab[w2], vocab[w3], vocab[w4]]], dtype=torch.long).to(device)

    # Forward pass: add batch dimension
    output = model(context_tensor.unsqueeze(0))  # shape: (1, vocab_size)

    # Convert logits to log probabilities
    log_probs = F.log_softmax(output, dim=1)

    # Get log probability for the actual next word
    target_idx = vocab[w5]
    word_log_prob = log_probs[0, target_idx].item()

    return word_log_prob

# Example usage: compute perplexity for each sentence in train_sentences
perplexities = []
for sentence in train_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    for sent in sentence_list:
        tokens = tokenizer([sent], N)  # tokenizer expects a list of sentences
        # Build trigrams: sliding window of size N
        ngrams = [tuple(tokens[i:i+N]) for i in range(len(tokens) - N + 1)]
        log_prob = 0.0
        for fgram in ngrams:
            fgram_log_prob = calculate_5gram_log_probability(model, fgram, vocab)
            log_prob += fgram_log_prob
        # Number of predictions is len(tokens) - (N - 1)
        num_predictions = len(tokens) - (N - 1)
        perplexity = np.exp(-log_prob / num_predictions)
        perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)


# Write results to file
type_str = "train"
with open(f"2022102078_Uly_FFNN_{N}_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(train_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")


# Example usage: compute perplexity for each sentence in train_sentences
perplexities = []
for sentence in test_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    for sent in sentence_list:
        tokens = tokenizer([sent], N)  # tokenizer expects a list of sentences
        # Build trigrams: sliding window of size N
        ngrams = [tuple(tokens[i:i+N]) for i in range(len(tokens) - N + 1)]
        log_prob = 0.0
        for fgram in ngrams:
            fgram_log_prob = calculate_5gram_log_probability(model, fgram, vocab)
            log_prob += fgram_log_prob
        # Number of predictions is len(tokens) - (N - 1)
        num_predictions = len(tokens) - (N - 1)
        perplexity = np.exp(-log_prob / num_predictions)
        perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)


# Write results to file
type_str = "test"
with open(f"2022102078_Uly_FFNN_{N}_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(test_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")


In [4]:
# Define a Dataset for our token sequences
class TextDataset(Dataset):
    def __init__(self, sequences):
        """
        sequences: list of (input_seq, target_seq) pairs,
                   where each sequence is a list of token indices.
        """
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        input_seq, target_seq = self.sequences[idx]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target_seq, dtype=torch.long)

# Define the vanilla RNN language model
class RNNLanguageModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1):
        super(RNNLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        # Use vanilla RNN here
        self.rnn = nn.RNN(embed_dim, hidden_dim, num_layers, batch_first=True)
        # Fully connected layer to map RNN outputs to vocabulary logits
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, hidden=None):
        # x shape: (batch_size, seq_length)
        embed = self.embedding(x)            # (batch_size, seq_length, embed_dim)
        output, hidden = self.rnn(embed, hidden)  # (batch_size, seq_length, hidden_dim)
        logits = self.fc(output)             # (batch_size, seq_length, vocab_size)
        return logits, hidden

In [14]:
vocab_size    = len(vocab)
embed_dim     = 200   # Embedding dimensionality
hidden_dim    = 50    # Hidden state dimensionality
num_layers    = 1
batch_size    = 128
num_epochs    = 10
learning_rate = 0.001
accumulation_steps = 4  # Update every 4 batches

def create_sequences(tokens, seq_length):
    sequences = []
    for i in range(0, len(tokens) - seq_length):
        input_seq = tokens[i:i+seq_length]
        target_seq = tokens[i+1:i+seq_length+1]  # shifted by one
        sequences.append((input_seq, target_seq))
    return sequences

# Example usage:
seq_length = 50  # or any other fixed length suitable for your model/memory
sequences = create_sequences(tokens, seq_length)

def encode_sequence(sequences, vocab, random_unk_words):
    encoded_sequences = []
    for input_seq, target_seq in sequences:
        # Encode input sequence: replace rare words with <UNK> and map to indices
        encoded_input = [
            vocab[token] if token not in random_unk_words else vocab["<UNK>"]
            for token in input_seq
        ]
        # Encode target sequence similarly
        encoded_target = [
            vocab[token] if token not in random_unk_words else vocab["<UNK>"]
            for token in target_seq
        ]
        encoded_sequences.append((encoded_input, encoded_target))
    return encoded_sequences

encoded_sequences = encode_sequence(sequences, vocab, random_unk_words)
# Assume `encoded_sequences` is produced by your updated encode_dataset function.
# Each element in encoded_sequences is a tuple: (input_seq, target_seq)
dataset = TextDataset(encoded_sequences)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, loss function, and optimizer
model = RNNLanguageModel(vocab_size, embed_dim, hidden_dim, num_layers).to(device)

# Use SparseAdam if using nn.Embedding
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Mixed precision scaler
scaler = torch.amp.GradScaler('cuda')

# Training loop using teacher forcing
model.train()
for epoch in range(num_epochs):
    total_loss = 0.0
    for i, (batch_inputs, batch_targets) in enumerate(dataloader):
        batch_inputs = batch_inputs.to(device)
        batch_targets = batch_targets.to(device)

        optimizer.zero_grad()

        # Mixed Precision Forward Pass
        with torch.amp.autocast('cuda'):
            logits, _ = model(batch_inputs)  # logits: (batch_size, seq_length, vocab_size)
            logits = logits.view(-1, vocab_size)       # Shape: (batch_size * seq_length, vocab_size)
            batch_targets = batch_targets.view(-1)     # Shape: (batch_size * seq_length)
            loss = F.cross_entropy(logits, batch_targets) / accumulation_steps

        # Backward pass and optimizer step
        scaler.scale(loss).backward()

        # Gradient Accumulation Step
        if (i + 1) % accumulation_steps == 0:
            # Gradient Clipping for stability
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()

        total_loss += loss.detach().item() * accumulation_steps  # Scale back the accumulated loss

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

Epoch [1/10], Loss: 5.9905
Epoch [2/10], Loss: 5.0764
Epoch [3/10], Loss: 4.9134
Epoch [4/10], Loss: 4.8279
Epoch [5/10], Loss: 4.7681
Epoch [6/10], Loss: 4.7133
Epoch [7/10], Loss: 4.6797
Epoch [8/10], Loss: 4.6514
Epoch [9/10], Loss: 4.6255
Epoch [10/10], Loss: 4.6095


In [15]:
torch.save(model, "Uly_RNN.pt")

In [16]:
import torch
import torch.nn.functional as F

def calculate_sentence_probability(model, sentence, vocab, random_unk_words, device='cpu'):
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        # Tokenize and encode the sentence
        tokens = tokenizer(sentence, N)  # Split sentence into tokens
        # print(tokens)
        if len(tokens) < 2:
            return 0.0  # Probability of a single word or empty sentence is undefined or zero

        # Convert tokens to indices
        encoded_sentence = [
            vocab[token] if token in vocab else vocab["<UNK>"]
            for token in tokens
        ]
        # print(encoded_sentence)
        # Prepare input and target sequences
        input_seq = torch.tensor(encoded_sentence[:-1], dtype=torch.long).unsqueeze(0).to(device)
        target_seq = torch.tensor(encoded_sentence[1:], dtype=torch.long).to(device)

        # Forward pass through the model
        logits, _ = model(input_seq)  # logits shape: (1, seq_length, vocab_size)

        # Calculate probabilities using softmax
        log_probs = F.log_softmax(logits, dim=-1)  # (1, seq_length, vocab_size)

        # Get the log probabilities of the target words
        target_log_probs = log_probs[0, torch.arange(len(target_seq)), target_seq]
        log_prob_sentence = torch.sum(target_log_probs)
        # print(log_prob_sentence)
        num_predictions = len(encoded_sentence) - 1
        # print(num_predictions)
        # Convert to probability
        # sentence_prob = torch.exp(log_prob_sentence).item()

    return log_prob_sentence, num_predictions


perplexities = []
for sentence in train_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    log_prob, num_predictions = calculate_sentence_probability(model, sentence_list, vocab, random_unk_words, device)
    perplexity = np.exp(-log_prob.cpu().item() / num_predictions)
    perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)

# Write results to file
type_str = "train"
with open(f"2022102078_Uly_RNN_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(train_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")

perplexities = []
for sentence in test_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    log_prob, num_predictions = calculate_sentence_probability(model, sentence_list, vocab, random_unk_words, device)
    perplexity = np.exp(-log_prob.cpu().item() / num_predictions)
    perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)

# Write results to file
type_str = "test"
with open(f"2022102078_Uly_RNN_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(test_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")

In [6]:

class LSTMLanguageModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1):
        super(LSTMLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        # Use LSTM here
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
        # Fully connected layer to map LSTM outputs to vocabulary logits
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, hidden=None):
        # x shape: (batch_size, seq_length)
        embed = self.embedding(x)               # (batch_size, seq_length, embed_dim)
        output, hidden = self.lstm(embed, hidden)  # (batch_size, seq_length, hidden_dim)
        logits = self.fc(output)                # (batch_size, seq_length, vocab_size)
        return logits, hidden

    def init_hidden(self, batch_size):
        # Initialize hidden state and cell state to zeros
        # Hidden and cell state shapes: (num_layers, batch_size, hidden_dim)
        return (torch.zeros(self.lstm.num_layers, batch_size, self.lstm.hidden_size).to(next(self.parameters()).device),
                torch.zeros(self.lstm.num_layers, batch_size, self.lstm.hidden_size).to(next(self.parameters()).device))

In [12]:
vocab_size    = len(vocab)
embed_dim     = 200   # Embedding dimensionality
hidden_dim    = 10   # Hidden state dimensionality
num_layers    = 1
batch_size    = 128
num_epochs    = 10
learning_rate = 0.001


def create_sequences(tokens, seq_length):
    sequences = []
    for i in range(0, len(tokens) - seq_length):
        input_seq = tokens[i:i+seq_length]
        target_seq = tokens[i+1:i+seq_length+1]  # shifted by one
        sequences.append((input_seq, target_seq))
    return sequences

# Example usage:
seq_length = 50  # or any other fixed length suitable for your model/memory
sequences = create_sequences(tokens, seq_length)

def encode_sequence(sequences, vocab, random_unk_words):
    encoded_sequences = []
    for input_seq, target_seq in sequences:
        # Encode input sequence: replace rare words with <UNK> and map to indices
        encoded_input = [
            vocab[token] if token not in random_unk_words else vocab["<UNK>"]
            for token in input_seq
        ]
        # Encode target sequence similarly
        encoded_target = [
            vocab[token] if token not in random_unk_words else vocab["<UNK>"]
            for token in target_seq
        ]
        encoded_sequences.append((encoded_input, encoded_target))
    return encoded_sequences

encoded_sequences = encode_sequence(sequences, vocab, random_unk_words)
# Assume `encoded_sequences` is produced by your updated encode_dataset function.
# Each element in encoded_sequences is a tuple: (input_seq, target_seq)
dataset = TextDataset(encoded_sequences)

model = LSTMLanguageModel(vocab_size, embed_dim, hidden_dim, num_layers)
model.to(device)
model.train()

# DataLoader for batching
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_loss = 0

    for batch_idx, (input_seq, target_seq) in enumerate(dataloader):
        input_seq, target_seq = input_seq.to(device), target_seq.to(device)

        # Initialize hidden state
        batch_size_actual = input_seq.size(0)
        hidden = model.init_hidden(batch_size_actual)

        # Detach hidden state to prevent backprop through entire history
        hidden = tuple(h.detach() for h in hidden)

        # Forward pass
        logits, hidden = model(input_seq, hidden)

        # Reshape logits and targets for CrossEntropyLoss
        logits = logits.view(-1, vocab_size)   # (batch_size * seq_length, vocab_size)
        target_seq = target_seq.view(-1)       # (batch_size * seq_length)

        # Compute loss
        loss = criterion(logits, target_seq)
        total_loss += loss.item()

        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()

        # Gradient clipping (optional but recommended)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)

        optimizer.step()

    # Average loss for the epoch
    avg_loss = total_loss / len(dataloader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

Epoch [1/10], Loss: 5.8005
Epoch [2/10], Loss: 5.0039
Epoch [3/10], Loss: 4.7634
Epoch [4/10], Loss: 4.6290
Epoch [5/10], Loss: 4.5430
Epoch [6/10], Loss: 4.4823
Epoch [7/10], Loss: 4.4361
Epoch [8/10], Loss: 4.3996
Epoch [9/10], Loss: 4.3690
Epoch [10/10], Loss: 4.3427


In [9]:
torch.save(model, "Uly_LSTM.pt")

In [14]:
import torch
import torch.nn.functional as F

def calculate_sentence_probability_lstm(model, sentence, vocab, random_unk_words, device='cpu'):
    """
    Calculate the log probability of a given sentence using an LSTM language model.

    Args:
        model (nn.Module): The trained LSTM language model.
        sentence (str): The input sentence whose probability is to be calculated.
        vocab (dict): Mapping from tokens to indices.
        random_unk_words (list): List of words to be replaced with <UNK> for robustness.
        device (str): Device to run the model on ('cpu' or 'cuda').

    Returns:
        log_prob_sentence (float): Log probability of the sentence.
        num_predictions (int): Number of predictions made (for calculating average log prob).
    """
    model.eval()  # Set model to evaluation mode

    with torch.no_grad():
        # Tokenize and encode the sentence
        tokens = tokenizer(sentence, N) # Adjust if using a custom tokenizer
        if len(tokens) < 2:
            return 0.0, 0  # Probability of a single word or empty sentence is undefined or zero

        # Convert tokens to indices, mapping unknown words to "<UNK>"
        encoded_sentence = [
            vocab[token] if token in vocab else vocab["<UNK>"]
            for token in tokens
        ]

        # Prepare input and target sequences
        input_seq = torch.tensor(encoded_sentence[:-1], dtype=torch.long).unsqueeze(0).to(device)
        target_seq = torch.tensor(encoded_sentence[1:], dtype=torch.long).to(device)

        # Initialize hidden and cell states for LSTM
        batch_size = input_seq.size(0)
        hidden = model.init_hidden(batch_size)

        # Forward pass through the model
        logits, _ = model(input_seq, hidden)  # logits shape: (1, seq_length, vocab_size)

        # Calculate log probabilities using softmax
        log_probs = F.log_softmax(logits, dim=-1)  # (1, seq_length, vocab_size)

        # Get the log probabilities of the target words
        target_log_probs = log_probs[0, torch.arange(len(target_seq)), target_seq]
        log_prob_sentence = torch.sum(target_log_probs).item()

        # Count the number of predictions
        num_predictions = len(encoded_sentence) - 1

    return log_prob_sentence, num_predictions



perplexities = []
for sentence in train_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    log_prob, num_predictions = calculate_sentence_probability_lstm(model, sentence_list, vocab, random_unk_words, device)
    perplexity = np.exp(-log_prob / num_predictions)
    perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)
# print(avg_perplexity)

# Write results to file
type_str = "train"
with open(f"2022102078_Uly_LSTM_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(train_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")


perplexities = []
for sentence in test_sentences:
    # Sentence may need to be split into sentences if it contains multiple sentences:
    sentence_list = nltk.sent_tokenize(sentence)
    # For simplicity, let's assume each 'sentence' is processed separately
    log_prob, num_predictions = calculate_sentence_probability_lstm(model, sentence_list, vocab, random_unk_words, device)
    perplexity = np.exp(-log_prob / num_predictions)
    perplexities.append(perplexity)

# Compute average perplexity over all sentences
avg_perplexity = np.mean(perplexities)
# print(avg_perplexity)

# Write results to file
type_str = "test"
with open(f"2022102078_Uly_LSTM_{type_str}_perplexity.txt", 'w', encoding='utf-8') as f:
    f.write(f"{avg_perplexity}\n")
    # Write each sentence and its perplexity
    # Assuming train_sentences corresponds 1-to-1 with computed perplexities:
    for sentence, perp in zip(test_sentences, perplexities):
        f.write(f"{sentence}\t{perp:.4f}\n")