# Spotify Genre Classifier

## Setup

In [None]:
# imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json

from os import walk, path

from matplotlib.ticker import MaxNLocator

from sklearn import metrics
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold, cross_val_score
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.neighbors import KNeighborsClassifier
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, precision_score, recall_score
from scipy import stats

from joblib import dump, load

from cf_matrix import make_confusion_matrix

In [None]:
# load data

# get all filenames from the directory data
f = []
for (dirpath, dirnames, filenames) in walk("data"):
    f.extend(filenames)
    break

# load data from all files from the directory data
frames = []
for file in f:
    data = pd.read_json(path.join("data", file))
    frames.append(data)

# concat all data into one dataframe
raw_data = pd.concat(frames, ignore_index=True)

## Visualization

### Clean Dataset

In [None]:
raw_data.head()

In [None]:
raw_data.columns

In [None]:
raw_data.columns

In [None]:
raw_data.shape

In [None]:
raw_data.info()

In [None]:
# if you want to compare statistic informations from specific features you can use: data.groupby('genre').describe()["feature1", "feature2", ...]
raw_data.groupby(raw_data["genre"]).describe()["energy"]

### Plots

In [None]:
raw_data_plots = raw_data.copy()
raw_data_plots.drop(["uri", "id", "track_href", "analysis_url", "type", "playlist_id"], axis=1, inplace=True)

In [None]:
raw_data_plots.columns

In [None]:
sorted_list = raw_data_plots["genre"].value_counts().sort_values()
labels = sorted_list.index.tolist()
values = sorted_list.tolist()

plt.bar(labels, values)
plt.title(f"No. of samples {raw_data.shape[0]}")
plt.ylabel("number of samples")
plt.show()

In [None]:
for feature in raw_data_plots.iloc[:,:13].columns:
    sns.boxplot(x="genre", y=feature, data=raw_data_plots)
    plt.show()

In [None]:
raw_data_plots.iloc[:,:13].corr()

In [None]:
fig = plt.figure()
heatmap = sns.heatmap(raw_data_plots.iloc[:,:13].corr(), vmin=-1, vmax=1, annot=True, cmap='BrBG')
heatmap.set_title('Correlation Heatmap', fontdict={'fontsize':18}, pad=12)
fig.set_size_inches(15.5, 10.5, forward=True)

In [None]:
sns.pairplot(data=raw_data_plots, hue="genre")

## Preprocessing

In [None]:
numeric_features = ["danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "duration_ms"]
categorical_features = ["key", "mode", "time_signature"]
features = numeric_features + categorical_features

Get the maximum and minimum data value within the boxplot whiskers

In [None]:
iqr_factor = 1.5

def getQuartiles(data: pd.DataFrame) -> set:
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    return Q1, Q3, IQR

def getMaxWhiskerValue(data: pd.Series) -> float:
    Q1, Q3, IQR = getQuartiles(data)
    whisker_value = Q3 + (IQR * iqr_factor)
    return whisker_value

def getMinWhiskerValue(data: pd.Series) -> float:
    Q1, Q3, IQR = getQuartiles(data)
    whisker_value = Q1 - (IQR * iqr_factor)
    return whisker_value

Find correlated features where the correlation coefficient is above a specific threshold

In [None]:
def find_correlated_features(data:pd.DataFrame, threshold:float) -> list:
    correlation_matrix = data.corr().abs()
    avg_correlation = correlation_matrix.mean(axis = 1)
    up = correlation_matrix.where(np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool))
    drop = list()
        
    for row in range(len(up)-1):
        for col in range (row + 1, len(up)):
            if(correlation_matrix.iloc[row, col] > threshold):
                if(avg_correlation.iloc[row] > avg_correlation.iloc[col]): 
                    drop.append(row)
                else: 
                    drop.append(col)
    
    drop = list(set(drop)) 
    dropcols_names = list(data.columns[[item for item in drop]])
    
    return dropcols_names

Remove samples with key == -1

In [None]:
def removeSamplesWithInvalidKey(data: pd.DataFrame) -> None:
    old_len = len(data)
    data = data[data.key != -1]
    print("Samples removed because of invalid key:", old_len - len(data))

Remove samples where time_signature is out of bounds [3:7]

In [None]:
def removeSamplesWithInvalidTimeSignature(data: pd.DataFrame) -> None:
    old_len = len(data)
    data = data[(data.time_signature >= 3) & (data.time_signature <= 7)]
    print("Samples removed because of invalid time_signature:", old_len - len(data))

**Perform all preprocessing steps on the training data**

In [None]:
def preprocessTrainingData(data: pd.DataFrame):
    preprocessing_numeric_features = numeric_features
    preprocessing_categorical_features = categorical_features
    preprocessing_one_hot_encoded_features = ["key", "time_signature"]

    removeSamplesWithInvalidKey(data)
    removeSamplesWithInvalidTimeSignature(data)

    # drop features with high correlation coefficient
    preprocessing_correlated_features = find_correlated_features(data[preprocessing_numeric_features], .8)
    print(f'Drop these correlated features: {preprocessing_correlated_features}')
    data.drop(preprocessing_correlated_features, axis=1, inplace=True)

    # remove correlated features from numeric features
    preprocessing_numeric_features = [e for e in preprocessing_numeric_features if e not in preprocessing_correlated_features]

    # create column transformer for scaling and one-hot-encoding
    preprocessing_column_transformer = ColumnTransformer([
        ("scaling", StandardScaler(), preprocessing_numeric_features),
        ("one-hot-encoding", OneHotEncoder(), preprocessing_one_hot_encoded_features)
    ], verbose=True, remainder='passthrough')

    # perform scaling and one-hot-encoding
    transformed_data = preprocessing_column_transformer.fit_transform(data)

    # list containing features which are not used in the column transformer
    feature_remainder = [e for e in data.columns if e not in preprocessing_numeric_features and e not in preprocessing_one_hot_encoded_features]  

    # update list of categorical features according to one-hot-encoding
    preprocessing_categorical_features = [e for e in preprocessing_categorical_features if e not in preprocessing_one_hot_encoded_features]
    one_hot_encoded_features = preprocessing_column_transformer.named_transformers_["one-hot-encoding"].get_feature_names_out(preprocessing_one_hot_encoded_features)
    preprocessing_categorical_features.extend(one_hot_encoded_features)

    # create feature name list containing the new one-hot-encoded features
    preprocessing_transformed_features = preprocessing_numeric_features.copy()
    preprocessing_transformed_features.extend(one_hot_encoded_features)
    preprocessing_transformed_features.extend(feature_remainder)

    # create new dataframe with transformed data
    data = pd.DataFrame(transformed_data, index=data.index, columns=preprocessing_transformed_features)

    # create dictionary which contains min and max whisker values for every feature and clip the data according to them
    preprocessing_features_info = {}
    preprocessing_features_info["max_whisker_value"] = {}
    preprocessing_features_info["min_whisker_value"] = {}

    for feature_name in data[preprocessing_numeric_features]:
        max_whisker_value = getMaxWhiskerValue(data[feature_name])
        min_whisker_value = getMinWhiskerValue(data[feature_name])
        preprocessing_features_info["max_whisker_value"][feature_name] = max_whisker_value
        preprocessing_features_info["min_whisker_value"][feature_name] = min_whisker_value

        # set outliers to min/max whisker
        data[feature_name] = data[feature_name].clip(min_whisker_value, max_whisker_value)

    # create list containing all features for the training data
    preprocessing_features = preprocessing_numeric_features + preprocessing_categorical_features

    print("final features", preprocessing_features)

    # create a dict containing information which is needed to preprocess future test data
    preprocessing_pipeline = {
        "categorical_features": preprocessing_categorical_features,
        "numeric_features": preprocessing_numeric_features,
        "features" : preprocessing_features,
        "correlated_features": preprocessing_correlated_features,
        "transformed_features": preprocessing_transformed_features,
        "features_info": preprocessing_features_info,
        "column_transformer": preprocessing_column_transformer
    }

    return data[preprocessing_features], data.playlist_id, preprocessing_pipeline

**Perform all preprocessing steps on the test data**

In [None]:
def preprocessTestData(data: pd.DataFrame, preprocessing_pipeline: dict) -> pd.DataFrame:

    removeSamplesWithInvalidKey(data)
    removeSamplesWithInvalidTimeSignature(data)

    data.drop(preprocessing_pipeline["correlated_features"], axis=1, inplace=True)

    transformed = preprocessing_pipeline["column_transformer"].transform(data)
    data = pd.DataFrame(transformed, index=data.index, columns=preprocessing_pipeline["transformed_features"])

    for feature_name in data[preprocessing_pipeline["numeric_features"]]:
        max_whisker_value = preprocessing_pipeline["features_info"]["max_whisker_value"][feature_name]
        min_whisker_value = preprocessing_pipeline["features_info"]["min_whisker_value"][feature_name]

        # set outliers to min/max whisker
        data[feature_name] = data[feature_name].clip(min_whisker_value, max_whisker_value)
       
    return data[preprocessing_pipeline["features"]]

## Train/test split

In [None]:
x_train, x_test, y_train, y_test = train_test_split(raw_data, raw_data.genre, test_size=0.25, stratify=raw_data.genre, random_state=1)

## Perform Preprocessing on Training & Test Set

In [None]:
print("Preprocess training set")
x_train_preprocessed, x_train_playlists, preprocessing_pipeline  = preprocessTrainingData(x_train)

print("Preprocess test set")
x_test_preprocessed = preprocessTestData(x_test, preprocessing_pipeline)

data = {
    "x_train": x_train_preprocessed,
    "x_playlists": x_train_playlists,
    "x_test": x_test_preprocessed,
    "y_train": y_train,
    "y_test": y_test,
    "features": preprocessing_pipeline["features"],
    "numeric_features": preprocessing_pipeline["numeric_features"],
    "categorical_features": preprocessing_pipeline["categorical_features"],
    "target": "genre"
}

## Model Selection

### Nested Cross Validation 

In [None]:
estimators = {}

estimators['knn'] = {
    'estimator': KNeighborsClassifier(),
    'paramGrid': {
            'n_neighbors' : [x for x in range(3, 40) if x % len(set(data["y_train"])) != 0],
            'weights' : ['uniform', 'distance'],
            'metric' : ['euclidean', 'manhattan']
            }
}

estimators['randomForest'] = {
    'estimator': RandomForestClassifier(),
    'paramGrid': {
            'max_depth': [30, 40, 50, 60],
            'max_features': [5, 10, 20],
            'min_samples_leaf': [1, 2, 3],
            'min_samples_split': [3, 5, 8],
            'n_estimators': [1000, 2000, 4000]
           } 
}

estimators['svc'] = {
    'estimator': SVC(probability=True),
    'paramGrid': [
            {"kernel": ["rbf"], "gamma": np.float_power(10, range(-4,4)), "C": np.float_power(3, range(0,6))},
            {"kernel": ["linear"], "C":  np.float_power(3, range(0,6))},
           ]
}

results = dict()
seed = 12345

for estimatorKey, estimatorValue in estimators.items():
    results[estimatorKey] = []
    
    inner_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
    outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)

    for train_index, test_index in outer_cv.split(X=data["x_train"], y=data["y_train"]):
        X_train, X_test = data["x_train"].iloc[train_index,:], data["x_train"].iloc[test_index,:]
        y_train, y_test = data["y_train"].iloc[train_index], data["y_train"].iloc[test_index]

        grid_search = GridSearchCV(
            estimator = estimatorValue['estimator'], 
            param_grid = estimatorValue['paramGrid'], 
            cv = inner_cv, 
            n_jobs = -1, 
            verbose = 1
        )

        grid_search.fit(X_train, y_train)
        y_pred = grid_search.predict(X_test)
        acc_score = accuracy_score(y_test, y_pred)

        res = {}
        res["acc_score"] = acc_score
        res["best_params"] = json.dumps(grid_search.best_params_)
        res["best_estimator"] = grid_search.best_estimator_

        results[estimatorKey].append(res)

    print(estimatorKey, results[estimatorKey])

### PCA

In [None]:
pca_variance_threshold = 0.9

pca = PCA(pca_variance_threshold)
pca.fit(data["x_train"][data["numeric_features"]])

ax = plt.gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.scatter(range(1, len(pca.explained_variance_ratio_)+1), np.cumsum(pca.explained_variance_ratio_))
plt.grid()
plt.title(f"PCA Variance - {np.sum(pca.explained_variance_ratio_)}")
plt.xlabel("no of PCA dimensions") 
plt.ylabel("% variance")
plt.show()

In [None]:
def pca_transformation(pca: PCA, input_data: pd.DataFrame):
    pca_components = pca.transform(input_data[data["numeric_features"]])
    cols = [f"PC{x}" for x in range(1, len(pca.explained_variance_ratio_)+1)]
    pca_df = pd.DataFrame(pca_components, columns=cols, index=input_data.index)
    pca_fransformed = pd.concat([pca_df, input_data[data["categorical_features"]]], axis=1)
    return pca_fransformed

In [None]:
svc_model = SVC(C=3, gamma=0.1, kernel="rbf")
train_pca = pca_transformation(pca, data["x_train"])
svc_model.fit(train_pca, data["y_train"])

test_pca = pca_transformation(pca, data["x_test"])
predicted = svc_model.predict(test_pca)
print("Accuracy:", metrics.accuracy_score(data["y_test"], predicted))

### Model Persistence

In [None]:
dir_name = "models/"
if not os.path.exists(dir_name):
    os.makedirs(dir_name)

for estimator_key, estimators_results in results.items():
    for i in range(0, len(estimators_results)):
        file_name = dir_name + estimator_key + "_" + str(i) + "_" + str(estimators_results[i]["acc_score"]) + ".joblib"
        dump(estimators_results[i]["best_estimator"], file_name)

### Perform CV and train models on entire training set

In [None]:
for estimator_key, estimators_results in results.items():
    for estimator_result in estimators_results:
        new_model = None

        if estimator_key == "knn":
            new_model = KNeighborsClassifier()
        elif estimator_key == "randomForest":
            new_model = RandomForestClassifier()
        elif estimator_key == "svc":
            new_model = SVC(probability=True, random_state=1)

        new_model.set_params(**estimator_result["best_estimator"].get_params())
        
        cv_scores = cross_val_score(new_model, data["x_train"], data["y_train"], cv=5)

        new_model.fit(data["x_train"], data["y_train"])

        estimator_result["cv_training_acc"] = cv_scores
        estimator_result["final_model"] = new_model

        print(estimator_key, estimator_result["best_params"], cv_scores)


### Feature Selection forward

In [None]:
feature_selection_results = []
for i in range(1, len(data["features"])):

    svc = SVC(C=3, gamma=0.1, kernel="rbf")
    sfs_forward = SequentialFeatureSelector(svc, n_features_to_select=i, direction="forward").fit(data["x_train"], data["y_train"])

    feature_names = list(sfs_forward.get_feature_names_out())

    cv_score = cross_val_score(svc, data["x_train"][feature_names], data["y_train"], cv=5)

    feature_selection_results.append(
        {
            "feature_cnt": i,
            "cv_score": cv_score
        }
    )

    print(f'No. of features: {i}, Features: {feature_names}, Score: {cv_score.mean()}')

In [None]:
#feature_selection_results

cnts = [x["feature_cnt"] for x in feature_selection_results]
scores = [x["cv_score"].mean() for x in feature_selection_results]

ax = sns.lineplot(x=cnts, y=scores)
ax.set_title("Forward Feature Selection")
ax.set_xlabel("Number of features")
ax.set_ylabel("CV Accuracy")
plt.savefig("feature_selection.png")

for i in range(0,25):
    print(cnts[i])
    print(scores[i])

## Evaluation of results

### Boxplots over CV accuracies

In [None]:
i = 0

for estimator_key, estimators_results in results.items():
    result_cv_scores = {}
    params = {}    

    for estimator_result in estimators_results:
        if estimator_result["best_params"] not in params.values():
            params[i] = estimator_result["best_params"]
            result_cv_scores[i] = estimator_result["cv_training_acc"] 

        i += 1
        
    result_cv_scores = pd.DataFrame.from_dict(result_cv_scores)

    ax = sns.boxplot(data=result_cv_scores, palette="magma")   
    ax.set_title(f"{estimator_key} CV accuracies")

    for param_num, param_val in params.items():
        print(f"{param_num}: {param_val}")

    plt.show()

### Confusion matrices and dataframe containing results

In [None]:
result_eval = [] 

for estimator_key, estimators_results in results.items():
    for estimator_result in estimators_results:
        y_pred = estimator_result["final_model"].predict(data["x_test"])
        y_pred_proba = estimator_result["final_model"].predict_proba(data["x_test"])

        result_eval.append(
            {
                "model": estimator_key,
                "params": estimator_result["best_params"],
                "nested_cv_training_acc": estimator_result["acc_score"],
                "cv_training_acc": estimator_result["cv_training_acc"].mean(),
                "test_acc": accuracy_score(data["y_test"], y_pred),
                "test_roc_auc": roc_auc_score(data["y_test"], y_pred_proba, multi_class="ovr"),
            }
        )

        cf_matrix_title = f"{estimator_key} {estimator_result['best_params']}"
        cf_matrix = confusion_matrix(data["y_test"], y_pred)
        make_confusion_matrix(cf_matrix, figsize=(8,6), cbar=False, title=cf_matrix_title, categories=estimator_result["final_model"].classes_)
         
result_eval = pd.DataFrame(result_eval)
result_eval

In [None]:
final_model = results["randomForest"][0]["best_estimator"]
y_pred = final_model.predict(data["x_test"])

print(y_pred)