In [1]:
import pandas as pd
import numpy as np
import os

# Pull Data

In [2]:
# create train and test indices
raw = pd.read_parquet('D:\\Projects\\cs224-multimodal-recommender-system\\datasets\\nowplaying\\HSP-L_Nowplay_Data.parquet')
data = raw.value_counts(['Artist','Title']).reset_index()[['Artist','Title']]
lyrics_df = pd.read_parquet('D:\\Projects\\cs224-multimodal-recommender-system\\processed_data\\nowplaying\\nowplaying_roberta_lyrics_embedding.parquet')

In [3]:
raw.merge(lyrics_df, on = ["Artist", "Title"]).to_parquet('D:\\Projects\\cs224-multimodal-recommender-system\\processed_data\\nowplaying\\Lyrics_HSP-L_Nowplay_Data.parquet')

## Pull lyrics for train and test data from Lyrics Genius API

In [11]:
import pandas as pd
import lyricsgenius
import re

# Initialize Genius API
API_KEY = "nJjHJ8XF_NTAoTKQ-OOHzK0rxkWWznsnX_KcBngLCwjsJJ3goB3_jE2C5mRQqmhq" # my genius api key, we can all use this.
genius = lyricsgenius.Genius(API_KEY)


# Remove anything that's not a letter, number, underscore, or whitespace. Also remove's punctuation and special symbols that might affect the match rate with Genius. 
def preprocess_string(s):
    return re.sub(r'[^\w\s]', '', s).strip().lower()


import time
from functools import wraps

def retry_on_failure(max_attempts=3, initial_delay=1):
    """
    A decorator that implements retry logic with exponential backoff.
    
    Args:
        max_attempts (int): Maximum number of retry attempts
        initial_delay (int): Initial delay between retries in seconds
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = initial_delay
            last_exception = None
            
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:  # Don't sleep on the last attempt
                        print(f"Attempt {attempt + 1} failed, retrying in {delay} seconds...")
                        time.sleep(delay)
                        delay *= 2  # Exponential backoff
                    
            print(f"All {max_attempts} attempts failed. Last error: {last_exception}")
            return None
        return wrapper
    return decorator

@retry_on_failure(max_attempts=3, initial_delay=1)
def fetch_lyrics(artist, track_title):
    try:
        song = genius.search_song(title=track_title, artist=artist)
        
        if song:
            fetched_artist = preprocess_string(song.artist)
            fetched_title = preprocess_string(song.title)
            input_artist = preprocess_string(artist)
            input_title = preprocess_string(track_title)
            
            if input_artist in fetched_artist and input_title in fetched_title:
                return song.lyrics
            else:
                print(f"No exact match for: {artist} - {track_title}")
                return None
        else:
            print(f"Song not found: {artist} - {track_title}")
            return None
    except Exception as e:
        print(f"Error fetching lyrics for {artist} - {track_title}: {e}")
        return None

# Apply the fetch_lyrics function to each row in the dataset
lyrics_list = []
for index, row in data.iterrows():
    artist = row['Artist']
    track_title = row['Title']
    lyrics = fetch_lyrics(artist, track_title)
    lyrics_list.append(lyrics)


print("Lyrics fetching complete.")
    
      


Searching for "Here Without You" by 3 Doors Down...
Done.
Searching for "Bohemian Rhapsody" by Queen...
Done.
Searching for "Yellow" by Coldplay...
Done.
Searching for "Clocks" by Coldplay...
Done.
Searching for "Coward Of The County" by Kenny Rogers...
Done.
Searching for "December" by Collective Soul...
Done.
Searching for "Imagine" by John Lennon...
Done.
Searching for "Somebody Like You" by Keith Urban...
Done.
Searching for "Personal Jesus" by Depeche Mode...
Done.
Searching for "Sussudio" by Phil Collins...
Done.
Searching for "French Foreign Legion" by Frank Sinatra...
Done.
Searching for "A Blossom Fell" by Diana Krall...
Done.
Searching for "The Tide Is High" by Blondie...
Done.
Searching for "Atomic" by Blondie...
Done.
Searching for "Buffalo Stance" by Neneh Cherry...
Done.
Searching for "Mesmerized" by Faith Evans...
Done.
Searching for "Again" by Lenny Kravitz...
Done.
Searching for "Pharcyde" by The Pharcyde...
Done.
Searching for "Foolish" by Ashanti...
Done.
Searching f

In [13]:
data['lyrics'] = lyrics_list
data.to_parquet('D:\\Projects\\cs224-multimodal-recommender-system\\processed_data\\nowplaying\\lyrics_raw.parquet')

In [17]:
lyrics_df = data.copy()
lyrics_df['lyrics'] = lyrics_df['lyrics'].apply(lambda x: x[x.lower().find('lyrics') + len('lyrics'):].strip() if isinstance(x, str) and 'lyrics' in x.lower() else x)
lyrics_df.to_parquet('D:\\Projects\\cs224-multimodal-recommender-system\\processed_data\\nowplaying\\lyrics_processed.parquet')

In [20]:
import torch
from transformers import RobertaTokenizer, RobertaModel
import nltk
from nltk.tokenize import sent_tokenize
from tqdm import tqdm

# Load pre-trained RoBERTa model and tokenizer
# Load pre-trained RoBERTa model and tokenizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaModel.from_pretrained('roberta-base').to(device)

# Function to generate embeddings using mean pooling
def generate_roberta_embedding(text):
    if isinstance(text, str):
        # Tokenize input text
        inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512, padding=True).to(device)
        # Pass through RoBERTa model
        with torch.no_grad():
            outputs = model(**inputs)
        # Get the last hidden states
        last_hidden_state = outputs.last_hidden_state
        # Apply mean pooling
        mean_embedding = torch.mean(last_hidden_state, dim=1).squeeze()
        return mean_embedding
    else:
        return None

# Generate embeddings for all lyrics and store them
embeddings = []
for lyrics in tqdm(lyrics_df['lyrics']):
    embedding = generate_roberta_embedding(lyrics)
    if embedding is not None:
        embeddings.append(embedding.cpu().numpy())
    else:
        embeddings.append(None)

# Add the embeddings to the dataframe
lyrics_df['lyrics_embedding'] = embeddings


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
100%|██████████| 2471/2471 [00:12<00:00, 194.14it/s]


In [24]:
lyrics_df.to_parquet('D:\\Projects\\cs224-multimodal-recommender-system\\processed_data\\nowplaying\\nowplaying_roberta_lyrics_embedding.parquet')

In [1]:
df = pd.read_parquet(os.path.normpath(r'D:\Projects\cs224-multimodal-recommender-system\datasets\nowplaying\HSP-L_Nowplay_Data.parquet'))

NameError: name 'pd' is not defined