In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.metrics import confusion_matrix, classification_report
from collections import Counter
import re

PATH = "Project-III/data/sherlock-holm.es_stories_plain-text_advs.txt"

# Read the text file
with open(PATH, 'r', encoding='utf-8') as file:
    text = file.read()

In [2]:
# Tokenize the text
def tokenize(text):
    text = text.lower()
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    words = text.split()
    return words

tokens = tokenize(text)
word_counts = Counter(tokens)
vocab = sorted(word_counts, key=word_counts.get, reverse=True)
word_to_index = {word: index + 1 for index, word in enumerate(vocab)}
index_to_word = {index + 1: word for index, word in enumerate(vocab)}
total_words = len(word_to_index) + 1

In [3]:
# Create input-output pairs
input_sequences = []
for line in text.split('\n'):
    token_list = [word_to_index[word] for word in tokenize(line) if word in word_to_index]
    for i in range(1, len(token_list)):
        n_gram_sequence = token_list[:i+1]
        input_sequences.append(n_gram_sequence)

In [4]:
# Pad the sequences
max_sequence_len = max([len(seq) for seq in input_sequences])
input_sequences = np.array([np.pad(seq, (max_sequence_len - len(seq), 0), mode='constant') for seq in input_sequences])

In [5]:
# Split the sequences into input (X) and output (y)
X = input_sequences[:, :-1]
y = input_sequences[:, -1]

# Convert output to one-hot encoded vectors
y = np.array(torch.nn.functional.one_hot(torch.tensor(y), num_classes=total_words))

In [12]:
# Create a custom Dataset class
class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.long)
        self.y = torch.tensor(y, dtype=torch.float)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

dataset = TextDataset(X, y)

# Split dataset into training and validation sets (90% training, 10% validation)
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Define the model
class NextWordPredictor(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(NextWordPredictor, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm(x)
        x = x[:, -1, :]
        x = self.fc(x)
        x = self.softmax(x)
        return x

model = NextWordPredictor(total_words, 200, 256, total_words)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [18]:
from sklearn.model_selection import GridSearchCV
from sklearn.base import BaseEstimator, ClassifierMixin
class PyTorchModel(BaseEstimator, ClassifierMixin):
    def __init__(self, vocab_size, embed_dim=100, hidden_dim=150, output_dim=None, lr=0.001, epochs=10):
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.lr = lr
        self.epochs = epochs
        self.model = NextWordPredictor(vocab_size, embed_dim, hidden_dim, output_dim)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)

    def fit(self, X, y):
        dataset = TextDataset(X, y)
        train_size = int(0.9 * len(dataset))
        val_size = len(dataset) - train_size
        train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
        train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        self.model.train()
        for epoch in range(self.epochs):
            for inputs, labels in train_dataloader:
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels.argmax(dim=1))
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
        return self

    def predict(self, X):
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(torch.tensor(X, dtype=torch.long))
            return outputs.argmax(dim=1).numpy()

    def score(self, X, y):
        self.model.eval()
        correct_predictions = 0
        total_predictions = 0
        with torch.no_grad():
            outputs = self.model(torch.tensor(X, dtype=torch.long))
            predicted = outputs.argmax(dim=1)
            actual = torch.tensor(y.argmax(axis=1), dtype=torch.long)
            correct_predictions = (predicted == actual).sum().item()
            total_predictions = len(y)
        return correct_predictions / total_predictions

# Define the parameter grid
param_grid = {
    'embed_dim': [50, 100, 200],
    'hidden_dim': [128, 256, 512],
    'lr': [0.001, 0.0001],
    'epochs': [50, 100, 150]
}

# Perform grid search
grid_search = GridSearchCV(PyTorchModel(vocab_size=total_words, output_dim=total_words), param_grid, cv=3, scoring='accuracy')
grid_search.fit(X, y)

print("Best parameters found: ", grid_search.best_params_)
print("Best score: ", grid_search.best_score_)

# After finding the best parameters, you can train the final model with them
best_params = grid_search.best_params_
best_model = PyTorchModel(vocab_size=total_words, output_dim=total_words, **best_params)
best_model.fit(X, y)

In [13]:
# Train the model
epochs = 2
for epoch in range(epochs):
    model.train()
    for inputs, labels in train_dataloader:
        outputs = model(inputs)
        loss = criterion(outputs, labels.argmax(dim=1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')

torch.save(model.state_dict(), "model_state.pth")

Epoch 1/2, Loss: 8.98869514465332
Epoch 2/2, Loss: 9.038681030273438


In [14]:
# Evaluate the model on the validation set
model.eval()
correct_predictions = 0
total_predictions = 0

with torch.no_grad():
    for inputs, labels in val_dataloader:
        outputs = model(inputs)
        predicted = outputs.argmax(dim=1)
        actual = labels.argmax(dim=1)
        correct_predictions += (predicted == actual).sum().item()
        total_predictions += labels.size(0)

accuracy = correct_predictions / total_predictions
print(f'Validation Accuracy: {accuracy:.4f}')

perplexity  = torch.exp(loss)
print('Loss:', loss, 'PP:', perplexity)

Validation Accuracy: 0.0531
Loss: tensor(9.0387, grad_fn=<NllLossBackward0>) PP: tensor(8422.6602, grad_fn=<ExpBackward0>)


In [None]:
import time

# Evaluate the model on the validation set
model.eval()
correct_predictions = 0
total_predictions = 0

with torch.no_grad():
    for inputs, labels in val_dataloader:
        outputs = model(inputs)
        predicted = outputs.argmax(dim=1)
        actual = labels.argmax(dim=1)

        # Display results
        for i in range(len(inputs)):
            input_sentence = " ".join([index_to_word[idx.item()] for idx in inputs[i] if idx.item() != 0])
            predicted_word = index_to_word[predicted[i].item()]
            actual_word = index_to_word[actual[i].item()]
            correct = predicted[i].item() == actual[i].item()
            print(f"Sentence: {input_sentence}")
            print(f"Predicted: {predicted_word}")
            print(f"Actual: {actual_word}")
            print(f"Correct: {correct}")
            print()

            if correct:
                correct_predictions += 1
            total_predictions += 1

            time.sleep(3)  # Wait for 1 second between predictions

accuracy = correct_predictions / total_predictions
print(f'Validation Accuracy: {accuracy:.4f}')

In [15]:
# Generate predictions
def predict_next_words(model, tokenizer, seed_text, next_words):
    for _ in range(next_words):
        token_list = [word_to_index[word] for word in tokenize(seed_text)]
        token_list = np.pad(token_list, (max_sequence_len - len(token_list), 0), mode='constant')
        token_list = torch.tensor(token_list[-max_sequence_len+1:], dtype=torch.long).unsqueeze(0)

        with torch.no_grad():
            predicted = model(token_list).argmax(dim=1).item()

        output_word = index_to_word[predicted]
        seed_text += " " + output_word

    return seed_text

seed_text = "Scotland"
next_words = 1
print(predict_next_words(model, word_to_index, seed_text, next_words))

Scotland the
