In [0]:
from pyspark.sql import functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.functions import vector_to_array
from pyspark.ml import PipelineModel

In [0]:
from xgboost.spark import SparkXGBClassifier

FEATURES_PATH = "dbfs:/tmp/booking_stage4/final_features_assembled_no_labels"
LABELS_PATH   = "dbfs:/tmp/booking_stage4/teacher_labels_multilabel_desc_reviews_v2"
OUT_ROOT = "dbfs:/tmp/booking_stage5"

MODELS_WITH_DIR    = f"{OUT_ROOT}/models_with_enrichment_v1"
MODELS_WITHOUT_DIR = f"{OUT_ROOT}/models_without_enrichment_v1"


TARGETS = ["label_family", "label_remote", "label_tourist"]

NUM_WORKERS = 3
SEED = 42

In [0]:
X = spark.read.parquet(FEATURES_PATH)
Y = spark.read.parquet(LABELS_PATH)

print("Features rows:", X.count())
print("Labels rows:", Y.count())


Features rows: 3239391
Labels rows: 1402841


In [0]:
df = (
    Y.select("hotel_id", *TARGETS)
     .join(X, on="hotel_id", how="inner")
).cache()

print("Joined labeled rows:", df.count())

df.select(
    F.count("*").alias("rows"),
    *[F.mean(F.col(c)).alias(f"{c}_pos_rate") for c in TARGETS],
).show(truncate=False)


Joined labeled rows: 1402841
+-------+---------------------+---------------------+----------------------+
|rows   |label_family_pos_rate|label_remote_pos_rate|label_tourist_pos_rate|
+-------+---------------------+---------------------+----------------------+
|1402841|0.2644098654088382   |0.7067800271021448   |0.35968153197689545   |
+-------+---------------------+---------------------+----------------------+



In [0]:
EXCLUDE = set(["hotel_id"] + TARGETS + ["grid_id"])

all_feature_cols = [c for c in df.columns if c not in EXCLUDE]

def is_enrichment_col(c: str) -> bool:
    return (
        c in ["has_enrichment"]  
        or c.startswith("count_")
        or c.startswith("log1p_count_")
        or c.endswith("_within_200m")
        or c.endswith("_within_500m")
        or (c.startswith("inv_") and c.endswith("_dist"))
    )

enrich_cols = [c for c in all_feature_cols if is_enrichment_col(c)]
base_cols   = [c for c in all_feature_cols if c not in enrich_cols]

print("All features:", len(all_feature_cols))
print("Enrichment features:", len(enrich_cols))
print("Base (no enrichment) features:", len(base_cols))


All features: 69
Enrichment features: 46
Base (no enrichment) features: 23


In [0]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=SEED)
train_df = train_df.cache()
test_df  = test_df.cache()

print("Train:", train_df.count(), "Test:", test_df.count())

Train: 1122311 Test: 280530


In [0]:
roc_eval = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
pr_eval  = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderPR")


In [0]:
def metrics_at_threshold(pred_df, thr: float):
    """
    pred_df must include columns: label (double), probability (vector)
    """
    tmp = (
        pred_df
        .withColumn("p_pos", vector_to_array("probability")[1])
        .withColumn("pred_thr", (F.col("p_pos") >= F.lit(float(thr))).cast("int"))
    )

    tp = tmp.filter("label=1 AND pred_thr=1").count()
    fp = tmp.filter("label=0 AND pred_thr=1").count()
    fn = tmp.filter("label=1 AND pred_thr=0").count()
    tn = tmp.filter("label=0 AND pred_thr=0").count()

    precision = tp / (tp + fp) if (tp + fp) else 0.0
    recall    = tp / (tp + fn) if (tp + fn) else 0.0
    f1        = (2*precision*recall/(precision+recall)) if (precision+recall) else 0.0

    return {
        "thr": float(thr),
        "precision": float(precision),
        "recall": float(recall),
        "f1": float(f1),
        "tn": int(tn), "fp": int(fp), "fn": int(fn), "tp": int(tp)
    }


In [0]:
def fit_eval_xgb(train_df, test_df, feature_cols, target_col, exp_name, threshold_grid=None):
    tr = train_df.withColumn("label", F.col(target_col).cast("double"))
    te = test_df.withColumn("label", F.col(target_col).cast("double"))

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")

    xgb = SparkXGBClassifier(
        features_col="features",
        label_col="label",
        prediction_col="prediction",
        probability_col="probability",
        raw_prediction_col="rawPrediction",

        eval_metric="aucpr",

        num_workers=NUM_WORKERS,
        max_depth=6,
        learning_rate=0.1,
        n_estimators=300,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_lambda=1.0,
        min_child_weight=1.0,
        missing=float("nan"),
        random_state=SEED
    )

    model = Pipeline(stages=[assembler, xgb]).fit(tr)

    # Keep probability for threshold tuning
    pred = model.transform(te).select("label", "rawPrediction", "probability").cache()
    _ = pred.count()

    pos_rate = te.select(F.mean("label").alias("pos")).first()["pos"]
    roc_auc = roc_eval.evaluate(pred)
    pr_auc  = pr_eval.evaluate(pred)

    # Metrics at default threshold 0.5
    m05 = metrics_at_threshold(pred, 0.5)

    # Best threshold search (max F1)
    if threshold_grid is None:
        threshold_grid = [round(x, 2) for x in [i/100 for i in range(10, 91, 5)]]  # 0.10..0.90 step 0.05

    best = None
    for thr in threshold_grid:
        m = metrics_at_threshold(pred, thr)
        if (best is None) or (m["f1"] > best["f1"]):
            best = m

    pred.unpersist()

    return model, {
        "experiment": exp_name,
        "target": target_col,
        "pos_rate": float(pos_rate),
        "roc_auc": float(roc_auc),
        "pr_auc": float(pr_auc),

        # default 0.5
        "thr_default": 0.5,
        "precision_default": m05["precision"],
        "recall_default": m05["recall"],
        "f1_default": m05["f1"],

        # best-F1 threshold
        "thr_best": best["thr"],
        "precision_best": best["precision"],
        "recall_best": best["recall"],
        "f1_best": best["f1"],
    }


In [0]:
models_with = {}
results_full = []

for t in TARGETS:
    print("WITH enrichment:", t)
    model, metrics = fit_eval_xgb(train_df, test_df, all_feature_cols, t, "with_enrichment")
    models_with[t] = model
    results_full.append(metrics)

    save_path = f"{MODELS_WITH_DIR}/{t}"
    model.write().overwrite().save(save_path)
    print("Saved model to:", save_path)


WITH enrichment: label_family


INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 6, 'min_child_weight': 1.0, 'random_state': 42, 'reg_lambda': 1.0, 'subsample': 0.8, 'eval_metric': 'aucpr', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 300}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


🏃 View run magnificent-ram-501 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232/runs/d3466a80029e4715b4d6c61d70b0df93
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232
Saved model to: dbfs:/tmp/booking_stage5/models_with_enrichment_v1/label_family
WITH enrichment: label_remote


INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 6, 'min_child_weight': 1.0, 'random_state': 42, 'reg_lambda': 1.0, 'subsample': 0.8, 'eval_metric': 'aucpr', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 300}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


🏃 View run legendary-turtle-354 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232/runs/933c81b8ab6547c78235c044e2f90bfd
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232
Saved model to: dbfs:/tmp/booking_stage5/models_with_enrichment_v1/label_remote
WITH enrichment: label_tourist


INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 6, 'min_child_weight': 1.0, 'random_state': 42, 'reg_lambda': 1.0, 'subsample': 0.8, 'eval_metric': 'aucpr', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 300}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


🏃 View run welcoming-lynx-668 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232/runs/a5a201e8c26543acbf8facf2fabd41d2
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232
Saved model to: dbfs:/tmp/booking_stage5/models_with_enrichment_v1/label_tourist


In [0]:
models_without = {}
results_base = []

for t in TARGETS:
    print("WITHOUT enrichment:", t)
    model, metrics = fit_eval_xgb(train_df, test_df, base_cols, t, "without_enrichment")
    models_without[t] = model
    results_base.append(metrics)

    save_path = f"{MODELS_WITHOUT_DIR}/{t}"
    model.write().overwrite().save(save_path)
    print("Saved model to:", save_path)


WITHOUT enrichment: label_family


INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 6, 'min_child_weight': 1.0, 'random_state': 42, 'reg_lambda': 1.0, 'subsample': 0.8, 'eval_metric': 'aucpr', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 300}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


🏃 View run victorious-fly-266 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232/runs/763ae173a7084cb39a25d398f2eed0a1
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232
Saved model to: dbfs:/tmp/booking_stage5/models_without_enrichment_v1/label_family
WITHOUT enrichment: label_remote


INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 6, 'min_child_weight': 1.0, 'random_state': 42, 'reg_lambda': 1.0, 'subsample': 0.8, 'eval_metric': 'aucpr', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 300}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


🏃 View run bouncy-kit-995 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232/runs/9bc5247120a243449bf31f9275c629af
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232
Saved model to: dbfs:/tmp/booking_stage5/models_without_enrichment_v1/label_remote
WITHOUT enrichment: label_tourist


INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 6, 'min_child_weight': 1.0, 'random_state': 42, 'reg_lambda': 1.0, 'subsample': 0.8, 'eval_metric': 'aucpr', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 300}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


🏃 View run thundering-skunk-641 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232/runs/5c8be9522a964a39a301c77422d07966
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/2752243540419232
Saved model to: dbfs:/tmp/booking_stage5/models_without_enrichment_v1/label_tourist


In [0]:
res_df = spark.createDataFrame(results_full + results_base)

display(
    res_df.select(
        "target","experiment","pos_rate",
        "roc_auc","pr_auc",
        "thr_default","precision_default","recall_default","f1_default",
        "thr_best","precision_best","recall_best","f1_best"
    ).orderBy("target","experiment")
)


target,experiment,pos_rate,roc_auc,pr_auc,thr_default,precision_default,recall_default,f1_default,thr_best,precision_best,recall_best,f1_best
label_family,with_enrichment,0.2633586425694221,0.7796285370853345,0.5729610837501546,0.5,0.6630247268545141,0.2965213860314022,0.409779182761104,0.3,0.4741088398432954,0.6617758527341635,0.552439492892816
label_family,without_enrichment,0.2633586425694221,0.7754186181356733,0.566399701228637,0.5,0.6554160125588697,0.2938548998375744,0.4057792232066091,0.3,0.4711988760536996,0.6537087168381158,0.547648206105139
label_remote,with_enrichment,0.7077175346665241,0.7680358980880517,0.8815766218070779,0.5,0.7728832759922799,0.9278367651206834,0.8433010739889579,0.45,0.7598042362002567,0.9540033041866464,0.845900995051539
label_remote,without_enrichment,0.7077175346665241,0.7605083903050003,0.8784200007940002,0.5,0.7670604551682653,0.9248801224966756,0.8386097917427839,0.4,0.7412713687091939,0.9738838296329128,0.8418039371055764
label_tourist,with_enrichment,0.3593091647952091,0.7759079574900503,0.6991143446754837,0.5,0.7262635658914729,0.4647360536523904,0.566785846083111,0.3,0.5486845937287642,0.7289502663769755,0.6261002939798048
label_tourist,without_enrichment,0.3593091647952091,0.7625572625802253,0.6567031820690785,0.5,0.6893092660860369,0.4656190164389813,0.5558016153099169,0.3,0.532246552659377,0.7398236058612856,0.6190988148855359


In [0]:
a = res_df.filter("experiment = 'with_enrichment'").select(
    "target",
    F.col("roc_auc").alias("roc_with"),
    F.col("pr_auc").alias("pr_with"),
    F.col("f1_best").alias("f1_with_best"),
)
b = res_df.filter("experiment = 'without_enrichment'").select(
    "target",
    F.col("roc_auc").alias("roc_without"),
    F.col("pr_auc").alias("pr_without"),
    F.col("f1_best").alias("f1_without_best"),
)

delta = (
    a.join(b, on="target")
     .withColumn("delta_roc", F.col("roc_with") - F.col("roc_without"))
     .withColumn("delta_pr",  F.col("pr_with")  - F.col("pr_without"))
     .withColumn("delta_f1_best",  F.col("f1_with_best") - F.col("f1_without_best"))
     .orderBy("target")
)

display(delta)


target,roc_with,pr_with,f1_with_best,roc_without,pr_without,f1_without_best,delta_roc,delta_pr,delta_f1_best
label_family,0.7796285370853345,0.5729610837501546,0.552439492892816,0.7754186181356733,0.566399701228637,0.547648206105139,0.0042099189496612,0.0065613825215176,0.004791286787677
label_remote,0.7680358980880517,0.8815766218070779,0.845900995051539,0.7605083903050003,0.8784200007940002,0.8418039371055764,0.0075275077830514,0.0031566210130776,0.0040970579459626
label_tourist,0.7759079574900503,0.6991143446754837,0.6261002939798048,0.7625572625802253,0.6567031820690785,0.6190988148855359,0.0133506949098249,0.0424111626064052,0.0070014790942689
