# Playlist Title Generation using RNN and Transformer Models

In this notebook, we explore the task of generating titles for music playlists by implementing and comparing two neural network architectures: Recurrent Neural Networks (RNN) and Transformer models. Our goal is to evaluate how well each model performs on this task and to understand their strengths and weaknesses.

## Setup

First, let's import all the necessary libraries we'll need to load the data, define our models, and train and evaluate them.


In [69]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
from nltk.tokenize import word_tokenize
from collections import Counter
from sklearn.model_selection import train_test_split
import json
import os
import torch.nn.functional as F

# Ensure you've downloaded the NLTK tokenizer's dataset
import nltk
nltk.download('punkt')


[nltk_data] Downloading package punkt to
[nltk_data]     /Users/bestricemossberg/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

## Load and Preprocess Data

We have preprocessed and split our dataset into training, validation, and test sets. Now, we'll load these datasets from their respective JSON files.


In [60]:
def load_data(path):
    with open(path, 'r') as f:
        return json.load(f)

train_data = load_data('../datasets/train.json')
val_data = load_data('../datasets/validation.json')
test_data = load_data('../datasets/test.json')

In [61]:
# Used to intialize the decoder model
max_seq_length = max(len(item['description']) for item in train_data) + 1  # Adding 1 for the EOS token
max_seq_length

65

### Preparing Data for Training
Convert the playlists and descriptions into sequences of tokens, pad them to uniform length, and create DataLoader instances for batching.

In [62]:
PAD_token = 0  # Used for padding short sequences
UNK_token = 1  # Represents unknown words not in the vocabulary
SOS_token = 2  # Start-of-sequence token
EOS_token = 3  # End-of-sequence token

def build_vocab(sequences):
    """Build a vocabulary from a list of sequences."""
    counter = Counter()
    for seq in sequences:
        counter.update(seq)
    # Start indexing from 4 to leave space for special tokens
    vocab = {word: i+4 for i, (word, _) in enumerate(counter.items())}
    vocab['<PAD>'] = PAD_token
    vocab['<UNK>'] = UNK_token
    vocab['<SOS>'] = SOS_token
    vocab['<EOS>'] = EOS_token
    return vocab


# Flatten lists of tokens for tracks and descriptions
all_tracks = [track for item in train_data for track in item['tracks']]
all_descriptions = [token for item in train_data for token in word_tokenize(' '.join(item['description']).lower())]

# Build vocabularies
track_vocab = build_vocab(all_tracks)
description_vocab = build_vocab(all_descriptions)

# Check the size of each vocabulary
track_vocab_size = len(track_vocab)
description_vocab_size = len(description_vocab)

In [63]:
# This function encodes a list of textual tokens into their corresponding numerical IDs based on a given vocabulary. 
# If a token is not found in the vocabulary, it uses a special <UNK> token to represent unknown words.
def encode_sequence(sequence, vocabulary, add_eos=False):
    """Encode a sequence of tokens into a sequence of numerical IDs."""
    encoded_sequence = [vocabulary.get(token, UNK_token) for token in sequence]
    if add_eos:
        encoded_sequence.append(EOS_token)
    return encoded_sequence

# This function takes a list of sequences and pads them to the same length by adding a specified pad_token 
# (default is 0) to the end of shorter sequences. This is necessary for batch processing in models that require 
# input sequences of uniform length.
def pad_sequences(sequences, pad_token=0):
    """Pad sequences to the same length with a pad_token."""
    max_length = max(len(seq) for seq in sequences) # Determine the maximum length of any sequence in the batch
    padded_sequences = [seq + [pad_token] * (max_length - len(seq)) for seq in sequences] # Pad each sequence to match the max length by appending the pad_token
    return padded_sequences


# Custom Dataset class for handling the playlist dataset. It requires the dataset itself and vocabularies for 
# both tracks and descriptions. The __getitem__ method returns the encoded and tensorized versions of the tracks and descriptions.
class PlaylistDataset(Dataset):
    def __init__(self, data, track_vocab, description_vocab):
        self.data = data
        self.track_vocab = track_vocab
        self.description_vocab = description_vocab
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        track_ids = encode_sequence(item['tracks'], self.track_vocab)
        description_ids = encode_sequence(item['description'], self.description_vocab)
        return torch.tensor(track_ids), torch.tensor(description_ids)

# Custom collate function for DataLoader. It pads the sequences in each batch to ensure they have the same length, 
# allowing them to be processed together as a batch. This function is passed to the DataLoader to be applied to each batch.
def collate_fn(batch):
    """Custom collate function for DataLoader."""
    # Extract tracks and descriptions from the batch
    tracks, descriptions = zip(*[(item[0], item[1]) for item in batch])
    # Encode and pad sequences
    tracks_encoded = [encode_sequence(track, track_vocab) for track in tracks]
    descriptions_encoded = [encode_sequence(description, description_vocab, add_eos=True) for description in descriptions]

    tracks_padded = pad_sequences(tracks_encoded, pad_token=track_vocab['<PAD>'])
    descriptions_padded = pad_sequences(descriptions_encoded, pad_token=description_vocab['<PAD>'])
    
    # Convert to tensors and return
    return torch.tensor(tracks_padded, dtype=torch.long), torch.tensor(descriptions_padded, dtype=torch.long)

# Setting the batch size for the DataLoader. The DataLoader iterates over the PlaylistDataset in batches, using the collate_fn to pad the sequences in each batch.
batch_size = 32

# Instantiating the DataLoader with the PlaylistDataset. The DataLoader facilitates efficient iteration over the dataset during the training process.
train_loader = DataLoader(PlaylistDataset(train_data, track_vocab, description_vocab),
                          batch_size=batch_size,
                          shuffle=True,
                          collate_fn=collate_fn)


### Encoder
The encoder will use a bidirectional GRU to process the input sequence of track IDs, producing a set of hidden states that represent the sequence.

In [71]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, embed_size, hidden_size, num_layers=2):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, embed_size)
        # Ensure bidirectional=True is compatible with your overall model design
        self.gru = nn.GRU(embed_size, hidden_size, num_layers, bidirectional=True)
        
    def forward(self, input_seq):
        embedded = self.embedding(input_seq)  # input_seq: [seq_len, batch_size]
        outputs, hidden = self.gru(embedded)
        # Sum bidirectional GRU outputs if using bidirectional. Adjust as needed.
        outputs = (outputs[:, :, :self.hidden_size] + outputs[:, :, self.hidden_size:])
        return outputs, hidden

    def initHidden(self, batch_size):
        # Determine the multiplier for bidirectional GRUs
        bidirectional_multiplier = 2 if self.gru.bidirectional else 1
        return torch.zeros(self.num_layers * bidirectional_multiplier, batch_size, self.hidden_size, device=device)



### Decoder with Attention Mechanism
The decoder uses a unidirectional GRU along with an attention mechanism. The attention mechanism allows the decoder to focus on different parts of the input sequence for each step of the output sequence.

In [73]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AttnDecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, output_size, max_length, num_layers=1, dropout_p=0.1):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length
        self.num_layers = num_layers

        self.embedding = nn.Embedding(output_size, embed_size)
        self.attn = nn.Linear(self.hidden_size * 2 + embed_size, max_length)  # Adjust for correct size
        self.attn_combine = nn.Linear(self.hidden_size * 2 + embed_size, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(embed_size + self.hidden_size * 2, hidden_size, num_layers=num_layers)
        self.out = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).unsqueeze(1)  # [batch_size, 1, embed_size]
        embedded = self.dropout(embedded)
        
        # For attention calculation, ensure dimensions are compatible
        # Adjust hidden to have same dim as embedded for concatenation
        hidden_for_concat = hidden.transpose(0, 1).contiguous().view(1, hidden.size(1), -1)  # Reshape hidden for batch processing
        
        # Concatenate along the dimension that matches embedded size
        attn_input = torch.cat((embedded, hidden_for_concat), dim=2)  # [batch_size, 1, embed_size + hidden_size * num_layers * num_directions]
        
        attn_weights = F.softmax(self.attn(attn_input), dim=2)  # Adjust softmax dim if necessary
        attn_applied = torch.bmm(attn_weights, encoder_outputs.transpose(0, 1))  # Apply attention to encoder outputs
        
        # Combine attended encoder outputs with embedded input before sending to GRU
        output = torch.cat((embedded, attn_applied), dim=2)
        output = self.attn_combine(output)
        output = F.relu(output)

        output, hidden = self.gru(output, hidden)
        
        output = F.log_softmax(self.out(output.squeeze(1)), dim=1)  # Adjust for batch processing
        return output, hidden, attn_weights

    def initHidden(self, batch_size):
        bidirectional_multiplier = 2 if self.gru.bidirectional else 1  # Use 2 if your GRU is bidirectional; adjust as necessary
        return torch.zeros(self.num_layers * bidirectional_multiplier, batch_size, self.hidden_size, device=device)


#### Defining the Training Process

Before writing the training loop, define the loss function and optimization algorithm:

In [74]:
# Assuming SOS_token, EOS_token, epochs, device, and log_interval are defined
SOS_token = 2  # Arbitrary integer to represent the start of a sequence
EOS_token = 3  # Arbitrary integer to represent the end of a sequence
epochs = 10  # Number of training epochs, adjust based on your dataset and model performance
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
log_interval = 5  # Log training info every 5 epochs, adjust as needed

# Adjust embed_size and hidden_size as per specifications
embed_size = 128
hidden_size = 256

encoder = EncoderRNN(input_size=len(track_vocab), embed_size=embed_size, hidden_size=hidden_size).to(device)
decoder = AttnDecoderRNN(embed_size=embed_size, hidden_size=hidden_size, output_size=len(description_vocab), max_length=max_seq_length, num_layers=2).to(device)  # Ensure AttnDecoderRNN is defined similarly

encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.001)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.001)
criterion = nn.NLLLoss()

### Training Function and Training Loop
The training loop involves processing each batch of data through the encoder and decoder, calculating the loss, and updating the model parameters.

In [75]:
import random
import torch

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length, batch_size):
    # Initialize the encoder's hidden state for the current batch size
    encoder_hidden = encoder.initHidden(batch_size).to(device)

    # Reset gradients
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    # Define the initial input and hidden state for the decoder
    decoder_input = torch.tensor([[SOS_token]] * batch_size, device=device)  # Adjust for batch_size
    decoder_hidden = encoder_hidden

    # Determine the lengths of the input and target sequences
    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)
    loss = 0

    # Forward pass through the encoder
    encoder_outputs, encoder_hidden = encoder(input_tensor)

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, _ = decoder(decoder_input, decoder_hidden, encoder_hidden)
            loss += criterion(decoder_output, target_tensor[di].unsqueeze(0))
            decoder_input = target_tensor[di]  # Next input comes from the teaching data
    else:
        # Without teacher forcing: use the decoder's own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, _ = decoder(decoder_input, decoder_hidden, encoder_hidden)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze(1).detach()  # Detach from history as input
            
            loss += criterion(decoder_output, target_tensor[di].unsqueeze(0))
            if decoder_input.item() == EOS_token:
                break

    # Perform backpropagation
    loss.backward()

    # Update the weights
    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length


In [76]:
# Assuming train_loader is defined and ready
epochs = 10
teacher_forcing_ratio = 0.5
print_every = 1000

for epoch in range(1, epochs + 1):
    print(f"Epoch {epoch}/{epochs}")
    for i, (input_tensor, target_tensor) in enumerate(train_loader, 1):
        input_tensor, target_tensor = input_tensor.to(device), target_tensor.to(device)
        batch_size = input_tensor.size(1)  # Assuming input_tensor is [seq_len, batch_size]
        loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_seq_length, batch_size=batch_size)  # Pass batch_size to train

        if i % print_every == 0:
            print(f"Step {i}, Loss: {loss:.4f}")



Epoch 1/10


RuntimeError: Tensors must have same number of dimensions: got 4 and 3

### Generating Playlist Descriptions
After training, you can use the model to generate playlist descriptions by processing a sequence of track IDs through the encoder and iteratively predicting the next word in the description with the decoder.

In [None]:
def generate_description(encoder, decoder, track_sequence, max_length):
    with torch.no_grad():
        input_tensor = track_sequence.to(device)
        input_length = input_tensor.size(0)
        encoder_hidden = encoder.initHidden().to(device)

        encoder_outputs, encoder_hidden = encoder(input_tensor, encoder_hidden)

        decoder_input = torch.tensor([[SOS_token]], device=device)
        decoder_hidden = encoder_hidden
        decoded_words = []

        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(topi.item())

            decoder_input = topi.squeeze().detach()

        return decoded_words


def generate_description(encoder, decoder, track_sequence, max_length, vocab):
    # Assuming 'vocab' is a dictionary mapping IDs back to words
    with torch.no_grad():
        # similar setup as before
        decoded_words = []
        for di in range(max_length):
            # similar decoding steps as before
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(vocab[topi.item()])  # Convert ID back to word

        return decoded_words