In [None]:
import nltk
from nltk.util import ngrams
from collections import defaultdict, Counter
import random
from nltk.tokenize import word_tokenize

In [None]:
!wget https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-v1.zip
# !unzip wikitext-2-v1.zip -d wikitext-2
nltk.download('punkt')
def load_wikitext(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text

words = load_wikitext('wikitext-2/wikitext-2/wiki.train.tokens')

num_characters = len(words)
num_words = len(word_tokenize(words))
num_lines = words.count('\n')
print(f"Number of Characters: {num_characters}")
print(f"Number of Words: {num_words}")
print(f"Number of Lines: {num_lines}")

words = word_tokenize(words)
# Convert words to lower case and filter out non-alpha characters
words= [word for word in words if word.isalpha()]
words = [word.lower() for word in words if word.isalpha()]

# Split the data into training and test sets
split_index = int(0.6 * len(words))
training_words = words[:split_index]
test_words = words[split_index:]

In [None]:
# Create N-grams from the training set
bigrams_train = list(ngrams(training_words, 2))
trigrams_train = list(ngrams(training_words, 3))
fourgrams_train = list(ngrams(training_words, 4))

# Frequency counts for N-grams from the training set
bigram_freq_train = Counter(bigrams_train)
trigram_freq_train = Counter(trigrams_train)
fourgram_freq_train = Counter(fourgrams_train)

bigrams_test = list(ngrams(test_words, 2))
trigrams_test = list(ngrams(test_words, 3))
fourgrams_test = list(ngrams(test_words, 4))

In [None]:
from collections import defaultdict

def build_ngram_model(ngram_freq):
    # Create a model where for each N-1 gram, we have all possible next words along with their counts
    model = defaultdict(Counter)
    for ngram in ngram_freq:
        prefix, next_word = ngram[:-1], ngram[-1]
        model[prefix][next_word] += 1
    return model

# Build models
bigram_model = build_ngram_model(bigrams_train)
trigram_model = build_ngram_model(trigrams_train)
fourgram_model = build_ngram_model(fourgrams_train)

def predict_next_word(model, context):
    # Context should be a tuple of N-1 words
    if context in model:
        # Get the most common next word
        most_common_next_word = model[context].most_common(1)
        return most_common_next_word[0][0] if most_common_next_word else None
    else:
        return None

In [None]:
def calculate_accuracy(model, test_ngrams):
    correct_predictions, total_predictions = 0, 0

    for ngram in test_ngrams:
        context, actual_next_word = ngram[:-1], ngram[-1]
        predicted_next_word = predict_next_word(model, context)

        if predicted_next_word == actual_next_word:
            correct_predictions += 1
        total_predictions += 1

    return correct_predictions / total_predictions if total_predictions > 0 else 0

In [None]:
import math

def calculate_perplexity(model, test_ngrams):
    log_prob_sum = 0
    word_count = 0

    for ngram in test_ngrams:
        context, actual_next_word = ngram[:-1], ngram[-1]
        predicted_probability = model[context][actual_next_word] if actual_next_word in model[context] else model[context]['<UNK>']

        # Check if predicted_probability is zero and handle it
        if predicted_probability > 0:
            log_prob_sum += math.log(predicted_probability)
        else:
            # Handle zero probability case, e.g., by using a very small probability
            log_prob_sum += math.log(1e-10)  # You can adjust this value as needed

        word_count += 1

    return math.exp(-log_prob_sum / word_count)

In [None]:
perplexity_2 = calculate_perplexity(bigram_model, bigrams_test)
perplexity_3 = calculate_perplexity(trigram_model, trigrams_test)
perplexity_4 = calculate_perplexity(fourgram_model, fourgrams_test)
# Calculate accuracies
accuracy_bigram = calculate_accuracy(bigram_model, bigrams_test)
accuracy_trigram = calculate_accuracy(trigram_model, trigrams_test)
accuracy_fourgram = calculate_accuracy(fourgram_model, fourgrams_test)

print(f"2-gram Model Accuracy: {accuracy_bigram * 100:.2f}%")
print(f"3-gram Model Accuracy: {accuracy_trigram * 100:.2f}%")
print(f"4-gram Model Accuracy: {accuracy_fourgram * 100:.2f}%")
print("perplexity of bigrams:", perplexity_2)
print("perplexity of trigrams:", perplexity_3)
print("perplexity of fourgrams:", perplexity_4)

In [None]:
def replace_rare_words(words, threshold=1):
    word_freq = Counter(words)
    return [word if word_freq[word] > threshold else '<UNK>' for word in words]

# Replace rare words in the training data
threshold = 1  # This can be adjusted based on your dataset
training_words = replace_rare_words(training_words, threshold)


def build_ngram_model_smoothing(ngram_freq, vocabulary_size, smoothing=0.01):
    model = defaultdict(Counter)
    for ngram in ngram_freq:
        prefix, next_word = ngram[:-1], ngram[-1]
        model[prefix][next_word] += 1

    # Apply smoothing
    for prefix in model:
        for word in model[prefix]:
            model[prefix][word] += smoothing
        model[prefix]['<UNK>'] = smoothing  # Add smoothing for unknown words

    # Adjust counts to probabilities
    for prefix in model:
        total_count = sum(model[prefix].values())
        for word in model[prefix]:
            model[prefix][word] /= total_count

    return model


vocabulary = set(training_words)
vocabulary_size = len(vocabulary)

bigram_model_s = build_ngram_model_smoothing(bigrams_train, vocabulary_size)
trigram_model_s = build_ngram_model_smoothing(trigrams_train, vocabulary_size)
fourgram_model_s = build_ngram_model_smoothing(fourgrams_train, vocabulary_size)


def predict_next_word(model, context):
    if context not in model:
        return '<UNK>'
    most_common_next_word = model[context].most_common(1)
    return most_common_next_word[0][0] if most_common_next_word else '<UNK>'


def predict_with_unk_handling(model, context):
    new_context = tuple(word if word in vocabulary else '<UNK>' for word in context)
    return predict_next_word(model, new_context)

def calculate_accuracy(model, test_ngrams):
    correct_predictions, total_predictions = 0, 0

    for ngram in test_ngrams:
        context, actual_next_word = ngram[:-1], ngram[-1]
        predicted_next_word = predict_with_unk_handling(model, context)

        if predicted_next_word == actual_next_word:
            correct_predictions += 1
        total_predictions += 1

    return correct_predictions / total_predictions if total_predictions > 0 else 0

accuracy_bigram_s = calculate_accuracy(bigram_model_s, bigrams_test)
accuracy_trigram_s = calculate_accuracy(trigram_model_s, trigrams_test)
accuracy_fourgram_s = calculate_accuracy(fourgram_model_s, fourgrams_test)

print(f"2-gram Model Accuracy with smoothing: {accuracy_bigram_s * 100:.2f}%")
print(f"3-gram Model Accuracy with smoothing: {accuracy_trigram_s * 100:.2f}%")
print(f"4-gram Model Accuracy with smoothing: {accuracy_fourgram_s * 100:.2f}%")

perplexity_2s = calculate_perplexity(bigram_model_s, bigrams_test)
perplexity_3s = calculate_perplexity(trigram_model_s, trigrams_test)
perplexity_4s = calculate_perplexity(fourgram_model_s, fourgrams_test)

print("perplexity of bigrams with smoothing:", perplexity_2s)
print("perplexity of trigrams with smoothing:", perplexity_3s)
print("perplexity of fourgrams with smoothing:", perplexity_4s)

In [None]:
def mask_word(word, probability=0.05):
    """Randomly masks a word with a given probability."""
    return '<MASK>' if random.random() < probability else word
    
noisy_words_mask = [mask_word(word.lower()) for word in words if word.isalpha()]
split_index = int(0.6 * len(noisy_words_mask))
training_words_mask = noisy_words_mask[:split_index]

bigrams_train_mask = list(ngrams(training_words_mask, 2))
trigrams_train_mask = list(ngrams(training_words_mask, 3))
fourgrams_train_mask = list(ngrams(training_words_mask, 4))

bigram_model_mask = build_ngram_model(bigrams_train_mask)
trigram_model_mask = build_ngram_model(trigrams_train_mask)
fourgram_model_mask = build_ngram_model(fourgrams_train_mask)

In [None]:
def introduce_spelling_mistake(word, probability=0.05):
    if random.random() < probability and len(word) > 5:
        idx = random.randint(0, len(word) - 2)
        return word[:idx] + word[idx+1] + word[idx] + word[idx+2:]
    return word

words = words.split()
    
noisy_words_spelling = [introduce_spelling_mistake(word.lower()) for word in words if word.isalpha()]
split_index = int(0.6 * len(noisy_words_spelling))
training_words_spelling = noisy_words_spelling[:split_index]

bigrams_train_spelling = list(ngrams(training_words_spelling, 2))
trigrams_train_spelling = list(ngrams(training_words_spelling, 3))
fourgrams_train_spelling = list(ngrams(training_words_spelling, 4))

bigram_model_spelling = build_ngram_model(bigrams_train_spelling)
trigram_model_spelling = build_ngram_model(trigrams_train_spelling)
fourgram_model_spelling = build_ngram_model(fourgrams_train_spelling)

In [None]:
accuracy_bigram_mask = calculate_accuracy(bigram_model_mask, bigrams_test)
accuracy_trigram_mask = calculate_accuracy(trigram_model_mask, trigrams_test)
accuracy_fourgram_mask = calculate_accuracy(fourgram_model_mask, fourgrams_test)

print(f"2-gram masked Model Accuracy: {accuracy_bigram_mask * 100:.2f}%")
print(f"3-gram masked Model Accuracy: {accuracy_trigram_mask * 100:.2f}%")
print(f"4-gram masked Model Accuracy: {accuracy_fourgram_mask * 100:.2f}%")

In [None]:
accuracy_bigram_spelling = calculate_accuracy(bigram_model_spelling, bigrams_test)
accuracy_trigram_spelling = calculate_accuracy(trigram_model_spelling, trigrams_test)
accuracy_fourgram_spelling = calculate_accuracy(fourgram_model_spelling, fourgrams_test)

print(f"2-gram Model Accuracy include spelling mistakes: {accuracy_bigram_spelling * 100:.2f}%")
print(f"3-gram Model Accuracy include spelling mistakes: {accuracy_trigram_spelling * 100:.2f}%")
print(f"4-gram Model Accuracy include spelling mistakes: {accuracy_fourgram_spelling * 100:.2f}%")

In [None]:
perplexity_2_mask = calculate_perplexity(bigram_model_mask, bigrams_test)
perplexity_3_mask = calculate_perplexity(trigram_model_mask, trigrams_test)
perplexity_4_mask = calculate_perplexity(fourgram_model_mask, fourgrams_test)
perplexity_2_spelling = calculate_perplexity(bigram_model_spelling, bigrams_test)
perplexity_3_spelling = calculate_perplexity(trigram_model_spelling, trigrams_test)
perplexity_4_spelling = calculate_perplexity(fourgram_model_spelling, fourgrams_test)
print("perplexity of bigrams with masks:", perplexity_2_mask)
print("perplexity of trigrams with masks:", perplexity_3_mask)
print("perplexity of fourgrams with masks:", perplexity_4_mask)
print("perplexity of bigrams with spelling mistakes included:", perplexity_2_spelling)
print("perplexity of trigrams with spelling mistakes included:", perplexity_3_spelling)
print("perplexity of fourgrams with spelling mistakes included:", perplexity_4_spelling)

In [None]:
import matplotlib.pyplot as plt

accuracy_values = [
    round(accuracy_bigram * 100, 2),
    round(accuracy_trigram * 100, 2),
    round(accuracy_fourgram * 100, 2),
    round(accuracy_bigram_s * 100, 2),
    round(accuracy_trigram_s * 100, 2),
    round(accuracy_fourgram_s * 100, 2),
    round(accuracy_bigram_mask * 100, 2),
    round(accuracy_trigram_mask * 100, 2),
    round(accuracy_fourgram_mask * 100, 2),
    round(accuracy_bigram_spelling * 100, 2),
    round(accuracy_trigram_spelling * 100, 2),
    round(accuracy_fourgram_spelling * 100, 2),
]
labels = ['Bigram', 'Trigram', 'Fourgram', 'Bigram', 'Trigram', 'Fourgram','Bigram', 'Trigram', 'Fourgram','Bigram', 'Trigram', 'Fourgram']

labels_without_smoothing = labels[:3]
accuracy_without_smoothing = accuracy_values[:3]
labels_with_smoothing = labels[3:6]
accuracy_with_smoothing = accuracy_values[3:6]
labels_with_mask = labels[6:9]
accuracy_with_mask = accuracy_values[6:9]
labels_with_spelling = labels[9:]
accuracy_with_spelling = accuracy_values[9:]

fig, axes = plt.subplots(nrows=1, ncols=4, figsize=(16, 5))
axes[0].bar(labels_without_smoothing, accuracy_without_smoothing, color=['blue', 'orange', 'green'])
axes[0].set_title('Accuracy without Smoothing')
axes[0].set_ylabel('Percentage (%)')
axes[0].set_ylim(0, max(accuracy_values) + 5)

axes[1].bar(labels_with_smoothing, accuracy_with_smoothing, color=['blue', 'orange', 'green'])
axes[1].set_title('Accuracy with Smoothing')
axes[1].set_ylabel('Percentage (%)')
axes[1].set_ylim(0, max(accuracy_values) + 5)

axes[2].bar(labels_with_mask, accuracy_with_mask, color=['blue', 'orange', 'green'])
axes[2].set_title('Accuracy with mask words')
axes[2].set_ylabel('Percentage (%)')
axes[2].set_ylim(0, max(accuracy_values) + 5)

axes[3].bar(labels_with_spelling, accuracy_with_spelling, color=['blue', 'orange', 'green'])
axes[3].set_title('Accuracy with spelling mistakes included')
axes[3].set_ylabel('Percentage (%)')
axes[3].set_ylim(0, max(accuracy_values) + 5)

for ax in axes:
    for bar in ax.patches:
        ax.annotate(f"{bar.get_height():.2f}%",
                    (bar.get_x() + bar.get_width() / 2, bar.get_height()),
                    ha='center', va='bottom')

plt.tight_layout()
plt.show()

In [None]:
perplexities_without_smoothing = [perplexity_2 , perplexity_3, perplexity_4]
perplexities_with_masks = [perplexity_2_mask , perplexity_3_mask, perplexity_4_mask]
perplexities_with_spelling = [perplexity_2_spelling , perplexity_3_spelling, perplexity_4_spelling]
perplexities_with_smoothing = [perplexity_2s, perplexity_3s, perplexity_4s]
labels = ['Bigram', 'Trigram', 'Fourgram']
fig, axes = plt.subplots(nrows=1, ncols=4, figsize=(16, 5), sharey=True)

axes[0].bar(labels, perplexities_with_smoothing, color=['blue', 'orange', 'green'])
axes[0].set_title('Perplexity with Smoothing (Log Scale)')
axes[0].set_yscale('log')
axes[0].set_ylabel('Perplexity (Log Scale)')
axes[0].set_xlabel('N-Gram Model')

axes[1].bar(labels, perplexities_without_smoothing, color=['blue', 'orange', 'green'])
axes[1].set_title('Perplexity without Smoothing (Log Scale)')
axes[1].set_yscale('log')
axes[1].set_xlabel('N-Gram Model')

axes[2].bar(labels, perplexities_with_masks, color=['blue', 'orange', 'green'])
axes[2].set_title('Perplexity with masks (Log Scale)')
axes[2].set_yscale('log')
axes[2].set_xlabel('N-Gram Model')

axes[3].bar(labels, perplexities_with_spelling, color=['blue', 'orange', 'green'])
axes[3].set_title('Perplexity with spelling mistakes included (Log Scale)')
axes[3].set_yscale('log')
axes[3].set_xlabel('N-Gram Model')

min_visible = 10**4
for ax in axes:
    for bar in ax.patches:
        height = bar.get_height()
        if height < min_visible:
            ax.annotate(f'{height:.2e}',
                        xy=(bar.get_x() + bar.get_width() / 2, min_visible),
                        xytext=(0, 3),
                        textcoords="offset points",
                        ha='center', va='bottom', fontsize=8)
        else:
            ax.annotate(f'{height:.2e}',
                        xy=(bar.get_x() + bar.get_width() / 2, height),
                        xytext=(0, 3),  # 3 points vertical offset
                        textcoords="offset points",
                        ha='center', va='bottom', fontsize=8)

plt.tight_layout()
plt.show()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from collections import Counter
from sklearn.model_selection import train_test_split
import torch.nn.functional as F

In [None]:
nltk.download('punkt')

# Load data from wiki-text
def load_wikitext(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text

text = load_wikitext('wikitext-2/wikitext-2/wiki.train.tokens')

In [None]:
sentences = sent_tokenize(text)

# Split sentences into words
words = word_tokenize(text)
words = [word.lower() for word in words if word.isalpha()]
vocab = Counter(words)
vocab_size = len(vocab) + 1  # '<pad>'

# Add <pad>
pad_token = '<pad>'
word_to_idx = {word: i for i, (word, _) in enumerate(vocab.items())}
word_to_idx[pad_token] = vocab_size - 1  # '<pad>'

idx_to_word = {i: word for word, i in word_to_idx.items()}

def words_to_indices(sequence):
    return [word_to_idx.get(word, word_to_idx[pad_token]) for word in sequence]


def process_sequences(sentences, seq_length):
    sequences = []
    for sentence in sentences:
        words = word_tokenize(sentence)
        words = [word.lower() for word in words if word.isalpha()]
        if len(words) < seq_length:
            words.extend([pad_token] * (seq_length - len(words)))
        seqs = [words[i:i + seq_length] for i in range(len(words) - seq_length + 1)]
        sequences.extend(seqs)
    return [words_to_indices(seq) for seq in sequences]

# Set sequence length
seq_length_train = 10  # Sequence size for training
seq_length_val_test = 5  # Sequence size for validation and testing

# Generate training sequences
train_sequences = process_sequences(sentences, seq_length_train)

# Generate validation and testing sequences
val_test_sequences = process_sequences(sentences, seq_length_val_test)

# Convert ot tensor
X_train_seq = torch.tensor([seq[:-1] for seq in train_sequences], dtype=torch.long)
y_train_seq = torch.tensor([seq[-1] for seq in train_sequences], dtype=torch.long)

X_val_test_seq = torch.tensor([seq[:-1] for seq in val_test_sequences], dtype=torch.long)
y_val_test_seq = torch.tensor([seq[-1] for seq in val_test_sequences], dtype=torch.long)

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(X_train_seq, y_train_seq, test_size=0.3, random_state=42) # Training
X_val, X_test, y_val, y_test = train_test_split(X_val_test_seq, y_val_test_seq, test_size=0.5, random_state=42)  # Validation and Testing

# Data Loader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

In [None]:
sentence_lengths = [len(sentence) for sentence in sentences]
plt.hist(sentence_lengths, bins=range(min(sentence_lengths), max(sentence_lengths) + 1, 1))
plt.title("Distribution of Sentence Lengths")
plt.xlabel("Length of Sentences (words)")
plt.ylabel("Frequency")
plt.show()

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, dropout):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim) # Embedding
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout) # LSTM
        self.layer_norm = nn.LayerNorm(hidden_dim) # Layer Normalization
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, vocab_size) # Fully Conntected Layer


    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        lstm_out = self.layer_norm(lstm_out[:, -1, :])
        logits = F.softmax(self.fc1(lstm_out), dim = -1) # Another fully connected layer
        logits = self.fc(lstm_out)

        return logits

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def train(model, loader, criterion, optimizer):
    model.train()
    total_loss = 0
    for data, targets in loader:
        data, targets = data.to(device), targets.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

# Evaluation
def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0
    total_correct = 0
    total = 0
    with torch.no_grad():
        for data, targets in loader:
            data, targets = data.to(device), targets.to(device)
            output = model(data)
            loss = criterion(output, targets)
            total_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            total_correct += (predicted == targets).sum().item()
            total += targets.size(0)
    return total_loss / len(loader), total_correct / total

In [None]:
embedding_dim = 256 
hidden_dim = 512
num_layers = 2
dropout = 0.5
learning_rate = 0.001
epochs = 20

In [None]:
model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout).to(device)


# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early Stopping
early_stopping = {
    'patience': 3,
    'counter': 0,
    'best_val_loss': float('inf')
}

train_losses, val_losses, train_accuracies, val_accuracies, train_perplexities, val_perplexities = [], [], [], [], [], []

for epoch in range(epochs):
    train_loss = train(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    train_acc = evaluate(model, train_loader, criterion)[1]

    train_perplexity = np.exp(train_loss)
    val_perplexity = np.exp(val_loss)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    train_perplexities.append(train_perplexity)
    val_perplexities.append(val_perplexity)

    print(f'Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f} - Train Accuracy: {train_acc:.4f} - Train Perplexity: {train_perplexity:.4f} - Val loss: {val_loss:.4f} - Val Accuracy: {val_acc:.4f} - Val Perplexity: {val_perplexity:.4f}')

    if val_loss < early_stopping['best_val_loss']:
        early_stopping['best_val_loss'] = val_loss
        early_stopping['counter'] = 0
    else:
        early_stopping['counter'] += 1

    if early_stopping['counter'] >= early_stopping['patience']:
        break

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(train_perplexities, label='Train Perplexity')
plt.plot(val_perplexities, label='Val Perplexity')
plt.xlabel('Epoch')
plt.ylabel('Perplexity')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

Data Augementation

In [None]:
import random
# Split text into sentences
sentences = sent_tokenize(text)

# Split sentences into words
words = word_tokenize(text)
words = [word.lower() for word in words if word.isalpha()]
vocab = Counter(words)

# Add <pad>
pad_token = '<pad>'
mask_token = '<mask>'
misspell_token = '<unk>'
vocab_size = len(vocab) + 3

word_to_idx = {word: i for i, (word, _) in enumerate(vocab.items())}
word_to_idx[pad_token] = vocab_size - 1  # '<pad>'
word_to_idx[mask_token] = vocab_size - 2 # '<mask>'
word_to_idx[misspell_token] = vocab_size - 3 # '<misspell>'

idx_to_word = {i: word for word, i in word_to_idx.items()}

def words_to_indices(sequence):
    return [word_to_idx.get(word, word_to_idx[pad_token]) for word in sequence]


def process_sequences(s, seq_length):
    sequences = []
    for sentence in s:
        words = word_tokenize(sentence)
        words = [word.lower() for word in words if word.isalpha()]
        if len(words) < seq_length:
            words.extend([pad_token] * (seq_length - len(words)))
        seqs = [words[i:i + seq_length] for i in range(len(words) - seq_length + 1)]
        sequences.extend(seqs)
    return [words_to_indices(seq) for seq in sequences]


def modify(sequences, mask_token, misspell_token, mask_prob, misspell_prob):
    modified_sequences = []
    for seq in sequences:
        modified_seq = []
        for word in seq:
            rand_num = random.random()
            if rand_num < (mask_prob + misspell_prob):
                if rand_num < mask_prob:
                    modified_seq.append(mask_token)
                else:
                    modified_seq.append(misspell_token)
            else:
                modified_seq.append(word)
        modified_sequences.append(modified_seq)
    return modified_sequences

In [None]:
seq_length_train = 5  # Sequence size for training
seq_length_val_test = 5  # Sequence size for validation and testing

s = int(0.7 * len(sentences))
# Generate training sequences
train_sequences = process_sequences(sentences[:s], seq_length_train)
# New
train_sequences = modify(train_sequences, vocab_size - 2, vocab_size - 3, 0.0001, 0.0001)

ss = int(0.85 * len(sentences))
# Generate validation and testing sequences
val_sequences = process_sequences(sentences[s:ss], seq_length_val_test)
val_sequences = modify(val_sequences, vocab_size - 2, vocab_size - 3, 0, 0)

test_sequences = process_sequences(sentences[ss:], seq_length_val_test)


# Convert ot tensor
X_train_seq = torch.tensor([seq[:-1] for seq in train_sequences], dtype=torch.long)
y_train_seq = torch.tensor([seq[-1] for seq in train_sequences], dtype=torch.long)


X_val_seq = torch.tensor([seq[:-1] for seq in val_sequences], dtype=torch.long)
y_val_seq = torch.tensor([seq[-1] for seq in val_sequences], dtype=torch.long)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

# Split dataset
# X_train, X_test, y_train, y_test = train_test_split(X_train_seq, y_train_seq, test_size=0.3, random_state=42) # Training
X_train = X_train_seq
y_train = y_train_seq
# X_val, X_test, y_val, y_test = train_test_split(X_val_test_seq, y_val_test_seq, test_size=0.5, random_state=42)  # Validation and Testing
X_val = X_val_seq
y_val = y_val_seq

# Data Loader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
# test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

In [None]:
model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout).to(device)


# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early Stopping
early_stopping = {
    'patience': 3,
    'counter': 0,
    'best_val_loss': float('inf')
}

train_losses, val_losses, train_accuracies, val_accuracies, train_perplexities, val_perplexities = [], [], [], [], [], []

for epoch in range(epochs):
    train_loss = train(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    train_acc = evaluate(model, train_loader, criterion)[1]

    train_perplexity = np.exp(train_loss)
    val_perplexity = np.exp(val_loss)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    train_perplexities.append(train_perplexity)
    val_perplexities.append(val_perplexity)

    print(f'Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f} - Train Accuracy: {train_acc:.4f} - Train Perplexity: {train_perplexity:.4f} - Val loss: {val_loss:.4f} - Val Accuracy: {val_acc:.4f} - Val Perplexity: {val_perplexity:.4f}')

    if val_loss < early_stopping['best_val_loss']:
        early_stopping['best_val_loss'] = val_loss
        early_stopping['counter'] = 0
    else:
        early_stopping['counter'] += 1

    if early_stopping['counter'] >= early_stopping['patience']:
        break

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(train_perplexities, label='Train Perplexity')
plt.plot(val_perplexities, label='Val Perplexity')
plt.xlabel('Epoch')
plt.ylabel('Perplexity')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# 0 mask, 0 misspell
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
# test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0.1, 0.1)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
# 5 mask, 0 misspell
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0.05, 0)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
# 10 mask, 0 misspell
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0.1, 0)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0.2, 0)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0, 0.05)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0, 0.10)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0, 0.2)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
test_sequences = process_sequences(sentences[ss:], seq_length_val_test)
test_sequences = modify(test_sequences, vocab_size - 2, vocab_size - 3, 0.05, 0.05)

X_test_seq = torch.tensor([seq[:-1] for seq in test_sequences], dtype=torch.long)
y_test_seq = torch.tensor([seq[-1] for seq in test_sequences], dtype=torch.long)

X_test = X_test_seq
y_test = y_test_seq

# Data Loader
batch_size = 64
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

Mask Words (Prob = 5%)

In [None]:
sentences = sent_tokenize(text)

# Split sentences into words
words = word_tokenize(text)
words = [word.lower() for word in words if word.isalpha()]
vocab = Counter(words)

# Add <pad>
pad_token = '<pad>'
mask_token = '<mask>'
vocab_size = len(vocab) + 2

word_to_idx = {word: i for i, (word, _) in enumerate(vocab.items())}
word_to_idx[pad_token] = vocab_size - 1  # '<pad>'
word_to_idx[mask_token] = vocab_size - 2 # '<mask>'

idx_to_word = {i: word for word, i in word_to_idx.items()}

def words_to_indices(sequence):
    return [word_to_idx.get(word, word_to_idx[pad_token]) for word in sequence]


def process_sequences(sentences, seq_length):
    sequences = []
    for sentence in sentences:
        words = word_tokenize(sentence)
        words = [word.lower() for word in words if word.isalpha()]
        if len(words) < seq_length:
            words.extend([pad_token] * (seq_length - len(words)))
        seqs = [words[i:i + seq_length] for i in range(len(words) - seq_length + 1)]
        sequences.extend(seqs)
    return [words_to_indices(seq) for seq in sequences]


def mask_words_in_training(sequences, mask_token=vocab_size - 2, mask_prob=0.05):
    masked_sequences = []
    for seq in sequences:
        masked_seq = [mask_token if random.random() < mask_prob else word for word in seq]
        masked_sequences.append(masked_seq)
    return masked_sequences

# Set sequence length
seq_length_train = 10  # Sequence size for training
seq_length_val_test = 5  # Sequence size for validation and testing

# Generate training sequences
train_sequences = process_sequences(sentences, seq_length_train)

# New
train_sequences = mask_words_in_training(train_sequences)

# Generate validation and testing sequences
val_test_sequences = process_sequences(sentences, seq_length_val_test)

# Convert ot tensor
X_train_seq = torch.tensor([seq[:-1] for seq in train_sequences], dtype=torch.long)
y_train_seq = torch.tensor([seq[-1] for seq in train_sequences], dtype=torch.long)

X_val_test_seq = torch.tensor([seq[:-1] for seq in val_test_sequences], dtype=torch.long)
y_val_test_seq = torch.tensor([seq[-1] for seq in val_test_sequences], dtype=torch.long)

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(X_train_seq, y_train_seq, test_size=0.3, random_state=42) # Training
X_val, X_test, y_val, y_test = train_test_split(X_val_test_seq, y_val_test_seq, test_size=0.5, random_state=42)  # Validation and Testing

# Data Loader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

In [None]:
# Initilize LSTM Model
model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout).to(device)


# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early Stoppin

early_stopping = {
    'patience': 2,  # was 3
    'counter': 0,
    'best_val_loss': float('inf')
}

train_losses, val_losses, train_accuracies, val_accuracies, train_perplexities, val_perplexities = [], [], [], [], [], []

for epoch in range(epochs):
    train_loss = train(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    train_acc = evaluate(model, train_loader, criterion)[1]

    train_perplexity = np.exp(train_loss)
    val_perplexity = np.exp(val_loss)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    train_perplexities.append(train_perplexity)
    val_perplexities.append(val_perplexity)

    print(f'Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f} - Train Accuracy: {train_acc:.4f} - Train Perplexity: {train_perplexity:.4f} - Val loss: {val_loss:.4f} - Val Accuracy: {val_acc:.4f} - Val Perplexity: {val_perplexity:.4f}')

    if val_loss < early_stopping['best_val_loss']:
        early_stopping['best_val_loss'] = val_loss
        early_stopping['counter'] = 0
    else:
        early_stopping['counter'] += 1

    if early_stopping['counter'] >= early_stopping['patience']:
        break

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(train_perplexities, label='Train Perplexity')
plt.plot(val_perplexities, label='Val Perplexity')
plt.xlabel('Epoch')
plt.ylabel('Perplexity')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

Mask Words (Prob = 10%)

In [None]:
# Split text into sentences
sentences = sent_tokenize(text)

# Split sentences into words
words = word_tokenize(text)
words = [word.lower() for word in words if word.isalpha()]
vocab = Counter(words)

# Add <pad>
pad_token = '<pad>'
mask_token = '<mask>'
vocab_size = len(vocab) + 2

word_to_idx = {word: i for i, (word, _) in enumerate(vocab.items())}
word_to_idx[pad_token] = vocab_size - 1  # '<pad>'
word_to_idx[mask_token] = vocab_size - 2 # '<mask>'

idx_to_word = {i: word for word, i in word_to_idx.items()}

def words_to_indices(sequence):
    return [word_to_idx.get(word, word_to_idx[pad_token]) for word in sequence]


def process_sequences(sentences, seq_length):
    sequences = []
    for sentence in sentences:
        words = word_tokenize(sentence)
        words = [word.lower() for word in words if word.isalpha()]
        if len(words) < seq_length:
            words.extend([pad_token] * (seq_length - len(words)))
        seqs = [words[i:i + seq_length] for i in range(len(words) - seq_length + 1)]
        sequences.extend(seqs)
    return [words_to_indices(seq) for seq in sequences]


def mask_words_in_training(sequences, mask_token=vocab_size - 2, mask_prob=0.1):
    masked_sequences = []
    for seq in sequences:
        masked_seq = [mask_token if random.random() < mask_prob else word for word in seq]
        masked_sequences.append(masked_seq)
    return masked_sequences

# Set sequence length
seq_length_train = 10  # Sequence size for training
seq_length_val_test = 5  # Sequence size for validation and testing

# Generate training sequences
train_sequences = process_sequences(sentences, seq_length_train)

# New
train_sequences = mask_words_in_training(train_sequences)

# Generate validation and testing sequences
val_test_sequences = process_sequences(sentences, seq_length_val_test)

# Convert ot tensor
X_train_seq = torch.tensor([seq[:-1] for seq in train_sequences], dtype=torch.long)
y_train_seq = torch.tensor([seq[-1] for seq in train_sequences], dtype=torch.long)

X_val_test_seq = torch.tensor([seq[:-1] for seq in val_test_sequences], dtype=torch.long)
y_val_test_seq = torch.tensor([seq[-1] for seq in val_test_sequences], dtype=torch.long)

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(X_train_seq, y_train_seq, test_size=0.3, random_state=42) # Training
X_val, X_test, y_val, y_test = train_test_split(X_val_test_seq, y_val_test_seq, test_size=0.5, random_state=42)  # Validation and Testing

# Data Loader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

In [None]:
# Initilize LSTM Model
model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout).to(device)


# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early Stoppin

early_stopping = {
    'patience': 2,  # was 3
    'counter': 0,
    'best_val_loss': float('inf')
}

train_losses, val_losses, train_accuracies, val_accuracies, train_perplexities, val_perplexities = [], [], [], [], [], []

for epoch in range(epochs):
    train_loss = train(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    train_acc = evaluate(model, train_loader, criterion)[1]

    train_perplexity = np.exp(train_loss)
    val_perplexity = np.exp(val_loss)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    train_perplexities.append(train_perplexity)
    val_perplexities.append(val_perplexity)

    print(f'Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f} - Train Accuracy: {train_acc:.4f} - Train Perplexity: {train_perplexity:.4f} - Val loss: {val_loss:.4f} - Val Accuracy: {val_acc:.4f} - Val Perplexity: {val_perplexity:.4f}')

    if val_loss < early_stopping['best_val_loss']:
        early_stopping['best_val_loss'] = val_loss
        early_stopping['counter'] = 0
    else:
        early_stopping['counter'] += 1

    if early_stopping['counter'] >= early_stopping['patience']:
        break

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(train_perplexities, label='Train Perplexity')
plt.plot(val_perplexities, label='Val Perplexity')
plt.xlabel('Epoch')
plt.ylabel('Perplexity')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

Mask Words (Prob = 15%)

In [None]:
# Split text into sentences
sentences = sent_tokenize(text)

# Split sentences into words
words = word_tokenize(text)
words = [word.lower() for word in words if word.isalpha()]
vocab = Counter(words)

# Add <pad>
pad_token = '<pad>'
mask_token = '<mask>'
vocab_size = len(vocab) + 2

word_to_idx = {word: i for i, (word, _) in enumerate(vocab.items())}
word_to_idx[pad_token] = vocab_size - 1  # '<pad>'
word_to_idx[mask_token] = vocab_size - 2 # '<mask>'

idx_to_word = {i: word for word, i in word_to_idx.items()}

def words_to_indices(sequence):
    return [word_to_idx.get(word, word_to_idx[pad_token]) for word in sequence]


def process_sequences(sentences, seq_length):
    sequences = []
    for sentence in sentences:
        words = word_tokenize(sentence)
        words = [word.lower() for word in words if word.isalpha()]
        if len(words) < seq_length:
            words.extend([pad_token] * (seq_length - len(words)))
        seqs = [words[i:i + seq_length] for i in range(len(words) - seq_length + 1)]
        sequences.extend(seqs)
    return [words_to_indices(seq) for seq in sequences]


def mask_words_in_training(sequences, mask_token=vocab_size - 2, mask_prob=0):
    masked_sequences = []
    for seq in sequences:
        masked_seq = [mask_token if random.random() < mask_prob else word for word in seq]
        masked_sequences.append(masked_seq)
    return masked_sequences

# Set sequence length
seq_length_train = 20  # Sequence size for training
seq_length_val_test = 5  # Sequence size for validation and testing

# Generate training sequences
train_sequences = process_sequences(sentences, seq_length_train)

# New
train_sequences = mask_words_in_training(train_sequences)

# Generate validation and testing sequences
val_test_sequences = process_sequences(sentences, seq_length_val_test)

# Convert ot tensor
X_train_seq = torch.tensor([seq[:-1] for seq in train_sequences], dtype=torch.long)
y_train_seq = torch.tensor([seq[-1] for seq in train_sequences], dtype=torch.long)

X_val_test_seq = torch.tensor([seq[:-1] for seq in val_test_sequences], dtype=torch.long)
y_val_test_seq = torch.tensor([seq[-1] for seq in val_test_sequences], dtype=torch.long)

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(X_train_seq, y_train_seq, test_size=0.3, random_state=42) # Training
X_val, X_test, y_val, y_test = train_test_split(X_val_test_seq, y_val_test_seq, test_size=0.5, random_state=42)  # Validation and Testing

# Data Loader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

In [None]:
# Initilize LSTM Model
model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout).to(device)


# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early Stoppin

early_stopping = {
    'patience': 2,  # was 3
    'counter': 0,
    'best_val_loss': float('inf')
}

train_losses, val_losses, train_accuracies, val_accuracies, train_perplexities, val_perplexities = [], [], [], [], [], []

for epoch in range(epochs):
    train_loss = train(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    train_acc = evaluate(model, train_loader, criterion)[1]

    train_perplexity = np.exp(train_loss)
    val_perplexity = np.exp(val_loss)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    train_perplexities.append(train_perplexity)
    val_perplexities.append(val_perplexity)

    print(f'Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f} - Train Accuracy: {train_acc:.4f} - Train Perplexity: {train_perplexity:.4f} - Val loss: {val_loss:.4f} - Val Accuracy: {val_acc:.4f} - Val Perplexity: {val_perplexity:.4f}')

    if val_loss < early_stopping['best_val_loss']:
        early_stopping['best_val_loss'] = val_loss
        early_stopping['counter'] = 0
    else:
        early_stopping['counter'] += 1

    if early_stopping['counter'] >= early_stopping['patience']:
        break

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(train_perplexities, label='Train Perplexity')
plt.plot(val_perplexities, label='Val Perplexity')
plt.xlabel('Epoch')
plt.ylabel('Perplexity')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Split text into sentences
sentences = sent_tokenize(text)

# Split sentences into words
words = word_tokenize(text)
words = [word.lower() for word in words if word.isalpha()]
vocab = Counter(words)

# Add <pad>
pad_token = '<pad>'
mask_token = '<mask>'
vocab_size = len(vocab) + 2

word_to_idx = {word: i for i, (word, _) in enumerate(vocab.items())}
word_to_idx[pad_token] = vocab_size - 1  # '<pad>'
word_to_idx[mask_token] = vocab_size - 2 # '<mask>'

idx_to_word = {i: word for word, i in word_to_idx.items()}

def words_to_indices(sequence):
    return [word_to_idx.get(word, word_to_idx[pad_token]) for word in sequence]


def process_sequences(sentences, seq_length):
    sequences = []
    for sentence in sentences:
        words = word_tokenize(sentence)
        words = [word.lower() for word in words if word.isalpha()]
        if len(words) < seq_length:
            words.extend([pad_token] * (seq_length - len(words)))
        seqs = [words[i:i + seq_length] for i in range(len(words) - seq_length + 1)]
        sequences.extend(seqs)
    return [words_to_indices(seq) for seq in sequences]


def mask_words_in_training(sequences, mask_token=vocab_size - 2, mask_prob=0):
    masked_sequences = []
    for seq in sequences:
        masked_seq = [mask_token if random.random() < mask_prob else word for word in seq]
        masked_sequences.append(masked_seq)
    return masked_sequences

# Set sequence length
seq_length_train = 5  # Sequence size for training
seq_length_val_test = 5  # Sequence size for validation and testing

s = int(0.7 * len(sentences))
# Generate training sequences
train_sequences = process_sequences(sentences[:s], seq_length_train)

# New
train_sequences = mask_words_in_training(train_sequences)

# Generate validation and testing sequences
val_test_sequences = process_sequences(sentences[s:], seq_length_val_test)

# Convert ot tensor
X_train_seq = torch.tensor([seq[:-1] for seq in train_sequences], dtype=torch.long)
y_train_seq = torch.tensor([seq[-1] for seq in train_sequences], dtype=torch.long)

X_val_test_seq = torch.tensor([seq[:-1] for seq in val_test_sequences], dtype=torch.long)
y_val_test_seq = torch.tensor([seq[-1] for seq in val_test_sequences], dtype=torch.long)

# Split dataset
# X_train, X_test, y_train, y_test = train_test_split(X_train_seq, y_train_seq, test_size=0.3, random_state=42) # Training
X_train = X_train_seq
y_train = y_train_seq
X_val, X_test, y_val, y_test = train_test_split(X_val_test_seq, y_val_test_seq, test_size=0.5, random_state=42)  # Validation and Testing

# Data Loader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

In [None]:
# Initilize LSTM Model
model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers, dropout).to(device)


# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early Stoppin

early_stopping = {
    'patience': 2,  # was 3
    'counter': 0,
    'best_val_loss': float('inf')
}

train_losses, val_losses, train_accuracies, val_accuracies, train_perplexities, val_perplexities = [], [], [], [], [], []

for epoch in range(epochs):
    train_loss = train(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    train_acc = evaluate(model, train_loader, criterion)[1]

    train_perplexity = np.exp(train_loss)
    val_perplexity = np.exp(val_loss)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    train_perplexities.append(train_perplexity)
    val_perplexities.append(val_perplexity)

    print(f'Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f} - Train Accuracy: {train_acc:.4f} - Train Perplexity: {train_perplexity:.4f} - Val loss: {val_loss:.4f} - Val Accuracy: {val_acc:.4f} - Val Perplexity: {val_perplexity:.4f}')

    if val_loss < early_stopping['best_val_loss']:
        early_stopping['best_val_loss'] = val_loss
        early_stopping['counter'] = 0
    else:
        early_stopping['counter'] += 1

    if early_stopping['counter'] >= early_stopping['patience']:
        break

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(train_perplexities, label='Train Perplexity')
plt.plot(val_perplexities, label='Val Perplexity')
plt.xlabel('Epoch')
plt.ylabel('Perplexity')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
test_loss, test_acc = evaluate(model, test_loader, criterion)
test_perplexity = np.exp(test_loss)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test Perplexity: {test_perplexity:.4f}')

In [None]:
from transformers import GPT2Tokenizer, TFGPT2LMHeadModel
split_index = int(0.96 * len(words))
split_index_mask = int(0.96 * len(noisy_words_mask))
split_index_spelling = int(0.96 * len(noisy_words_spelling))

test_words = words[split_index:]
test_words_mask = words[split_index_mask:]
test_words_spelling = words[split_index_spelling:]

# Load the pre-trained GPT-2 tokenizer and model
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = TFGPT2LMHeadModel.from_pretrained('gpt2')

def predict_next_word(sequence, model, tokenizer):
    input_ids = tokenizer.encode(sequence, return_tensors='tf')
    attention_mask = tf.ones(input_ids.shape, dtype=tf.int32)  # Create an attention mask
    output = model.generate(input_ids, attention_mask=attention_mask, max_length=len(input_ids[0]) + 1, num_return_sequences=1)
    return tokenizer.decode(output[0][-1], skip_special_tokens=True)

n = 5  # Number of words to use for predicting the next one
correct_predictions = 0

for i in range(n, len(test_words)):
    prompt = ' '.join(test_words[i-n:i])
    predicted_word = predict_next_word(prompt, model, tokenizer)
    if predicted_word.strip().lower() == test_words[i].lower():
        correct_predictions += 1

accuracy_GPT2 = correct_predictions / (len(test_words) - n)
print(f"Accuracy of GPT2: {accuracy_GPT2 * 100:.2f}%")
