# Data preprocessing notebook
This notebook is for audio pre-processing and Spotify audio feature extraction to prepare the dataset for manual annotation/labeling.

## Part 1: Audio Pre-processing
- The following code iterates through all MP3 files in the `input_dir` and performs the following operations:
  - Strips the silence from the audio.
  - Parses the MP3 metadata.
  - Exports the processed audio to the `output_dir` with a new filename based on a song ID counter.
  - Appends song information to the `song_data_list`.
  - Converts `song_data_list` to a pandas DataFrame and exports to a CSV file for further use.
  - New songs can be added into `input_dir` and processed and appended into the existing CSV.

In [None]:
import os
import pandas as pd
from datetime import datetime
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
from tqdm import tqdm
from functools import reduce
from mutagen.mp3 import MP3
from mutagen.easyid3 import EasyID3

In [None]:
def strip_silence(sound, min_silence_len=1000, silence_thresh=-50):
    """
    Strip silence from an audio segment.

    Args:
        sound: The audio segment to strip silence from.
        min_silence_len: The minimum length of silence (in milliseconds) to be removed.
        silence_thresh: The threshold (in dB) below which a segment is considered silent.

    Returns:
        An audio segment with the silence removed.
    """

    nonsilent_ranges = detect_nonsilent(sound, min_silence_len=min_silence_len, silence_thresh=silence_thresh)
    return reduce(lambda acc, val: acc + sound[val[0]:val[1]], nonsilent_ranges, AudioSegment.empty()) if nonsilent_ranges else sound


def parse_mp3_metadata(file_path):
    """
    Parse metadata from an MP3 file.

    Args:
        file_path: The path to the MP3 file.

    Returns:
        A tuple containing the track title, artists, and genre.
    """

    try:
        audiofile = MP3(file_path, ID3=EasyID3)
        track_title = audiofile.get('title', [None])[0]
        artists = audiofile.get('artist', [None])[0]
        genre = audiofile.get('genre', [None])[0]
        return track_title, artists, genre
    except Exception as e:
        print(f"Error reading metadata for {file_path}: {e}")
        return None, None, None

#### Run the cell below when processing songs for the ***first time***.

In [None]:
# Define directories
input_dir = r'..\data\audio_files\raw'
output_dir = r'..\data\audio_files\processed'

# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)

# Initialize song data DataFrame
df = pd.DataFrame(columns=["SongID", "TrackName", "Artists", "Genre", "FilePath"])

# Initialize song ID counter and list for song data
song_id_counter = 1
song_data_list = []

# Process audio files
for filename in tqdm([f for f in os.listdir(input_dir) if f.endswith('.mp3')], desc="Processing audio files", unit="file"):
    file_path = os.path.join(input_dir, filename)

    # Generate new filename using song ID counter
    new_filename = f"{song_id_counter}.mp3"
    output_file_path = os.path.join(output_dir, new_filename)

    # Load and strip silence from audio file
    sound = AudioSegment.from_file(file_path)
    stripped_sound = strip_silence(sound)

    # Parse metadata
    track_title, artists, genre = parse_mp3_metadata(file_path)

    # Export stripped audio with updated metadata
    stripped_sound.export(output_file_path, format='mp3', tags={"title": track_title, "artist": artists, "genre": genre})

    # Create dictionary of song information and add to list
    song_data_list.append({
        "SongID": song_id_counter,
        "TrackName": track_title,
        "Artists": artists,
        "Genre": genre,
        "FilePath": output_file_path
    })

    # Increment song ID counter
    song_id_counter += 1

# Convert list of dictionaries to DataFrame and save to CSV
df = pd.DataFrame(song_data_list)
df.to_csv(r'..\data\dataframes\song_data.csv', index=False)

Processing audio files: 100%|██████████| 554/554 [3:16:50<00:00, 21.32s/file]  


#### Run the cell below if processing additional songs at a later time.

In [None]:
# Define directories
input_dir = r'..\data\audio_files\raw'
output_dir = r'..\data\audio_files\processed'
song_data_csv = r'..\data\dataframes\song_data.csv'

# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)

# Load existing song data
if os.path.exists(song_data_csv):
    existing_df = pd.read_csv(song_data_csv)
else:
    existing_df = pd.DataFrame(columns=["SongID", "TrackName", "Artists", "Genre", "FilePath"])

# Initialize song ID counter with the next available ID
song_id_counter = existing_df['SongID'].max() + 1 if not existing_df.empty else 1
song_data_list = []

# Process all new audio files in the input directory
for filename in tqdm([f for f in os.listdir(input_dir) if f.endswith('.mp3')], desc="Processing new audio files", unit="file"):
    file_path = os.path.join(input_dir, filename)

    # Parse metadata to compare with existing data
    track_title, artists, genre = parse_mp3_metadata(file_path)

    # Check if the song already exists in the DataFrame
    if not existing_df[
        (existing_df['TrackName'] == track_title) &
        (existing_df['Artists'] == artists)
    ].empty:
        continue  # Skip the file if it already exists

    new_filename = f"{song_id_counter}.mp3"
    output_file_path = os.path.join(output_dir, new_filename)

    sound = AudioSegment.from_file(file_path)
    stripped_sound = strip_silence(sound)
    stripped_sound.export(output_file_path, format='mp3', tags={"title": track_title, "artist": artists, "genre": genre})

    # Append new song data to the list
    song_data_list.append({
        "SongID": song_id_counter,
        "TrackName": track_title,
        "Artists": artists,
        "Genre": genre,
        "FilePath": output_file_path
    })

    # Increment the song ID counter
    song_id_counter += 1

# If we have new songs, append them to the existing dataframe and save to a new CSV with a unique identifier
if song_data_list:
    new_songs_df = pd.DataFrame(song_data_list)
    updated_df = pd.concat([existing_df, new_songs_df], ignore_index=True)

    # Get the current timestamp to use as a unique identifier
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    new_csv_filename = f'song_data_{timestamp}.csv'
    new_song_data_csv = os.path.join(os.path.dirname(song_data_csv), new_csv_filename)

    updated_df.to_csv(new_song_data_csv, index=False)
    print(f"Processed {len(song_data_list)} new songs and saved to {new_csv_filename}.")
else:
    print("No new songs to process.")

Processing new audio files: 100%|██████████| 869/869 [2:17:54<00:00,  9.52s/file]  

Processed 315 new songs and saved to song_data_20240109110632.csv.





## Part 2: Spotify Data Extraction

Using the DataFrame we just created, we'll search for each track on Spotify and extract various Spotify-generated audio features and metadata.

In [1]:
import pandas as pd
import json
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from tqdm import tqdm
import time

The Spotify client is initialized with credentials stored in a JSON file. These credentials include the `client_id` and `client_secret` necessary for accessing the Spotify Web API. The `SpotifyClientCredentials` manager handles the OAuth 2.0 flow for server-to-server authentication.

Using any text editor, create a new text file containing the following information. Save the text file as "spotify_credentials.json" and place in '../data/reference' folder. 

```
{
  "client_id":"REPLACE WITH CLIENT ID",
  "client_secret":"REPLACE WITH SECRET",
  "user_id":"REPLACE WITH SPOTIFY USERNAME"
}
```
* Retrieve your client ID and secret here: https://developer.spotify.com/dashboard/
* Retrieve your username here: https://www.spotify.com/us/account/overview/

In [2]:
# Load Spotify credentials
credentials_path = r'../data/reference/spotify_credentials.json'  # Path to your spotify_credentials.json file

with open(credentials_path, 'r') as file:
    creds = json.load(file)

# Initialize Spotify client
auth_manager = SpotifyClientCredentials(client_id=creds['client_id'], client_secret=creds['client_secret'])
sp = spotipy.Spotify(auth_manager=auth_manager)

In [3]:
# In case rate-limited by Spotify
def make_spotify_request(sp, track_title, artists):
    retries = 3  # Maximum number of retries
    backoff_factor = 0.5  # Factor to determine the next sleep time
    for attempt in range(retries):
        try:
            # Format the search query
            query = f"track:{track_title} artist:{artists[0].strip()}"  # Using the first artist for simplicity
            # Search for tracks
            return sp.search(q=query, type='track', limit=1)
        except spotipy.exceptions.SpotifyException as e:
            if e.http_status == 429:
                sleep_time = int(e.headers.get('Retry-After', 1))
                time.sleep(sleep_time)
            elif 500 <= e.http_status < 600:
                sleep_time = backoff_factor * (2 ** attempt)
                time.sleep(sleep_time)
            else:
                raise
    return None  # If all retries failed

def extract_spotify_metadata_features(track_title, artists, sp):
    """
    Extracts Spotify metadata and audio features for a given track title and list of artists.

    Returns:
    - DataFrame: A pandas DataFrame containing the Spotify metadata and audio features for the track.
                 If no match is found, an empty DataFrame with the specified columns is returned.
    """

    # Initialize variables
    track_id, artist_ids, unique_genres = None, None, set()

    # Search for track ID, artist IDs, and genres
    for artist in artists:
        query = f"track:{track_title} artist:{artist.strip()}"
        track_results = sp.search(q=query, type='track', limit=1)
        if track_results['tracks']['items']:
            track_item = track_results['tracks']['items'][0]
            track_id = track_item['id']
            artist_ids = [artist['id'] for artist in track_item['artists']]
            for artist_id in artist_ids:
                artist_genre = sp.artist(artist_id)['genres']
                unique_genres.update(artist_genre)
            break

    # Retrieve audio features if a match was found
    if track_id and artist_ids:
        audio_features_dict = sp.audio_features(track_id)[0]
        audio_features_dict['sp_genre'] = list(unique_genres)
        audio_features_df = pd.DataFrame([audio_features_dict])
        columns_to_drop = ['type', 'id', 'uri', 'track_href', 'analysis_url', 'duration_ms', 'mode']
        audio_features_df.drop(columns=columns_to_drop, inplace=True)
    else:
        audio_features_df = pd.DataFrame()

    return audio_features_df

In [4]:
# Load the DataFramge or CSV that we created in the step above
df = pd.read_csv(r'..\data\dataframes\song_data_20240109110632.csv')

# Initialize an empty DataFrame to store all tracks' info
spotify_df = pd.DataFrame()

# Loop through each row of the DataFrame
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="Fetching track features"):    
    track_title = row['TrackName']
    artists = [artist.strip() for artist in row['Artists'].split('/')]  # Split artists by '/' and strip whitespace

    # Use the function to extract metadata and audio features
    track_features_df = extract_spotify_metadata_features(track_title, artists, sp)

    # If the DataFrame is not empty, merge the results with the song details
    if not track_features_df.empty:
        track_features_df['SongID'] = row['SongID']  # Assuming there's a 'SongID' column in the original DataFrame
        track_features_df['sp_genre'] = ', '.join(track_features_df.loc[0, 'sp_genre'])  # Join genres into a single string
        # Append this track's DataFrame to the main DataFrame
        spotify_df = pd.concat([spotify_df, track_features_df], ignore_index=True)

Fetching track features: 100%|██████████| 869/869 [07:19<00:00,  1.98it/s]


In [7]:
# Adding 'sp_' in front of columns to indicate they are Spotify-derived features
prefix_cols = ['danceability', 'energy', 'key', 'loudness', 'speechiness',
               'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
               'time_signature']

spotify_df.rename(columns=lambda x: 'sp_' + x if x in prefix_cols else x, inplace=True)
spotify_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 840 entries, 0 to 839
Data columns (total 13 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   sp_danceability      840 non-null    float64
 1   sp_energy            840 non-null    float64
 2   sp_key               840 non-null    int64  
 3   sp_loudness          840 non-null    float64
 4   sp_speechiness       840 non-null    float64
 5   sp_acousticness      840 non-null    float64
 6   sp_instrumentalness  840 non-null    float64
 7   sp_liveness          840 non-null    float64
 8   sp_valence           840 non-null    float64
 9   sp_tempo             840 non-null    float64
 10  sp_time_signature    840 non-null    int64  
 11  sp_genre             840 non-null    object 
 12  SongID               840 non-null    int64  
dtypes: float64(9), int64(3), object(1)
memory usage: 85.4+ KB


In [8]:
# Join with DataFrame made in Part 1
merged_df = pd.merge(df, spotify_df, on='SongID', how='left')
merged_df.to_csv(r'../data/dataframes/sp_merged.csv', index=False)