In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import kagglehub

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import make_scorer, fbeta_score, precision_score, recall_score

In [None]:
# Constants
POPULARITY_THRESHOLD = 85
SEED = 42
BETA = 1.5

RECALL_THRESHOLD = 0.7
PRECISION_THRESHOLD = 0.2

In [None]:
# Download latest version
data_path = kagglehub.dataset_download("amitanshjoshi/spotify-1million-tracks")

print("Path to dataset files:", data_path)

In [None]:
# I. Load data
data = pd.read_csv(f"{data_path}/spotify_data.csv")

In [None]:
data['artist_name'].nunique(), data['track_name'].nunique(), data['track_id'].nunique()
artists_popularity = data.groupby('artist_name')['popularity'].mean().sort_values(ascending=False)
# add columns names (artist_name, average_popularity)
artists_popularity = artists_popularity.reset_index()
artists_popularity.columns = ['artist_name', 'average_popularity']

artists_popularity = artists_popularity[artists_popularity['average_popularity'] > 0]
# get the 20 top artists based on average popularity of their songs
top_artists = data.groupby('artist_name')['popularity'].mean().sort_values(ascending=False).head(20)
top_artists.plot(kind='bar', figsize=(12, 6), title='Top 20 Artists by Average Popularity')
# add feature for every song, number of songs from the same artist
data['artist_song_count'] = data.groupby('artist_name')['track_id'].transform('count')

In [None]:
data['year'] = data['year'].astype(int)
yearly_thresholds = data.groupby('year')['popularity'].quantile(POPULARITY_THRESHOLD / 100).to_dict()
data['verdict'] = data.apply(lambda row: 1 if row['popularity'] >= yearly_thresholds[row['year']] else 0, axis=1)
data['verdict'].value_counts()
yearly_thresholds
# add a red median line
median_popularity = data['popularity'].median()

In [None]:
# verdict 1 percentage
verdict_1_percentage = (data['verdict'].sum() / data.shape[0]) * 100

In [None]:
# calculate the quantiles for duration_ms
Q1 = data['duration_ms'].quantile(0.25)
Q3 = data['duration_ms'].quantile(0.75)
Q4 = data['duration_ms'].quantile(0.95)
IQR = Q3 - Q1
print(f"Q1: {Q1}, Q3: {Q3}, Q4: {Q4}")

In [None]:
# add feature normal vs long duration
data['long_duration'] = data['duration_ms'].apply(lambda x: 1 if x > Q4 else 0)

# add feature normal vs short duration
data['short_duration'] = data['duration_ms'].apply(lambda x: 1 if x < Q1 else 0)

In [None]:
# percentage of tracks with 0 popularity
num_zero_popularity = data[data['popularity'] == 0].shape[0]
percentage_zero_popularity = (num_zero_popularity / data.shape[0]) * 100
print(f"Percentage of tracks with 0 popularity: {percentage_zero_popularity:.2f}%")

In [None]:
# nbr of tracks with popularity >=95
num_max_popularity = data[data['popularity'] >= 95].shape[0]
print(f"Number of tracks with popularity >=95: {num_max_popularity}")
percentage_max_popularity = (num_max_popularity / data.shape[0]) * 100
print(f"Percentage of tracks with popularity >=95: {percentage_max_popularity:.2f}%")

# III. Modeling

In [None]:
# create a model that randomly predicts the popularity based on the distribution of popularity in the dataset

mean_popularity = data['popularity'].mean()
std_popularity = data['popularity'].std()

data['random_popularity'] = np.random.normal(mean_popularity, std_popularity, size=len(data))
data['random_verdict'] = (data['random_popularity'] > POPULARITY_THRESHOLD).astype(int)

In [None]:
def hot_encode_column(df, column_name) -> pd.DataFrame:
    """
    One-hot encode a categorical column in the dataframe.
    """
    one_hot = pd.get_dummies(df[column_name], prefix=column_name).astype(int)
    df = df.drop(column_name, axis=1)
    df = df.join(one_hot)
    return df

In [None]:
# build a simple decision tree model
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
# from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

features = data.drop(columns=['popularity', 'verdict', 'random_popularity', 'random_verdict'])
features = features.drop(columns=['Unnamed: 0', 'artist_name', 'track_name', 'track_id', 'year'])
features.dropna(inplace=True)

features = hot_encode_column(features, 'genre')

X_train, X_test, y_train, y_test = train_test_split(features, data['verdict'], test_size=0.2, random_state=SEED)
features.head()

In [None]:
model = DecisionTreeClassifier(random_state=SEED, class_weight='balanced', max_depth=5)
model.fit(X_train, y_train)

In [None]:
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)

In [None]:
# build a random forest model
from sklearn.ensemble import RandomForestClassifier

rf_model = RandomForestClassifier(random_state=SEED, class_weight='balanced', n_estimators=100, max_depth=5, max_features='sqrt', min_samples_split=10)
rf_model.fit(X_train, y_train)

In [None]:
features = data.drop(columns=['popularity', 'verdict', 'random_popularity', 'random_verdict'])

In [None]:
from xgboost import XGBClassifier
xgb_model = XGBClassifier(random_state=SEED, 
                          objective='binary:logistic',
                          colsample_bytree=0.8, 
                          learning_rate=0.1, 
                          max_depth=6, 
                          n_estimators=1000, 
                          subsample=0.9, 
                          scale_pos_weight=6)

xgb_model.fit(X_train, y_train)

# Calculate the best Threshold for popularity

In [None]:
def constrained_score(y_true, y_pred):
    prec = precision_score(y_true, y_pred, pos_label=1)
    rec  = recall_score(y_true, y_pred, pos_label=1)

    # Hard constraints
    if rec < RECALL_THRESHOLD or prec < PRECISION_THRESHOLD:
        return 0.0

    # Smooth reward zone once constraints are satisfied
    # You can tweak the weights
    return 0.5 * prec + 0.5 * rec

In [None]:
# create a function that run an xgboost model for all thresholds from 50 to 70 and return the best threshold based on the average of recalls
def find_best_threshold(data, start=50, end=70):
    best_threshold = start
    best_recall_avg = 0.0
    delta_recalls = 0.0
    score = 0.0
    best_recall_avg_lst = []
    deltas_lst = []
    scores_lst = []
    scorer = make_scorer(constrained_score, greater_is_better=True)

    for threshold in range(start, end + 1):
        print(f"Evaluating threshold: {threshold}")
        data['verdict'] = (data['popularity'] > threshold).astype(int)
        X = data.drop(columns=['popularity', 'verdict', 'random_popularity', 'random_verdict'])
        X = X.drop(columns=['Unnamed: 0', 'artist_name', 'track_name', 'track_id', 'year'])
        X = hot_encode_column(X, 'genre')
        y = data['verdict']

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=SEED)

        xgb_model = XGBClassifier(random_state=SEED, 
                                  objective='binary:logistic',
                                  colsample_bytree=0.8, 
                                  learning_rate=0.1,
                                  max_depth=6, 
                                  n_estimators=600, 
                                  subsample=0.9, 
                                  scale_pos_weight=30, n_jobs=-1)
        
        xgb_model.fit(X_train, y_train)
        predictions = xgb_model.predict(X_test)

        report = classification_report(y_test, predictions, output_dict=True)
        recall_avg = (report['1']['recall'] + report['0']['recall']) / 2
        delta_recalls = report['1']['recall'] - report['0']['recall']
        score = scorer(xgb_model, X_test, y_test)

        best_recall_avg_lst.append(recall_avg)
        deltas_lst.append(delta_recalls)
        scores_lst.append(score)

        print(f"Threshold: {threshold}, Average Recall: {recall_avg:.2f}, Delta Recalls: {delta_recalls:.2f}, Score: {score:.2f}")

        if recall_avg > best_recall_avg:
            best_recall_avg = recall_avg
            best_threshold = threshold

    return best_threshold, best_recall_avg, best_recall_avg_lst, deltas_lst, scores_lst

## SearchGridCV

In [None]:
from sklearn.model_selection import GridSearchCV

xgb_model = XGBClassifier(random_state=SEED, 
                          objective='binary:logistic',
                          n_jobs=-1)

# scorer = make_scorer(fbeta_score, beta=2)
scorer = make_scorer(constrained_score, greater_is_better=True)

param_grid = {
    'n_estimators': [600, 800, 1000],
    'max_depth': [6],
    'learning_rate': [0.1],
    'subsample': [0.9],
    'colsample_bytree': [0.6, 0.8],
    'scale_pos_weight': [6, 12]
}

grid_search = GridSearchCV(estimator=xgb_model, param_grid=param_grid, scoring='recall_macro', cv=3, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)

In [None]:
best_params = grid_search.best_params_
best_model = grid_search.best_estimator_

y_pred = best_model.predict(X_test)