In [1]:
%matplotlib inline
import os
import sys
import json
import spotipy
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from spotipy.oauth2 import SpotifyClientCredentials
from spotipy.client import SpotifyException

from sklearn import decomposition
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy import sparse

# models
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import plot_tree, DecisionTreeClassifier

from spotirecs import get_playlist_tracks, get_features, Playlists
from spotirecs.utils import get_playlists
from spotirecs.plotting import plot_features

load_dotenv() 

# pandas settings
pd.set_option('display.max_colwidth', 30)
pd.set_option('display.max_columns', 0)
pd.set_option('display.expand_frame_repr', False)

In [2]:
def dict_to_json(data: dict, path: str):
    with open(path, "w", encoding="utf-8") as outfile:
        json.dump(data, outfile)

In [3]:
username_id = '113586775'

playlist_of_interest_name = 'Favorites'

## Initialization

In [4]:
# Logging in

spotify = spotipy.Spotify(requests_timeout=10, client_credentials_manager=SpotifyClientCredentials())

# Which playlists do I have?
playlists = get_playlists(spotify, username_id)

playlist_of_interest = None

playlists_of_no_interest = []
for playlist in playlists.items:
    if playlist.name == playlist_of_interest_name:
        playlist_of_interest = playlist
    elif playlist.owner.id == username_id:
        playlists_of_no_interest.append(playlist)


In [5]:
if playlist_of_interest is None:
    print("Given playlist name can not be found")
    sys.exit(1)

good_track_ids, good_track_names = get_playlist_tracks(spotify, playlist_of_interest)

bad_track_ids = []
bad_track_names = []

for playlist in playlists_of_no_interest:
    tmp_ids, tmp_names = get_playlist_tracks(spotify, playlist)
    
    for tmp_id, tmp_name in zip(tmp_ids, tmp_names):
        if tmp_id not in good_track_ids and tmp_id not in bad_track_ids:
            bad_track_ids.append(tmp_id)
            bad_track_names.append(tmp_name)

ValidationError: 1 validation error for Tracks
previous
  Input should be a valid integer, unable to parse string as an integer [type=int_parsing, input_value='https://api.spotify.com/...&additional_types=track', input_type=str]
    For further information visit https://errors.pydantic.dev/2.6/v/int_parsing

In [None]:
ratings = [1] * len(good_track_ids) + [0] * len(bad_track_ids)
track_ids = good_track_ids + bad_track_ids
track_names = good_track_names + bad_track_names

## Feature-extraction

In [None]:
data_file = "track_features.csv"

features = get_features(spotify, track_ids)
favorites_df = pd.DataFrame(features, index=track_names)
favorites_df['rating'] = ratings

In [None]:
if os.path.isfile(data_file):
    # If the data file exists, look for new tracks and append their features
    print("Audio features found")
    read_df = pd.read_csv(data_file, index_col=0)
    read_ids = read_df['id']
    
    print("\tFinding new tracks ...")
    tmp_indices = []
    for i, track_id in enumerate(track_ids):
        if track_id not in read_ids.values:
            tmp_indices.append(i)

    new_ids = [track_ids[i] for i in tmp_indices]
    new_names = [track_names[i] for i in tmp_indices]
    new_ratings = [ratings[i] for i in tmp_indices]

    new_features = get_features(spotify, new_ids)
    new_features_df = pd.DataFrame(new_features, index=new_names)
    new_features_df['rating'] = new_ratings
    
    if new_features_df.size > 0:
        print("\tAppending new track features to main file ...")
        new_features_df.to_csv(data_file, mode='a')

        print("\tCreating audio features dataframe ...")
        read_df = pd.concat([read_df, new_features_df])
        favorites_df = read_df[read_df['id'].isin(track_ids)]

favorites_df.to_csv(data_file)

print("Done!")
favorites_df

## Pre-processing

In [None]:
training_df = favorites_df[["acousticness", "danceability", "duration_ms", "energy", "instrumentalness",  "key", "liveness", "loudness", "mode", "speechiness", "tempo", "valence", "rating"]]
training_df

In [None]:
column_names = training_df.columns[:-1]

ratings = training_df['rating'].to_numpy()
training_array = training_df.to_numpy()

fig, _ = plot_features(column_names, training_array[ratings == 0], training_array[ratings == 1])

In [None]:
X_train = training_df.drop('rating', axis=1)
y_train = training_df['rating']

X_scaled = StandardScaler().fit_transform(X_train)
pca = decomposition.PCA().fit(X_scaled)

variance_ratio = pca.explained_variance_ratio_
cum_var = np.cumsum(variance_ratio)
threshold = 0.95
n_components = next(i for i, v in enumerate(cum_var) if v > threshold) + 1
print(f"Number of components for {threshold*100:.0f}% variance: {n_components}")

plt.figure(figsize=(15,5))
plt.plot(range(1,13), cum_var, lw=2)
plt.xlabel('Number of components')
plt.ylabel('Total variance explained')
plt.xlim(1, 12)
plt.ylim(0, 1)
plt.grid()
plt.axvline(n_components, c='k')
plt.axhline(threshold, c='r')


In [None]:
# Fit your dataset to the optimal pca
pca = decomposition.PCA(n_components=n_components)
X_train_pca = pca.fit_transform(X_scaled)

column_names = [f"Component #{i + 1}" for i in range(n_components)]
plot_features(column_names, X_train_pca[y_train == 0], X_train_pca[y_train == 1])

In [None]:
v = TfidfVectorizer(sublinear_tf=True, ngram_range=(1, 6), max_features=10000)
X_names_sparse = v.fit_transform(track_names)
X_names_sparse.shape

X_train = sparse.csr_matrix(sparse.hstack([X_train_pca, X_names_sparse]))
X_train.shape

## Training

In [None]:
# K-nearest neighbor classification
n_splits = 5
max_neighbors = 50

skf = StratifiedKFold(n_splits=n_splits, shuffle=True)

knc_params = {'n_neighbors': range(1, max_neighbors + 1)}
knc = KNeighborsClassifier(n_jobs=-1)

knc_grid = GridSearchCV(knc, knc_params, n_jobs=-1, cv=skf, verbose=1)
knc_grid.fit(X_train, y_train)
print("Best score: ", knc_grid.best_score_)

grid_results = pd.DataFrame(knc_grid.cv_results_)
grid_results

In [None]:
plt.figure(figsize=(15, 5))
for n in range(n_splits):
    plt.plot(grid_results['param_n_neighbors'], grid_results[f'split{n}_test_score'], label=f'Split {n}', ls='--', lw=1)
plt.plot(grid_results['param_n_neighbors'], grid_results['mean_test_score'], label='Mean', lw=3)
plt.legend()
plt.xlabel('Number of neighbors')
plt.ylabel('Test score')
plt.xlim(0, max_neighbors)

In [None]:
# Random Forest classification

rfc_parameters = {
    'max_features': [4, 6, 8, 10], 
    'min_samples_leaf': [1, 3, 5, 7],
    'max_depth': [3, 5, 7]
}
rfc = RandomForestClassifier(n_estimators=100, n_jobs=-1, oob_score=True)
forest_grid = GridSearchCV(rfc, rfc_parameters, n_jobs=-1, cv=skf, verbose=1)
forest_grid.fit(X_train, y_train)
print("Best score: ", forest_grid.best_score_)

grid_results = pd.DataFrame(forest_grid.cv_results_)
grid_results

In [None]:
plt.figure(figsize=(15, 5))
for n in range(n_splits):
    plt.plot(grid_results[f'split{n}_test_score'], label=f'Split {n}', ls='--', lw=1)
plt.plot(grid_results['mean_test_score'], label='Mean', lw=3)
plt.legend()
plt.xlabel('Parameters')
plt.ylabel('Test score')
plt.xlim(0, len(grid_results.index) - 1)
plt.xticks([])

In [None]:
# Decision tree classification

tree_parameters = {
    'max_depth': range(1,11),
    'max_features': range(4, 11)
}
tree = DecisionTreeClassifier()
tree_grid = GridSearchCV(tree, tree_parameters, cv=skf, n_jobs=-1, verbose=True)
tree_grid.fit(X_train, y_train)
print("Best score: ", tree_grid.best_score_)

grid_results = pd.DataFrame(tree_grid.cv_results_)
grid_results

In [None]:
plt.figure(figsize=(15, 5))
for n in range(n_splits):
    plt.plot(grid_results[f'split{n}_test_score'], label=f'Split {n}', ls='--', lw=1)
plt.plot(grid_results['mean_test_score'], label='Mean', lw=3)
plt.legend()
plt.xlabel('Parameters')
plt.ylabel('Test score')
plt.xlim(0, len(grid_results.index) - 1)
plt.xticks([])

In [None]:
plt.figure(figsize=(15, 10))
plot_tree(tree_grid.best_estimator_, fontsize=10)
print()

## Testing

In [None]:
rec_tracks_per_track = 1
max_training_size = 50

rec_tracks = []
for i in favorites_df['id']:
    try:
        rec_tracks.extend(spotify.recommendations(seed_tracks=[i], limit=rec_tracks_per_track)['tracks'])
    except:
        break

    if len(rec_tracks) >= max_training_size:
        rec_tracks = rec_tracks[:max_training_size]
        break


In [None]:
rec_track_ids = []
rec_track_names = []
for i in rec_tracks:
    rec_track_ids.append(i['id'])
    rec_track_names.append(i['name'])

rec_features = get_features(spotify, rec_track_ids)
        
rec_playlist_df = pd.DataFrame(rec_features, index=rec_track_names)
rec_playlist_df.drop_duplicates(subset='id', inplace=True)
rec_track_names = rec_playlist_df.index.tolist()
rec_playlist_df

In [None]:
testing_df = rec_playlist_df[
    [
        "acousticness", "danceability", "duration_ms", "energy", 
        "instrumentalness",  "key", "liveness", "loudness", "mode", 
        "speechiness", "tempo", "valence"
    ]
]
testing_df

In [None]:
estimators = [knc_grid, forest_grid, tree_grid]

testing_df_scaled = StandardScaler().fit_transform(testing_df)

X_test = pca.transform(testing_df_scaled)
X_test_names = v.transform(rec_track_names)

X_test = sparse.csr_matrix(sparse.hstack([X_test, X_test_names]))
y_pred_final = np.array([1] * X_test_names.shape[0])

for estimator in estimators:
    estimator.best_estimator_.fit(X_train, y_train)
    y_pred = estimator.best_estimator_.predict(X_test)
    
    y_pred_final = y_pred_final * y_pred
    print("Number of disliked tracks by model: ", sum(y_pred == 0))
    print("Number of disliked tracks: ", sum(y_pred_final == 0))
    print("Number of liked tracks: ", sum(y_pred_final == 1))
    print()

In [None]:
final_tracks = testing_df[y_pred_final.astype(bool)]
final_tracks