In [None]:
# Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline
from imblearn.pipeline import Pipeline as Pip
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, precision_score, recall_score, confusion_matrix
from spotify_cleaner import clean_data
from spotify_feature_engineering import create_binary_classification, GENRE_PRIORITY, assign_meta_genres, remove_meta_duplicates, engineer_features

In [None]:
df = pd.read_csv('data/dataset.csv', index_col=0)
df = clean_data(df)

# reclassifying sleep genre as 'negative control group'
df.loc[df['track_genre'] == 'sleep', 'danceability'] = 0

# assign meta-genre & remove duplicates
df = assign_meta_genres(df, GENRE_PRIORITY)
df = remove_meta_duplicates(df, GENRE_PRIORITY)

# general DF information
print(df.info())
display(df.describe())
df.head()

In [None]:
# further feature engineering
df = engineer_features(df)

In [None]:
# prepare modelling data (feature & target definition, train/test-split)
features_cols = ['energy', 'loudness', 'speechiness', 'acousticness', 'instrumentalness', 'valence', 'tempo',
                     
    # New Engineered Features:
    'beat_density',
    'energy_tempo_interaction',
    'party_factor',
    'mood_score']

target_col = ['danceability']

features = df[features_cols]
target = df[target_col]


print(features.isna().sum())  # Check for NaNs
print(features[features == np.inf].count())  # Check for Inf values

In [None]:
features_train, features_test, target_train, target_test = train_test_split(features, target, test_size=0.1, random_state=42)

In [None]:
# define and fit model
poly_lr_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('poly_features', PolynomialFeatures(degree=2, interaction_only=True, include_bias=False)),
    ('linear_regression', LinearRegression())
])

poly_lr_pipeline.fit(features_train, target_train)

# predict & evaluate
target_pred_poly = poly_lr_pipeline.predict(features_test)

rmse_poly = np.sqrt(mean_squared_error(target_test, target_pred_poly))
r2_poly = r2_score(target_test, target_pred_poly)

print('RMSE: {:.4f}'.format(rmse_poly))
print('R² Score: {:.4f}'.format(r2_poly))

In [None]:
# Visualise actual vs predicted danceability
plt.figure(figsize=(6, 6))
sns.scatterplot(x=target_test.values.flatten(), y=target_pred_poly.flatten(), alpha=0.25)
plt.plot([0, 1], [0, 1], '--', color='red')
plt.xlabel('Actual Danceability')
plt.ylabel('Predicted Danceability')
plt.title('Actual vs. Predicted Danceability')

In [None]:
# prediction and evaluation (originally 'best estimator')
best_rf_model = RandomForestRegressor(random_state=42, max_depth=25, n_estimators=400, min_samples_split=5, min_samples_leaf=2)
best_rf_model.fit(features_train, target_train.values.ravel())

# predict & evaluate
target_pred_rf = best_rf_model.predict(features_test)
target_pred_train = best_rf_model.predict(features_train) # overfit check

rmse_rf = np.sqrt(mean_squared_error(target_test, target_pred_rf))
r2_rf = r2_score(target_test, target_pred_rf)

rmse_train = np.sqrt(mean_squared_error(target_train, target_pred_train))
r2_train = r2_score(target_train, target_pred_train)

# Training vs. Test performance (For Overfitting)
print('Training RMSE: {:.4f}'.format(rmse_train))
print('Training R² Score: {:.4f}'.format(r2_train))
print('Random Forest RMSE: {:.4f}'.format(rmse_rf))
print('Random Forest R² Score: {:.4f}'.format(r2_rf))

# Overfitting Check: Big Gap?
if r2_train - r2_rf > 0.1:
    print('The model may be overfitting! Consider tuning hyperparameters or using a different model.')
else:
    print('No major overfitting detected.')

In [None]:
# Visualise actual vs predicted danceability
plt.figure(figsize=(6, 6))
sns.scatterplot(x=target_test.values.flatten(), y=target_pred_rf.flatten(), alpha=0.25)
plt.plot([0, 1], [0, 1], '--', color='red')
plt.xlabel('Actual Danceability')
plt.ylabel('Predicted Danceability')
plt.title('Random Forest: Actual vs. Predicted Danceability')

In [None]:
plt.savefig('visuals/random_forest_regression.svg', format='svg', bbox_inches='tight')

In [None]:
# feature importance
feature_importance = pd.DataFrame({'Feature': features_cols, 'Importance': best_rf_model.feature_importances_})
print(feature_importance.sort_values(by='Importance', ascending=False))

In [None]:
# Classification Model
df = create_binary_classification(df, new_col_name='danceable', base_col='danceability', percentile_cutoff=0.693) #picked from top 25%

clf_num_cols = [
    'energy', 'loudness', 'speechiness', 'acousticness', 'instrumentalness',
    'valence', 'tempo', 'beat_density', 'energy_tempo_interaction', 'party_factor'
]
clf_cat_cols = ['tempo_category', 'meta_genre']

clf_features = df[clf_num_cols+clf_cat_cols]
clf_target = df['danceable']

clf_features_train, clf_features_test, clf_target_train, clf_target_test = train_test_split(clf_features, clf_target, stratify=clf_target,
                                                                                            test_size=0.1, random_state=42)

In [None]:
# column transformer
preprocessor = ColumnTransformer([
    ('num', StandardScaler(), clf_num_cols),
    ('cat', OneHotEncoder(drop='first', handle_unknown='ignore'), clf_cat_cols)
])

#Random Forest Classification
rf_clf_pipeline = Pip([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=350, max_depth=20, 
                                          min_samples_split=5, min_samples_leaf=2, 
                                          class_weight='balanced', random_state=42))
])

rf_clf_pipeline.fit(clf_features_train, clf_target_train)

rf_clf_target_pred = rf_clf_pipeline.predict(clf_features_test)

accuracy = accuracy_score(clf_target_test, rf_clf_target_pred)
precision = precision_score(clf_target_test, rf_clf_target_pred)
recall = recall_score(clf_target_test, rf_clf_target_pred)
conf_matrix = confusion_matrix(clf_target_test, rf_clf_target_pred)

print('Accuracy:  {:.4f}'.format(accuracy))
print('Precision: {:.4f}'.format(precision))
print('Recall: {:.4f}'.format(recall))
print('Confusion Matrix:\n', conf_matrix)

In [None]:
feature_importances = rf_clf_pipeline.named_steps['classifier'].feature_importances_

feature_names = (
    clf_num_cols + 
    rf_clf_pipeline.named_steps['preprocessor'].named_transformers_['cat'].get_feature_names_out(clf_cat_cols).tolist()
)

# importance df
importance_df = pd.DataFrame({'Feature': feature_names, 'Importance': feature_importances})
importance_df = importance_df.sort_values(by='Importance', ascending=False)

plt.figure(figsize=(10, 6))
plt.barh(importance_df['Feature'], importance_df['Importance'])
plt.xlabel('Feature Importance Score')
plt.ylabel('Features')
plt.title('Random Forest Feature Importance for Danceability Prediction')
plt.gca().invert_yaxis()
plt.show()
