# Train models once (US-only)

This notebook trains and saves all reusable artifacts **once**:
- KMeans geo clustering model (regions)
- PCA model / weights for WeightedQuality
- Region median price table
- RandomForest pricing model (with CV)



In [0]:
# Imports

from __future__ import annotations

import os
import time
import json
from typing import Dict, Iterable, Iterator, List, Optional, Tuple

import pandas as pd

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import (
    ArrayType, StructType, StructField, StringType, DoubleType, IntegerType
)

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler, PCA as SparkPCA
from pyspark.ml.clustering import KMeans


In [0]:
# Configuration & setup

# Data sources
EVENTS_CSV_PATH = "/Workspace/Users/odelia.dov@campus.technion.ac.il/events_at_big_venues.csv"

# Azure storage (Databricks + ABFSS)
STORAGE_ACCOUNT = "lab94290"
CONTAINER = "airbnb"
AIRBNB_PARQUET_PATH = f"abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/airbnb_1_12_parquet"

# Output tables
VENUES_TABLE = "default.venues_clean"
EVENTS_TABLE = "default.events_clean"
LISTINGS_TABLE = "default.listings_clean"
PAIR_TABLE = "default.listing_upcoming_events"
OUT_TABLE = "default.listing_upcoming_events_scored_v4"

# Join + modeling parameters
MAX_RADIUS_KM = 10.0
LOOKAHEAD_DAYS = 150
SEED = 22

# Model CV grid 
CV_FOLDS = 3
RF_MAX_DEPTH_GRID = [5, 10]
RF_NUM_TREES_GRID = [50, 100]

# US-only geographic bounds (training restriction)
US_LAT_MIN = 19.50139
US_LAT_MAX = 64.85694
US_LON_MIN = -161.75583
US_LON_MAX = -68.01197

# Artifact output base (DBFS). Each run writes to a unique folder.
# /Workspace/Users/odelia.dov@campus.technion.ac.il/train_models_once_us
from datetime import datetime
RUN_ID = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
ARTIFACT_BASE = f"/Workspace/Users/odelia.dov@campus.technion.ac.il/{RUN_ID}"

KMEANS_MODEL_PATH = f"{ARTIFACT_BASE}/kmeans_region_model"
PCA_MODEL_PATH    = f"{ARTIFACT_BASE}/pca_quality_model"
PRICE_MODEL_PATH  = f"{ARTIFACT_BASE}/price_model"
REGION_STATS_PATH = f"{ARTIFACT_BASE}/region_price_stats"   # saved as Delta
CONFIG_JSON_PATH  = f"{ARTIFACT_BASE}/config.json"


In [0]:

# Storage + utility helpers


def configure_azure_abfss_with_sas(
    spark,
    storage_account: str,
    sas_token: Optional[str] = None,
    sas_env_var: str = "sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D",
) -> None:

    if sas_token is None:
        sas_token = os.getenv(sas_env_var)

    if not sas_token:
        raise ValueError(
            f"Missing SAS token. Set env var {sas_env_var} or pass sas_token explicitly."
        )

    sas_token = sas_token.lstrip("?")
    spark.conf.set(
        f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS"
    )
    spark.conf.set(
        f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net",
        "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider",
    )
    spark.conf.set(
        f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", sas_token
    )


def haversine_km(lat1, lon1, lat2, lon2):
    """get circle distance between (lat, lon) points in km."""
    r = F.lit(6371.0)
    phi1 = F.radians(lat1)
    phi2 = F.radians(lat2)
    dphi = F.radians(lat2 - lat1)
    dlambda = F.radians(lon2 - lon1)
    a = (F.sin(dphi / 2) ** 2 +
         F.cos(phi1) * F.cos(phi2) * (F.sin(dlambda / 2) ** 2))
    return r * 2 * F.atan2(F.sqrt(a), F.sqrt(1 - a))


def safe_overwrite_table(df: DataFrame, table_name: str) -> None:
    (
        df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(table_name)
    )


In [0]:

# Load + clean: venues, events, listings

def load_events_raw(spark, csv_path: str) -> DataFrame:
    events_pd = pd.read_csv(csv_path)
    return spark.createDataFrame(events_pd)


def build_venues_clean(events_raw: DataFrame) -> DataFrame:
    return (
        events_raw
        .withColumn("venue_name", F.coalesce(F.col("tm_venue"), F.col("anchor_venue_name")))
        .withColumn("lat", F.col("tm_latitude").cast("double"))
        .withColumn("lon", F.col("tm_longitude").cast("double"))
        .withColumn("capacity", F.col("anchor_capacity").cast("double"))
        .filter("lat IS NOT NULL AND lon IS NOT NULL")
        .select("venue_name", "lat", "lon", "capacity")
        .dropDuplicates()
        .withColumn("venue_id", F.sha2(F.concat_ws("||", "venue_name", "lat", "lon"), 256))
    )


def build_events_clean(events_raw: DataFrame, venues_clean: DataFrame) -> DataFrame:
    return (
        events_raw
        .withColumn("venue_name", F.coalesce(F.col("tm_venue"), F.col("anchor_venue_name")))
        .withColumn("event_date", F.to_date(F.col("local_date")))
        .withColumn("event_type", F.col("genre"))
        .filter(F.col("event_date").isNotNull())
        .select("event_id", "event_name", "event_date", "event_type", "venue_name")
        .join(venues_clean, on="venue_name", how="inner")
        .filter(F.col("event_date") >= F.current_date())
    )


def build_listings_clean(airbnb_df: DataFrame) -> DataFrame:
    return (
        airbnb_df
        .withColumn("listing_id", F.col("property_id"))
        .withColumn("lat", F.col("lat").cast("double"))
        .withColumn("lon", F.col("long").cast("double"))
        .withColumn("total_price", F.regexp_extract(F.col("price").cast("string"), r"([0-9]+)", 1).cast("int"))
        .select(
            "listing_id",
            "lat",
            "lon",
            "total_price",
            "listing_title",
            "description",
            "amenities",
            "ratings",
            "category_rating",
            "reviews",
            "is_supperhost",
            "guests",
        )
        .filter("listing_id IS NOT NULL AND lat IS NOT NULL AND lon IS NOT NULL")
        .filter((F.col("lat") >= F.lit(US_LAT_MIN)) & (F.col("lat") <= F.lit(US_LAT_MAX)) &
                (F.col("lon") >= F.lit(US_LON_MIN)) & (F.col("lon") <= F.lit(US_LON_MAX)))
    )


In [0]:
# Feature engineering 

from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.ml.feature import VectorAssembler, PCA as SparkPCA
from pyspark.ml.clustering import KMeans

# JSON Schemas
AMENITIES_SCHEMA = ArrayType(
    StructType([
        StructField("group_name", StringType(), True),
        StructField("items", ArrayType(
            StructType([
                StructField("name", StringType(), True),
                StructField("value", StringType(), True),
            ])
        ), True),
    ])
)

CATEGORY_RATING_SCHEMA = ArrayType(
    StructType([
        StructField("name", StringType(), True),
        StructField("value", StringType(), True),
    ])
)

CATEGORY_COLS = ["cleanliness", "accuracy", "check_in", "communication", "location", "value"]


def _extract_category_rating_cols(df: DataFrame) -> DataFrame:
    """
    Expects column: category_rating_json (array<struct<name,value>>)
    Produces numeric columns:
      cleanliness, accuracy, check_in, communication, location, value
    """
    df2 = df.withColumn(
        "cat_map",
        F.map_from_entries(
            F.transform(
                "category_rating_json",
                lambda x: F.struct(F.lower(x["name"]).alias("k"), x["value"].cast("double").alias("v"))
            )
        )
    )
    for c in CATEGORY_COLS:
        df2 = df2.withColumn(c, F.coalesce(F.col("cat_map").getItem(c), F.lit(0.0)))
    return df2.drop("cat_map")


# KMeans:

def fit_kmeans_region_model(
    listings_feats_base: DataFrame,
    lat_col: str = "l_lat",
    lon_col: str = "l_lon",
    k: int = 150,
    sample_fraction: float = 0.2,
    seed: int = 22,
    max_iter: int = 25,
) -> Any:
    df_geo = listings_feats_base.select("listing_id", lat_col, lon_col).dropna(subset=[lat_col, lon_col]).dropDuplicates(["listing_id"])
    assembler = VectorAssembler(inputCols=[lat_col, lon_col], outputCol="latlon_vec")
    df_vec = assembler.transform(df_geo)
    df_sample = df_vec.sample(withReplacement=False, fraction=sample_fraction, seed=seed)

    kmeans = KMeans(k=k, seed=seed, featuresCol="latlon_vec", predictionCol="region_id", maxIter=max_iter)
    return kmeans.fit(df_sample)


def apply_kmeans_region_model(
    df: DataFrame,
    kmeans_model: Any,
    lat_col: str = "l_lat",
    lon_col: str = "l_lon",
) -> DataFrame:
    assembler = VectorAssembler(inputCols=[lat_col, lon_col], outputCol="latlon_vec")
    df_geo = assembler.transform(df.select("listing_id", lat_col, lon_col).dropna(subset=[lat_col, lon_col]))
    regions = kmeans_model.transform(df_geo).select("listing_id", "region_id")
    return df.join(regions, on="listing_id", how="left")


def compute_region_price_stats(
    df_with_region: DataFrame,
    price_col: str = "total_price",
) -> DataFrame:
    return (
        df_with_region.dropna(subset=["region_id", price_col])
          .groupBy("region_id")
          .agg(F.expr(f"percentile_approx({price_col}, 0.5)").alias("median_price_region"))
    )


def add_region_relative_price_from_stats(
    df_with_region: DataFrame,
    region_price_stats: DataFrame,
    price_col: str = "total_price",
) -> DataFrame:
    return (
        df_with_region.join(region_price_stats, on="region_id", how="left")
        .withColumn(
            "rel_price_region",
            F.when(F.col("median_price_region") > 0, F.col(price_col) / F.col("median_price_region"))
             .otherwise(F.lit(0.0))
        )
    )


# PCA:

def fit_pca_quality_model(
    listings_with_cat_cols: DataFrame,
    sample_fraction: float = 0.2,
    seed: int = 22,
) -> Any:
    df_feat = listings_with_cat_cols.select("listing_id", *CATEGORY_COLS)
    assembler = VectorAssembler(inputCols=CATEGORY_COLS, outputCol="features_vec", handleInvalid="skip")
    df_vec = assembler.transform(df_feat).select("listing_id", "features_vec")
    df_sample = df_vec.sample(withReplacement=False, fraction=sample_fraction, seed=seed)

    pca = SparkPCA(k=1, inputCol="features_vec", outputCol="pca_features")
    return pca.fit(df_sample)


def pca_weights_from_model(pca_model: Any) -> List[float]:
    # first principal component loadings; flip sign like your earlier assignment
    w = pca_model.pc.toArray()[:, 0]
    w = (-w).tolist()
    s = float(sum(abs(x) for x in w)) or 1.0
    return [float(x / s) for x in w]


def add_weighted_quality_from_weights(df: DataFrame, weights: List[float]) -> DataFrame:
    expr = None
    for name, wi in zip(CATEGORY_COLS, weights):
        term = F.col(name) * F.lit(float(wi))
        expr = term if expr is None else (expr + term)
    return df.withColumn("WeightedQuality", F.coalesce(expr.cast("double"), F.lit(0.0)))


# Base features from data

def build_listings_feature_base(listings_clean: DataFrame) -> DataFrame:
    """
    Produces per-listing features that don't require learned artifacts.
    """
    # robust numeric parsing
    ratings_num = F.regexp_extract(F.col("ratings").cast("string"), r"([0-9]+(\.[0-9]+)?)", 1).cast("double")
    price_num   = F.regexp_replace(F.col("total_price").cast("string"), r"[^0-9.]", "").cast("double")
    reviews_num = F.regexp_extract(F.col("reviews").cast("string"), r"([0-9]+)", 1).cast("double")
    guests_num  = F.regexp_extract(F.col("guests").cast("string"), r"([0-9]+)", 1).cast("double")
    is_superhost_num = F.lower(F.col("is_supperhost").cast("string")).isin("true", "t", "1", "yes").cast("double")

    df = (
        listings_clean
        .withColumn("l_lat", F.col("lat").cast("double"))
        .withColumn("l_lon", F.col("lon").cast("double"))
        .withColumn("total_price", price_num)     # ensure numeric for modeling
        .withColumn("rating_overall", ratings_num)
        .withColumn("reviews_num", reviews_num)
        .withColumn("guests_num", guests_num)
        .withColumn("is_superhost_num", is_superhost_num)

        # Amenities
        .withColumn("amenities_json", F.from_json(F.col("amenities").cast("string"), AMENITIES_SCHEMA))
        .withColumn("amenities_groups", F.size(F.col("amenities_json")).cast("double"))
        .withColumn("amenities_items", F.flatten(F.transform("amenities_json", lambda g: g["items"])))
        .withColumn("amenities_total_items", F.size(F.col("amenities_items")).cast("double"))
        .withColumn("amenities_values", F.transform("amenities_items", lambda x: x["value"]))
        .withColumn("amenities_unique_values", F.size(F.array_distinct("amenities_values")).cast("double"))
        .withColumn("has_wifi", F.array_contains("amenities_values", F.lit("SYSTEM_WI_FI")).cast("double"))
        .withColumn("has_pool", F.array_contains("amenities_values", F.lit("SYSTEM_POOL")).cast("double"))
        .withColumn("has_gym",  F.array_contains("amenities_values", F.lit("SYSTEM_GYM")).cast("double"))

        # Category ratings
        .withColumn("category_rating_json", F.from_json(F.col("category_rating").cast("string"), CATEGORY_RATING_SCHEMA))
        .withColumn("category_rating_values", F.transform("category_rating_json", lambda x: x["value"].cast("double")))
        .withColumn(
            "category_rating_avg",
            F.expr("aggregate(category_rating_values, 0D, (acc, x) -> acc + x) / greatest(size(category_rating_values), 1)").cast("double")
        )
        .withColumn(
            "category_rating_min",
            F.expr("aggregate(category_rating_values, 10D, (acc, x) -> least(acc, x))").cast("double")
        )

        # Null safety
        .withColumn("amenities_groups", F.coalesce("amenities_groups", F.lit(0.0)))
        .withColumn("amenities_total_items", F.coalesce("amenities_total_items", F.lit(0.0)))
        .withColumn("amenities_unique_values", F.coalesce("amenities_unique_values", F.lit(0.0)))
        .withColumn("rating_overall", F.coalesce("rating_overall", F.lit(0.0)))
        .withColumn("category_rating_avg", F.coalesce("category_rating_avg", F.lit(0.0)))
        .withColumn("category_rating_min", F.coalesce("category_rating_min", F.lit(0.0)))
        .withColumn("reviews_num", F.coalesce("reviews_num", F.lit(0.0)))
        .withColumn("guests_num", F.coalesce("guests_num", F.lit(0.0)))
        .withColumn("is_superhost_num", F.coalesce("is_superhost_num", F.lit(0.0)))
        .withColumn("total_price", F.coalesce("total_price", F.lit(0.0)))
    )

    # Needed for PCA weights computation
    df = _extract_category_rating_cols(df)

    return df


def add_vfm_metric(df: DataFrame) -> DataFrame:
    """
    Your VFM:
      VFM = (WeightedQuality * log(1+reviews) * log(1+guests) * log(1+amenity_count)) / rel_price_region
    """
    return (
        df
        .withColumn("amenity_count", F.coalesce(F.col("amenities_unique_values"), F.lit(0.0)))
        .withColumn("review_log", F.log1p(F.col("reviews_num")))
        .withColumn("guest_log", F.log1p(F.col("guests_num")))
        .withColumn("amenity_log", F.log1p(F.col("amenity_count")))
        .withColumn(
            "vfm_score",
            F.when(
                F.col("rel_price_region") > 0,
                (F.col("WeightedQuality") * F.col("review_log") * F.col("guest_log") * F.col("amenity_log")) / F.col("rel_price_region")
            ).otherwise(F.lit(0.0))
        )
    )


def build_listings_features_with_artifacts(
    listings_clean: DataFrame,
    kmeans_model: Any,
    pca_weights: List[float],
    region_price_stats: DataFrame,
) -> DataFrame:
    base = build_listings_feature_base(listings_clean)

    with_region = apply_kmeans_region_model(base, kmeans_model)
    with_rel_price = add_region_relative_price_from_stats(with_region, region_price_stats, price_col="total_price")

    with_quality = add_weighted_quality_from_weights(with_rel_price, pca_weights)

    final = add_vfm_metric(with_quality)

    # Final feature set used
    return (
        final.select(
            "listing_id",
            "l_lat", "l_lon",
            "total_price",
            "listing_title",
            "description",
            # listing quality
            "rating_overall",
            "category_rating_avg",
            "category_rating_min",
            "reviews_num",
            "is_superhost_num",
            "guests_num",
            # amenities
            "amenities_groups",
            "amenities_total_items",
            "amenities_unique_values",
            "has_wifi",
            "has_pool",
            "has_gym",
            # learned / derived
            "region_id",
            "median_price_region",
            "rel_price_region",
            "WeightedQuality",
            "vfm_score",
        )
        .filter("listing_id IS NOT NULL AND l_lat IS NOT NULL AND l_lon IS NOT NULL")
    )


In [0]:
# create listing-event pair table


def build_listing_event_pairs(
    listings_feats: DataFrame,
    venues_clean: DataFrame,
    events_clean: DataFrame,
    max_radius_km: float,
    lookahead_days: int,
) -> DataFrame:
    venues_feats = (
        venues_clean
        .select(
            "venue_id",
            F.col("venue_name"),
            F.col("lat").cast("double").alias("v_lat"),
            F.col("lon").cast("double").alias("v_lon"),
            F.col("capacity").cast("double").alias("venue_capacity"),
        )
        .filter("venue_id IS NOT NULL AND v_lat IS NOT NULL AND v_lon IS NOT NULL")
    )

    lv = (
        listings_feats.alias("l")
        .crossJoin(venues_feats.alias("v"))
        .withColumn(
            "distance_km",
            haversine_km(F.col("l.l_lat"), F.col("l.l_lon"), F.col("v.v_lat"), F.col("v.v_lon"))
        )
        .filter(F.col("distance_km") <= F.lit(max_radius_km))
    )

    today = F.current_date()

    lv_events = (
        lv.join(events_clean.drop("venue_name"), on="venue_id", how="inner")
          .withColumn("days_until_event", F.datediff(F.col("event_date"), today).cast("double"))
          .filter((F.col("days_until_event") >= 0) & (F.col("days_until_event") <= F.lit(lookahead_days)))
          .withColumn("is_weekend", F.dayofweek(F.col("event_date")).isin([6, 7]).cast("double"))
    )

    return lv_events

In [0]:
# Model training (RandomForest + CV)

def train_price_model(
    ml_df: DataFrame,
    seed: int,
    cv_folds: int,
    max_depth_grid: List[int],
    num_trees_grid: List[int],
) -> Tuple[PipelineModel, Dict[str, float], List[Tuple[str, float]]]:
    """Train a RF regressor with CV. Returns (model, metrics, feature_importances)."""

    indexer = StringIndexer(inputCol="event_type", outputCol="event_type_idx", handleInvalid="keep")

    feature_cols = [
        # event/venue context
        "distance_km",
        "venue_capacity",
        "days_until_event",
        "is_weekend",
        # listing quality
        "rating_overall",
        "category_rating_avg",
        "category_rating_min",
        "reviews_num",
        "is_superhost_num",
        "guests_num",
        # amenities
        "amenities_groups",
        "amenities_total_items",
        "amenities_unique_values",
        "has_wifi",
        "has_pool",
        "has_gym",
        # score metrics
        "vfm_score",
        # event type
        "event_type_idx",
    ]

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    rf = RandomForestRegressor(
        featuresCol="features",
        labelCol="label",
        predictionCol="prediction",
        seed=seed,
    )

    pipeline = Pipeline(stages=[indexer, assembler, rf])

    paramGrid = (
        ParamGridBuilder()
        .addGrid(rf.maxDepth, max_depth_grid)
        .addGrid(rf.numTrees, num_trees_grid)
        .build()
    )

    evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    evaluator_mae  = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")

    train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=seed)

    cv = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator_rmse,
        numFolds=cv_folds,
        seed=seed,
    )

    cv_model = cv.fit(train_df)
    preds = cv_model.transform(test_df)

    rmse = evaluator_rmse.evaluate(preds)
    mae  = evaluator_mae.evaluate(preds)

    metrics = {"rmse": float(rmse), "mae": float(mae)}

    # Feature importance (best model's RF stage)
    best_pipeline_model = cv_model.bestModel
    rf_model = best_pipeline_model.stages[-1]
    importances = rf_model.featureImportances.toArray().tolist()
    fi = sorted(zip(feature_cols, importances), key=lambda x: x[1], reverse=True)

    return best_pipeline_model, metrics, fi


In [0]:
# Artifact saving utilities

import json

def save_json_to_dbfs(path: str, obj: dict) -> None:
    dbutils.fs.mkdirs(path.rsplit("/", 1)[0])
    dbutils.fs.put(path, json.dumps(obj, indent=2), overwrite=True)


def save_region_stats_delta(region_stats: DataFrame, path: str) -> None:
    (region_stats
     .write
     .mode("overwrite")
     .format("delta")
     .save(path))


In [0]:

# Train & save once

# configure
configure_azure_abfss_with_sas(spark, STORAGE_ACCOUNT, sas_token="sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D")
print("finished step 1")

# load datasets
airbnb_df = spark.read.parquet(AIRBNB_PARQUET_PATH)
events_raw = load_events_raw(spark, EVENTS_CSV_PATH)
print("finished step 2")

# Clean datasets 
venues_clean = build_venues_clean(events_raw)
events_clean = build_events_clean(events_raw, venues_clean)
listings_clean = build_listings_clean(airbnb_df)


safe_overwrite_table(venues_clean, VENUES_TABLE)
safe_overwrite_table(events_clean, EVENTS_TABLE)
safe_overwrite_table(listings_clean, LISTINGS_TABLE)
print("Saved cleaned tables:", VENUES_TABLE, EVENTS_TABLE, LISTINGS_TABLE)
print("US-only listings count:", listings_clean.count())
print("finished step 3")

# Fit learned artifacts (KMeans + PCA)
base_for_fitting = build_listings_feature_base(listings_clean).cache()

kmeans_model = fit_kmeans_region_model(
    listings_feats_base=base_for_fitting,
    k=150,
    sample_fraction=0.2,
    seed=SEED,
    max_iter=25,
)

pca_model = fit_pca_quality_model(
    listings_with_cat_cols=base_for_fitting,
    sample_fraction=0.2,
    seed=SEED,
)
pca_weights = pca_weights_from_model(pca_model)
print("finished step 4")

# Region price (median per region)
with_region_tmp = apply_kmeans_region_model(base_for_fitting, kmeans_model)
region_price_stats = compute_region_price_stats(with_region_tmp, price_col="total_price")
print("finished step 5")

# Build final features 
listings_feats = build_listings_features_with_artifacts(
    listings_clean=listings_clean,
    kmeans_model=kmeans_model,
    pca_weights=pca_weights,
    region_price_stats=region_price_stats,
)
print("finished step 6")

# Pair listings with upcoming events
lv_events = build_listing_event_pairs(
    listings_feats=listings_feats,
    venues_clean=venues_clean,
    events_clean=events_clean,
    max_radius_km=MAX_RADIUS_KM,
    lookahead_days=LOOKAHEAD_DAYS,
)


safe_overwrite_table(lv_events, PAIR_TABLE)
print("Saved pair table:", PAIR_TABLE)
print("finished step 7")

# 8) Train price model (RandomForest + CV)
ml_df = (
    lv_events
    .withColumn("label", F.col("total_price").cast("double"))  # label: current listing price
    .withColumn("venue_capacity", F.coalesce(F.col("venue_capacity"), F.lit(0.0)))
    .withColumn("days_until_event", F.coalesce(F.col("days_until_event"), F.lit(LOOKAHEAD_DAYS)).cast("double"))
    .withColumn("distance_km", F.col("distance_km").cast("double"))
    .dropna(subset=["label", "event_type"])
)

price_model, metrics, feature_importance = train_price_model(
    ml_df=ml_df,
    seed=SEED,
    cv_folds=CV_FOLDS,
    max_depth_grid=RF_MAX_DEPTH_GRID,
    num_trees_grid=RF_NUM_TREES_GRID,
)

print("Model metrics:", metrics)
print("\nTop feature importances:")
for name, val in feature_importance[:15]:
    print(f"{name:24s} {val:.6f}")
print("finished step 8")

# 9) Save artifacts
dbutils.fs.mkdirs(ARTIFACT_BASE)

kmeans_model.write().overwrite().save(KMEANS_MODEL_PATH)
pca_model.write().overwrite().save(PCA_MODEL_PATH)
price_model.write().overwrite().save(PRICE_MODEL_PATH)

save_region_stats_delta(region_price_stats, REGION_STATS_PATH)

config_obj = {
    "run_id": RUN_ID,
    "created_utc": datetime.utcnow().isoformat() + "Z",
    "us_bounds": {
        "lat_min": US_LAT_MIN, "lat_max": US_LAT_MAX,
        "lon_min": US_LON_MIN, "lon_max": US_LON_MAX,
    },
    "data_sources": {
        "events_csv_path": EVENTS_CSV_PATH,
        "airbnb_parquet_path": AIRBNB_PARQUET_PATH,
    },
    "tables": {
        "venues_table": VENUES_TABLE,
        "events_table": EVENTS_TABLE,
        "listings_table": LISTINGS_TABLE,
        "pair_table": PAIR_TABLE,
    },
    "params": {
        "max_radius_km": MAX_RADIUS_KM,
        "lookahead_days": LOOKAHEAD_DAYS,
        "seed": SEED,
        "cv_folds": CV_FOLDS,
        "rf_max_depth_grid": RF_MAX_DEPTH_GRID,
        "rf_num_trees_grid": RF_NUM_TREES_GRID,
        "kmeans_k": 150,
        "kmeans_sample_fraction": 0.2,
        "kmeans_max_iter": 25,
        "pca_sample_fraction": 0.2,
        "pca_category_cols": CATEGORY_COLS,
    },
    "artifacts": {
        "artifact_base": ARTIFACT_BASE,
        "kmeans_model_path": KMEANS_MODEL_PATH,
        "pca_model_path": PCA_MODEL_PATH,
        "price_model_path": PRICE_MODEL_PATH,
        "region_stats_delta_path": REGION_STATS_PATH,
    },
    "pca_weights": pca_weights,
    "metrics": metrics,
}
print("finished step 9")
save_json_to_dbfs(CONFIG_JSON_PATH, config_obj)

print("\n Saved artifacts to:", ARTIFACT_BASE)
print("  - KMeans:", KMEANS_MODEL_PATH)
print("  - PCA:", PCA_MODEL_PATH)
print("  - Price model:", PRICE_MODEL_PATH)
print("  - Region stats (Delta):", REGION_STATS_PATH)
print("  - Config JSON:", CONFIG_JSON_PATH)


finished step 1
finished step 2
Saved cleaned tables: default.venues_clean default.events_clean default.listings_clean
US-only listings count: 747185
finished step 3


Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

🏃 View run exultant-dog-838 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416/runs/40cbbfb3b89b42a3976961bcd3a8637b
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416
🏃 View run handsome-mule-649 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416/runs/86afec6fd7874268944b1789e2c0f3b0
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416
finished step 4
finished step 5
finished step 6
Saved pair table: default.listing_upcoming_events
finished step 7
🏃 View run thoughtful-frog-639 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416/runs/17f5e72b1e5046f39fc05183f83b2714
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416
🏃 View run colorful-crab-421 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416

Downloading artifacts:   0%|          | 0/70 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/35 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

🏃 View run inquisitive-ape-108 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416/runs/d035552d7371401dbe86066f4785adc2
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/200762682747416
Model metrics: {'rmse': 3439.11090792496, 'mae': 1011.4834767695479}

Top feature importances:
vfm_score                0.325584
guests_num               0.102390
amenities_unique_values  0.072303
rating_overall           0.070877
amenities_total_items    0.070841
venue_capacity           0.058418
distance_km              0.055408
reviews_num              0.054188
category_rating_avg      0.040573
amenities_groups         0.039899
category_rating_min      0.033917
has_gym                  0.020611
event_type_idx           0.020579
is_superhost_num         0.015334
has_pool                 0.013203
finished step 8
finished step 9
Wrote 1901 bytes.

 Saved artifacts to: /Workspace/Users/odelia.dov@campus.technion.ac.il/20260125_17045