![Spotify Logo](https://storage.googleapis.com/pr-newsroom-wp/1/2018/11/Spotify_Logo_RGB_Green.png)

# Spotify Song Popularity Prediction

This notebook implements a machine learning model to predict song popularity on Spotify using various audio features.

## Table of Contents
1. [Setup and Imports](#1-setup-and-imports)
2. [Data Loading and Exploration](#2-data-loading-and-exploration)
3. [Data Preprocessing & Feature Engineering](#3-data-preprocessing--feature-engineering)
4. [Model Training](#4-model-training)
5. [Model Evaluation](#5-model-evaluation)

## 1. Setup and Imports <a id='setup'></a>

In [43]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostRegressor
import joblib
from datetime import datetime

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

# Set plotting style
sns.set_theme(style="white")

## 2. Data Loading and Exploration <a id='data'></a>

In [None]:
# Load the dataset
df = pd.read_csv('../data/raw/spotify_songs.csv')

# Display basic information
print("Dataset Shape:", df.shape)
print("\nFirst few rows:")
df.head()

In [45]:
# Drop unnecessary columns
df.drop(columns=['track_id', 'track_name', 'track_artist', 'track_album_id', 'track_album_name', 'playlist_id', 'playlist_name'], inplace=True)

In [None]:
# Check data info
print("Dataset Info:")
df.info()

In [47]:
# Update data types
# Define data types for each column
dtypes = {
    'track_popularity': 'int32',
    'track_album_release_date': 'datetime64[ns]',
    'playlist_genre': 'category', 
    'playlist_subgenre': 'category',
    'danceability': 'float32',
    'energy': 'float32',
    'key': 'int8',
    'loudness': 'float32', 
    'mode': 'category',
    'speechiness': 'float32',
    'acousticness': 'float32',
    'instrumentalness': 'float32',
    'liveness': 'float32',
    'valence': 'float32',
    'tempo': 'float32',
    'duration_ms': 'int32'
}

# Update data types
df = df.astype(dtypes)

Note: **'Key'** is a categorical variable but is encoded as an integer. More on this in the [Data Preprocessing & Feature Selection](#3-data-preprocessing--feature-engineering) section.


In [None]:
# Calculate number of unique categories for categorical variables
print("Number of unique categories in categorical variables:")
categorical_features = df.select_dtypes(include=['category']).columns
for col in categorical_features:
    print(f"{col}: {df[col].nunique()}")

In [None]:
# Basic statistical description
df.describe()

In [None]:
# Check for missing values
print("\nMissing Values:")
print(df.isnull().sum())

The dataset appears to have no missing values. However, according to the data documentation, the `key` column has missing values encoded as `-1`. We will address this in the next section: [Data Preprocessing & Feature Selection](#3-data-preprocessing--feature-engineering).



In [None]:
# Create a distribution plot for each numeric column
df_numerical_features = df.select_dtypes(include=['int8', 'int32', 'float32']).columns

n_rows = (len(df_numerical_features) + 1) // 3
fig, axes = plt.subplots(nrows=n_rows, ncols=3, figsize=(15, 4 * n_rows))
axes = axes.flatten() 

for i, (ax, col) in enumerate(zip(axes, df_numerical_features)):
    sns.histplot(data=df, x=col, kde=True, ax=ax)
    ax.set_title(f'Distribution of {col}')

# Hide empty subplots if odd number of features
if len(df_numerical_features) % 2 != 0:
    axes[-1].set_visible(False)

plt.tight_layout()
plt.show()

In [None]:
# Visualize outliers in numerical columns

n_rows = (len(df_numerical_features) + 1) // 3
fig, axes = plt.subplots(nrows=n_rows, ncols=3, figsize=(15, 4 * n_rows))
axes = axes.flatten() 

for i, (ax, col) in enumerate(zip(axes, df_numerical_features)):
    sns.boxplot(data=df, x=col, ax=ax)
    ax.set_title(f'Boxplot of {col}')

# Hide empty subplots if odd number of features
if len(df_numerical_features) % 3 != 0:
    for ax in axes[len(df_numerical_features):]:
        ax.set_visible(False)

plt.tight_layout()
plt.show()


#### Do we need to clean the data for outliers?

1. **Tree-based models (XGBoost, LightGBM, CatBoost) are robust to outliers** since they split data into bins rather than fitting continuous functions.  
2. **Large dataset size minimizes the impact of extreme values**, making explicit outlier treatment unnecessary.  
3. **Outliers may contain valuable insights**, and removing them risks losing important patterns.  
4. **Categorical data is unaffected by outliers**, and encoding methods handle variations naturally.  
5. **Regularization and loss functions manage extreme values**, reducing their influence without manual removal.  

Given these factors, outlier cleaning is unnecessary for this dataset.


In [None]:
# Correlation matrix for numerical features
correlation_matrix = df[df_numerical_features].corr()

plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt='.2f')
plt.title('Correlation Matrix of Numerical Features')
plt.tight_layout()
plt.show()

In [None]:
# Visulize the relationship between numerical features and the target variable
plt.figure(figsize=(12, 8))
for i, feature in enumerate(df_numerical_features):
    plt.subplot(5, 3, i + 1)
    sns.scatterplot(data=df, x=feature, y='track_popularity')
    plt.title(f'{feature} vs Popularity')
plt.tight_layout()
plt.show()

## 3. Data Preprocessing & Feature Engineering <a id='preprocessing'></a>

### Encoding Cyclical Variables (Key & Month) with Sine & Cosine

**Key Definition**  
**`key`**: The estimated overall key of the track (0 = C, 1 = C♯/D♭, ..., 11 = B). If no key is detected, the value is `-1`.

Both **`key`** (0–11) and **release_month** (1–12) are naturally **cyclical**. To preserve adjacency (e.g., key 0 near key 11; month 12 near month 1), we encode them with **sine** and **cosine**:

$$
\text{key\_sin} = \sin\!\Bigl(2\pi \cdot \frac{\text{key}}{12}\Bigr), \quad
\text{key\_cos} = \cos\!\Bigl(2\pi \cdot \frac{\text{key}}{12}\Bigr);
$$

$$
\text{release\_month\_sin}(m) = \sin\!\Bigl(\frac{2\pi \times m}{12}\Bigr), \quad
\text{release\_month\_cos}(m) = \cos\!\Bigl(\frac{2\pi \times m}{12}\Bigr).
$$

**Why Both Sine & Cosine?**  
1. **2D Representation**  
   A single trigonometric function (e.g., only sine) flattens the circle to a line, losing adjacency (e.g., January next to December). Using **both** sine and cosine maps each key/month to a **unique 2D point**, preserving cyclical distance.

2. **Avoiding Ambiguities**  
   Sine alone can yield the same magnitude for angles \(\theta\) and \(\theta+\pi\). Adding cosine disambiguates these positions, preventing overlap.

**Handling `-1` for Key**  
Tracks with `key = -1` (no key detected) should have their `key_sin` and `key_cos` set to `NaN`, ensuring they don’t overlap with valid keys.


In [None]:
def preprocess_and_engineer_features(df):
    """
    Preprocess and engineer new features for the Spotify dataset.
    """
    df_featured = df.copy()
    
    # Encode key with sine & cosine
    if 'key' in df_featured.columns:
        df_featured['key_sin'] = np.sin(2 * np.pi * df_featured['key'] / 12)
        df_featured['key_cos'] = np.cos(2 * np.pi * df_featured['key'] / 12)
        df_featured.loc[df_featured['key'] == -1, ['key_sin', 'key_cos']] = np.nan
        df_featured.drop(columns='key', inplace=True)

    # Audio feature engineering
    if all(col in df_featured.columns for col in ['energy', 'danceability']):
        df_featured['energy_dance_ratio'] = df_featured['energy'] / df_featured['danceability']
        df_featured['energy_dance_ratio'].replace([np.inf, -np.inf], np.nan, inplace=True)
    
    if all(col in df_featured.columns for col in ['acousticness', 'energy']):
        df_featured['acoustic_energy_balance'] = df_featured['acousticness'] - df_featured['energy']
    
    if all(col in df_featured.columns for col in ['valence', 'energy']):
        df_featured['mood_score'] = df_featured['valence'] * df_featured['energy']
    
    # Tempo categories
    if 'tempo' in df_featured.columns:
        df_featured['tempo_category'] = pd.qcut(df_featured['tempo'], 4, 
                                              labels=['slow', 'medium', 'fast', 'very_fast'])

    # Release date features
    if 'track_album_release_date' in df_featured.columns:
        df_featured['track_album_release_date'] = pd.to_datetime(df_featured['track_album_release_date'], 
                                                               errors='coerce')
        
        df_featured['release_year'] = df_featured['track_album_release_date'].dt.year
        df_featured['release_month'] = df_featured['track_album_release_date'].dt.month
        df_featured['release_day'] = df_featured['track_album_release_date'].dt.day
        
        # Cyclical month encoding
        month = 2 * np.pi * df_featured['release_month'] / 12
        df_featured['release_month_sin'] = np.sin(month)
        df_featured['release_month_cos'] = np.cos(month)

        df_featured['track_age_years'] = datetime.now().year - df_featured['release_year']
        
        df_featured.drop(columns='track_album_release_date', inplace=True)

    return df_featured

# Engineer features
df_featured = preprocess_and_engineer_features(df)

# Display new features
print("New Features:")
new_features = ['key_sin', 'key_cos', 'energy_dance_ratio', 'acoustic_energy_balance', 'mood_score', 'tempo_category', 'release_month_sin', 'release_month_cos']
df_featured[new_features].head()

## 4. Model Training <a id='training'></a>

In [None]:
df_featured.dtypes

In [57]:
# Prepare features and target
X = df_featured.drop(columns='track_popularity')
y = df_featured['track_popularity']

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create preprocessing pipeline
df_f_numeric_features = X.select_dtypes(include=['int32', 'float32', 'float64']).columns
df_f_categorical_features = X.select_dtypes(include=['category']).columns


In [None]:
# Train XGBoost model with categorical features
xgb_pipeline = Pipeline([
    ('preprocessing', ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), df_f_numeric_features),
            ('cat', OneHotEncoder(sparse_output=False), df_f_categorical_features)
        ]
    )),
    ('regressor', xgb.XGBRegressor(random_state=42))
])

xgb_params = {
    'regressor__n_estimators': [100, 200],
    'regressor__max_depth': [3, 5, 7],
    'regressor__learning_rate': [0.01, 0.1]
}

xgb_grid = GridSearchCV(xgb_pipeline, xgb_params, cv=5, scoring='neg_mean_squared_error', n_jobs=-1)
xgb_grid.fit(X_train, y_train)

print("Best XGBoost parameters:", xgb_grid.best_params_)

In [None]:
# Train LightGBM model
lgb_pipeline = Pipeline([
    ('preprocessing', ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), df_f_numeric_features),
            ('cat', OneHotEncoder(sparse_output=False), df_f_categorical_features)
        ]
    )),
    ('regressor', lgb.LGBMRegressor(random_state=42))
])

lgb_params = {
    'regressor__n_estimators': [100, 200],
    'regressor__num_leaves': [31, 63],
    'regressor__learning_rate': [0.01, 0.1]
}

lgb_grid = GridSearchCV(lgb_pipeline, lgb_params, cv=5, scoring='neg_mean_squared_error', n_jobs=-1)
lgb_grid.fit(X_train, y_train)

print("Best LightGBM parameters:", lgb_grid.best_params_)

In [None]:
# Train CatBoost model
cat_pipeline = Pipeline([
    ('preprocessing', ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), df_f_numeric_features),
            ('cat', 'passthrough', df_f_categorical_features)
        ]
    )),
    ('regressor', CatBoostRegressor(
        random_state=42, 
        verbose=False, 
        task_type="CPU",
        cat_features=[i + len(df_f_numeric_features) for i in range(len(df_f_categorical_features))]
    ))
])

cat_params = {
    'regressor__iterations': [100, 200],
    'regressor__depth': [4, 6],
    'regressor__learning_rate': [0.01, 0.1]
}

cat_grid = GridSearchCV(cat_pipeline, cat_params, cv=5, scoring='neg_mean_squared_error', n_jobs=-1)
cat_grid.fit(X_train, y_train)

print("Best CatBoost parameters:", cat_grid.best_params_)

## 5. Model Evaluation <a id='evaluation'></a>

In [None]:
# Model evaluation function
def evaluate_model(model, X_test, y_test, model_name):
    """Evaluate model performance with comprehensive metrics."""
    y_pred = model.predict(X_test)
    
    # Basic metrics
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    
    # Calculate residuals
    residuals = y_test - y_pred
    
    print(f"\n{model_name} Performance Metrics:")
    print(f"RMSE: {rmse:.2f}")
    print(f"R² Score: {r2:.3f}")
    print(f"MAE: {mae:.2f}")
    print(f"Residual Range: [{residuals.min():.2f}, {residuals.max():.2f}]")
    
    return y_pred, residuals

# Evaluate the three models 
models = {
    'XGBoost': xgb_grid,
    'LightGBM': lgb_grid,
    'CatBoost': cat_grid}
        

In [None]:
# Plot model comparisons function
def plot_model_comparisons(models, X_test, y_test):
    """Create comprehensive comparison visualizations for multiple models."""
    # Store results
    results = {}
    
    plt.figure(figsize=(16, 12))
    
    # Evaluate each model
    for i, (model_name, model) in enumerate(models.items()):
        y_pred, residuals = evaluate_model(model, X_test, y_test, model_name)
        results[model_name] = {'y_pred': y_pred, 'residuals': residuals}
        
        # Actual vs Predicted plot
        plt.subplot(3, 3, i+1)
        plt.scatter(y_test, y_pred, alpha=0.5)
        plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
        plt.title(f'{model_name}: Actual vs Predicted')
        plt.xlabel('Actual Popularity')
        plt.ylabel('Predicted Popularity')
        
        # Residuals plot
        plt.subplot(3, 3, i+4)
        plt.scatter(y_pred, residuals, alpha=0.5)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.title(f'{model_name}: Residuals')
        plt.xlabel('Predicted Popularity')
        plt.ylabel('Residuals')
        
        # Residual distribution
        plt.subplot(3, 3, i+7)
        sns.histplot(residuals, kde=True)
        plt.axvline(x=0, color='r', linestyle='--')
        plt.title(f'{model_name}: Residual Distribution')
        plt.xlabel('Residual Value')
    
    plt.tight_layout()
    plt.show()
    
    return results

# Plot model comparisons
plot_model_comparisons(models, X_test, y_test)