In [1]:
from pymongo import MongoClient
from getpass import getpass
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import davies_bouldin_score
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pickle
import random
from datetime import datetime
import sys
from IPython.display import clear_output, HTML
import re

### Create MongoDB client

In [None]:
mongo_db_username = getpass(prompt='Enter MongoDB username: ')
mongo_db_password = getpass(prompt='Enter MongoDB password: ')

mongo_db_client = MongoClient(f'mongodb://{mongo_db_username}:{mongo_db_password}@localhost:27018')

# Create a new database instance
db = mongo_db_client['spotify-db']

# Create a song-collection instance from db
song_collection = db['song-collection']

### Load MongoDB songs into Dataframe

In [None]:
data = pd.json_normalize(list(song_collection.find({"audio_features": {"$ne": None} }, {'_id': 1, 'name': 1, 'artists': 1, 'audio_features': 1})))
data = data.sample(frac=1).reset_index(drop=True)
data.head()

### Clean up Dataframe

In [None]:
data.columns

In [None]:
columns_to_drop = ['audio_features.type', 'audio_features.id', 'audio_features.uri', 'audio_features.track_href', 'audio_features.analysis_url']
data.drop(columns=columns_to_drop, axis=1, inplace=True)
data.head()

In [None]:
data['artists'] = [', '.join([artist['name'] for artist in x]) for x in data['artists']]
data.head()

In [None]:
data.nunique()

In [None]:
data.isna().sum()

In [None]:
data.dropna(axis=0, inplace=True)

In [None]:
data.shape

### Split Labels and Features

In [None]:
X = data.drop(['_id', 'name', 'artists'], axis=1)
y = data['name']

In [None]:
X.head()

### Standardize numerical columns

In [None]:
X_prep = StandardScaler().fit_transform(X)
pd.DataFrame(X_prep,columns=X.columns).head()

### Find best K mean with Davies-Bouldin method

In [None]:
# Create an empty list to store Davies-Bouldin scores for different K values
scores = []

# Fit K-Means for K values from 2 to 20
for k in range(2, 21):
    print(f"Starting with K='{k}'")
    kmeans = KMeans(n_clusters=k)
    kmeans.fit(X_prep)
    labels = kmeans.labels_
    score = davies_bouldin_score(X_prep, labels)
    scores.append(score)
    print(f"Done with K='{k}'")

# Plot the Davies-Bouldin scores for different K values
plt.plot(range(2, 21), scores, 'bx-')
plt.title('Davies-Bouldin Index')
plt.xlabel('Number of clusters')
plt.ylabel('Score')
plt.show()

In [None]:
best_k = 9
kmeans_model = KMeans(n_clusters=best_k)
kmeans_model.fit(X_prep)

In [None]:
clusters = kmeans.predict(X_prep)

# Check the size of the clusters
pd.Series(clusters).value_counts().sort_index()

### Save Model into kmeans_model_{current_date}.pkl file

In [None]:
current_date = datetime.now().strftime("%Y-%m-%d %H-%M")

with open(f'data/kmeans_model_{current_date}.pkl', 'wb') as f:
    pickle.dump(kmeans_model, f)

### Concatenate Data with Clusters

In [None]:
data = pd.concat([data,pd.Series(clusters, name='cluster')],axis=1)
data.head()

### Save Data into data_{current_date}.csv

In [None]:
data.to_csv(f'data/data_{current_date}.csv')

### Clean top_100_songs_billboard.csv

In [None]:
top_100_songs = pd.read_csv('data/top_100_songs_billboard.csv', usecols=['title', 'artists'])
top_100_songs.head()

In [None]:
top_100_songs['artists'] = top_100_songs['artists'].str.replace(" & | and | Featuring | X ", ", ", regex=True)
top_100_songs.head()

In [None]:
top_100_songs['title'] = top_100_songs['title'].str.lower()
top_100_songs['artists'] = top_100_songs['artists'].str.lower()
top_100_songs.head()

### Lower case name and artists columns in Data

In [None]:
data['name'] = data['name'].str.lower()
data['artists'] = data['artists'].str.lower()
data.head()

### Verify if Top 100 Songs are in Data

In [None]:
top_100_songs = top_100_songs.merge(data, left_on=['title', 'artists'], right_on=['name', 'artists'], how='left', indicator='exist')
top_100_songs.head()

In [None]:
top_100_songs.drop(['name', 'exist'], axis=1, inplace=True)
top_100_songs.head()

In [None]:
top_100_songs = top_100_songs.groupby('title').agg({
    '_id': 'first',
    'artists': 'first',
    'audio_features.danceability': 'mean',
    'audio_features.energy': 'mean',
    'audio_features.key': 'first',
    'audio_features.loudness': 'mean',
    'audio_features.mode': 'first',
    'audio_features.speechiness': 'mean',
    'audio_features.acousticness': 'mean',
    'audio_features.instrumentalness': 'mean',
    'audio_features.liveness': 'mean',
    'audio_features.valence': 'mean',
    'audio_features.tempo': 'mean',
    'audio_features.duration_ms': 'mean',
    'audio_features.time_signature': 'first',
    'cluster': 'first'
    }).reset_index()

top_100_songs.head()

### Save top_100_songs into top_100_songs_billboard_cleaned{current_date}.csv

In [None]:
top_100_songs.to_csv(f'data/top_100_songs_billboard_cleaned_{current_date}.csv')

### Load top_100_songs into top_100_songs_billboard_cleaned{current_date}.csv

In [2]:
top_100_songs = pd.read_csv(f'data/top_100_songs_billboard_cleaned_2023-01-16 19-20.csv')

### Load Data

In [3]:
data = pd.read_csv(f'data/data_2023-01-16 19-20.csv')

### Load kmeans_model_{current_date}.pkl file

In [4]:
with open(f'data/kmeans_model_2023-01-16 19-20.pkl', "rb") as f:
    kmeans_model = pickle.load(f)

### Build Song Recommender

In [5]:
def find_in_song_top_100(top_100_songs, user_input, match):
    clear_output()
    print(f"'{user_input}' seems to be part of the Top 100.")
    # Print list of matching items and ask user to pick one
    display(match[['_id', 'title', 'artists']])
    
    user_pick = input("Please enter the _id of the song you want or type 'no' if it's not in the list.")
    if user_pick == 'no':
        return pd.DataFrame()
    
    clear_output()
    # Return a random song, excluding the combination _id/name from the picked song from data where cluster equals to cluster of the pickedsong
    cluster = match[match['_id'] == user_pick]['cluster'].values[0]
    random_song = data[(data['cluster'] == cluster) & ~(data['name'] == user_pick)].sample()
    return random_song


def find_in_data(data, user_input, match):
    clear_output()
    print(f"'{user_input}' seems to be part of Data.")
    # Print list of matching items and ask user to pick one
    display(match[['_id', 'name', 'artists']])
    
    user_pick = input("Please enter the _id of the song you want or type 'no' if it's not in the list.")
    if user_pick == 'no':
        return pd.DataFrame()

    clear_output()
    # Return a random song, excluding the combination _id/name from the pickedsong from data where cluster equals to cluster of the pickedsong
    cluster = match[match['_id'] == user_pick]['cluster'].values[0]
    random_song = data[(data['cluster'] == cluster) & ~(data['_id'] == user_pick) & ~(data['name'] == user_pick)].sample()
    return random_song


def find_song(top_100_songs, data, user_input):
    # Search for song in top_100_songs
    match = top_100_songs[top_100_songs['title'].str.contains(user_input, case=False)]
    if not match.empty:
        result = find_in_song_top_100(top_100_songs, user_input, match)
        if result.shape[0] > 0:
            return result

    # Search for song in data
    print(f"Please wait, Data has '{data.shape[0]}' rows...")
    match = data[(data['name'].notna()) & (data['name'].str.contains(user_input, case=False))]
    if not match.empty:
        result = find_in_data(data, user_input, match)
        if result.shape[0] > 0:
            return result

    # If song was not found
    clear_output()
    return pd.DataFrame()


In [11]:
while True:
    # # User greetings
    clear_output()
    print("Welcome! Please enter a song title you like (or 'exit' to quit).")
    
    # Ask for a song title the user like or quit
    user_input = input()
    if user_input == 'exit':
        clear_output()
        break

    result = find_song(top_100_songs=top_100_songs, data=data, user_input=user_input)
    if result.shape[0] != 0:
        print(f"Based on '{user_input}', you can try:")
        display(result[['_id', 'name', 'artists']])
    else:
        print('I am sorry, I could not find any recommendation.. :\'(')

    user_input = input("Continue or 'exit'?")
    if user_input == 'exit':
        clear_output()
        break