In [None]:
import os
import time
import warnings

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from rich import print
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.ensemble import (
    GradientBoostingClassifier,
    RandomForestClassifier,
    StackingClassifier,
)
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score,
    auc,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
    roc_curve,
)
from sklearn.model_selection import (
    RandomizedSearchCV,
    RepeatedKFold,
    cross_val_predict,
    cross_val_score,
    cross_validate,
    train_test_split,
)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import (
    FunctionTransformer,
    OneHotEncoder,
    OrdinalEncoder,
    StandardScaler,
)
from sklearn.tree import DecisionTreeClassifier

%load_ext rich
pd.set_option("display.max_columns", None)
warnings.filterwarnings("ignore")

In [None]:
data = pd.read_csv('../data/spotify_songs_train.csv')

In [None]:
def drop_month_day_null(X):
    X["track_album_release_date"] = pd.to_datetime(
        X["track_album_release_date"], errors="coerce"
    )

    not_null_mask = (
        X["track_album_release_date"].dt.month.notnull()
        & X["track_album_release_date"].dt.day.notnull()
    )

    X = X[not_null_mask]

    return X


In [None]:
data = data.dropna()

data = drop_month_day_null(data)


In [None]:
X = data.drop(columns=["track_popularity"])
y = (data["track_popularity"] > 50).astype(int)

y

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print(f"Training set size: {X_train.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")


## Feature engineering

In [None]:
class TopArtistTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, num_top_artists=10):
        self.num_top_artists = num_top_artists
        self.top_artists = None

    def fit(self, X, y=None):
        X["release_date__year"] = pd.to_datetime(
            X["track_album_release_date"], errors="coerce"
        ).dt.year
        songs_last_decade = X[
            X["release_date__year"] >= X["release_date__year"].max() - 10
        ]

        top_artists = (
            songs_last_decade.groupby("track_artist")
            .agg({"track_id": "count"})
            .rename(
                columns={
                    "track_id": "number_of_tracks",
                }
            )
        )

        top_artists = top_artists.sort_values(
            ["number_of_tracks"], ascending=[False]
        ).head(self.num_top_artists)

        self.top_artists = top_artists.index

        return self

    def transform(self, X):
        is_top_artist = X["track_artist"].isin(self.top_artists)
        return is_top_artist.to_frame(name="is_top_artist")

    def get_feature_names_out(self, input_features=None):
        return ["is_top_artist"]


In [None]:
def get_num_playlists(X):
    num_playlist = X.groupby("track_id")["playlist_id"].transform("nunique").values

    return np.column_stack([num_playlist])


def playlist_name(X, feature_names):
    return ["num_playlists"]


def release_date(X):
    X = pd.to_datetime(X, errors="coerce")

    month = X.dt.month
    day = X.dt.day

    month_season = month.map(
        {
            1: "Winter",
            2: "Winter",
            3: "Spring",
            4: "Spring",
            5: "Spring",
            6: "Summer",
            7: "Summer",
            8: "Summer",
            9: "Fall",
            10: "Fall",
            11: "Fall",
            12: "Winter",
        }
    )

    day_category = day.apply(
        lambda x: "First 10" if x <= 10 else ("Middle 10" if x <= 20 else "Last 10")
    )

    return np.column_stack([month_season, day_category])


def release_date_name(X, feature_names):
    return ["month_season", "day_category"]


def get_is_remix(X):
    return X.str.contains("remix", case=False).astype(int).values.reshape(-1, 1)


def is_remix_name(X, feature_names):
    return ["is_remix"]


def get_is_weekend(X):
    X = pd.to_datetime(X, errors="coerce")
    return X.dt.dayofweek.isin([5, 6]).astype(int).values.reshape(-1, 1)


def is_weekend_name(X, feature_names):
    return ["is_weekend"]


num_playlist_pipeline = make_pipeline(
    FunctionTransformer(
        get_num_playlists,
        validate=False,
        feature_names_out=playlist_name,
    ),
    StandardScaler(),
)

release_date_pipeline = make_pipeline(
    FunctionTransformer(
        release_date, validate=False, feature_names_out=release_date_name
    ),
    OneHotEncoder(handle_unknown="ignore"),
)

num_pipeline = make_pipeline(
    StandardScaler(),
)


feature_engineering = ColumnTransformer(
    [
        (
            "num_playlists",
            num_playlist_pipeline,
            ["track_id", "playlist_id"],
        ),
        (
            "release_date",
            release_date_pipeline,
            "track_album_release_date",
        ),
        (
            "release_day",
            FunctionTransformer(
                get_is_weekend, validate=False, feature_names_out=is_weekend_name
            ),
            "track_album_release_date",
        ),
        (
            "top_artist",
            TopArtistTransformer(num_top_artists=50),
            [
                "track_artist",
                "track_album_release_date",
                "track_id",
            ],
        ),
        (
            "genres",
            OneHotEncoder(handle_unknown="ignore"),
            ["playlist_genre", "playlist_subgenre"],
        ),
        (
            "track_name",
            FunctionTransformer(
                get_is_remix, validate=False, feature_names_out=is_remix_name
            ),
            "track_name",
        ),
        (
            "numerical",
            num_pipeline,
            [
                "danceability",
                "energy",
                "loudness",
                "speechiness",
                "acousticness",
                "instrumentalness",
                "liveness",
                "valence",
                "tempo",
                "duration_ms",
            ],
        ),
        (
            "key",
            OrdinalEncoder(),
            ["key"],
        ),
        ("mode", "passthrough", ["mode"]),
    ],
    remainder="drop",
)


## Model building

In [None]:
estimators = [
    ("knn", KNeighborsClassifier()),
    ("rf", RandomForestClassifier()),
]

final_estimator = LogisticRegression()

stacked_model = StackingClassifier(
    estimators=estimators, final_estimator=final_estimator, n_jobs=-1, verbose=2, cv=5
)

stacked_model


In [None]:
pipeline = make_pipeline(feature_engineering, stacked_model)
pipeline

In [None]:
print(pipeline)

In [None]:
pipeline.fit(X_train, y_train)
print("Model trained")
pipeline

In [None]:
y_pred = pipeline.predict(X_test)

print(classification_report(y_test, y_pred))


In [None]:
print(f'Accuracy: {accuracy_score(y_test, y_pred)}')
print(f'Precision: {precision_score(y_test, y_pred)}')
print(f'Recall: {recall_score(y_test, y_pred)}')
print(f'F1: {f1_score(y_test, y_pred)}')
print(f'ROC AUC: {roc_auc_score(y_test, y_pred)}')

In [None]:
# Compute ROC curve and ROC area for each class
fpr, tpr, _ = roc_curve(y_test, y_pred)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(15, 10))
lw = 2
plt.plot(
    fpr, tpr, color="darkorange", lw=lw, label="ROC curve (area = %0.2f)" % roc_auc
)
plt.plot([0, 1], [0, 1], color="navy", lw=lw, linestyle="--")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC-AUC curve")
plt.legend(loc="lower right")
plt.show()


#### RandomizedSearchCV

In [None]:
param_grid = {
    "knn__n_neighbors": [3, 5, 7],
    'rf__n_estimators': [100, 200, 300],
}

search = RandomizedSearchCV(
    stacked_reg,
    param_distributions=param_grid,
    n_iter=5,
    random_state=42,
    n_jobs=-1,
    verbose=2,
    cv=5,
    scoring="neg_mean_squared_error",
)

search.fit(X_train, y_train)

print(f"Best params: {search.best_params_}")

In [None]:
train_rmse = np.sqrt(-search.best_score_)
print(f"Train RMSE: {train_rmse}")


In [None]:
best_model = search.best_estimator_
best_model

best_model.fit(X_train, y_train)


In [None]:
y_pred = best_model.predict(X_test)

test_rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print(f"Test RMSE: {test_rmse:.2f}")


In [None]:
fig, ax = plt.subplots(figsize=(15, 10))

PredictionErrorDisplay.from_predictions(
    y_test,
    y_pred,
    kind="actual_vs_predicted",
    random_state=42,
    ax=ax,
    scatter_kwargs=dict(s=100, alpha=0.5),
    line_kwargs=dict(linestyle="--", color="red"),
)

plt.show()
