In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb
%matplotlib inline

In [None]:
df = pd.read_csv('data.csv')

In [None]:
df.head()

In [None]:
df.shape

In [None]:
# plt.figure()
# for i, feature in enumerate(audio_feature_cols):
#     plt.subplot(4, 4, i+1)
#     plt.hist(df[feature])
#     plt.title(feature)

# plt.tight_layout()
# plt.show()

def plot(feat):
    plt.figure()
    plt.hist(df[feat])
    plt.xlabel(feat)
    plt.show()

audio_feature_cols = ['popularity', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness',
                          'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature']

for feat in audio_feature_cols:
    plot(feat)

In [None]:
continuous_features_cols = ['danceability', 'energy', 'loudness', 'speechiness', 'acousticness',
                          'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms', 'year', 'popularity']

In [None]:
def clean_df(df):
    years = []
    for date in df['release_date']:
        years.append(int(date[:4]))

    df['year'] = years
    
    dropped_cols = ['name', 'artist', 'album', 'key', 'mode', 'time_signature', 'release_date']
    return df.drop(dropped_cols, axis=1)

In [None]:
from sklearn.preprocessing import MinMaxScaler

def scale_min_max(df):
    scaler = MinMaxScaler()
    scaled_features = scaler.fit_transform(df)
    return pd.DataFrame(scaled_features, columns=df.columns)

In [None]:
from sklearn.preprocessing import StandardScaler

def scale_standard(df):
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(df)
    return pd.DataFrame(scaled_features, columns=df.columns)

In [None]:
def scale_df(df, scaler):
    if scaler == 'minmax':
        return scale_min_max(df)
    elif scaler == 'standard':
        return scale_standard(df)

In [None]:
from sklearn.decomposition import PCA

def apply_PCA(df, n):
    pca = PCA(n_components=n)
    return pca.fit_transform(df)

In [None]:
def preprocess_df(df):
    df_cont = clean_df(df)
    df_ids = df_cont['id']
    df_cont = scale_df(df_cont.drop('id', axis=1), scaler='standard')
    pca_arr = apply_PCA(df_cont, 6)
    pca_df_cols = []
    for i in range(len(pca_arr[0])):
        pca_df_cols.append('feature {}'.format(i+1))
    df_cont = pd.DataFrame(pca_arr, columns=pca_df_cols)
    df_cont['id'] = df_ids
    return df_cont

In [None]:
def fit_euclid_dist_recommender(df, given_track_id):
    """Returns a list of tuples of tracks and ids sorted by Euclidean distance."""
    df_cont = preprocess_df(df)
    
    given_track_df = df_cont.loc[df_cont['id'] == given_track_id]
    given_track_array = np.array(given_track_df.drop(['id'], axis=1)).reshape(-1,)
        
    track_distances = []

    for i in range(len(df_cont)):
        track_id = df_cont.iloc[i]['id']
        track_array = np.array(df_cont.iloc[i].drop(['id']))
        euclid_dist = np.linalg.norm(given_track_array - track_array)
        track_distances.append((track_id, euclid_dist))

    return sorted(track_distances, key=lambda x: x[1])

In [None]:
def euclid_dist_recommender(df, sorted_tracks, num_tracks, closeness):
    """Returns a list of tuples that contain the song names and IDs of the recommended songs."""
    rec_songs = []
    starting_index = int(closeness*(len(df)-2))+1
    for i in range(num_tracks):
        if (i+starting_index >= len(df)):
            continue
        song_df = df[df['id']==sorted_tracks[i+starting_index][0]]
        rec_song = (song_df.iloc[0]['name'], song_df.iloc[0]['id'])
        rec_songs.append(rec_song)
    
    return rec_songs

In [None]:
df[df['name']=='Call Me Maybe']

In [None]:
test_track_id = '20I6sIOMTCkB6w7ryavxtO'

In [None]:
sorted_tracks = fit_euclid_dist_recommender(df, test_track_id)

In [None]:
rec_songs = euclid_dist_recommender(df, sorted_tracks, 5, 0)
print(rec_songs)

In [None]:
def get_ids(rec_songs):
    ids_list = []
    for song in rec_songs:
        ids_list.append(song[1])
    
    return ids_list

In [None]:
def plot_closeness_pca(given_song_id, rec_songs_id):
    projection = clean_df(df)
    df_ids = projection['id']
    projection = scale_df(projection.drop('id', axis=1), scaler='standard')
    pca_arr = apply_PCA(projection, 2)
    pca_df_cols = []
    for i in range(len(pca_arr[0])):
        pca_df_cols.append('feature {}'.format(i+1))
    projection = pd.DataFrame(pca_arr, columns=pca_df_cols)
    projection['id'] = df_ids

    given_row = projection[projection['id']==given_song_id]
    given_point = (given_row['feature 1'], given_row['feature 2'])

    rec_points = []
    for song_id in rec_songs_id:
        row = projection[projection['id']==song_id]
        rec_points.append((row['feature 1'], row['feature 2']))
    rec_x = [point[0] for point in rec_points]
    rec_y = [point[1] for point in rec_points]

    x = projection['feature 1']
    y = projection['feature 2']
    plt.figure()
    plt.scatter(x, y)
    plt.scatter(rec_x, rec_y, c='orange')
    plt.scatter(given_point[0], given_point[1], c='red')
    plt.xlabel('x')
    plt.ylabel('y')
    plt.show()

In [None]:
plot_closeness_pca(test_track_id, get_ids(rec_songs))

In [None]:
def plot_closeness(given_song_id, rec_songs_id, dims):
    df_dims = df[dims+['id']]
    
    given_row = df_dims.loc[df_dims['id']==given_song_id]
    given_point = (given_row.iloc[0][dims[0]], given_row.iloc[0][dims[1]])
    
    rec_points = []
    
    for song_id in rec_songs_id:
        row = df_dims.loc[df_dims['id']==song_id]
        point = (row.iloc[0][dims[0]], row.iloc[0][dims[1]])
        rec_points.append(point)
    
    rec_x = [point[0] for point in rec_points]
    rec_y = [point[1] for point in rec_points]
    
    df_other = df_dims[~df_dims['id'].isin(rec_songs_id)]
    points = []
    
    for i in range(len(df_other)):
        point = (df_other.iloc[i][0], df_other.iloc[i][1])
        points.append(point)
    
    x = [point[0] for point in points]
    y = [point[1] for point in points]
    
    plt.figure()
    plt.scatter(x, y)
    plt.scatter(rec_x, rec_y, c='orange')
    plt.scatter(given_point[0], given_point[1], c='red')
    plt.xlabel(dims[0])
    plt.ylabel(dims[1])
    plt.show()

In [None]:
for i, feat_1 in enumerate(continuous_features_cols):
    for feat_2 in continuous_features_cols:
        if feat_2 == feat_1:
            continue
        if continuous_features_cols.index(feat_2) < i:
            continue
        
        plot_closeness(test_track_id, get_ids(rec_songs), [feat_1, feat_2])

In [None]:
from numpy.random import default_rng

def init_centroids(feat_range, k, df):
    rng = default_rng()
    centroids = []
    for c in range(k):
        centroids.append((c, rng.uniform(low=feat_range[0], high=feat_range[1], size=len(df.columns)-1)))
    
    return centroids

In [None]:
def fit_k_means_clustering(df, k, stopping_dist):
    """Returns a dictionary containing the track centroids and lables after clustering."""
    df_cont = preprocess_df(df)
    
    # initialize centroids
    centroids = init_centroids((-1, 1), k, df_cont)
    prev_centroids = None
    clusters = None
    
    # iterate for an arbitrarily large number of times
    i = 0
    maxI = 100
    while i < maxI:
        i += 1
        print('iterations: {}'.format(i))
        
        # keep track of previous centroids
        prev_centroids = centroids
        
        clusters = []
        
        # loop through dataframe
        for j in range(len(df_cont)):
            # get track vector
            track_array = np.array(df_cont.iloc[j].drop(['id']))
            
            distances = []
            
            # loop through centroids and compute the Euclidean distance of each centroid to the track vector
            for labeled_centroid in centroids:
                euclid_dist = np.linalg.norm(labeled_centroid[1] - track_array)
                distances.append((labeled_centroid[0], euclid_dist))
            
            # assign track vector to the closest centroid
            sorted_distances = sorted(distances, key=lambda x: x[1])
            clusters.append((sorted_distances[0][0], track_array))
        
        centroids = []
        
        # loop through clusters to recompute centroids
        for c in range(k):
            cluster = []
            for labeled_cluster in clusters:
                if labeled_cluster[0] == c:
                    cluster.append(labeled_cluster[1])
            
            # compute mean vector by summing all vectors in a cluster and diving by the number of vectors
            vector_sum = np.zeros(len(cluster[0]))
            for vector in cluster:
                vector_sum = vector_sum + vector
            
            mean_vector = vector_sum/len(cluster)
            centroids.append((c, mean_vector))
        
        centroid_distances = []
        
        # compute the Euclidean distances between the current and previous centroids
        for c in range(k):
            centroid_euclid_dist = np.linalg.norm(centroids[c][1] - prev_centroids[c][1])
            centroid_distances.append(centroid_euclid_dist)
        
        centroid_distances = np.array(centroid_distances)
        dist_are_less = centroid_distances < stopping_dist
        
        # terminate loop if all computed distances are less than stopping_dist
        if np.all(dist_are_less):
            break
    
    centroids = [centroid[1] for centroid in centroids]
    labels = [cluster[0] for cluster in clusters]
    
    return {'centroids':centroids, 'labels':labels}

In [None]:
from numpy.random import default_rng

def k_means_recommender_rand_sample(df, labels, given_track_id, num_tracks):
    """Returns a list of tuples that contain the song names and IDs of the recommended songs."""
    given_track_index = df[df['id']==given_track_id].index.tolist()[0]
    given_track_label = labels[given_track_index]
    
    given_cluster_label_indexes = []
    for i, label in enumerate(labels):
        if label == given_track_label:
            given_cluster_label_indexes.append(i)
    
    rng = default_rng()
    selected_indexes = rng.integers(len(given_cluster_label_indexes), size=num_tracks)
    
    rec_songs = []
    for i in selected_indexes:
        rec_song = df.iloc[given_cluster_label_indexes[i]]
        rec_songs.append((rec_song['name'], rec_song['id']))
    
    return rec_songs

In [None]:
def k_means_recommender_euclid_dist(df, labels, given_track_id, num_tracks, closeness):
    """Returns a list of tuples that contain the song names and IDs of the recommended songs."""
    given_track_index = df[df['id']==given_track_id].index.tolist()[0]
    given_track_label = labels[given_track_index]
    
    given_cluster_label_indexes = []
    for i, label in enumerate(labels):
        if label == given_track_label:
            given_cluster_label_indexes.append(i)
    
    selected_tracks = []
    for i in given_cluster_label_indexes:
        selected_tracks.append(np.array(df.iloc[i]))
    
    cluster_df = pd.DataFrame(selected_tracks, columns=df.columns)
    sorted_tracks = fit_euclid_dist_recommender(cluster_df, given_track_id)
    return euclid_dist_recommender(cluster_df, sorted_tracks, num_tracks, closeness)

In [None]:
model = fit_k_means_clustering(df, 4, 0.001)

In [None]:
rec_songs_rand = k_means_recommender_rand_sample(df, model['labels'], test_track_id, 5)
print(rec_songs_rand)

In [None]:
rec_songs_euclid = k_means_recommender_euclid_dist(df, model['labels'], test_track_id, 5, 0)
print(rec_songs_euclid)

In [None]:
def plot_clusters_pca(labels):
    projection = clean_df(df)
    projection = scale_df(projection.drop('id', axis=1), scaler='standard')
    pca_arr = apply_PCA(projection, 2)
    pca_df_cols = []
    for i in range(len(pca_arr[0])):
        pca_df_cols.append('feature {}'.format(i+1))
    projection = pd.DataFrame(pca_arr, columns=pca_df_cols)

    x = projection['feature 1']
    y = projection['feature 2']
    plt.figure()
    plt.scatter(x, y, c=labels, cmap='rainbow')
    plt.xlabel('x')
    plt.ylabel('y')
    plt.show()

In [None]:
plot_clusters_pca(model['labels'])

In [None]:
def plot_clusters(dims, labels):
    x = []
    y = []
    
    for i in range(len(df)):
        track = df.iloc[i]
        x.append(track[dims[0]])
        y.append(track[dims[1]])
    
    plt.figure()
    plt.scatter(x, y, c=labels, cmap='rainbow')
    plt.xlabel(dims[0])
    plt.ylabel(dims[1])
    plt.show()

In [None]:
for i, feat_1 in enumerate(continuous_features_cols):
    for feat_2 in continuous_features_cols:
        if feat_2 == feat_1:
            continue
        if continuous_features_cols.index(feat_2) < i:
            continue
        
        plot_clusters([feat_1, feat_2], model['labels'])

In [None]:
from sklearn.cluster import KMeans

def fit_k_means_clustering_sk(k):
    model = KMeans(n_clusters=k)
    df_cont = preprocess_df(df).drop('id', axis=1)
    model.fit(df_cont)
    return model

In [None]:
model_sk = fit_k_means_clustering_sk(4)

In [None]:
rec_songs_rand_sk = k_means_recommender_rand_sample(df, model_sk.labels_, test_track_id, 5)
print(rec_songs_rand_sk)

In [None]:
rec_songs_euclid_sk = k_means_recommender_euclid_dist(df, model_sk.labels_, test_track_id, 5, 0)
print(rec_songs_euclid_sk)

In [None]:
plot_clusters_pca(model_sk.labels_)

In [None]:
for i, feat_1 in enumerate(continuous_features_cols):
    for feat_2 in continuous_features_cols:
        if feat_2 == feat_1:
            continue
        if continuous_features_cols.index(feat_2) < i:
            continue
        
        plot_clusters([feat_1, feat_2], model_sk.labels_)