In [0]:
import pyspark.sql.functions as F

# 1. Aggregate FactStream to get Target Variable (Total Streams per Track)
df_popularity = spark.sql("""
    SELECT 
        track_id, 
        COUNT(stream_id) as total_streams
    FROM spotify_cata.gold.factstream
    GROUP BY track_id
""")

# 2. Join with Dimensions to get Features
df_features = spark.sql("""
    SELECT 
        t.track_id,
        t.duration_sec,
        t.release_date,
        a.genre,
        a.country as artist_country
    FROM spotify_cata.gold.dimtrack t
    JOIN spotify_cata.gold.dimartist a ON t.artist_id = a.artist_id
""")

# 3. Final Dataset
df_full = df_features.join(df_popularity, "track_id", "left").fillna(0)

# 4. Extract Year from Date
df_full = df_full.withColumn("release_year", F.year("release_date"))

# Convert to Pandas for Scikit-Learn (Limit to 100k rows if data is huge)
pdf = df_full.limit(100000).toPandas()

In [0]:
pdf.head()

Unnamed: 0,track_id,duration_sec,release_date,genre,artist_country,total_streams,release_year
0,469,189,2023-07-16,Classical,Suriname,3,2023
1,226,102,2024-10-24,Hip-Hop,Anguilla,3,2024
2,184,244,2023-10-14,Jazz,Argentina,1,2023
3,51,307,2025-04-20,Rock,Greenland,2,2025
4,250,111,2022-06-03,Pop,Saudi Arabia,2,2022


In [0]:
import mlflow
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score

# Define Target
threshold = pdf['total_streams'].median() 
pdf['is_hit'] = (pdf['total_streams'] > threshold).astype(int)
pdf['is_hit'] = (pdf['total_streams'] > threshold).astype(int)

# Define Features
numeric_features = ['duration_sec', 'release_year']
categorical_features = ['genre', 'artist_country']

X = pdf[numeric_features + categorical_features]
y = pdf['is_hit']
X = pdf.drop(columns=['total_streams', 'is_hit', 'track_id'])

# --- Build Preprocessing Pipeline ---
# Numeric -> Scale
# Categorical -> OneHotEncode 
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ])

# --- Train Model with MLflow ---
mlflow.sklearn.autolog()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

with mlflow.start_run(run_name="Spotify_Metadata_Predictor"):
    
    # Random Forest Pipeline
    clf = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', RandomForestClassifier(n_estimators=100, max_depth=10))
    ])
    
    # Train
    clf.fit(X_train, y_train)
    
    # Evaluate
    y_pred = clf.predict(X_test)




Model Accuracy: 0.6534653465346535

Classification Report:
               precision    recall  f1-score   support

           0       0.65      1.00      0.79        66
           1       0.00      0.00      0.00        35

    accuracy                           0.65       101
   macro avg       0.33      0.50      0.40       101
weighted avg       0.43      0.65      0.52       101



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [0]:
    print("Model Accuracy:", accuracy_score(y_test, y_pred))
    print("\nClassification Report:\n", classification_report(y_test, y_pred))

Model Accuracy: 0.6534653465346535

Classification Report:
               precision    recall  f1-score   support

           0       0.65      1.00      0.79        66
           1       0.00      0.00      0.00        35

    accuracy                           0.65       101
   macro avg       0.33      0.50      0.40       101
weighted avg       0.43      0.65      0.52       101



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [0]:
print(pdf['is_hit'].value_counts())

is_hit
0    329
1    173
Name: count, dtype: int64


In [0]:
!pip install xgboost

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from xgboost import XGBClassifier

In [0]:
clf2 = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', XGBClassifier(n_estimators=100, max_depth=10))
])

# Train
clf2.fit(X_train, y_train)


2026/01/18 15:54:36 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '6873dada79d2415e9704b3e6327bd89c', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current sklearn workflow


Model Accuracy: 0.6336633663366337

Classification Report:
               precision    recall  f1-score   support

           0       0.69      0.79      0.74        66
           1       0.46      0.34      0.39        35

    accuracy                           0.63       101
   macro avg       0.58      0.57      0.57       101
weighted avg       0.61      0.63      0.62       101



In [0]:
# Evaluate
y_pred = clf2.predict(X_test)
print("Model Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred))



Model Accuracy: 0.6336633663366337

Classification Report:
               precision    recall  f1-score   support

           0       0.69      0.79      0.74        66
           1       0.46      0.34      0.39        35

    accuracy                           0.63       101
   macro avg       0.58      0.57      0.57       101
weighted avg       0.61      0.63      0.62       101

