#### Import Libraries

In [1]:
import nltk
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torch.optim import Adam
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from sklearn.model_selection import train_test_split
from gensim.models import Word2Vec
import multiprocessing
from torch.optim.lr_scheduler import ReduceLROnPlateau
import re
import pandas as pd
import spacy
from spacy.tokenizer import Tokenizer
from bs4 import BeautifulSoup
import string
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import words

#### Setup NLTK and Text Data Loading

In [2]:
nltk.download('punkt')
nltk.download('stopwords')

def load_text(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        text = file.read().lower()
    return text

text_data = load_text("outs.txt")

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\HP\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\HP\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


#### Data Preprocessing and Tokenization

In [3]:
# Download the words corpus
nltk.download('words')

# Load the English words corpus
english_words = set(words.words())

def clean_text_and_split(text):
    # Remove newline characters
    cleaned_text = text.replace('\n', ' ')
    # Remove extra spaces
    cleaned_text = ' '.join(cleaned_text.split())
    # Add period to the end of sentences
    cleaned_text = re.sub(r'(?<=[a-zA-Z0-9])\n(?=[A-Z])', '. ', cleaned_text)
    # Add period to the end of text
    cleaned_text = re.sub(r'(?<=[a-zA-Z0-9])\n*$', '.', cleaned_text)
    # Remove punctuation and symbols
    cleaned_text = re.sub(r'[^\w\s]', ' ', cleaned_text)
    # Remove multiple spaces
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
    # Split the cleaned text into words
    tokens = word_tokenize(cleaned_text)
    # Spell check and filter out non-English words
    corrected_tokens = [word for word in tokens if word.lower() in english_words]
    
    return corrected_tokens

token = clean_text_and_split(text_data)

[nltk_data] Downloading package words to
[nltk_data]     C:\Users\HP\AppData\Roaming\nltk_data...
[nltk_data]   Package words is already up-to-date!


In [4]:
token

['text',
 'a',
 'b',
 'high',
 'performance',
 'was',
 'a',
 'competition',
 'pistol',
 'manufacturer',
 'included',
 'the',
 'limited',
 'class',
 'and',
 'open',
 'class',
 'semi',
 'automatic',
 'both',
 'available',
 'in',
 's',
 'w',
 'and',
 'a',
 'b',
 'sold',
 'directly',
 'to',
 'external',
 'links',
 'category',
 'defunct',
 'category',
 'defunct',
 'based',
 'in',
 'a',
 'c',
 'black',
 'is',
 'a',
 'book',
 'company',
 'since',
 'by',
 'the',
 'company',
 'is',
 'noted',
 'for',
 'who',
 's',
 'who',
 'since',
 'and',
 'the',
 'encyclopedia',
 'between',
 'and',
 'it',
 'a',
 'wide',
 'variety',
 'of',
 'in',
 'fiction',
 'and',
 'nonfiction',
 'and',
 'popular',
 'travel',
 'and',
 'science',
 'history',
 'the',
 'firm',
 'was',
 'in',
 'by',
 'and',
 'black',
 'in',
 'in',
 'the',
 'company',
 'the',
 'to',
 'sir',
 'walter',
 's',
 'for',
 'the',
 'company',
 'to',
 'the',
 'soho',
 'district',
 'of',
 'in',
 'during',
 'the',
 'the',
 'firm',
 'the',
 'seventh',
 'eight

In [5]:
len(token)

421316

#### In this step, we generate a vocabulary by collecting unique tokens from the tokenized text data. Each word in the vocabulary is then assigned an index to facilitate numerical representation. These indices are used to map between words and their corresponding numerical identifiers.

In [6]:
vocabulary = sorted(set(token))
word_to_index = {word: idx for idx, word in enumerate(vocabulary)}
index_to_word = {idx: word for word, idx in word_to_index.items()}
vocab_size = len(vocabulary)
output_size = vocab_size


#### Train a Word2Vec model on the tokenized text data (token). The trained model is then saved for future use.

In [7]:
# Train Word2Vec model
word2vec_model = Word2Vec(sentences=[token], vector_size=100, window=2, min_count=1, workers=multiprocessing.cpu_count())

# Save the trained Word2Vec model for later use
word2vec_model.save("word2vec_model.bin")

embedding_dim = word2vec_model.vector_size

#### Generate training sequences by sliding a window of length sequence_length (10) over the tokenized text (tokens). Each sequence consists of sequence_length tokens as input and the next token as the target. 

In [8]:
def create_training_sequences(tokens, sequence_length=10):
    sequences = []
    for i in range(len(tokens) - sequence_length):
        seq = tokens[i:i + sequence_length + 1]
        sequences.append(seq)
    return sequences

train_seq = create_training_sequences(token)

#### The second function, sequence_to_embeddings, converts the generated sequences into embedded representations using the pre-trained Word2Vec model (word2vec_model). It retrieves word embeddings for each token in the sequences, filtering out tokens not present in the Word2Vec model's vocabulary.

In [9]:
def sequence_to_embeddings(sequences, word2vec_model):
    embedded_sequences = []
    for sequence in sequences:
        embedded_seq = [word2vec_model.wv[word] for word in sequence if word in word2vec_model.wv]
        embedded_sequences.append(embedded_seq)
    return embedded_sequences

embedded_seq = sequence_to_embeddings(train_seq, word2vec_model)

#### Define a LSTM model for sequence prediction tasks. The model consists of multiple layers: an LSTM layer with num_layers stacked LSTM cells, followed by a sequence of fully connected (dense) layers (fc_layers) with ReLU activation functions. The final output layer produces predictions for the next token in the sequence. The dropout layer is applied to prevent overfitting during training.

In [10]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2, num_fc_layers=2, fc_hidden_size=64):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layers
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        
        # Dropout layer
        self.dropout = nn.Dropout(p=0.2)
        
        # Fully connected layers
        self.fc_layers = nn.ModuleList([nn.Linear(hidden_size, fc_hidden_size)])
        for _ in range(num_fc_layers - 1):
            self.fc_layers.append(nn.Linear(fc_hidden_size, fc_hidden_size))
        
        # Final output layer
        self.output_layer = nn.Linear(fc_hidden_size, output_size)
    
    def forward(self, x):
        # Initialize hidden state and cell state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # LSTM layer
        lstm_out, _ = self.lstm(x, (h0, c0))
        
        # Only take the output from the final timestep
        x = self.dropout(lstm_out[:, -1, :])
        
        # Fully connected layers
        for fc_layer in self.fc_layers:
            x = F.relu(fc_layer(x))
        
        # Final output layer
        x = self.output_layer(x)
        
        return x

#### Calculate the accuracy of the model predictions given the model outputs and the corresponding ground truth labels. Compare the predicted labels with the actual labels to count the number of correct predictions. Finally, compute the accuracy by dividing the number of correct predictions by the total number of samples.

In [13]:
def calculate_accuracy(outputs, labels):
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    accuracy = correct / total
    return accuracy

#### Prepare the Predictor and Target for training the model. We convert the embedded sequences (embedded_seq) into PyTorch tensors (X) of data and their corresponding labels (y). The labels represent the index of the next word in each sequence. We split the data into training and testing sets using the train_test_split function, allocating 20% of the data for testing and the remaining 80% for training.

In [11]:
hidden_size = 64  # LSTM units

X = torch.tensor(embedded_seq, dtype=torch.float32)
y = torch.tensor([word_to_index.get(seq[-1], 0) for seq in train_seq], dtype=torch.long)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

  X = torch.tensor(embedded_seq, dtype=torch.float32)


#### Initialize the LSTM model (model) with the specified input size, hidden size, and output size. We also define the optimizer (optimizer) using the Adam optimizer with a learning rate of 0.001. Additionally, we specify the loss function (criterion) as the CrossEntropyLoss, which is commonly used for classification tasks.

In [12]:
model = LSTMModel(input_size=embedding_dim, hidden_size=hidden_size, output_size=output_size)
optimizer = Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

  _torch_pytree._register_pytree_node(


#### Train the LSTM model using the specified train_loader for a given number of epochs (num_epochs). It iterates through the data loader, calculates the loss and accuracy for each batch, performs backpropagation, and updates the model parameters using the optimizer. It also implements early stopping based on the validation loss, with a patience parameter to control the number of epochs to wait for improvement. Additionally, it utilizes a learning rate scheduler to adjust the learning rate during training based on the validation loss.

In [14]:
# Add a learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)

def train_model(model, train_loader, criterion, optimizer, scheduler, num_epochs=12):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    patience = 5
    best_loss = float('inf')
    epochs_no_improve = 0
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        running_accuracy = 0.0
        total_samples = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
            # Calculate accuracy
            accuracy = calculate_accuracy(outputs, labels)
            running_accuracy += accuracy * inputs.size(0)
            total_samples += inputs.size(0)

        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = running_accuracy / total_samples
        print(f'Epoch {epoch+1} Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')
        
        # Early stopping
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            best_accuracy = epoch_accuracy
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
        
        if epochs_no_improve == patience:
            print('Early stopping!')
            break
            
        # Step the scheduler
        scheduler.step(epoch_loss)

    print(f'Best Loss: {best_loss:.4f}, Best Accuracy: {best_accuracy:.4f}')



#### Prepare the training dataset by creating a TensorDataset from the training features (X_train) and labels (y_train). Then, create a DataLoader object called train_loader to iterate over the training data in batches during model training. The batch_size parameter specifies the number of samples in each batch, and shuffle=True indicates that the data will be shuffled before creating batches to improve training efficiency and model generalization. Train the model after this.

In [15]:
batch_size = 32
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [16]:
# Train the model with the learning rate scheduler
train_model(model, train_loader, criterion, optimizer, scheduler)

Epoch 1 Loss: 4.5109, Accuracy: 0.2851
Epoch 2 Loss: 3.7365, Accuracy: 0.3720
Epoch 3 Loss: 3.5140, Accuracy: 0.3973
Epoch 4 Loss: 3.3521, Accuracy: 0.4134
Epoch 5 Loss: 3.2351, Accuracy: 0.4232
Epoch 6 Loss: 3.1403, Accuracy: 0.4321
Epoch 7 Loss: 3.0676, Accuracy: 0.4387
Epoch 8 Loss: 3.0080, Accuracy: 0.4447
Epoch 9 Loss: 2.9552, Accuracy: 0.4491
Epoch 10 Loss: 2.9000, Accuracy: 0.4580
Epoch 11 Loss: 2.8530, Accuracy: 0.4640
Epoch 12 Loss: 2.8134, Accuracy: 0.4713
Best Loss: 2.8134, Best Accuracy: 0.4713


#### Evaluate the performance of the trained LSTM model on the test dataset. It iterates over batches of test data using the provided test_loader and computes the average loss and accuracy for the entire test dataset. The model is set to evaluation mode (model.eval()) to disable dropout and batch normalization layers during evaluation. Finally, it prints the average test loss and accuracy.

In [17]:
def evaluate_model(model, test_loader, criterion):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()  # Set the model to evaluation mode
    test_loss = 0.0
    test_accuracy = 0.0
    total_samples = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            
            # Calculate accuracy
            accuracy = calculate_accuracy(outputs, labels)
            test_accuracy += accuracy * inputs.size(0)
            total_samples += inputs.size(0)
    
    avg_test_loss = test_loss / len(test_loader)
    avg_test_accuracy = test_accuracy / total_samples
    
    print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {avg_test_accuracy:.4f}')

# Create test dataset and data loader
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Evaluate the model
evaluate_model(model, test_loader, criterion)

Test Loss: 3.5068, Test Accuracy: 0.4586


#### Generate the top 6 predicted next words given a seed text using the trained model.

In [67]:
def predict_next_words(seed_text, model, word_to_index, index_to_word, word2vec_model, sequence_length, top_n=6):
    seed_tokens = seed_text.split()
    embedded_tokens = [word2vec_model.wv[word] for word in seed_tokens if word in word2vec_model.wv]
    # Pad the sequence if needed
    if len(embedded_tokens) < sequence_length:
        padding = [np.zeros(word2vec_model.vector_size)] * (sequence_length - len(embedded_tokens))
        embedded_tokens = padding + embedded_tokens
    token_tensor = torch.tensor([embedded_tokens], dtype=torch.float32)
    device = next(model.parameters()).device
    
    token_tensor = token_tensor.to(device)

    with torch.no_grad():
        model.eval()
        output = model(token_tensor)
        probabilities = F.softmax(output, dim=1)
        top_probabilities, top_indices = torch.topk(probabilities, top_n, dim=1)

    top_words = [index_to_word[index.item()] for index in top_indices[0]]
    top_probabilities = top_probabilities.squeeze().tolist()

    return list((top_words))

# Example usage
sample_input = "I am going to University of Arizona to study"
top_predicted_words = predict_next_words(sample_input, model, word_to_index, index_to_word, word2vec_model, sequence_length=10, top_n=6)

print("Top predicted next words:", top_predicted_words)

Top predicted next words: ['comedy', 'how', 'war', 'smith', 'light', 'wife']


#### Define a Vanilla RNN model architecture. Initialize the model with specified input_size, hidden_size, and output_size. Then, define the loss function as Cross Entropy Loss and the optimizer as Adam. Additionally, a learning rate scheduler is defined to adjust the learning rate during training. Finally, reduce the learning rate if the validation loss does not improve for the specified patience.

In [19]:
# Define Vanilla RNN model
class VanillaRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(VanillaRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # Initialize hidden state
        h0 = torch.zeros(1, x.size(0), self.hidden_size).to(x.device)
        
        # Forward propagate RNN
        out, _ = self.rnn(x, h0)
        
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out 

# Define model parameters
embedding_dim = word2vec_model.vector_size
hidden_size = 64
output_size = vocab_size

# Initialize the model
model_rnn = VanillaRNN(input_size=embedding_dim, hidden_size=hidden_size, output_size=output_size)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model_rnn.parameters(), lr=0.001)

# Define learning rate scheduler
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2, verbose=True)

# Define DataLoader for training set
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Train the model
for epoch in range(12):  # Train for 10 epochs as an example
    model_rnn.train()  # Set the model to training mode
    total_correct = 0
    total_samples = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model_rnn(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        # Calculate training accuracy
        _, predicted = torch.max(outputs, 1)
        total_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)
    
    # Print training accuracy and loss for every epoch
    train_accuracy = total_correct / total_samples
    print(f'Epoch {epoch+1}, Loss: {loss.item()}, Training Accuracy: {train_accuracy}')
    
    # Reduce learning rate if validation loss does not improve for patience number of epochs
    scheduler.step(loss.item())


Epoch 1, Loss: 4.625983238220215, Training Accuracy: 0.25695754856932623
Epoch 2, Loss: 2.9773545265197754, Training Accuracy: 0.5054918645636771
Epoch 3, Loss: 0.5300394296646118, Training Accuracy: 0.7116756269211142
Epoch 4, Loss: 0.2931666374206543, Training Accuracy: 0.842076998848815
Epoch 5, Loss: 0.14371295273303986, Training Accuracy: 0.9033657326639846
Epoch 6, Loss: 0.401125431060791, Training Accuracy: 0.9350707919440785
Epoch 7, Loss: 1.1446062326431274, Training Accuracy: 0.9531693191393409
Epoch 8, Loss: 0.7587936520576477, Training Accuracy: 0.9633193292270446
Epoch 9, Loss: 0.1993234008550644, Training Accuracy: 0.9722439800144789
Epoch 10, Loss: 0.10240236669778824, Training Accuracy: 0.9728403413204211
Epoch 11, Loss: 0.20833894610404968, Training Accuracy: 0.9730836329974721
Epoch 12, Loss: 0.07712671905755997, Training Accuracy: 0.9732824200994529


#### Evaluate the Vanilla RNN model on the test set. Initialize a DataLoader for the test set with a batch size of 32. Set the model to evaluation mode using model_rnn.eval(). Within a no_grad context, it iterates over the test loader, calculates the loss and accuracy for each batch, and aggregates them over the entire test set. Print the average test loss and test accuracy.

In [23]:
# Define DataLoader for test set
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32)

# Evaluate the model on the test set
model_rnn.eval()
test_loss = 0
total_correct = 0
total_samples = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model_rnn(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * inputs.size(0)  # Multiply by batch size
        _, predicted = torch.max(outputs, 1)
        total_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)

# Calculate test accuracy
test_accuracy = total_correct / total_samples

# Calculate average test loss
avg_test_loss = test_loss / len(test_loader.dataset)

# Print test loss and accuracy
print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

Test Loss: 0.5311, Test Accuracy: 0.9597


#### Generate the top 6 predicted next words given a seed text using the trained model.

In [30]:
def predict_next_word(input_text, model, word2vec_model, word_to_index, index_to_word, top_k=6):
    # Tokenize input text
    input_tokens = word_tokenize(input_text.lower())
    
    # Convert tokens to embeddings
    input_embeddings = [word2vec_model.wv[word] for word in input_tokens if word in word2vec_model.wv]
    
    # Convert embeddings to PyTorch tensor
    input_tensor = torch.tensor([input_embeddings], dtype=torch.float32)
    
    # Get predictions from the model
    model.eval()
    with torch.no_grad():
        outputs = model(input_tensor)
    
    # Get top k predictions
    _, top_indices = torch.topk(outputs, top_k)
    
    # Convert indices to words
    predicted_words = [index_to_word[idx.item()] for idx in top_indices[0]]
    
    return predicted_words

In [66]:
input_text = "I am going to University of Arizona to study"
predicted_words = predict_next_word(input_text, model_rnn, word2vec_model, word_to_index, index_to_word, top_k=6)
print("Predicted words:", predicted_words)

Predicted words: ['study', 'guardian', 'overcome', 'marry', 'behave', 'invite']
