In [9]:
import billboard
import pandas as pd
from datetime import date, timedelta
import animation
import os
import glob
import json
import spotipy
from spotipy import SpotifyClientCredentials
# from spotipy.oauth2 import SpotifyOAuth
from dotenv import load_dotenv
import string

from ytmusicapi import YTMusic
from pytube import YouTube as YTDownload
import re
import requests
import wget
import time
from math import floor

import librosa
import numpy as np
import warnings
warnings.filterwarnings('ignore')

# import matplotlib.pyplot as plt
# import cupy as cp
# import cudf

In [10]:
load_dotenv('../.env')
scope = "user-library-read"

spotify = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=os.environ.get('SPOTIPY_CLIENT_ID'),
                                                                client_secret=os.environ.get('SPOTIPY_CLIENT_SECRET')))
# spotify = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))

In [11]:
def all_day_in_year(day=0, year=date.today().year):
    """Returns every occurrence of a specified weekday in a specified year"""

    # yyyy mm dd
    # 0 = mon
    # 1 = tue
    # 2 = wed
    # 3 = thu
    # 4 = fri
    # 5 = sat
    # 6 = sun
    dte = date(year, 1, 1)
    dte += timedelta(days=(day - dte.weekday()) % 7)
    while dte.year == year:
        yield dte
        dte += timedelta(days=7)

def get_chart(chart_title='hot-100', week=date.today(), starting_id=0):
    chart = billboard.ChartData(chart_title, date=week, max_retries=10, fetch=True)
    return pd.DataFrame(data=[[
        # starting_id + idx,
        #                        song.title,
                               song.artist,
                               song.image,
                               song.peakPos,
                               song.lastPos,
                               song.weeks,
                               song.rank,
                               song.isNew,
                               chart.date] for idx, song in enumerate(chart)],
                        columns=[
                            # 'id',
                            #      'title',
                                 'artist',
                                 'image',
                                 'peakPos',
                                 'lastPos',
                                 'weeks',
                                 'rank',
                                 'isNew',
                                 'date'], index=[song.title for song in chart]), starting_id + len(chart)

@animation.wait('spinner', text='Fetching Billboard Charts', speed=0.2)
def get_billboard_data(start_year, end_year, chart_title='hot-100', output_dir=os.getcwd()):
    state_cols = ['last_date', 'last_id', 'next_id', 'num_weeks', 'num_songs']
    default_state = [[None, 0, 0, 0, 0]]
    output_path = mkdir(f"{output_dir}/{chart_title}")
    year = end_year
    while year >= start_year:
        for week in all_day_in_year(4, year):
            state_df= pd.read_csv(f'{output_path}/state.csv') if os.path.exists(f'{output_path}/state.csv') else \
                pd.DataFrame(data=default_state, columns=state_cols)
            next_id = state_df.iloc[0]['next_id']
            num_weeks = state_df.iloc[0]['num_weeks']
            num_songs = state_df.iloc[0]['num_songs']
            chart_df, last_id = get_chart(chart_title=chart_title, week=week, starting_id=next_id)
            state_df.update(pd.DataFrame(data=[[week, last_id-1, last_id, num_weeks+1, num_songs+(last_id-next_id)]], columns=state_cols))
            state_df.to_csv(f"{output_path}/state.csv", index=False)
            chart_df.to_csv(f"{output_path}/{chart_title}_{week}.csv", index_label='title')
        year -= 1
    print(f"Data written to {output_path}")

In [12]:
def merge_csvs_in_path(path, glob_pattern="hot-100_*.csv", output_path='../data/billboard', output_filename='merged_csv', index=False):
    files = glob.glob(f'{os.path.abspath(path)}/{glob_pattern}')
    full_df = None
    for file in files:
        full_df = pd.read_csv(file) if full_df is None else pd.concat([full_df, pd.read_csv(file)])
    full_df.to_csv(f"{output_path}/{output_filename}.csv", index=index)

In [13]:
def get_song_data(title, artist=None):
    # get results with only song name
    title = title.replace("$", "s")
    results = spotify.search(q='track:' + title, type='track')['tracks']['items']
    # if that doesn't work get results with song name and first artist
    if len(results) == 0:
        artist = ' ' + artist.lower().split('featuring')[0] if artist else ''
        results = spotify.search(q='track:' + title + artist, type='track')['tracks']['items']
        # if that doesn't work return None
        if len(results) == 0:
            return None, None, None
    song_data = results[0]
    artists = spotify.artists([ar['id'] for ar in song_data['artists']][:50])
    song_data['artist_popularity'] = [ar['popularity'] for ar in artists['artists']]
    artist_genres = []
    for ag in [ar['genres'] for ar in artists['artists']]:
        artist_genres += ag
    song_data['artist_genres'] = list(dict.fromkeys(artist_genres))
    audio_features = spotify.audio_features([song_data['id']])
    try:
        audio_analysis = spotify.audio_analysis(song_data['id'])
    except spotipy.SpotifyException:
        audio_analysis = None

    return song_data, audio_features[0], audio_analysis

In [14]:
def remove_punctuation(val: str) -> str:
    return val.translate(str.maketrans('', '', string.punctuation))

def mkdir(path: str) -> str:
    path = os.path.abspath(path)
    os.makedirs(path) if not os.path.exists(path) else None
    return path

def open_or_create_csv(path, cols):
    path = os.path.abspath(path)
    dir = os.sep.join(path.split(os.sep)[:-1])
    os.makedirs(dir) if not os.path.exists(dir) else None
    try:
        return pd.read_csv(path)
    except FileNotFoundError:
        pd.DataFrame(columns=cols).to_csv(path, index=False)
        return pd.read_csv(path)

@animation.wait('spinner', text='Fetching Spotify Song Data', speed=0.2)
def fetch_spotify_songs(song_dir, glob_pattern='*', output_dir='./', output_file='songs.csv', audio_analysis_dir=None, preview_audio_dir='../data/audio/previews', preview_format="m4a", verbose=False):
    state_cols = ['billboard_name', 'spotify_name', 'song_data', 'audio_features', 'audio_analysis']
    spotify_song_cols = ['billboard_name', 'spotify_name', 'artist', 'duration_ms', 'spotify_id', 'spotify_uri', 'spotify_external_url', 'spotify_popularity', 'spotify_artist_popularity', 'spotify_artist_popularity_mean', 'explicit', 'preview_url', 'preview_url_audio', 'full_audio', 'full_audio_duration_s', 'artist_genres']
    audio_feature_cols = ['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']
    audio_analysis_cols = ['audio_analysis_file']
    song_dir = os.path.abspath(song_dir)
    audio_analysis_dir = os.path.abspath(audio_analysis_dir) if audio_analysis_dir is not None else f"{output_dir}/audio_analysis"
    audio_analysis_dir = mkdir(audio_analysis_dir)
    path_glob = glob.glob(f"{song_dir}/{glob_pattern}")
    # output_dir = os.path.abspath(output_dir)
    output_dir = mkdir(output_dir)
    # preview_audio_dir = os.path.abspath(preview_audio_dir)
    preview_audio_dir = mkdir(preview_audio_dir)
    for file in path_glob:
        weeks_songs = pd.read_csv(file)
        spotify_songs_df = open_or_create_csv(f'{output_dir}/{output_file}', spotify_song_cols)
        for i in range(len(weeks_songs)):
            song = weeks_songs.iloc[i]
            spotify_song = spotify_songs_df.loc[spotify_songs_df['billboard_name'] == song['title']]
            if spotify_song.empty:
                # if empty get song data
                song_data, audio_features, audio_analysis = get_song_data(song['title'], artist=song['artist'])
                state = [song['title']]

                if song_data is not None:
                    state.append(song_data['name'])
                    state.append(True)
                    s_df = pd.DataFrame(data=[[
                        song['title'],
                        song_data['name'],
                        song['artist'],
                        song_data['duration_ms'],
                        song_data['id'],
                        song_data['uri'],
                        song_data['external_urls']['spotify'],
                        song_data['popularity'],
                        song_data['artist_popularity'],
                        sum(song_data['artist_popularity'])/len(song_data['artist_popularity']),
                        song_data['explicit'],
                        song_data['preview_url'],
                         wget.download(song_data['preview_url'], out=f"{preview_audio_dir}/{remove_punctuation(song['title'])}.{preview_format}").split(os.sep)[-1] if song_data['preview_url'] else None,
                        "not_fetched",
                        -1,
                        song_data['artist_genres']]], columns=spotify_song_cols)
                else:
                    state.append(None)
                    state.append(False)
                    s_df = pd.DataFrame(data=[[song['title']] + [None for i in range(len(spotify_song_cols)-1)]], columns=spotify_song_cols)

                if audio_features is not None:
                    state.append(True)
                    # audio_feature_cols = ['spotify_' + item for item in audio_feature_cols]
                    af_df = pd.DataFrame(data=[[audio_features['danceability'],
                                                audio_features['energy'],
                                                audio_features['key'],
                                                audio_features['loudness'],
                                                audio_features['mode'],
                                                audio_features['speechiness'],
                                                audio_features['acousticness'],
                                                audio_features['instrumentalness'],
                                                audio_features['liveness'],
                                                audio_features['valence'],
                                                audio_features['tempo'],
                                                audio_features['time_signature']]], columns=audio_feature_cols)
                    # af_df = pd.DataFrame(data=[['sample feature 1']], columns=['audio_feature_1'])
                else:
                    state.append(False)
                    af_df = pd.DataFrame(data=[[None for i in range(len(audio_feature_cols))]], columns=audio_feature_cols)

                if audio_analysis is not None:
                    state.append(True)
                    song_title = remove_punctuation(song['title'])
                    with open(f"{audio_analysis_dir}/{song_title}.json", "w+", encoding="utf-8") as json_file:
                        json.dump(audio_analysis, json_file, ensure_ascii=False, indent=4)
                        aa_df = pd.DataFrame(data=[[f"{song_title}.json"]], columns=audio_analysis_cols)
                    # aa_df = pd.DataFrame(data=[['sample analysis 1']], columns=['audio_analysis_1'])
                else:
                    state.append(False)
                    aa_df = pd.DataFrame(data=[[None for i in range(len(audio_analysis_cols))]], columns=audio_analysis_cols)
                    # aa_df = pd.DataFrame(data=[['sample analysis 1 EMPTY']], columns=['audio_analysis_1'])

                s_df = pd.concat([s_df, af_df, aa_df], axis=1)
                spotify_songs_df = pd.concat([spotify_songs_df, s_df])
                spotify_songs_df.to_csv(f'{output_dir}/{output_file}', index=False)
                state_df = open_or_create_csv(f'{output_dir}/state.csv', cols=state_cols)
                state_df = pd.concat([state_df, pd.DataFrame(data=[state], columns=state_cols)])
                state_df.to_csv(f'{output_dir}/state.csv', index=False)
            elif verbose:
                print(f"{spotify_song.iloc[0]['billboard_name']} - {song['artist']} skipped...")
    print(f"Data written to {output_file}")

In [15]:
def yt_get_query_string(song_name: str) ->  bytes:
    return "+".join(song_name.split()).encode("utf-8")

def yt_query(video_title: str, all_ids: bool = False) -> str or None:
    query = yt_get_query_string(video_title)
    url = f"https://www.youtube.com/results?search_query={query}"
    html = requests.get(url)
    vid_ids = re.findall(r"watch\?v=(\S{11})", html.text)
    if len(vid_ids) == 0: return None
    return vid_ids if all_ids else vid_ids[0]

def yt_download_audio(vid_id: str, output_dir=os.getcwd(), filename=None, file_type='m4a') -> str or None:
    yt_music = YTMusic()
    track = yt_music.get_song(videoId=vid_id)
    song_url = track['microformat']['microformatDataRenderer']['urlCanonical']
    vid = YTDownload(song_url)
    vid_audio = vid.streams.get_audio_only()
    if vid_audio is None: return None
    filename = track['videoDetails']['title'] if filename is None else filename
    filename = remove_punctuation(filename)
    output_dir = mkdir(output_dir)
    return vid_audio.download(output_path=output_dir, filename=f"{filename}.{file_type}"), track['videoDetails']['lengthSeconds']

In [16]:
@animation.wait('spinner', text='Fetching Audio Files', speed=0.2)
def fetch_audio_data(songs_csv_filepath="../data/spotify/songs.csv", output_dir="../data/audio/full"):
    songs_csv_filepath = os.path.abspath(songs_csv_filepath)
    output_dir = mkdir(output_dir)
    songs_df_full = pd.read_csv(songs_csv_filepath)
    songs_df = songs_df_full[['billboard_name', 'artist', 'full_audio']]
    try:
        for i in range(len(songs_df)):
            song = songs_df.iloc[i]
            if song['full_audio'] == "not_fetched":
                query_string = f"{song['billboard_name']} {song['artist']} lyrics"
                yt_id = yt_query(query_string, all_ids=False)
                # print(f"{song['billboard_name']}: {yt_id}")
                audio_file, duration_s = yt_download_audio(yt_id, output_dir=output_dir) if yt_id is not None else (None, None)
                songs_df_full.iat[i, songs_df_full.columns.get_loc('full_audio')] = audio_file.split(os.sep)[-1]
                songs_df_full.iat[i, songs_df_full.columns.get_loc('full_audio_duration_s')] = duration_s
    except Exception:
        songs_df_full.to_csv(songs_csv_filepath, index=False)
    finally:
        songs_df_full.to_csv(songs_csv_filepath, index=False)
    print(f"Data written to {output_dir}")

In [17]:
start_time = time.time()

In [15]:
get_billboard_data(2020, 2021, output_dir="../data/billboard")

Fetching Billboard Charts	/Data written to C:\msc_data_science_uwi_sta\semester_2\classes\comp_6940\project\comp-6940-project\data\billboard\hot-100
[K


In [16]:
merge_csvs_in_path('../data/billboard/hot-100', glob_pattern='hot-100_*.csv', output_path='../data/billboard', output_filename='hot-100_all')

In [17]:
fetch_spotify_songs(song_dir="../data/billboard/hot-100/", glob_pattern="hot-100_*.csv", output_dir='../data/spofity', output_file='songs.csv', audio_analysis_dir='../data/spofity/audio_analysis', preview_audio_dir="../data/audio/previews")

Fetching Spotify Song Data	\

HTTP Error for GET to https://api.spotify.com/v1/audio-analysis/4LaGu95Ui2s4vprSQYWUAZ with Params: {} returned 404 due to analysis not found


\Data written to songs.csv
[K


In [18]:
fetch_audio_data("../data/spofity/songs.csv", output_dir="../data/audio/full")

Fetching Audio Files	|Data written to C:\msc_data_science_uwi_sta\semester_2\classes\comp_6940\project\comp-6940-project\data\audio\full
[K


In [22]:
execution_time = time.time() - start_time
print(f"{execution_time} seconds\n{floor(execution_time/60)} mins, {execution_time%60} seconds")
# execution time to get Billboard charts, Spotify data and YouTube audio data for 2020 and 2021
#                = 4776.834360599518 seconds
#                = 79 mins, 36.83436059951782 seconds

6874.429597616196 seconds
114 mins, 34.42959761619568 seconds


### Features to extract (modelled off GTZAN dataset):
    0. length of analysed segments
    1. Chroma stft (short term fourier transform)
    2. rms (root mean square)
    3. spectral centroid
    4. spectral bandwidth
    5. rolloff
    6. zero crossing rate
    7. harmony
    8. perceptr
    9. tempo
    10. mfccs

In [18]:
def extract_audio_features(filepath:str, chunk_length: float or int = None, num_chunks: int = 1, n_fft:int=2048, hop_length:int=512):
    assert ((num_chunks > 1) and (chunk_length is not None)) == False, "either chunk_length or num_chunks must be used"
    assert num_chunks > 0, "num chunks cannot be negative"

    filepath = os.path.abspath(filepath)
    filename = filepath.split(os.sep)[-1]
    y, sample_rate = librosa.load(filepath)
    duration_s = np.shape(y)[0]/sample_rate

    if chunk_length:
        num_chunks = int(np.ceil(duration_s/chunk_length))
    if num_chunks > 1: chunk_length = duration_s/num_chunks
    else: chunk_length = duration_s

    final_df = None
    cols = []
    for chunk in range(num_chunks):
        name = filename if num_chunks == 1 else f"{filename}_{chunk}"
        offset = chunk_length * chunk
        audio, sr = librosa.load(filepath, offset=offset, duration=chunk_length)
        audio, _ = librosa.effects.trim(audio)
        # stft = np.abs(librosa.stft(audio, n_fft=n_fft, hop_length=hop_length))
        chroma_stft = librosa.feature.chroma_stft(audio, sr=sr, hop_length=hop_length, n_fft=n_fft)
        chroma_stft_mean = np.mean(chroma_stft)
        chroma_stft_var = np.var(chroma_stft)
        rms = librosa.feature.rms(y=audio)
        rms_mean = np.mean(rms)
        rms_var = np.var(rms)
        spectral_centroid = librosa.feature.spectral_centroid(audio, sr=sr)[0]
        spectral_centroid_mean = np.mean(spectral_centroid)
        spectral_centroid_var = np.var(spectral_centroid)
        spectral_bandwidth = librosa.feature.spectral_bandwidth(audio)
        spectral_bandwidth_mean = np.mean(spectral_bandwidth)
        spectral_bandwidth_var = np.var(spectral_bandwidth)
        rolloff = librosa.feature.spectral_rolloff(audio)
        rolloff_mean = np.mean(rolloff)
        rolloff_var = np.mean(rolloff)
        zero_crossing_rate = librosa.feature.zero_crossing_rate(audio)
        zero_crossing_rate_mean = np.mean(zero_crossing_rate)
        zero_crossing_rate_var = np.var(zero_crossing_rate)
        harmony, perceptr = librosa.effects.hpss(audio)
        harmony_mean = np.mean(harmony)
        harmony_var = np.var(harmony)
        perceptr_mean = np.mean(perceptr)
        perceptr_var = np.var(perceptr)
        tempo, _ = librosa.beat.beat_track(audio, sr=sr)
        mfccs = librosa.feature.mfcc(audio, sr=sr)
        mfccs_mean = [np.mean(mfcc) for mfcc in mfccs]
        mfccs_var = [np.var(mfcc) for mfcc in mfccs]
        cols = ['filename', 'length', 'chroma_stft_mean', 'chroma_stft_var', 'rms_mean', 'rms_var', 'spectral_centroid_mean', 'spectral_centroid_var', 'spectral_bandwidth_mean', 'spectral_bandwidth_var', 'rolloff_mean', 'rolloff_var', 'zero_crossing_rate_mean', 'zero_crossing_rate_var', 'harmony_mean', 'harmony_var', 'perceptr_mean', 'perceptr_var', 'tempo']
        data = [name, chunk_length, chroma_stft_mean, chroma_stft_var, rms_mean, rms_var, spectral_centroid_mean, spectral_centroid_var, spectral_bandwidth_mean, spectral_bandwidth_var, rolloff_mean, rolloff_var, zero_crossing_rate_mean, zero_crossing_rate_var, harmony_mean, harmony_var, perceptr_mean, perceptr_var, tempo]
        for m in range(len(mfccs_mean)):
            cols.append(f"mfcc{m+1}_mean")
            cols.append(f"mfcc{m+1}_var")
            data.append(mfccs_mean[m])
            data.append(mfccs_var[m])
        df = pd.DataFrame(columns=cols, data=[data])
        final_df = df if final_df is None else pd.concat([final_df, df], axis=0)
    return final_df, cols

In [32]:
@animation.wait('spinner', text='Fetching Audio Features', speed=0.2)
def fetch_audio_features(song_path='../data/spofity/songs.csv', output_path='../data/audio/audio_features_full.csv', audio_dir='../data/audio/full', audio_type='full'):
    assert audio_type == 'full' or audio_type == 'preview', 'audio_type must either be full or preview'
    audio_field = "full_audio" if audio_type == "full" else 'preview_url_audio'
    song_path = os.path.abspath(song_path)
    output_path = os.path.abspath(output_path)
    audio_dir = os.path.abspath(audio_dir)
    songs_df = pd.read_csv(song_path)

    status_dir = os.sep.join(output_path.split(os.sep)[:-1])
    state_cols = ['billboard_name']
    status_file = f"{status_dir}/{audio_type}_state.csv"
    status_df = open_or_create_csv(status_file, state_cols)

    for i in range(len(songs_df)):
        filename = songs_df.iloc[i][audio_field]
        if filename in status_df['billboard_name'].unique(): continue
        filepath = f"{audio_dir}/{filename}"
        audio_features, cols = extract_audio_features(filepath)
        output_df = open_or_create_csv(output_path, cols)
        output_df = pd.concat([output_df, audio_features])
        output_df.to_csv(output_path, index=False)
        status_df = pd.concat([status_df, pd.DataFrame(data=[[filename]], columns=state_cols)])
        status_df.to_csv(status_file, index=False)
    output_df = open_or_create_csv(output_path, cols=[])
    songs_df = pd.concat([songs_df, output_df], axis=1)
    songs_df.to_csv(song_path, index=False)
    return output_df

In [None]:
fetch_audio_features('../data/spofity/songs.csv')

In [None]:
# TODO: Move function definitions to separate module