# Next-word Generator for the Taylor Swift Lyrics dataset
 - Sourced from Kaggle: https://www.kaggle.com/datasets/ishikajohari/taylor-swift-all-lyrics-30-albums
 - The data was compiled and arranged into a single TaylorSwiftLyrics.txt file.
 
## Imports and Initial Configuration

In [6]:
import torch
import torch.nn.functional as F
from torch import nn
import os
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
# Display PyTorch version and set device
print(f"PyTorch Version: {torch.__version__}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define dataset directory
dataset_dir = os.path.join(os.getcwd(), 'datasets')


PyTorch Version: 1.11.0+cu113
Using device: cuda


## Text Cleaning Function

We define a function to clean the text by handling punctuation more effectively and ensuring case insensitivity.

In [7]:
import os
import re

def clean_text(filename: str):
    """
    Reads and cleans text from a file.
    Handles punctuation by separating them as distinct tokens,
    replaces single or multiple newline boundaries with <PAR_BREAK>, 
    and converts all text to lowercase.
    """
    filepath = os.path.join(dataset_dir, filename)
    with open(filepath, encoding='utf-8') as file:
        text = file.read()
    
    # Convert to lowercase for case insensitivity
    text = text.lower()
    
    # Separate specified punctuation by adding spaces around them
    # Punctuation marks: ., ,, !, ?, -, ', "
    text = re.sub(r'([.,!?\'\"-])', r' \1 ', text)
    
    # Remove any unwanted characters except specified punctuation and alphanumerics
    text = re.sub(r'[^a-zA-Z0-9\s.,!?\'\"-]', ' ', text)
    
    # Replace any single or multiple newline characters with <PAR_BREAK>
    text = re.sub(r'\n+', ' <PAR_BREAK> ', text)
    # Remove extra spaces
    text = " ".join(text.split())
    
    return text


## Unique Words Extraction Function

Extracts unique words, including punctuation, and creates mappings between words and their indices.

In [8]:
# %%
def unique_words(text: str):
    """
    Extracts unique words and punctuation from the text.
    Creates mappings from string to index and vice versa.
    Includes special tokens for paragraph boundaries and <UNK>.
    """
    words = pd.Series(text.split())
    
    # Define allowed punctuation marks
    allowed_punctuations = {'.', ',', '!', '?', '-', '\'', '"'}
    
    # Filter words by length and ensure they are alphanumeric or specified punctuation
    words = words[((words.str.len() > 0) & (words.str.len() < 20))]
    words = words[words.isin(allowed_punctuations) | words.str.match(r'^[a-zA-Z0-9]+$')]
    
    # Drop duplicates and sort
    words = words.drop_duplicates(ignore_index=True)
    vocab = words.sort_values().to_list()
    
    # Initialize stoi with special tokens
    special_tokens = ['<PAR_BREAK>', '<UNK>']
    stoi = {token: i + 1 for i, token in enumerate(special_tokens)}
    
    # Add the remaining vocabulary words, starting from the next available index
    next_index = len(stoi) + 1
    for word in vocab:
        if word not in stoi:
            stoi[word] = next_index
            next_index += 1
    
    # Create the itos mapping based on updated stoi
    itos = {i: s for s, i in stoi.items()}
    
    return vocab, stoi, itos


## Data Preparation
We prepare the dataset by creating input-output pairs based on a context window.

In [9]:
def prepare_data(text: str, block_size: int, stoi):
    """
    Prepares input-output pairs for training.
    Each input consists of `block_size` tokens, and the target is the next token.
    Unknown words are mapped to the <UNK> token.
    """
    words = text.split()
    X, Y = [], []
    
    # Ensure <UNK> and <PAR_BREAK> tokens are in stoi
    unk_token = '<UNK>'
    par_break_token = '<PAR_BREAK>'
    if unk_token not in stoi:
        stoi[unk_token] = len(stoi) + 1
    if par_break_token not in stoi:
        stoi[par_break_token] = len(stoi) + 1

    unk_idx = stoi[unk_token]
    par_break_idx = stoi[par_break_token]
    
    for i in range(block_size, len(words)):
        context = words[i-block_size:i]
        target = words[i]
        
        # Convert context and target to indices, map unknown words to <UNK>
        context_ix = [stoi.get(word, unk_idx) for word in context]
        target_ix = stoi.get(target, unk_idx)
        
        X.append(context_ix)
        Y.append(target_ix)
    
    # Convert lists to tensors
    X = torch.tensor(X, dtype=torch.long).to(device)
    Y = torch.tensor(Y, dtype=torch.long).to(device)
    
    return X, Y

## Data Cleaning and Preparation
We clean the text, build the vocabulary, and prepare the data for training.

In [10]:
# Clean the text from the dataset
text = clean_text('lyrics.txt')

# Extract unique words and create mappings
vocab, stoi, itos = unique_words(text)

# Add the <UNK> and <PAR_BREAK> tokens to `stoi` and `itos` if not present
if '<UNK>' not in stoi:
    unk_idx = len(stoi)
    stoi['<UNK>'] = unk_idx
    itos[unk_idx] = '<UNK>'

if '<PAR_BREAK>' not in stoi:
    par_break_idx = len(stoi)
    stoi['<PAR_BREAK>'] = par_break_idx
    itos[par_break_idx] = '<PAR_BREAK>'

TypeError: drop_duplicates() got an unexpected keyword argument 'ignore_index'

## Training variants of models:
- Embedding size (embedding_dim): 64, 128
- Context window size (block size): 5, 10, 15
- Activation function: ReLU, Tanh

In [11]:
# Hyperparameter grids
embedding_dims = [64, 128]
block_sizes = [5, 10, 15]
activation_functions = {
    'ReLU': nn.ReLU(),
    'Tanh': nn.Tanh()
}

# Other hyperparameters
hidden_dim = 256
epochs = 500
learning_rate = 0.001
batch_size = 1024  # Adjust if necessary based on dataset size

## Defining the dataloader and model

In [12]:
# Prepare DataLoader for mini-batch gradient descent
def create_data_loader(X, Y, batch_size):
    dataset = TensorDataset(X, Y)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define the NextWord model with a flexible activation function
class NextWord(nn.Module):
    """
    A feedforward neural network with multiple hidden layers for next-word prediction.
    Utilizes a configurable activation function to improve gradient flow.
    """
    def __init__(self, block_size, vocab_size, embedding_dim, hidden_dim, activation_fn):
        super(NextWord, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lin1 = nn.Linear(embedding_dim * block_size, hidden_dim)
        self.lin2 = nn.Linear(hidden_dim, hidden_dim)
        self.lin3 = nn.Linear(hidden_dim, hidden_dim)
        self.lin4 = nn.Linear(hidden_dim, hidden_dim)
        self.activation = activation_fn
        self.lin_out = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        embeds = self.embedding(x)  # Shape: (batch_size, block_size, embedding_dim)
        embeds = embeds.view(x.shape[0], -1)  # Flatten: (batch_size, block_size * embedding_dim)
        out = self.activation(self.lin1(embeds))
        out = self.activation(self.lin2(out))
        out = self.activation(self.lin3(out))
        out = self.activation(self.lin4(out))
        return self.lin_out(out)  # Shape: (batch_size, vocab_size)

## Training loop

In [13]:
# To store loss histories and labels for plotting
all_loss_histories = []
all_labels = []

# Iterate over all combinations of hyperparameters
for embedding_dim in embedding_dims:
    for block_size in block_sizes:
        # Prepare data for the current block_size
        X, Y = prepare_data(text, block_size, stoi)
        data_loader = create_data_loader(X, Y, batch_size)
        
        for act_name, act_fn in activation_functions.items():
            print(f"\nTraining model with Embedding Dim: {embedding_dim}, Block Size: {block_size}, Activation: {act_name}")
            
            # Initialize the model
            model = NextWord(
                block_size=block_size,
                vocab_size=len(stoi) + 1,  # +1 for unknown tokens
                embedding_dim=embedding_dim,
                hidden_dim=hidden_dim,
                activation_fn=act_fn
            ).to(device)
            
            # Initialize loss function and optimizer
            loss_fn = nn.CrossEntropyLoss()
            optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
            
            # List to store the average loss for each epoch
            loss_history = []
            
            # Training loop with mini-batch gradient descent
            for epoch in range(1, epochs + 1):
                model.train()  # Set model to training mode
                total_loss = 0  # Track total loss for the epoch
                
                for batch_X, batch_Y in data_loader:
                    # Move batches to device
                    batch_X, batch_Y = batch_X.to(device), batch_Y.to(device)
                    
                    # Forward pass
                    outputs = model(batch_X)
                    loss = loss_fn(outputs, batch_Y)
                    
                    # Backward pass and optimization
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                    
                    # Accumulate loss
                    total_loss += loss.item()
                
                # Average loss per epoch
                avg_loss = total_loss / len(data_loader)
                loss_history.append(avg_loss)  # Store the average loss
            
                # Print progress every 100 epochs and at the first epoch
                if epoch % 100 == 0 or epoch == 1:
                    print(f"Epoch {epoch}/{epochs}, Loss: {avg_loss:.4f}")
            
            # Save the model with a unique filename
            model_save_path = f'models/lyrics_nextword_model_bs{block_size}_emb{embedding_dim}_act{act_name}.pth'
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved to {model_save_path}")
            
            # Store loss history and label for plotting
            label = f'Emb={embedding_dim}, BS={block_size}, Act={act_name}'
            all_loss_histories.append(loss_history)
            all_labels.append(label)


NameError: name 'stoi' is not defined

## Plotting losses for all models

In [15]:

# Plotting the training losses for all models
fig, axes = plt.subplots(3, 4, figsize=(20, 15))
fig.suptitle('Training Loss over Epochs for Various Model Configurations', fontsize=16)

for idx, (loss_history, label) in enumerate(zip(all_loss_histories, all_labels)):
    row = idx // 4
    col = idx % 4
    ax = axes[row, col]
    ax.plot(range(1, epochs + 1), loss_history, marker='o', markersize=2)
    ax.set_title(label)
    ax.set_xlabel("Epoch")
    ax.set_ylabel("Average Loss")
    ax.grid(True)

# Hide any unused subplots if total models < subplots
total_models = len(all_loss_histories)
total_subplots = 3 * 4
if total_models < total_subplots:
    for idx in range(total_models, total_subplots):
        row = idx // 4
        col = idx % 4
        fig.delaxes(axes[row, col])

plt.tight_layout(rect=[0, 0.03, 1, 0.95])  # Adjust layout to make room for the main title
plt.show()

<Figure size 2000x1500 with 0 Axes>