In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType

# -------------------------------------------
# 0) INPUTS / PATHS
# -------------------------------------------
SOURCE_DELTA_PATH = "/Volumes/workspace/default/ita"
df = spark.read.format("delta").load(SOURCE_DELTA_PATH)





In [None]:
from typing import Union, List
from pyspark.sql import DataFrame
import datetime


def build_ml_dataset(
    df: DataFrame,
    obs_start: Union[str, datetime.date],
    obs_end: Union[str, datetime.date],
    lbl_start: Union[str, datetime.date],
    lbl_end: Union[str, datetime.date],
    key_cols: List[str] = ["language", "database_name", "is_mobile", "namespace", "page_title"]
) -> DataFrame:
    """
    Build a machine learning dataset for churn prediction.

    The function transforms raw pageview logs into a supervised dataset where each row
    corresponds to a unique entity (defined by `key_cols`). It computes activity,
    recency, seasonality, and diversity features over the observation window, and
    generates churn labels based on presence in the label window.

    Args:
        df (DataFrame):
            Input Spark DataFrame with at least the following columns:
            - file_date (date or string in 'yyyy-MM-dd' format)
            - domain_code (str)
            - page_title (str)
            - count_views (int)
        obs_start (str | datetime.date):
            Start date of the observation window (inclusive).
        obs_end (str | datetime.date):
            End date of the observation window (inclusive).
            All feature computations are restricted to this period.
        lbl_start (str | datetime.date):
            Start date of the label window (inclusive).
        lbl_end (str | datetime.date):
            End date of the label window (inclusive).
            Used to determine churn labels.
        key_cols (List[str], optional):
            Columns that uniquely identify an entity.
            Defaults to ["language", "database_name", "is_mobile", "namespace", "page_title"].

    Returns:
        DataFrame:
            Spark DataFrame with one row per entity, including:

            **Aggregate features**
                - days_active: number of active days in the observation window
                - views_total, views_mean, views_max, views_median, views_std
                - trend_slope: linear regression slope of activity over time

            **Recency / rolling features**
                - views_last_day: activity on the last observation day
                - sum_3d: rolling sums over last 3 days

            **Seasonality features**
                - unique_weekdays: number of distinct weekdays active
                - views_std_dow: variation across weekdays

            **Sparsity indicator**
                - sparsity_level: categorical label {very_sparse, sparse, medium, frequent}

            **Label**
                - churn: 1 if the entity disappeared in label window, else 0

    Notes:
        - Protects against data leakage by restricting features strictly to the
          observation window (`obs_start`–`obs_end`).
        - Label generation compares presence in the label window (`lbl_start`–`lbl_end`).
        - Assumes `file_date` is compatible with Spark date functions and
          comparable to the provided window boundaries.
    """
    # ----------------------
    # Filter obs + churn
    # ----------------------
    obs_df = df.filter(
        (F.col("file_date") >= F.lit(obs_start)) &
        (F.col("file_date") <= F.lit(obs_end))
    )
    lbl_df = df.filter(
        (F.col("file_date") >= F.lit(lbl_start)) &
        (F.col("file_date") <= F.lit(lbl_end))
    )

    # ----------------------
    # Daily aggregates
    # ----------------------
    obs_df = obs_df.withColumn("dow", F.dayofweek("file_date"))  # 1=Sunday, 7=Saturday

    daily = (
        obs_df.groupBy(["file_date","dow"] + key_cols)
        .agg(F.sum("count_views").alias("views_day"))
    )

    # ----------------------
    # Rolling sums (no leakage)
    # ----------------------
    time_w = Window.partitionBy(key_cols).orderBy(F.col("file_date")).rowsBetween(-6, 0)
    time_w3 = Window.partitionBy(key_cols).orderBy(F.col("file_date")).rowsBetween(-2, 0)
    daily = (daily
             .withColumn("sum_3d", F.sum("views_day").over(time_w3)))

    # ----------------------
    # Last day snapshot
    # ----------------------
    last_day = (daily.filter(F.col("file_date") == F.lit(obs_end))
                .select(key_cols + ["views_day","sum_3d"])
                .withColumnRenamed("views_day","views_last_day"))

    # ----------------------
    # Aggregate features
    # ----------------------
    agg_feats = (
        daily.groupBy(key_cols)
        .agg(
            F.countDistinct("file_date").alias("days_active"),
            F.sum("views_day").alias("views_total"),
            F.avg("views_day").alias("views_mean"),
            F.stddev_pop("views_day").alias("views_std"),
            F.countDistinct("dow").alias("unique_weekdays"),
            F.expr("stddev_pop(views_day)").alias("views_std_dow")
        )
    )


    # ----------------------
    # Merge features
    # ----------------------
    features = (agg_feats
                .join(last_day, on=key_cols, how="left")
                .fillna({
                    "views_last_day":0,
                    "sum_3d":0,
                })
                # --- Sparsity guards ---
                .withColumn("sparsity_level",
                            F.when(F.col("days_active")<=2,"very_sparse")
                             .when(F.col("days_active")<=5,"sparse")
                             .when(F.col("days_active")<=8,"medium")
                             .otherwise("frequent"))
               )

    # ----------------------
    # Labels
    # ----------------------
    alive_keys = lbl_df.select(key_cols).distinct().withColumn("alive_flag", F.lit(1))
    obs_keys = daily.select(key_cols).distinct()
    labels = (obs_keys.join(alive_keys, on=key_cols, how="left")
              .withColumn("churn", F.when(F.col("alive_flag").isNull(),1).otherwise(0))
              .drop("alive_flag"))

    # ----------------------
    # Join features + labels
    # ----------------------
    ml_dataset = features.join(labels, on=key_cols, how="inner")

    return ml_dataset

train_set = build_ml_dataset(
    df_ready,
    obs_start="2025-01-01", obs_end="2025-01-23",
    lbl_start="2025-01-24", lbl_end="2025-01-27"
)

test_set = build_ml_dataset(
    df_ready,
    obs_start="2025-01-05", obs_end="2025-01-27",
    lbl_start="2025-01-28", lbl_end="2025-01-31"
)

print("Train rows:", train_set.count())
print("Test rows:", test_set.count())


Train rows: 4697657
Test rows: 4762952


In [None]:
display(df_train)

language,database_name,is_mobile,namespace,page_title,days_active,views_total,views_mean,views_std,unique_weekdays,views_std_dow,views_last_day,sum_3d,sparsity_level,churn
Italian,wikipedia.org,False,Article,juninho pernambucano,23,617,26.82608695652174,13.41528062521646,7,13.41528062521646,68,143,frequent,0
Italian,wiktionary,False,Article,disobbedienza,3,3,1.0,0.0,3,0.0,0,0,sparse,1
Italian,wikipedia.org,True,Article,valigia,23,175,7.608695652173913,4.093888663072466,7,4.093888663072466,10,21,frequent,0
Italian,wikiquote,False,Article,central intelligence agency,7,14,2.0,1.0690449676496974,4,1.0690449676496974,1,4,medium,0
Italian,wikipedia.org,True,Article,francesco petrucci,19,63,3.3157894736842106,1.9749118988730223,7,1.9749118988730223,2,5,frequent,0
Italian,wikipedia.org,False,Article,baccara,23,210,9.130434782608695,2.878773105608148,7,2.878773105608148,7,26,frequent,0
Italian,wikipedia.org,False,Article,brossura,23,476,20.695652173913043,6.53708339837254,7,6.53708339837254,30,74,frequent,0
Italian,wikipedia.org,False,Article,melchiorre gioia,23,138,6.0,3.575277429186744,7,3.575277429186744,5,18,frequent,0
Italian,wikipedia.org,False,Article,bouvron,9,26,2.888888888888889,2.3306863292670035,6,2.3306863292670035,1,3,frequent,0
Italian,wikipedia.org,True,Article,mick foley,23,1335,58.04347826086956,14.5437219710522,7,14.5437219710522,59,189,frequent,0


In [None]:
df_train.columns

Index(['language', 'database_name', 'is_mobile', 'namespace', 'page_title',
       'days_active', 'views_total', 'views_mean', 'views_std',
       'unique_weekdays', 'views_std_dow', 'views_last_day', 'sum_3d',
       'sparsity_level', 'churn'],
      dtype='object')

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import numpy as np

# Target
y = df_train["churn"].astype(int)

# Features: drop target + maybe drop page_title (too sparse, high cardinality)
X = df_train.drop(columns=["churn"])

# Identify categorical and numerical
categorical_cols = ["page_title","language", "database_name", "namespace", "is_mobile","sparsity_level"]
numeric_cols = [c for c in X.columns if c not in categorical_cols]

# Preprocessor
categorical_transformer = OneHotEncoder(handle_unknown="ignore")
numeric_transformer = StandardScaler()

preprocessor = ColumnTransformer(
    transformers=[
        ("cat", categorical_transformer, categorical_cols),
        ("num", numeric_transformer, numeric_cols)
    ]
)

# Split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

In [None]:
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer

clf = HistGradientBoostingClassifier(random_state=42)

param_dist = {
    "clf__max_depth": [3, 5, 7, None],
    "clf__learning_rate": [0.01, 0.05, 0.1, 0.2],
    "clf__max_iter": [100, 200, 500],
    "clf__min_samples_leaf": [10, 20, 50],
    "clf__l2_regularization": [0.0, 1.0, 5.0]
}

# Add a FunctionTransformer to ensure dense output
def to_dense(x):
    return x.toarray() if hasattr(x, "toarray") else x

pipe = Pipeline(steps=[
    ("preprocess", preprocessor),
    ("to_dense", FunctionTransformer(to_dense)),
    ("clf", clf)
])

search = RandomizedSearchCV(
    pipe,
    param_distributions=param_dist,
    n_iter=2,
    scoring="roc_auc",
    cv=3,
    verbose=2,
    n_jobs=-1,
    random_state=42
)

X_small = X_train.sample(n=10000, random_state=42)
y_small = y_train.loc[X_small.index]

search.fit(X_small, y_small)

print("Best ROC-AUC (CV):", search.best_score_)
print("Best params:", search.best_params_)

Fitting 3 folds for each of 2 candidates, totalling 6 fits
[CV] END clf__l2_regularization=0.0, clf__learning_rate=0.1, clf__max_depth=None, clf__max_iter=200, clf__min_samples_leaf=10; total time= 1.8min
[CV] END clf__l2_regularization=0.0, clf__learning_rate=0.1, clf__max_depth=None, clf__max_iter=200, clf__min_samples_leaf=10; total time= 1.8min
[CV] END clf__l2_regularization=5.0, clf__learning_rate=0.05, clf__max_depth=7, clf__max_iter=500, clf__min_samples_leaf=10; total time= 3.7min
Best ROC-AUC (CV): 0.8810553536809765
Best params: {'clf__min_samples_leaf': 10, 'clf__max_iter': 500, 'clf__max_depth': 7, 'clf__learning_rate': 0.05, 'clf__l2_regularization': 5.0}


In [None]:
from sklearn.metrics import roc_auc_score, average_precision_score, classification_report

best_model = search.best_estimator_

X_test_small = X_test.sample(n=1000, random_state=42)
y_test_small = y_test.loc[X_test_small.index]


y_pred_proba = best_model.predict_proba(X_test_small)[:,1]
y_pred = best_model.predict(X_test_small)

print("Test ROC-AUC:", roc_auc_score(y_test_small, y_pred_proba))
print("Test PR-AUC:", average_precision_score(y_test_small, y_pred))
print(classification_report(y_test_small, y_pred))

Test ROC-AUC: 0.9025041642642065
Test PR-AUC: 0.7487685640882781
              precision    recall  f1-score   support

           0       0.86      0.78      0.82       516
           1       0.79      0.87      0.83       484

    accuracy                           0.82      1000
   macro avg       0.83      0.83      0.82      1000
weighted avg       0.83      0.82      0.82      1000



In [None]:
from sklearn.metrics import roc_auc_score, average_precision_score, classification_report

# Split
X_train_set, X_test_set, y_train_set, y_test_set = train_test_split(
    X_set, y_set, test_size=0.2, stratify=y_set, random_state=42
)

best_model = search.best_estimator_

X_test_small_set = X_test_set.sample(n=1000, random_state=42)
y_test_small_set = y_test_set.loc[X_test_small_set.index]


y_pred_proba = best_model.predict_proba(X_test_small_set)[:,1]
y_pred = best_model.predict(X_test_small_set)

print("Test ROC-AUC:", roc_auc_score(y_test_small_set, y_pred_proba))
print("Test PR-AUC:", average_precision_score(y_test_small_set, y_pred))
print(classification_report(y_test_small_set, y_pred))

Test ROC-AUC: 0.875728046594982
Test PR-AUC: 0.7318763342030361
              precision    recall  f1-score   support

           0       0.84      0.76      0.79       504
           1       0.77      0.85      0.81       496

    accuracy                           0.80      1000
   macro avg       0.80      0.80      0.80      1000
weighted avg       0.80      0.80      0.80      1000



In [None]:
from sklearn.inspection import permutation_importance

r = permutation_importance(best_model, X_test_small, y_test_small, n_repeats=10, random_state=42, n_jobs=-1)
sorted_idx = r.importances_mean.argsort()[::-1]

for i in sorted_idx[:10]:
    print(f"{X.columns[i]}: {r.importances_mean[i]:.4f}")

days_active: 0.1240
views_total: 0.0893
views_mean: 0.0104
unique_weekdays: 0.0098
views_std: 0.0068
sum_3d: 0.0055
database_name: 0.0040
namespace: 0.0035
views_last_day: 0.0006
views_std_dow: 0.0000
