In [None]:
import umap
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from spotipy.oauth2 import SpotifyOAuth
from os import path
from sklearn.preprocessing import minmax_scale

pd.options.display.max_rows = 100
pd.options.display.max_columns = 100

# Save filepaths
PROCESSED_DATA_FP = r'C:\Users\Brandon\git\spotify-recommender-system\data\processed\spotify_playlist_track_features.csv'
FIG_SAVE_PATH = r"C:\Users\Brandon\git\spotify-recommender-system\reports\figures"

In [None]:
# Helper objects

# Mapping for last day of each month
month_last_date = {
    1: 31,
    2: 28, # Beware of leap years
    3: 31,
    4: 30,
    5: 31,
    6: 30,
    7: 31,
    8: 31,
    9: 30,
    10: 31,
    11: 30,
    12: 31
}

# Mapping of month to name
month_name = {
    1: 'Jan',
    2: 'Feb',
    3: 'Mar',
    4: 'Apr',
    5: 'May',
    6: 'Jun',
    7: 'Jul',
    8: 'Aug',
    9: 'Sep',
    10: 'Oct',
    11: 'Nov',
    12: 'Dec'
}

# Set columns to perform analysis on
feature_cols = [
    'danceability',
    'energy',
    # 'key',
    'loudness',
    'speechiness',
    'acousticness',
    'instrumentalness',
    'liveness',
    'valence',
    'tempo',
    # 'time_signature'
]

In [None]:
# Load in data from previous notebook
df = pd.read_csv(PROCESSED_DATA_FP)

# Convert date col to Timestamp dtype
df['playlist_date_added'] = pd.to_datetime(df['playlist_date_added'], utc=True)

# Add language as indicator columns
# Note that this isn't accurate but works for ~80-85% of tracks? 
for lang in ['Japanese', 'Korean', 'Cantonese']:
    lang_mask = df['playlist_name'] == lang
    lang_artist_ids = df[lang_mask]['artist_id'].unique()
    df.loc[(df['artist_id'].isin(lang_artist_ids)), f"lang_{lang[:3].lower()}"] = 1
    
df = df.fillna(0)
df['lang_eng'] = df.apply(lambda x: 1 if sum([x[f'lang_{lang}'] for lang in ['kor', 'jap', 'can']]) == 0 else 0, axis=1)
df[['lang_kor', 'lang_jap', 'lang_can']] = df[['lang_kor', 'lang_jap', 'lang_can']].astype(int)

In [None]:
# See how many tracks are in each playlist - 2019 & 2018 were the peak periods of activity
df['playlist_name'].value_counts()

In [None]:
df.head()

In [None]:
# Check proportion of NaNs in each column, only genre has missing values - about 11% missing
df.isna().mean()

In [None]:
# Make pair plot with 2d hist plots on upper, KDE plots on lower, and univariate hist on diagonal 
g = sns.PairGrid(df[feature_cols])
g.map_upper(sns.histplot)
g.map_lower(sns.kdeplot, fill=True)
g.map_diag(sns.histplot, kde=True)
g.tight_layout()

save_path = f"{FIG_SAVE_PATH}/track_feature_pair_plot_all_tracks.png"
if not path.exists(save_path):
    g.savefig()(save_path, bbox_inches='tight')

In [None]:
# Plot distribution of features for each of my '20XX Complete Round Up' playlists - how does my music change evolve across the years?
mask = df['playlist_name'].str.contains('Complete Round Up')

plt.figure(figsize=(25,25))
for idx, feat in enumerate(feature_cols, start=1):
    plt.subplot(3,3,idx)
    sns.boxenplot(y=feat, x='playlist_name', data=df[mask], order=[f'{year} Complete Round Up' for year in range(2017,2023)])
    plt.title(f'Distribution of {feat.capitalize()} across yearly playlists', fontsize=15)
    plt.ylabel(feat.capitalize(), fontsize=14)
    plt.xlabel('Playlist', fontsize=14)
    plt.xticks(rotation=10, fontsize=12)
    plt.yticks(fontsize=12)
    
plt.tight_layout()

save_path = f'{FIG_SAVE_PATH}/track_feature_dist_yearly_playlists.png'
if not path.exists(save_path):
    plt.savefig(save_path, bbox_inches='tight')

plt.show()

In [None]:
df[mask]

In [None]:
# Plot distribution for each of my language/genre based playlists
mask = (df['playlist_name'].isin([name for name in df['playlist_name'].unique() if ('Complete' not in name) and ('&' not in name) and ('playlist' not in name)]))

plt.figure(figsize=(25,25))
for idx, feat in enumerate(feature_cols, start=1):
    plt.subplot(3,3,idx)
    sns.boxenplot(y=feat, x='playlist_name', data=df[mask])
    plt.title(f'Distribution of {feat.capitalize()} across playlists', fontsize=15)
    plt.ylabel(feat.capitalize(), fontsize=14)
    plt.xlabel('Playlist', fontsize=14)
    plt.xticks(rotation=25)
    
plt.tight_layout()

save_path = f'{FIG_SAVE_PATH}/track_feature_dist_genre_playlists.png'
if not path.exists(save_path):
    plt.savefig(save_path, bbox_inches='tight')

plt.show()

In [None]:
# Categorise tracks by when I added them to playlists -> granularity of 2 months
# This will give me a more granualar view of how my music taste evolves in 2 month periods
bi_month_year_map = {}
counter = 0

for year in range(2016,2023):
    for month in range(1,12,2):
        
        # Needed for string substitution
        if len(str(month)) < 2:
            _month = f'0{month}'
        else:
            _month = month

        if len(str(month+1)) == 2:
            _month_plus_1 = month+1
        else:
            _month_plus_1 = f'0{month+1}'
        
        # Take into account 2020 leap year for Feb end date
        if (year == 2020) and (month+1 == 2):
            last_date_month = month_last_date[month+1] + 1
        else:
            last_date_month = month_last_date[month+1]
        
        # Generate date mask for 2 month period
        date_mask = (
            (df['playlist_date_added'] >= pd.Timestamp(f'{year}-{_month}-01T00:00:00+0000'))
            & (df['playlist_date_added'] <= pd.Timestamp(f'{year}-{_month_plus_1}-{last_date_month}T23:59:59+0000'))
        )
        
        # Make a mapping of counter -> period name
        bi_month_year_map[counter] =  f'{month_name[month]} & {month_name[month+1]} {year}'
        
        # Set 'bi_month_year' for period as counter, makes it easy to sort the dataframe
        # Can use the mapping when we want the period names
        df.loc[date_mask, 'bi_month_year'] = counter
        
        # Increment the counter
        counter += 1

In [None]:
# Perform a group by aggregation to get the median values of features by each 'bi_month_year' period
agg_df = (df
 .groupby(by='bi_month_year')
 .agg({
     'danceability': 'median',
     'energy': 'median',
     'key': 'median',
     'loudness': 'median',
     'speechiness': 'median',
     'acousticness': 'median',
     'instrumentalness': 'median',
     'liveness': 'median',
     'valence': 'median',
     'tempo': 'median'
 })
 .reset_index()
)

agg_df['bi_month_year'] = agg_df['bi_month_year'].astype(int)

In [None]:
# Make boxen plots for each track feature across all 2-month periods
# Can I notice any changes in music taste due to life events?
for feat in feature_cols:
    plt.figure(figsize=(30,12))
    
    sns.boxenplot(y=feat, x='bi_month_year', data=df.sort_values(by=['bi_month_year']))
    sns.lineplot(y=agg_df[feat], x=range(len(agg_df)), marker='o', linestyle='-.')
    
    plt.title(f'Distribution of {feat.capitalize()} across time', fontsize=18)
    plt.ylabel(feat.capitalize(), fontsize=16)
    plt.xlabel('Period', fontsize=16)
    plt.xticks(ticks=range(df['bi_month_year'].nunique()) ,labels=[bi_month_year_map.get(key) for key in df.sort_values(by=['bi_month_year'])['bi_month_year'].unique()], rotation=25, fontsize=14)
    plt.yticks(fontsize=14)
    
    save_path = f'{FIG_SAVE_PATH}/track_{feat}_dist_2016-2022.png'
    if not path.exists(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    
    plt.show()

In [None]:
agg_df_cp = (
    agg_df[['danceability', 'valence', 'energy', 'acousticness']]
    .copy()
    .T
    .rename(columns={k:v for k,v in bi_month_year_map.items()})
    .reset_index()
)

melted_agg_df = pd.DataFrame()
for col in agg_df_cp.columns:
    if col == 'index':
        continue
    tmp_df = agg_df_cp[['index', col]].rename(columns={col: 'values'})
    tmp_df['period'] = col
    melted_agg_df = pd.concat([melted_agg_df, tmp_df])

# melted_agg_df = melted_agg_df.reset_index(drop=True)
melted_agg_df['index'] = pd.Categorical(melted_agg_df['index'])

In [None]:
fig = px.line_polar(
    melted_agg_df,
    r='values',
    theta='index',
    animation_frame='period',
    animation_group='index',
    line_close=True,
    width=800,
    height=800
)

fig.update_traces(fill='toself')

fig.update_layout(
  polar=dict(
    radialaxis=dict(
      visible=True,
      range=[0, 0.8]
    )),
  showlegend=False
)

save_path = f"{FIG_SAVE_PATH}/track_features_radar_chart_2_monthly_periods.html"
if not path.exists(save_path):
    fig.write_html(f"{FIG_SAVE_PATH}/track_features_radar_chart_2_monthly_periods.html")
    
fig.show()

In [None]:
fig = px.scatter_3d(
    df.drop_duplicates(subset=['id']),
    x='danceability',
    y='energy',
    z='valence',
    color='tempo',
    width=800,
    height=800
)

fig.update_traces(
    marker=dict(size=2),
    selector=dict(mode='markers')
)

fig.show()

In [None]:
from sklearn.neighbors import NearestNeighbors

nn_feat_cols = [
    'danceability',
    'energy',
    'loudness',
    # 'speechiness',
    'acousticness',
    'instrumentalness',
    # 'liveness',
    'valence',
    'tempo',
    # 'key',
    # 'time_signature',
    'lang_eng',
    'lang_kor',
    'lang_jap',
    'lang_can',
]

# Manually set weightings on columns to bias them, higher weights puts higher bias on a feature
WEIGHTING = True
weighting = {
    'danceability': 1,
    'energy': 1,
    'loudness': 1,
    'valence': 1,
    'instrumentalness': 1,
    'acousticness': 1,
    'tempo': 1,
    'key': 1,
    'time_signature': 1,
    'lang_eng': 1,
    'lang_kor': 1,
    'lang_jap': 1,
    'lang_can': 1,
}

df_weighted = df.copy().drop_duplicates(subset=nn_feat_cols).reset_index(drop=True)

for col in ['tempo', 'loudness', 'time_signature', 'key']:
    if col in nn_feat_cols:
        df_weighted[col] = minmax_scale(df_weighted[col].values)
        
if WEIGHTING == True:
    for col in nn_feat_cols:
        df_weighted[col] = weighting[col] * np.abs(df_weighted[col])

In [None]:
df_weighted[nn_feat_cols]

In [None]:
nbrs = NearestNeighbors(n_neighbors=20, algorithm='auto').fit(df_weighted[nn_feat_cols].values)

input_track = df_weighted[nn_feat_cols].iloc[0].values.reshape(1, -1)
distances, indices = nbrs.kneighbors(input_track) 

In [None]:
[(idx, dist) for idx, dist in zip(indices[0], distances[0])]

In [None]:
df_weighted.iloc[indices[0]]

In [None]:
np.cov(df_weighted[feat_cols])

In [None]:
from sklearn.covariance import EmpiricalCovariance, MinCovDet

feat_cols = [
    'danceability',
    'energy',
    'loudness',
    # 'speechiness',
    'acousticness',
    'instrumentalness',
    # 'liveness',
    'valence',
    'tempo',
    # 'key',
    # 'time_signature',
    # 'lang_eng',
    # 'lang_kor',
    # 'lang_jap',
    # 'lang_can',
]

robust_cov = MinCovDet().fit(df_weighted[feat_cols].values)

reducer = umap.UMAP(metric='mahalanobis', metric_kwds={'V': robust_cov.covariance_})
embedding = reducer.fit_transform(df_weighted[feat_cols])

In [None]:
robust_cov = MinCovDet().fit(df_weighted[feat_cols].values)

In [None]:
robust_cov

In [None]:
plt.figure(figsize=(18,12))
plt.scatter(
    embedding[:, 0],
    embedding[:, 1],
    s=5,
    alpha=0.9
    # c=[sns.color_palette()[x] for x in penguins.species_short.map({"Adelie":0, "Chinstrap":1, "Gentoo":2})]
)
# plt.gca().set_aspect('equal', 'datalim')
plt.title('UMAP projection of the Penguin dataset', fontsize=24)

In [None]:
from sklearn.manifold import TSNE

perplexities = [5, 30, 50, 100]

for i, perplexity in enumerate(perplexities):
    tsne = TSNE(perplexity=50)
    Y = tsne.fit_transform(np.asarray(df_weighted[feat_cols].values, 'float64'))
    
    plt.figure(figsize=(18,12))
    plt.scatter(
        Y[:, 0],
        Y[:, 1],
        s=5,
        alpha=0.9
    )
    plt.show()