Data loading and cleaning

In [1]:
import pandas as pd
import numpy as np
import math
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

df = pd.read_csv('./spotify_songs.csv') 
df = df.dropna()

In [2]:
print(df.columns)

Index(['track_id', 'track_name', 'track_artist', 'track_popularity',
       'track_album_id', 'track_album_name', 'track_album_release_date',
       'playlist_name', 'playlist_id', 'playlist_genre', 'playlist_subgenre',
       'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness',
       'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
       'duration_ms'],
      dtype='object')


In [3]:
print(df.describe())

       track_popularity  danceability        energy           key  \
count      32828.000000  32828.000000  32828.000000  32828.000000   
mean          42.483551      0.654850      0.698603      5.373949   
std           24.980476      0.145092      0.180916      3.611572   
min            0.000000      0.000000      0.000175      0.000000   
25%           24.000000      0.563000      0.581000      2.000000   
50%           45.000000      0.672000      0.721000      6.000000   
75%           62.000000      0.761000      0.840000      9.000000   
max          100.000000      0.983000      1.000000     11.000000   

           loudness          mode   speechiness  acousticness  \
count  32828.000000  32828.000000  32828.000000  32828.000000   
mean      -6.719529      0.565737      0.107053      0.175352   
std        2.988641      0.495667      0.101307      0.219644   
min      -46.448000      0.000000      0.000000      0.000000   
25%       -8.171250      0.000000      0.041000      

First we build a random forest classifier to demonstrate genre classification. The we propose a new entropy based clustering scheme: Root-Clustering which modifies the random forest classifier to cluster data in an unsupervised manner. This results in a fork of hierachical clustering whereby it is top down and the splitting decision is based on entropy instead of desicion based. This provides a few benefits:

- It is more interpretable (each cluster can be traced)
- It works on mixed data types (categorical and numerical) without special transformations

The pitfall of this approach is that it does not compute the covariance matrix therefore assumes independence of features and works for such datasets.

In the end we compare our proposed algortihm to traditional algorithms (KMeans, AgglomerativeClustering [Hierachical Clustering], DBSCAN) using the sillhouette score and interpretability metrics such as number of clusters, feature usage, and average rule length. We also provide some visualisations.

Finally we fit a hold-out set to Root-Clustering to generate novel playlists and do song recommendation by deducing which songs from the online spotify archive fits it to each playlist.

In [4]:
X = df[['danceability', 'energy', 'loudness', 'speechiness', 'acousticness', 'instrumentalness', 'valence', 'tempo']].values
y = df['playlist_genre'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [5]:

class RandomForest: 
    def __init__(self, trees, n_trees, max_feature, prediction_aggregation_calculation):
        self.n_estimators = n_trees # Number of trees
        self.max_features = max_feature # Number of features to consider when looking for the best split
        self.tree_feature_indexes = [] # To store the feature indexes used for each tree
        self.prediction_aggregation_calculation = prediction_aggregation_calculation # Function to aggregate predictions from all trees
        self.trees = trees

    # Bootstrapping the data without mixing types
    def _make_random_subset(self, X, y, n_subsets, replacement=True):
        subsets = []
        sample_size = X.shape[0]
        
        for i in range(n_subsets):
            # Sample indices and then select rows from X and y separately
            indices = np.random.choice(sample_size, size=sample_size, replace=replacement)
            subsets.append({"X": X[indices], "y": y[indices]})
        
        return subsets

    def train(self, X, y): 
        n_features = X.shape[1]
        if self.max_features is None:
            self.max_features = int(math.sqrt(n_features)) #

        subsets = self._make_random_subset(X, y, self.n_estimators) # Create n_estimators subsets of data

        for i, subset in enumerate(subsets):
            X_subset, y_subset = subset["X"], subset["y"]
            
            # Bagging: randomly select a subset of features for each tree
            idx = np.random.choice(range(n_features), size=self.max_features, replace=True)
            self.tree_feature_indexes.append(idx)
            X_subset = X_subset[:, idx]
            # Fit the tree on the subset; y_subset is already 1D
            self.trees[i].fit(X_subset, y_subset)

    def predict(self, test_X):
        # Prepare an empty array for predictions (dtype=object for string labels)
        y_preds = np.empty((test_X.shape[0], self.n_estimators), dtype=object)
        # Get predictions from each tree
        for i, tree in enumerate(self.trees):
            features_index = self.tree_feature_indexes[i]
            X_selected_features = test_X[:, features_index]
            y_preds[:, i] = tree.predict(X_selected_features)
        # Aggregate predictions using the provided aggregation method
        y_pred = self.prediction_aggregation_calculation(y_preds)
        return y_pred

In [6]:
from collections import Counter

class RandomForestClassifier(RandomForest): 
    def __init__(self, max_feature, max_depth, n_trees=100, min_samples_split=2):
        self.prediction_aggregation_calculation = self._maximum_vote_calculation
        self.trees = []
        for _ in range(n_trees): # Create n_trees Decision Trees
            self.trees.append(DecisionTreeClassifier(min_samples_split=min_samples_split, max_depth=max_depth))
        
        super().__init__(
            trees=self.trees,
            n_trees=n_trees,
            max_feature=max_feature,
            prediction_aggregation_calculation=self.prediction_aggregation_calculation
        ) 
    
    def _maximum_vote_calculation(self, y_preds): # Aggregate predictions using majority voting
        y_pred = np.empty(y_preds.shape[0], dtype=object)
        for i, sample_predictions in enumerate(y_preds):
            counter = Counter(sample_predictions)
            y_pred[i] = counter.most_common(1)[0][0]
        return y_pred

In [7]:
print(X_train.shape, y_train.shape)

(26262, 8) (26262,)


In [8]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from itertools import product
from sklearn.ensemble import RandomForestClassifier as SklearnRandomForest


n_trees_options = [150,300,500,1000]
max_feature_options = [2,3,4]
min_samples_split_options = [2,4,6,8]
max_depth_options = [8,10,12,14,16]

best_f1 = 0
best_params = None

# Iterate through all combinations of hyperparameters and train the Random Forest Classifier
for n_trees, max_feature, min_samples_split, max_depth in product(
    n_trees_options, max_feature_options, min_samples_split_options, max_depth_options
):
    random_forest_classifier = RandomForestClassifier(
        n_trees=n_trees,
        max_feature=max_feature,
        min_samples_split=min_samples_split,
        max_depth=max_depth
    )
    random_forest_classifier.train(X_train, y_train)
    y_pred = random_forest_classifier.predict(X_test)
    f1 = f1_score(y_test, y_pred, average='weighted')
    
    if f1 > best_f1:
        best_f1 = f1
        best_params = (n_trees, max_feature, min_samples_split, max_depth)

print(f"Best F1 Score: {best_f1}")
print(f"Best Parameters: Trees: {best_params[0]}, Max Features: {best_params[1]}, Min Samples Split: {best_params[2]}, Max Depth: {best_params[3]}")
y_pred = random_forest_classifier.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

model_random_forest = SklearnRandomForest(n_estimators=best_params[0], max_depth=best_params[-1], min_samples_split=best_params[-2], max_features=best_params[1], random_state=42)
model_random_forest.fit(X_train, y_train)
y_pred = model_random_forest.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
print(f"Library Random Forest Accuracy: {accuracy}")
print(f"Library Random Forest Precision: {precision}")
print(f"Library Random Forest Recall: {recall}")
print(f"Library Random Forest F1 Score: {f1}")

Best F1 Score: 0.5283615843596279
Best Parameters: Trees: 500, Max Features: 4, Min Samples Split: 8, Max Depth: 16
Accuracy: 0.5290892476393543
Precision: 0.5182378085430867
Recall: 0.5290892476393543
F1 Score: 0.5216076966395825
Library Random Forest Accuracy: 0.5523911056960098
Library Random Forest Precision: 0.5456045098311852
Library Random Forest Recall: 0.5523911056960098
Library Random Forest F1 Score: 0.5475747657663256
