In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import re
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, normalize
from sklearn.cluster import OPTICS, KMeans, DBSCAN, AffinityPropagation, AgglomerativeClustering
from sklearn.mixture import GaussianMixture
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score

In [None]:
DATA_DIR = Path("data")

In [None]:
#Read dataframes containing records of songs, and also of artist so that we can extract genre for a specific song
df = pd.read_csv(DATA_DIR / "data.csv")
df_genres = pd.read_csv(DATA_DIR / "data_w_genres.csv")

In [None]:
#function to filter out corrupt names and also remasters
def is_corrupt(value: str) -> int:
    found = bool(re.search("[0-9A-Za-z]", value))
    found = found and not "remaster" in value.lower() 
    return int(not found)

#function for getting a genre for a specific song (heuristic)
#uses only the last word in a genre name
#gives priority to genres already present in the mapping
genre_mappings = {}
def get_genre(value: str) -> int:
    for artist in value[1:-1].replace("'", "").split(","):
        artist_genres = df_genres.loc[df_genres.artists == artist].genres.values
#         print(artist_genres)
        if artist_genres == "[]" or not artist_genres:
            continue
        artist_genres = artist_genres[0]
        possible_candidates = artist_genres[1:-1].replace("'", "").split(',')
        last_suggestion = ""
        inserting_new_genre = True
        for candidate in possible_candidates:
            last_suggestion = candidate.split(" ")[-1]
            if last_suggestion not in genre_mappings:
                continue
            else:
                inserting_new_genre = False
                break
        if inserting_new_genre:
            genre_mappings[last_suggestion] = len(genre_mappings)
            
        return genre_mappings[last_suggestion]
    return -1

# Filter out elements with corrupt artists/names fields
# along with elements with unspecified artists
df["artists_corrupt"] = df["artists"].apply(lambda x: is_corrupt(x))
df["name_corrupt"] = df["name"].apply(lambda x: is_corrupt(x))
df = df[df["artists_corrupt"] == 0]
df = df[df["name_corrupt"] == 0]
df = df[df["artists"] != "['Unspecified']"]
#Filter out elements with popularity of 0
df = df[df["popularity"] > 0]

In [None]:
# Filter out elements that are "vocal only", such as speeches etc.
df = df[df["speechiness"] < 0.66]
# Filter by year (used to reduce the number of samples, because clustering is very slow otherwise)
df = df[df["year"] >= 2000]
# There are some records of songs entered twice with different values for the features, removing them
df = df.drop_duplicates(["artists", "name"])

In [None]:
#random sample 10000 records (again, this is for performance reasons)
#could be less in my opinion
df = df.sample(10000)
#insert genres into dataframe and remove songs with no genre
df["genre"] = df["artists"].apply(lambda x: get_genre(x))
df = df[df["genre"] != -1]

In [None]:
#create a dataframe of the features needed for training
df_feats = df.drop(["artists", "name", "artists_corrupt", "name_corrupt", "id",
                    "year", "release_date", "key",
                    "instrumentalness", "popularity", "energy"],axis=1)

In [None]:
#normalize features, each by its own column, and again the whole bunch
#(this yielded the best results and also ensured all values have the same distribution)
df_scaled_feats = normalize(df_feats, axis=0)
df_scaled_feats = normalize(df_scaled_feats)

In [None]:
#Reduce dimensionality to 2 features (I also experimented with 3, but 2 seemed good enough)
pca = PCA(n_components=2)
df_pca_feats = pca.fit_transform(df_scaled_feats)

In [None]:
#Plot the features in 2D
fig = plt.figure(figsize=(16,9))
ax = fig.add_subplot()
ax.scatter(*[df_pca_feats[:,i] for i in range(2)], marker='o')

In [None]:
#Run KMeans Clustering
#I experimented a lot with other clustering methods (OPTICS, DBScan, Agglomerative, etc.)
#In the end KMeans turned out to give the best results, all factors considered
cluster = KMeans(n_clusters=120)
cluster.fit(df_pca_feats)
df["clusters"] = cluster.labels_

In [None]:
#get the silhouette score for our clustering
score = silhouette_score(df_pca_feats, cluster.labels_, metric='euclidean')
score

In [None]:
#initialized features to check when analysing clusters
CHECK_FEATS = ["acousticness", "danceability", "liveness",  "loudness", "genre"]

In [None]:
#get a dataframe of the scaled_feats (the normalization turns it into numpy)
df_scaled_feats = pd.DataFrame(df_scaled_feats, columns=df_feats.columns.tolist())
df_scaled_feats.index = df_feats.index.tolist()

In [None]:
#get min and max values across all features we are interested in
max_values = {feat: df_scaled_feats[feat].max() for feat in CHECK_FEATS}
min_values = {feat: df_scaled_feats[feat].min() for feat in CHECK_FEATS}

In [None]:
#analyse clusters
#random choice of 50 clusters is analyzed for features that have lowest standard deviation (3 of them)
#after selecting 10 of those, the process terminates
#it also provides 30 songs from the cluster with name, artist name, min, max values for the features along with their means.
cluster_info = {}
for cluster_idx, cluster in enumerate(np.random.choice(df["clusters"].unique(), min(len(df["clusters"].unique()), 50))):
    if len(cluster_info) > 10:
        break
    cluster_df = df[df["clusters"] == int(cluster)]
    idxs = df[df["clusters"] == int(cluster)].sample(min(len(cluster_df), 30)).index
    df_feats_chunk = df_scaled_feats.loc[idxs]
    chunk_stds = []
    chunk_means = []

    for feat in CHECK_FEATS:
        chunk_means.append(df_feats_chunk[feat].mean())
        chunk_stds.append(df_feats_chunk[feat].std())

    chunk_stds = np.asarray(chunk_stds)
    chunk_means = np.asarray(chunk_means)

    argsorted = np.argsort(chunk_stds)
    chunk_stds = chunk_stds[argsorted]
    chunk_means = chunk_means[argsorted]

    chunk_stds = list(chunk_stds)
    chunk_means = list(chunk_means)
    
    feat_names = [CHECK_FEATS[i] for i in argsorted[:3]]
    song_names = cluster_df.loc[idxs].name.values.tolist()
    artist_names = cluster_df.loc[idxs].artists.apply(lambda x: x[1:-1].replace("'", "").replace(", ", ",").split(",")).values.tolist()
    cluster_info[cluster_idx] = {"feats": feat_names, "means": chunk_means[:3],
                             "mins":[min_values[feat] for feat in feat_names],
                             "maxes": [max_values[feat] for feat in feat_names],
                              "song_names": song_names, "artists": artist_names}

In [None]:
cluster_info