#Using BPE.

In [1]:
import string
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
import copy
import os

# Additional imports for BPE
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

In [2]:
def enhanced_clean_text(text):
    # Step 1: Find the actual start of the content (e.g., "Chapter 1")
    start_marker = "BOOK ONE: 1805"
    start = text.lower().find(start_marker.lower())
    if start != -1:
        text = text[start:]

    # Step 2: Remove URLs and metadata
    text = ' '.join([word for word in text.split() if not word.startswith('http')])

    # Step 3: Remove special formatting markers and transcriber notes
    text = text.replace('_', '')  # Remove underscores
    text = text.replace('^', '')  # Remove caret markers
    text = text.replace('{', '').replace('}', '')  # Remove curly brackets

    # Step 4: Remove punctuation, numbers, and convert to lowercase
    text = text.translate(str.maketrans("", "", string.punctuation))  # Remove punctuation
    text = ''.join([char for char in text if char.isalpha() or char.isspace()])  # Remove numbers
    text = text.lower()  # Convert to lowercase

    # Step 5: Remove extra spaces
    text = ' '.join(text.split())  # Remove redundant spaces

    return text

# Load raw text file
with open("War and Peace.txt", "r", encoding="utf-8") as file:
    raw_text = file.read()

# Apply the cleaning function
cleaned_text = enhanced_clean_text(raw_text)

# Output the first 500 characters of the cleaned text
print(cleaned_text[:500])


war and peace by leo tolstoytolstoi contents book one chapter i chapter ii chapter iii chapter iv chapter v chapter vi chapter vii chapter viii chapter ix chapter x chapter xi chapter xii chapter xiii chapter xiv chapter xv chapter xvi chapter xvii chapter xviii chapter xix chapter xx chapter xxi chapter xxii chapter xxiii chapter xxiv chapter xxv chapter xxvi chapter xxvii chapter xxviii book two chapter i chapter ii chapter iii chapter iv chapter v chapter vi chapter vii chapter viii chapter i


In [3]:
# Build a BPE tokenizer
tokenizer = Tokenizer(BPE(unk_token="<unk>"))
trainer = BpeTrainer(
    vocab_size=25000,
    min_frequency=2,
    special_tokens=["<unk>", "<pad>", "<s>", "</s>"]
)
tokenizer.pre_tokenizer = Whitespace()

# Train the tokenizer
tokenizer.train_from_iterator([cleaned_text], trainer)

# Tokenize the text
encoded = tokenizer.encode(cleaned_text)
text_as_int = np.array(encoded.ids)

# Define sequence length and prepare input-output pairs
sequence_length = 100
sequences = []
targets = []

for i in range(len(text_as_int) - sequence_length):
    sequences.append(text_as_int[i:i + sequence_length])
    targets.append(text_as_int[i + sequence_length])

sequences = np.array(sequences)
targets = np.array(targets)

In [4]:
tokenizer.get_vocab_size()

10000

In [5]:
# Custom dataset class
class TextDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = torch.tensor(sequences, dtype=torch.long)
        self.targets = torch.tensor(targets, dtype=torch.long)

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

# Split data into training and validation sets
train_size = int(0.9 * len(sequences))
val_size = len(sequences) - train_size

train_dataset = TextDataset(sequences[:train_size], targets[:train_size])
val_dataset = TextDataset(sequences[train_size:], targets[train_size:])

In [6]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    """
    Train the given model and visualize training and validation loss over epochs.

    Args:
        model: The PyTorch model to train.
        train_loader: DataLoader for training data.
        val_loader: DataLoader for validation data.
        criterion: Loss function.
        optimizer: Optimizer for training.
        num_epochs: Number of training epochs.
        device: Device to train on (e.g., "cpu" or "cuda").

    Returns:
        model: Trained model.
        best_model_state_dict: State dict of the best model based on validation loss.
        train_losses: List of training losses for each epoch.
        val_losses: List of validation losses for each epoch.
    """
    import copy
    # Move model to the device
    model.to(device)

    # Calculate the number of trainable parameters
    total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total number of trainable parameters in the model: {total_params}")

    # Lists to store losses
    train_losses = []
    val_losses = []

    # Initialize the best validation loss to a large value
    best_val_loss = float('inf')
    best_model_state_dict = None

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass
            outputs, _ = model(inputs)
            loss = criterion(outputs, targets)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # Validation loop
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs, _ = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item()

        # Calculate average losses for the epoch
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)

        # Store losses
        train_losses.append(train_loss)
        val_losses.append(val_loss)

        # Check if this is the best validation loss so far
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state_dict = copy.deepcopy(model.state_dict())

        print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

    # Save the final trained model
    model_path = "lstm_model.pth"
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to {model_path}")

    # Visualization
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
    plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.ylim(0)  # Start y-axis from 0
    plt.title('Training and Validation Loss over Epochs')
    plt.legend()
    plt.grid(True)
    plt.show()

    return model, best_model_state_dict, train_losses, val_losses

In [None]:
# Function to generate text
def generate_text(model, tokenizer, seed_text, length, device):
    model.eval()
    generated_ids = tokenizer.encode(seed_text).ids
    input_ids = torch.tensor(generated_ids, dtype=torch.long).unsqueeze(0).to(device)

    hidden = None

    for _ in range(length):
        with torch.no_grad():
            output, hidden = model(input_ids, hidden)
            probs = torch.nn.functional.softmax(output, dim=-1)
            # Sample from the distribution or take the argmax
            next_token_id = torch.multinomial(probs, num_samples=1).item()

        # Append the predicted token id to the generated_ids
        generated_ids.append(next_token_id)

        # Update input_ids to contain the new token
        input_ids = torch.tensor([[next_token_id]], dtype=torch.long).to(device)

    # Decode the generated ids to text
    generated_text = tokenizer.decode(generated_ids)
    return generated_text

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [7]:
train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=512)

In [8]:
len(train_loader)

1050

In [9]:
# Define the LSTM model
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1, dropout=0):
        super(LSTMModel2, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(
            embed_size,
            hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        self.dropout = nn.Dropout(dropout)  # Dropout after LSTM output
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embedding(x)  # Convert input to embeddings
        output, hidden = self.lstm(x, hidden)  # Pass through LSTM layers
        output = self.dropout(output)  # Apply dropout to LSTM outputs
        output = self.fc(output[:, -1, :])  # Use the last output for prediction
        return output, hidden

# Define model parameters
vocab_size = tokenizer.get_vocab_size()
embed_size = 128
hidden_size = 1024
num_layers = 1
dropout_rate = 0

model2_1 = LSTMModel(vocab_size, embed_size, hidden_size, num_layers=num_layers, dropout=dropout_rate)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer2_1 = optim.Adam(model2_1.parameters(), lr=0.001)

# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Train the model
trained_model2_1, best_model_state_dict2_1, train_losses, val_losses = train_model(
    model2_1, train_loader, val_loader, criterion, optimizer2_1, num_epochs=20, device=device
)

In [None]:
# Create models folder if it doesn't exist
models_folder = "models"
os.makedirs(models_folder, exist_ok=True)

# Save the last epoch model
last_epoch_model_path = os.path.join(models_folder, "lstm_model2_1.pth")
torch.save(trained_model2_1.state_dict(), last_epoch_model_path)
print(f"Last epoch model saved to {last_epoch_model_path}")

# Save the best model based on validation loss
best_model_path = os.path.join(models_folder, "lstm_model2_1_best.pth")
torch.save(best_model_state_dict2_1, best_model_path)
print(f"Best validation loss model saved to {best_model_path}")

Total number of trainable parameters in the model: 11927312
Epoch 1/20, Train Loss: 6.6545, Val Loss: 6.4536
Epoch 2/20, Train Loss: 5.8955, Val Loss: 6.0819
Epoch 3/20, Train Loss: 5.5284, Val Loss: 5.9421
Epoch 4/20, Train Loss: 5.2692, Val Loss: 5.8660
Epoch 5/20, Train Loss: 5.0452, Val Loss: 5.8466
Epoch 6/20, Train Loss: 4.8343, Val Loss: 5.8998
Epoch 7/20, Train Loss: 4.6269, Val Loss: 5.9612


KeyboardInterrupt: 

In [None]:
# Function to load the model
def load_model(model_class, model_path, vocab_size, embed_size, hidden_size, num_layers, dropout_rate, device):
    model = model_class(vocab_size, embed_size, hidden_size, num_layers=num_layers, dropout=dropout_rate)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    return model

# Load the best model
model_path = os.path.join(models_folder, "lstm_model2_1_best.pth")
model2_1_loaded = load_model(LSTMModel2, model_path, vocab_size, embed_size, hidden_size, num_layers, dropout_rate, device)

# Generate text using the loaded model
seed_text = "The soldiers marched forward and at a".lower()  # Convert to lowercase
generated_text = generate_text(model2_1_loaded, tokenizer, seed_text, length=200, device=device)
print(generated_text)

In [None]:
# Evaluate perplexity on the validation set
val_perplexity = calculate_perplexity(model4, val_loader)
print(f"Validation Perplexity: {val_perplexity:.2f}")

entropy = calculate_entropy(generated_text)
print(f"Entropy of Generated Text: {entropy:.2f}")