# LSTM

## PREPROCESSING

In [1]:
from google.colab import files
import shutil
import re
import random

import torch
import torchtext
from nltk.corpus import brown
from collections import Counter


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# # preprocessing done on vs code, then uploaded cleaned corpus to colab

# # Define a function to clean and tokenize the text
# def clean_and_tokenize_text(input_file, output_file):
#     with open(input_file, 'r', encoding='utf-8') as file:
#         text = file.read()

#     # Tokenize the text into sentences
#     sentences = nltk.sent_tokenize(text)

#     # Filter and clean sentences
#     cleaned_sentences = []
#     for sentence in sentences:
#         # Remove extra spaces and tabs, and convert to lowercase
#         cleaned_sentence = ' '.join(sentence.split()).strip().lower()

#         # Check if the sentence contains alphanumeric characters
#         if any(c.isalnum() for c in cleaned_sentence):
#             # Check if the sentence contains the word "Chapter"
#             if "chapter" not in cleaned_sentence:
#                 # Add the cleaned sentence to the list
#                 cleaned_sentences.append(cleaned_sentence)

#     # Write cleaned sentences to the output file
#     with open(output_file, 'w', encoding='utf-8') as file:
#         for sentence in cleaned_sentences:
#             file.write(sentence + '\n')


# # Input and output file paths
# input_file = 'Auguste_Maquet.txt'
# output_file = 'cleaned_auguste_maquet.txt'

# # Clean and tokenize the text
# clean_and_tokenize_text(input_file, output_file)

# print(f'Cleaned sentences have been saved to {output_file}')


In [3]:
# Specify the path to the file in your Google Drive
file_path = "/content/drive/My Drive/cleaned_auguste_maquet.txt"

# Initialize a list to store the sentences
content = []

# Read the file line by line
with open(file_path, "r") as file:
    for line in file:
        # Remove leading and trailing whitespace and append the line as a sentence
        sentence = line.strip()
        content.append(sentence)

# Display the content
# print(content)

In [4]:
# Split sizes
validation_size = 10000
test_size = 20000

# Randomly shuffle the sentences
# random.shuffle(filtered_content)

# Split the data
validation_set_text = content[:validation_size]
test_set_text = content[validation_size:validation_size + test_size]
train_set = content
print(len(validation_set_text))
print(len(test_set_text))
print(len(train_set))


# for running perp against train set lets take 10 k sentences
train_sents_perp = content[:10000]

10000
20000
34860


## DATA

In [6]:
import torch
from torch.utils.data import Dataset, DataLoader, Subset
import spacy
from torchtext.vocab import GloVe
import torch.nn as nn
import torch.optim as optim
import math
nlp = spacy.load("en_core_web_sm")


In [17]:
# Tokenize the sentences using spaCy (you can use other tokenizers as well)
# nlp = spacy.load("en_core_web_sm")
tokenized_sentences = [token.text for sentence in content for token in nlp(sentence)]

# Flatten the list of tokens
corpus = tokenized_sentences

# Build the vocabulary (mapping from tokens to indices)
vocab = {token: idx for idx, token in enumerate(set(corpus))}


In [7]:
class LMData(Dataset):
    def __init__(self, corpus_tokens, vocab, seq_len):
        self.seq_len = seq_len
        self.vocab = vocab
        self.inputs, self.targets = self.process_corpus(corpus_tokens)

    def process_corpus(self, corpus_tokens):
        # Convert tokens to indices using the vocab
        indices = [self.vocab[token] for token in corpus_tokens]

        # Prepare input and target sequences
        inputs = []
        targets = []
        for i in range(0, len(indices) - self.seq_len, self.seq_len):
            inputs.append(indices[i:i + self.seq_len])
            targets.append(indices[i + self.seq_len])

        return inputs, targets

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        input_seq = torch.LongTensor(self.inputs[idx])
        target_token = torch.LongTensor([self.targets[idx]])
        return input_seq, target_token


In [8]:
# Set the sequence length
seq_len = 5

# Create the custom dataset
total_dataset = LMData(corpus, vocab, seq_len)


In [31]:
# Split the dataset into training, validation, and test sets
def create_train_val_test(dataset, split_val, split_test):
    total_size = len(dataset)
    val_size = int(total_size * split_val)
    test_size = int(total_size * split_test)
    train_size = total_size - val_size - test_size
    train_set, val_set, test_set = torch.utils.data.random_split(
        dataset, [train_size, val_size, test_size]
    )
    return train_set, val_set, test_set

In [33]:
split_val = 0.1  # Validation set size
split_test = 0.1  # Test set size
train_set, val_set, test_set = create_train_val_test(total_dataset, split_val, split_test)


In [34]:
# Create DataLoaders for training, validation, and test sets
batch_size = 16  # Adjust as needed
trn_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)


In [35]:
# Printing shapes of the first batch from each DataLoader
for name, loader in [("Train", trn_loader), ("Validation", val_loader), ("Test", test_loader)]:
    inputs, targets = next(iter(loader))
    print(f"{name} Input Shape: {inputs.shape}, Target Shape: {targets.shape}")


Train Input Shape: torch.Size([16, 5]), Target Shape: torch.Size([16, 1])
Validation Input Shape: torch.Size([16, 5]), Target Shape: torch.Size([16, 1])
Test Input Shape: torch.Size([16, 5]), Target Shape: torch.Size([16, 1])


In [36]:
# Accessing a sample from the dataset
sample_idx = 0  # Change this index to access different samples
sample_input, sample_target = total_dataset[sample_idx]

# Print the sample
print("Sample Input:", sample_input)
print("Sample Target:", sample_target)


Sample Input: tensor([17048,  8396, 10232,  7111, 20323])
Sample Target: tensor([10923])


## MODEL

In [23]:
global_vectors = GloVe(name='6B', dim=100)


.vector_cache/glove.6B.zip: 862MB [02:41, 5.33MB/s]                           
100%|█████████▉| 399999/400000 [00:26<00:00, 15314.52it/s]


In [19]:
# Define a function to create the embedding matrix
def create_embedding_matrix(vocab, global_vectors, embedding_dim):
    num_tokens = len(vocab)
    embedding_matrix = torch.randn(num_tokens, embedding_dim)

    for token, idx in vocab.items():
        if token in global_vectors.stoi:
            embedding_matrix[idx] = global_vectors[token]

    # Check if '<unk>' is in vocab; if not, add it
    unk_idx = vocab.get('<unk>')
    if unk_idx is None:
        unk_idx = len(vocab)  # Assign a new index
        vocab['<unk>'] = unk_idx
        embedding_matrix = torch.cat([embedding_matrix, torch.randn(1, embedding_dim)], dim=0)  # Add a random vector for '<unk>'

    return embedding_matrix


In [20]:
# Define the LSTMCell class
class LSTMCell(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(LSTMCell, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTMCell(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, input, hidden):
        embedded = self.embedding(input)
        hx, cx = self.lstm(embedded, hidden)
        output = self.fc(hx)
        return output, (hx, cx)


In [21]:
# Define the LSTM class
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers):
        super(LSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, input):
        embedded = self.embedding(input)
        output, _ = self.lstm(embedded)
        output = self.fc(output[:, -1, :])
        return output


In [24]:
# Hyperparameters
vocab_size = len(vocab)
embedding_dim = 100  # Adjust as needed
hidden_dim = 256  # Adjust as needed
output_dim = len(vocab)
n_layers = 2
learning_rate = 0.001
epochs = 10

# Initialize the LSTM model
embedding_matrix = create_embedding_matrix(vocab, global_vectors, embedding_dim)

model = LSTM(embedding_matrix.size(0), embedding_dim, hidden_dim, output_dim, n_layers)


In [25]:
# Load the pre-trained embedding weights into the model's embedding layer
model.embedding.weight.data.copy_(embedding_matrix)
model.embedding.weight.requires_grad = False  # Freeze the pre-trained embeddings


In [26]:
# Move the model to a specified device (e.g., GPU if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Initialize the loss criterion (CrossEntropyLoss) and optimizer (Adam)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [25]:
# Create DataLoader instances for training and validation
batch_size = 64  # Adjust as needed
train_set, val_set, test_set = create_train_val_test(total_dataset, 0.1, 0.1)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size)

# Training loop
lowest_val_loss = float('inf')
m2_losses = {'train_loss': [], 'val_loss': []}


In [26]:
# Training loop

for epoch in range(epochs):
    model.train()
    train_loss = 0.0

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)  # Move data to the same device as the model
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.squeeze())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    # Validation loop
    model.eval()
    val_loss = 0.0

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)  # Move data to the same device as the model
            outputs = model(inputs)
            loss = criterion(outputs, targets.squeeze())
            val_loss += loss.item()

    train_loss /= len(train_loader)
    val_loss /= len(val_loader)
    m2_losses['train_loss'].append(train_loss)
    m2_losses['val_loss'].append(val_loss)

    # Save the model if validation loss is lower than the lowest so far
    if val_loss < lowest_val_loss:
        lowest_val_loss = val_loss
        torch.save(model.state_dict(), 'lstm_model.pt')

    print(f'Epoch [{epoch+1}/{epochs}] | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}')


Epoch [1/10] | Train Loss: 5.7433 | Val Loss: 5.2504
Epoch [2/10] | Train Loss: 4.9438 | Val Loss: 5.0098
Epoch [3/10] | Train Loss: 4.6128 | Val Loss: 4.9895
Epoch [4/10] | Train Loss: 4.3506 | Val Loss: 5.0253
Epoch [5/10] | Train Loss: 4.1098 | Val Loss: 5.0997
Epoch [6/10] | Train Loss: 3.8805 | Val Loss: 5.2106
Epoch [7/10] | Train Loss: 3.6602 | Val Loss: 5.3541
Epoch [8/10] | Train Loss: 3.4569 | Val Loss: 5.4913
Epoch [9/10] | Train Loss: 3.2669 | Val Loss: 5.5996
Epoch [10/10] | Train Loss: 3.0906 | Val Loss: 5.7658


In [27]:
# Define the path to save the model in your Google Drive
model_path = "/content/drive/My Drive/model_1b_lstm.pt"
torch.save(model.state_dict(), model_path)

print(f"Model saved to Google Drive at: {model_path}")

Model saved to Google Drive at: /content/drive/My Drive/model_1b_lstm.pt


In [10]:
import os

# Specify the path to your model directory on Google Drive
model_dir = '/content/drive/My Drive'

# Change the current working directory to the model directory
os.chdir(model_dir)


In [11]:
model_path = "/content/drive/My Drive/model_1b_lstm.pt"

model = torch.load('model_1b_lstm.pt')


## test and perplexity

In [7]:
import nltk
nltk.download('punkt')
from nltk.tokenize import word_tokenize


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [41]:
print(test_set_text[123])

section 3.


In [43]:
# Tokenize and clean the test sentences
test_tokens = [word_tokenize(sentence.lower()) for sentence in test_set_text]


In [13]:
test_tokens = [word_tokenize(sentence.lower()) for sentence in train_set]


In [14]:
def create_ngrams(tokens, n):
    ngrams = []
    for sentence_tokens in tokens:
        for i in range(len(sentence_tokens) - n + 1):
            ngram = sentence_tokens[i:i + n]
            ngrams.append(ngram)
    return ngrams

n = 5  # 5-grams
test_ngrams = create_ngrams(test_tokens, n)


In [15]:
import torch.nn.functional as F

def calculate_perplexity(model, ngrams, vocab):
    perplexity_scores = []
    total_ngrams = len(ngrams)

    for i, ngram in enumerate(ngrams):
        ngram_indices = [vocab.get(token, vocab['<unk>']) for token in ngram]
        input_seq = torch.LongTensor(ngram_indices).unsqueeze(0).to(device)
        with torch.no_grad():
            outputs = model(input_seq)
        probabilities = F.softmax(outputs, dim=1).squeeze()
        log_probabilities = torch.log(probabilities)
        perplexity = torch.exp(-torch.mean(log_probabilities)).item()
        perplexity_scores.append(perplexity)

        # Calculate and print the completion percentage
        completion_percentage = (i + 1) / total_ngrams * 100
        print(f'Completed: {completion_percentage:.2f}%')

    return perplexity_scores


In [27]:
# Calculate perplexity scores for the test set
test_perplexity_scores = calculate_perplexity(model, test_ngrams, vocab)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%
Completed: 99.52%

In [29]:
output_file = '/content/drive/My Drive/perp_1b_train.txt'

with open(output_file, 'w') as f:
    for sentence, perplexity in zip(train_set, test_perplexity_scores):
        f.write(f'Sentence: {sentence}\n')
        f.write(f'Perplexity: {perplexity:.4f}\n\n')


In [30]:
# Calculate and write the average perplexity score
average_perplexity = sum(test_perplexity_scores) / len(test_perplexity_scores)

with open(output_file, 'a') as f:
    f.write(f'Average Perplexity: {average_perplexity:.4f}')
print(average_perplexity)

22424.7169986074
