
# 1. Setup



In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt

pd.set_option('display.max_columns', None)
pd.options.mode.copy_on_write = True

# Using the URL for the file
spotify_original = pd.read_csv("dataset.csv")

spotify_original_reshape = spotify_original.iloc[:,1:]

# 2. Data Cleaning

In [None]:
# Cleaning rows with missing information
missing_data_rows = spotify_original_reshape[spotify_original_reshape.isnull().any(axis=1)]

missing_data_rows

spotify_original_reshape_drop = spotify_original_reshape.dropna()

print(spotify_original_reshape.shape)
print(spotify_original_reshape_drop.shape)

In [None]:
#clean track_name and artists columns by stripping spaces and converting to lowercase
spotify_original_reshape_drop['track_name_clean'] = spotify_original_reshape_drop['track_name'].str.strip().str.lower()
spotify_original_reshape_drop['artists_clean'] = spotify_original_reshape_drop['artists'].str.strip().str.lower()

#priority list for genres to handle duplicates
genre_priority = ['pop', 'rock', 'hip hop', 'rap', 'reggaeton', 'latin', 'electronic', 'r&b', 'reggae', 'dance', 'classical']
spotify_original_reshape_drop['genre_priority'] = spotify_original_reshape_drop['track_genre'].apply(lambda x: genre_priority.index(x) if x in genre_priority else len(genre_priority))

#sort the dataset by track_name, artists, genre priority, popularity, and duration
spotify_data_sorted = spotify_original_reshape_drop.sort_values(by=['track_name_clean', 'artists_clean', 'genre_priority', 'popularity', 'duration_ms'],
                                                                ascending=[True, True, True, True, False])

#remove duplicates
spotify_cleaned = spotify_data_sorted.drop_duplicates(subset=['track_name_clean', 'artists_clean'], keep='first')

# checking size
print(f"Shape of the dataset before cleaning: {spotify_original_reshape_drop.shape}")
print(f"Shape of the dataset after cleaning: {spotify_cleaned.shape}")

# removing extra columns added
spotify_cleaned_final = spotify_cleaned.drop(columns=['track_name_clean', 'artists_clean', 'genre_priority'])

# Fcheck size again
print(f"Shape of the dataset after removing extra columns: {spotify_cleaned_final.shape}")


spotify_cleaned_final

In [None]:
"""
Merge Genres
"""


genre_mapping = {
    # Pop
    'pop': 'Pop', 'power-pop': 'Pop', 'synth-pop': 'Pop', 'indie-pop': 'Pop',
    'k-pop': 'Pop', 'j-pop': 'Pop', 'cantopop': 'Pop', 'mandopop': 'Pop',
    'british': 'Pop', 'spanish': 'Pop', 'latino': 'Pop', 'pop-film': 'Pop',
    'pagode': 'Pop', 'j-idol': 'Pop', 'sad': 'Pop',
    
    # Rock
    'rock': 'Rock', 'rock-n-roll': 'Rock', 'alt-rock': 'Rock', 'indie': 'Rock',
    'hard-rock': 'Rock', 'punk-rock': 'Rock', 'garage': 'Rock', 'psych-rock': 'Rock',
    'grunge': 'Rock', 'guitar': 'Rock', 'ska': 'Rock', 'emo': 'Rock', 'punk': 'Rock',
    'death-metal': 'Rock', 'hardcore': 'Rock', 'metal': 'Rock', 'heavy-metal': 'Rock',
    'black-metal': 'Rock', 'metalcore': 'Rock', 'j-rock': 'Electronic/Dance', 'rockabilly': 'Rock',
    'alternative': 'Rock',
    
    # Electronic/Dance
    'electronic': 'Electronic/Dance', 'edm': 'Electronic/Dance', 'house': 'Electronic/Dance',
    'deep-house': 'Electronic/Dance', 'progressive-house': 'Electronic/Dance',
    'techno': 'Electronic/Dance', 'minimal-techno': 'Electronic/Dance',
    'detroit-techno': 'Electronic/Dance', 'trance': 'Electronic/Dance',
    'dubstep': 'Electronic/Dance', 'drum-and-bass': 'Electronic/Dance',
    'breakbeat': 'Electronic/Dance', 'club': 'Electronic/Dance',
    'dancehall': 'Electronic/Dance', 'j-dance': 'Electronic/Dance', 'disco': 'Electronic/Dance',
    'hardstyle': 'Electronic/Dance', 'chill': 'Electronic/Dance',
    'electro': 'Electronic/Dance', 'dance': 'Electronic/Dance',
    
    # Hip-Hop/R&B
    'hip-hop': 'Hip-Hop/R&B', 'r-n-b': 'Hip-Hop/R&B', 'funk': 'Hip-Hop/R&B',
    'afrobeat': 'Hip-Hop/R&B', 'reggaeton': 'Hip-Hop/R&B', 'dub': 'Hip-Hop/R&B',
    'groove': 'Hip-Hop/R&B', 'reggae': 'Hip-Hop/R&B',
    
    # Jazz/Blues
    'jazz': 'Jazz/Blues', 'blues': 'Jazz/Blues', 'bluegrass': 'Jazz/Blues',
    'gospel': 'Jazz/Blues', 'soul': 'Jazz/Blues',
    
    # Classical/Instrumental
    'classical': 'Classical/Instrumental', 'piano': 'Classical/Instrumental',
    'opera': 'Classical/Instrumental', 'ambient': 'Classical/Instrumental',
    'idm': 'Classical/Instrumental', 'trip-hop': 'Classical/Instrumental',
    'new-age': 'Classical/Instrumental', 'singer-songwriter': 'Classical/Instrumental',
    'study': 'Classical/Instrumental',
    
    # Folk/World
    'folk': 'Folk/World', 'acoustic': 'Folk/World', 'country': 'Folk/World',
    'honky-tonk': 'Folk/World', 'turkish': 'Folk/World', 'brazil': 'Folk/World',
    'samba': 'Folk/World', 'forro': 'Folk/World', 'indian': 'Folk/World',
    'iranian': 'Folk/World', 'malay': 'Folk/World', 'afrobeat': 'Folk/World',
    'world-music': 'Folk/World',
    
    # Latin
    'latin': 'Latin', 'salsa': 'Latin', 'tango': 'Latin', 'pagode': 'Latin',
    'mpb': 'Latin', 'sertanejo': 'Latin', 'brazil': 'Latin',
    
    # Children's/Family
    'kids': 'Children\'s/Family', 'children': 'Children\'s/Family', 'disney': 'Children\'s/Family',
    'show-tunes': 'Children\'s/Family', 'romance': 'Children\'s/Family', 'happy': 'Children\'s/Family',
    
    # Experimental/Alternative
    'industrial': 'Experimental/Alternative', 'grindcore': 'Experimental/Alternative',
    'goth': 'Experimental/Alternative', 'detroit-techno': 'Experimental/Alternative',
    'idm': 'Experimental/Alternative',
    
    # Other/Functional
    'anime': 'Other/Functional', 'study': 'Other/Functional', 'party': 'Other/Functional',
    'sleep': 'Other/Functional', 'comedy': 'Other/Functional', 'french': 'Other/Functional',
    'german': 'Other/Functional', 'swedish': 'Other/Functional', 'chicago-house': 'Other/Functional'
}

# Apply mapping
spotify_cleaned_final['merged_genre'] = spotify_cleaned_final['track_genre'].map(genre_mapping)

# drop track_genre column
spotify_cleaned_final = spotify_cleaned_final.drop(columns=['track_genre'])

# Add one-hot encoding for merged_genre
spotify_cleaned_final = pd.get_dummies(spotify_cleaned_final, columns=['merged_genre'])
print(spotify_cleaned_final.columns)

# 3. Decision Tree


In [None]:
"""
Train a model to predict the popularity of a song based on its audio features.
"""


import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
import sklearn.metrics as metrics

spotify_processed = spotify_cleaned_final

# Process popularity > 30 as popular, otherwise unpopular
spotify_processed['popularity'] = spotify_processed['popularity'].apply(lambda x: 1 if x > 35 else 0)
print(spotify_processed['popularity'].value_counts())

Y = spotify_processed['popularity']
# X = spotify_processed[['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 
#                        'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 
#                        'time_signature', 'merged_genre_Classical/Instrumental', 
#                        'merged_genre_Electronic/Dance', 'merged_genre_Folk/World', 'merged_genre_Hip-Hop/R&B', 
#                        'merged_genre_Jazz/Blues', 'merged_genre_Latin', 'merged_genre_Other/Functional', 
#                        'merged_genre_Pop', 'merged_genre_Rock']]
X = spotify_processed[['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 
                       'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 
                       'time_signature']]

numerical_features = ['danceability', 'energy', 'key', 'loudness', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']
category_features = ['mode', 'merged_genre_Classical/Instrumental', 'merged_genre_Electronic/Dance', 'merged_genre_Folk/World', 'merged_genre_Hip-Hop/R&B', 'merged_genre_Jazz/Blues', 'merged_genre_Latin', 'merged_genre_Other/Functional', 'merged_genre_Pop', 'merged_genre_Rock']

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

# Don't Standardize because it does not affect the performance of Decision Trees
# scaler = StandardScaler()
# X_train[numerical_features] = scaler.fit_transform(X_train[numerical_features])
# X_test[numerical_features] = scaler.transform(X_test[numerical_features])

model = DecisionTreeClassifier(random_state=42)
model.fit(X_train, Y_train)


In [None]:
"""
Evaluate the model, compare train vs validation performance
"""

# Evaluate the model on the Train set
print("Train set:") 
train_predictions = model.predict(X_train)
train_accuracy = metrics.accuracy_score(Y_train, train_predictions)
print(f"Accuracy: {train_accuracy}")
confusion_matrix = metrics.confusion_matrix(Y_train, train_predictions)
print(f"Confusion matrix:\n{confusion_matrix}")
precision = metrics.precision_score(Y_train, train_predictions)
print(f"Precision: {precision}")
recall = metrics.recall_score(Y_train, train_predictions)
print(f"Recall: {recall}")
f1 = metrics.f1_score(Y_train, train_predictions)
print(f"F1 Score: {f1}")

# Evaluate the model on the Test set
print("\nTest set:")
test_predictions = model.predict(X_test)
test_accuracy = metrics.accuracy_score(Y_test, test_predictions)
print(f"Accuracy: {test_accuracy}")
confusion_matrix = metrics.confusion_matrix(Y_test, test_predictions)
print(f"Confusion matrix:\n{confusion_matrix}")
precision = metrics.precision_score(Y_test, test_predictions)
print(f"Precision: {precision}")
recall = metrics.recall_score(Y_test, test_predictions)
print(f"Recall: {recall}")
f1 = metrics.f1_score(Y_test, test_predictions)
print(f"F1 Score: {f1}")



# 4. Random Forest
- Consider that the training set and validation set performance is too different, we need to use random forest to mitigate this

In [None]:
"""
Create Random Forest Model
"""

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from sklearn.model_selection import StratifiedKFold, GridSearchCV



rf_model = RandomForestClassifier(random_state=8743)
rf_model.fit(X_train, Y_train)

tree_depths = [estimator.tree_.max_depth for estimator in rf_model.estimators_]
print(f"Average Tree Depth: {np.mean(tree_depths)}")

In [None]:
"""
Evaluate the Random Forest Model on the Train set
"""

# Evaluate the model on the Train set
print("##### Train set: #####")
train_predictions = rf_model.predict(X_train)
train_accuracy = metrics.accuracy_score(Y_train, train_predictions)
print(f"Accuracy: {train_accuracy}")
train_conf_matrix = metrics.confusion_matrix(Y_train, train_predictions)
print(f"Confusion matrix:\n{train_conf_matrix}")
precision = metrics.precision_score(Y_train, train_predictions)
print(f"Precision: {precision}")
recall = metrics.recall_score(Y_train, train_predictions)
print(f"Recall: {recall}")
f1 = metrics.f1_score(Y_train, train_predictions)
print(f"F1 Score: {f1}")


Try Cross-Validation to compare RF performance against training set

In [None]:
"""
Cross-Validation
"""

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# Initialize lists to store metrics
accuracies = []
precisions = []
recalls = []
f1_scores = []
conf_matrices = []

# Cross-validation loop
for train_index, test_index in skf.split(X, Y):
    # Split data
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = Y.iloc[train_index], Y.iloc[test_index]
    
    # Train the model
    rf_model.fit(X_train, y_train)
    
    # Make predictions
    y_pred = rf_model.predict(X_test)
    
    # Calculate metrics
    accuracies.append(accuracy_score(y_test, y_pred))
    precisions.append(precision_score(y_test, y_pred, average='weighted'))
    recalls.append(recall_score(y_test, y_pred, average='weighted'))
    f1_scores.append(f1_score(y_test, y_pred, average='weighted'))
    conf_matrices.append(confusion_matrix(y_test, y_pred))

# Convert results to numpy arrays for easy calculation of averages
accuracies = np.array(accuracies)
precisions = np.array(precisions)
recalls = np.array(recalls)
f1_scores = np.array(f1_scores)

print("\n##### Cross-validation results: #####")
print(f"Accuracy: Mean={accuracies.mean():.4f}, Std={accuracies.std():.4f}")
print(f"Precision: Mean={precisions.mean():.4f}, Std={precisions.std():.4f}")
print(f"Recall: Mean={recalls.mean():.4f}, Std={recalls.std():.4f}")
print(f"F1 Score: Mean={f1_scores.mean():.4f}, Std={f1_scores.std():.4f}")

# Example: Print confusion matrices for each fold
for i, conf_matrix in enumerate(conf_matrices):
    print(f"Confusion Matrix for Fold {i+1}:\n{conf_matrix}\n")

Hyper-Parameters Tuning

In [None]:
"""
Hyper-Parameters Tuning
"""

# Define the hyperparameter grid
param_grid = {
    'n_estimators': [50, 100, 200],          # Number of trees in the forest
    'max_depth': [10, 20, None],            # Maximum depth of each tree
    'max_features': ['sqrt', 'log2', None]  # Number of features to consider at each split
}

# Perform Grid Search with 5-fold Cross-Validation
grid_search = GridSearchCV(
    estimator=rf_model,
    param_grid=param_grid,
    cv=5,                      # 5-fold cross-validation
    scoring='accuracy',         # Use accuracy as the evaluation metric
    n_jobs=-1,                  # Use all available cores
    verbose=2                   # Show progress logs
)

# Fit the model on the training set
grid_search.fit(X_train, y_train)

# Best parameters from Grid Search
print(f"Best Hyperparameters: {grid_search.best_params_}")

# Best cross-validation score
print(f"Best CV Accuracy: {grid_search.best_score_:.4f}")

# Evaluate on the test set
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)

print("\nClassification Report on Test Set:")
print(classification_report(y_test, y_pred))


With the new parameters, we create a new model and find the feature importance

In [None]:
new_model = RandomForestClassifier(n_estimators=200, max_depth=20, max_features='sqrt', random_state=8743)
new_model.fit(X_train, Y_train)

impurity_importances = pd.DataFrame({ 'variable': X_train.columns, 'importance': new_model.feature_importances_ })
print("\nVariable Importances:")
impurity_importances = impurity_importances.sort_values('importance', ascending=False)
print(impurity_importances)

# Plot the variable importances as a bar chart
plt.figure(figsize=(10, 6))
plt.barh(impurity_importances['variable'], impurity_importances['importance'])
plt.xlabel('Importance')
plt.ylabel('Variable')
plt.title('Variable Importances')
plt.show()

# Evaluate the model on the Train set
print("##### Train set: #####")
train_predictions = new_model.predict(X_train)
train_accuracy = metrics.accuracy_score(Y_train, train_predictions)
print(f"Accuracy: {train_accuracy}")
train_conf_matrix = metrics.confusion_matrix(Y_train, train_predictions)
print(f"Confusion matrix:\n{train_conf_matrix}")
precision = metrics.precision_score(Y_train, train_predictions)
print(f"Precision: {precision}")
recall = metrics.recall_score(Y_train, train_predictions)
print(f"Recall: {recall}")
f1 = metrics.f1_score(Y_train, train_predictions)
print(f"F1 Score: {f1}")


# Evaluate the model on the Test set
print("\n##### Test set: #####")
test_predictions = new_model.predict(X_test)
test_accuracy = metrics.accuracy_score(Y_test, test_predictions)
print(f"Accuracy: {test_accuracy}")
test_conf_matrix = metrics.confusion_matrix(Y_test, test_predictions)
print(f"Confusion matrix:\n{test_conf_matrix}")
precision = metrics.precision_score(Y_test, test_predictions)
print(f"Precision: {precision}")
recall = metrics.recall_score(Y_test, test_predictions)
print(f"Recall: {recall}")
f1 = metrics.f1_score(Y_test, test_predictions)
print(f"F1 Score: {f1}")

