In [None]:
import os
import glob
import pandas as pd
import json
import random
import time
import numpy as np
import librosa
from yt_dlp import YoutubeDL
import subprocess
import datetime

In [None]:
#Suppose your MPD is saved in a folder name MPD
mil_playlists=os.listdir("MPD")
all_songs=[]
#Optional: Count the number of songs with duplicates to see how many unique songs there are in the MPD
def song_count(files, sample_size=100000):
    total = 0  # Total count of songs
    for file in files:
        with open(f"MSD/{file}", "r", encoding="utf-8") as f:
            data = json.load(f)
            for playlist in data.get("playlists", []):
                for track in playlist.get("tracks", []):
                    total += 1
    return total
## Sample 100,000(or any number) unique songs from MPD
def reservoir_sampling_unique(files, sample_size=100000):
    reservoir = []          # List to hold sampled unique songs
    unique_ids = set()      # Set to keep track of which songs have been added
    unique_count = 0        # Count of unique songs seen so far

    for file in files:
        with open(f"MSD/{file}", "r", encoding="utf-8") as f:
            data = json.load(f)
            for playlist in data.get("playlists", []):
                for track in playlist.get("tracks", []):
                    song_id = track["track_uri"]
                    # If this song has already been added, skip it.
                    if song_id in unique_ids:
                        continue
                    
                    unique_count += 1  # New unique song encountered
                    song = {
                        "title": track["track_name"],
                        "artist": track["artist_name"],
                        "SpotifyID": song_id
                    }
                    
                    if len(reservoir) < sample_size:
                        # Reservoir not yet full, add the song and mark its ID as seen.
                        reservoir.append(song)
                        unique_ids.add(song_id)
                    else:
                        # Reservoir is full, decide whether to include this new unique song.
                        j = random.randint(0, unique_count - 1)
                        if j < sample_size:
                            # Replace the song at index j.
                            removed_song = reservoir[j]
                            unique_ids.remove(removed_song["SpotifyID"])
                            reservoir[j] = song
                            unique_ids.add(song_id)
    return reservoir




In [None]:
#Sample 100,000 songs and save them to a csv
sampled_songs = reservoir_sampling_unique(mil_playlists, sample_size=100000)
sampled_df=pd.DataFrame(sampled_songs)
sampled_df.to_csv("sampled_df.csv")

---
## Audio processing with librosa and Lyrics embeddings extractions with lyricsgenius and LLM

In [None]:
def download_audio(song_title, artist, output_filename="downloaded_audio", max_duration=600, max_filesize=20*1024*1024):
    """
    Downloads audio from YouTube using yt_dlp Python API.
    Filters for songs under max_duration seconds and filesize (here's 10 minutes and 20 Mb) in case some songs are not found on YouTube and mistaken for big files that slow the whole process down
    Store the audio in a temporary file, process it through librosa to extract embeddings before clearing the files
    Returns filename if successful; otherwise, None.
    """
    # Ensure the output template includes the extension placeholder.
    ## Depending on your ffmpeg, you might need to reencode your audio file differently. I have two version of reencoding but this version runs well in Jupiter environment
    ## Request for another version if your code doesn't run properly
    if "%(ext)s" not in output_filename:
        output_filename = output_filename + ".%(ext)s"
        
    query = f"{song_title} {artist} official audio"
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': output_filename,
        'noplaylist': True,
        'quiet': True,
        'no_warnings': True,
        'default_search': 'ytsearch',
        'match_filter': lambda info_dict: None if (info_dict.get('duration', 0) < max_duration and info_dict.get('filesize_approx', 0) < max_filesize) else 'Video too long or file too large',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'prefer_ffmpeg': True
    }

    try:
        with YoutubeDL(ydl_opts) as ydl:
            info_dict = ydl.extract_info(query, download=True)
            # Use prepare_filename to get the downloaded file, then change its extension to .mp3
            base_filename = os.path.splitext(ydl.prepare_filename(info_dict))[0]
            downloaded_file = base_filename + ".mp3"
            if os.path.exists(downloaded_file):
                return downloaded_file
            else:
                print("Downloaded file not found.")
    except Exception as e:
        print(f"Download error: {e}")
    return None

def extract_audio_embeddings(file_path):
    """
    Extract audio features from the MP3 file.
    """
    try:
        y, sr = librosa.load(file_path, sr=None)
        tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
        rms_mean = np.mean(librosa.feature.rms(y=y))
        spectral_centroid_mean = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))
        mfccs_mean = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1)
        chroma_cens_mean = np.mean(librosa.feature.chroma_cens(y=y, sr=sr), axis=1)
        tonnetz_mean = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr), axis=1)

        embeddings = {
            'tempo': tempo,
            'rms_mean': rms_mean,
            'spectral_centroid_mean': spectral_centroid_mean,
            'mfccs_mean': mfccs_mean,
            'chroma_cens_mean': chroma_cens_mean,
            'tonnetz_mean': tonnetz_mean
        }
        return embeddings
    except Exception as e:
        print(f"Embedding extraction error: {e}")
        return None

def get_audio_embeddings(song_title, artist):
    """
    Complete pipeline: Download audio and extract embeddings.
    Create a temporary file, extract embeddings then clean up the files. 
    """
    downloaded_file = download_audio(song_title, artist)
    if not downloaded_file:
        print(f"Failed to download '{song_title}' by '{artist}'.")
        return None

    embeddings = extract_audio_embeddings(downloaded_file)

    # Clean up downloaded audio
    if os.path.exists(downloaded_file):
        os.remove(downloaded_file)

    return embeddings


In [None]:
#Optional: Flatten out the audio embeddings to separate different features into their own columns
def flatten_features(features_dict):
    flat = {}
    for key, value in features_dict.items():
        # Check if the value is a list or a NumPy array
        if isinstance(value, (list, np.ndarray)):
            arr = np.array(value)
            # If it's a 0-dim array or a single element array, assign the scalar value
            if arr.ndim == 0 or (arr.ndim == 1 and arr.size == 1):
                flat[key] = arr.item()
            elif arr.ndim == 1:
                for i, v in enumerate(arr, start=1):
                    flat[f"{key}_{i}"] = v
            else:
                # For multi-dimensional arrays, flatten completely and add indices (optional)
                flat_array = arr.flatten()
                for i, v in enumerate(flat_array, start=1):
                    flat[f"{key}_{i}"] = v
        else:
            flat[key] = value
    return flat

In [None]:
import datetime
def process_dataset(df, batch_size=1000, output_dir="output_batches", sleep_time=1):
    """
    Processes a DataFrame with columns 'title' and 'artist' in batches.
    
    For each song in a batch:
      - Calls get_audio_embeddings(title, artist)
      - Flattens the returned embeddings using flatten_features
      - Adds a "failed" flag (True if the song couldn’t be processed)
    
    Each batch is saved as a separate Parquet file in output_dir.
    This way, even if the kernel dies, you'll have saved batches of progress.
    
    Parameters:
      - df: DataFrame with columns "title" and "artist".
      - batch_size: Number of songs to process per batch.
      - output_dir: Directory where batch Parquet files will be saved.
      - sleep_time: Pause (in seconds) between processing songs.
      
    Returns:
      - A folder with parquet files, each containing 1000(number per batch) processed songs with their embeddings
    """
    os.makedirs(output_dir, exist_ok=True)
    total = len(df)
    batch_files = []
    
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        print(f"Processing batch from index {start} to {end} out of {total}")
        batch_results = []
        
        for idx, row in df.iloc[start:end].iterrows():
            title = row["title"]
            artist = row["artist"]
            print(f"Processing song {idx}: '{title}' by '{artist}'...")
            
            # Process the song and get embeddings.
            embeddings = get_audio_embeddings(title, artist)
            
            result = {"original_index": idx, "title": title, "artist": artist}
            if embeddings is not None:
                flat = flatten_features(embeddings)
                result.update(flat)
                result["failed"] = False
            else:
                result["failed"] = True
            
            batch_results.append(result)
            time.sleep(sleep_time)
        
        if batch_results:
            batch_df = pd.DataFrame(batch_results).set_index("original_index")
            timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
            output_file = os.path.join(output_dir, f"batch_{start}_{end}_{timestamp}.parquet")
            batch_df.to_parquet(output_file, index=True)
            print(f"Saved batch {start} to {end} with {len(batch_results)} records to {output_file}")
            batch_files.append(output_file)
        else:
            print(f"No records processed in batch {start} to {end}.")
    
    return batch_files

In [None]:
process_dataset("sampled_df.csv")

In [None]:
# Combine all parquet files in the output_batches folder to create a complete dataset for modeling
import glob
parquet_files = glob.glob(os.path.join("output_batches", '*.parquet'))

# Read each parquet file into a DataFrame and store them in a list
df_list = [pd.read_parquet(file) for file in parquet_files]

# Concatenate all the DataFrames and reset the index
processed_sample = pd.concat(df_list, ignore_index=True)

# Optionally, display the first few rows of the combined DataFrame
print(processed_sample.shape)

In [None]:
#Only save the successfully processed songs to a parquet file for lyric embeddings extraction
successfully_processed_sample=processed_sample[processed_sample["failed"]==False]
successfully_processed_sample.to_parquet("processed_sample.parquet")

---
## Lyrics Embeddings

In [None]:
import lyricsgenius
genius=lyricsgenius.Genius('YOUR-GENIUSAPI-ACCESS-TOKEN',timeout=10,retries=2) 
from sentence_transformers import SentenceTransformer

In [None]:
model = SentenceTransformer('all-mpnet-base-v2') #Transformer-based LLM
def extract_lyrics_features(title,artist):
    """
    Given a string of lyrics, this function uses a BERT-based model
    (via Sentence Transformers) to extract semantic features as an embedding vector.
    """
    try:
        lyrics = genius.search_song(title, artist).lyrics
    except Exception as e:
        print(f"Error retrieving for '{title}' by '{artist}': {e}")
        return np.zeros(768)
    
    # Using all-mpnet-base-v2 for sentence embeddings
    embedding = model.encode(lyrics)
    return embedding


In [None]:
successfully_processed_sample['lyrics_embeddings'] = successfully_processed_sample.apply(lambda row: extract_lyrics_features(row['title'], row['artist']), axis=1)

In [None]:
# Reformat the embeddings from numpy array to lists to save it in a parquet file
successfully_processed_sample['lyrics_embeddings'] = successfully_processed_sample['lyrics_embeddings'].apply(lambda x: np.asarray(x, dtype=np.float64).tolist() if isinstance(x, (list, np.ndarray)) else x)
successfully_processed_sample.to_parquet("processed_sample.parquet") #Rewrite the parquet file with the final dataset