## Loading The Data

In [0]:
import utils
import importlib
importlib.reload(utils) # forces the system to load the updated version
from utils import *

In [0]:
ret_data = load_data_and_check_leak(spark, base_path="/FileStore/tables/paris_project/gold/",  test_needed=True,
                                    local_path="local_train_pool_v5_with_paris_features.parquet",
                                    test_path="test_set_v5_with_paris_features.parquet",
                                    global_path="global_train_features_v4.parquet")

global_train_df, paris_test_df, local_train_df = ret_data['global_train'], ret_data['test'], ret_data['local_train']

Loading data from: /FileStore/tables/paris_project/gold/...

✅ Global Train Count: 1,146,517
✅ Local Train Count:  40,333
✅ Test Set Count:      10,935
✅ No Data Leakage Found (Verified via Broadcast Join)


In [0]:
# 1. Analyze YOUR saved data
my_data_path = "/FileStore/tables/paris_project/gold/global_train_features_v4.parquet"
analyze_storage_layout(dbutils, my_data_path)

--- Analysis for: /FileStore/tables/paris_project/gold/global_train_features_v4.parquet ---
Total Parquet Files: 76
Total Size: 0.06 GB
Average File Size: 0.74 MB
Max File Size: 1.46 MB


## Train

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Imputer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, when, log1p, expm1
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import split, col, element_at
from pyspark.sql.functions import lower
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType

### Required Preprocecing

In [0]:
# --- Configuration ---
MODEL_NAME = "ֹglobal_pricing_spark_v5" # CHANGE TO READY MODEL NAME
TRAIN_SET = global_train_df #CHANGE TO READY DATASET
TARGET_COL = "log_price"
NUMERIC_COLS =  ['amenities_count_raw','is_supperhost', 'ratings', 'lat','long', 'guests', 'availability_365', 'hosts_year', 'property_number_of_reviews', 'rating_Accuracy', 'rating_Cleanliness', 'rating_Location', 'rating_Value', 'rating_Check-in', 'rating_Communication', 'num_beds', 'num_baths', 'has_dishwasher', 'has_washer']
CATEGORICAL_COLS = ['country',  'city', 'region', 'listing_type'] 
#VECTOR_COLS = ['shared_amenities_vector']
#SHARED_AMENITIES_COL = ['Baking sheet', 'Bathtub', 'Coffee maker', 'Dining table', 'Dishwasher', 'First aid kit', 'Hair dryer', 'Hangers', 'Hot water', 'Iron', 'Microwave', 'Oven', 'Refrigerator', 'Shampoo', 'Stove', 'TV', 'Toaster', 'Washer', 'Wifi']

#Shared: ['has_bakingsheet', 'has_bathtub', 'has_coffeemaker', 'has_diningtable', 'has_dishwasher', 'has_firstaidkit', 'has_hairdryer', 'has_hangers', 'has_hotwater', 'has_iron', 'has_microwave', 'has_oven', 'has_refrigerator', 'has_shampoo', 'has_stove', 'has_tv', 'has_toaster', 'has_washer', 'has_wifi']

In [0]:
# NOTE: This has placeholders needs to be filled in: [host_number_of_reviews, pos_reviews, is_guest_favorite]

def adapt_paris_to_global(local_df):
    print("🔧 Adapting Paris Local Set to Global Model Schema...")

    # 2. Complex Mapping & Transformation Pipeline
    adapted_df = (local_df
        # --- A. Simple Copying (Local -> Global, keep originals) ---
        .withColumn("lat", F.col("latitude"))
        .withColumn("long", F.col("longitude"))
        .withColumn("guests", F.col("accommodates"))
        .withColumn("num_beds", F.col("beds"))
        .withColumn("neighbourhood", F.col("neighbourhood_cleansed"))
        .withColumn("property_number_of_reviews", F.col("number_of_reviews"))
        .withColumn("num_baths", F.col("bathrooms"))
        # --- B. Ratings Copying (keep originals) ---
        .withColumn("ratings", F.col("review_scores_rating"))
        .withColumn("rating_Accuracy", F.col("review_scores_accuracy"))
        .withColumn("rating_Cleanliness", F.col("review_scores_cleanliness"))
        .withColumn("rating_Check-in", F.col("review_scores_checkin"))
        .withColumn("rating_Communication", F.col("review_scores_communication"))
        .withColumn("rating_Location", F.col("review_scores_location"))
        .withColumn("rating_Value", F.col("review_scores_value"))
        .withColumn("hosts_year", (F.col("host_since")))
        .withColumn("is_supperhost", F.col("host_is_superhost"))
        .withColumn("amenities_count_raw", F.col("amenities_count"))        
        
        # --- E. Location Hardcoding (Critical for Categorical Features) ---
        # Since this is the Paris dataset, we hardcode the location info
        .withColumn("country", F.lit("france"))
        .withColumn("city", F.lit("paris"))
        .withColumn("region", F.lit("ile-de-france"))
    )
        
    print("✅ Adaptation Mapping Complete.")
    print(f"Original Local Columns: {len(local_df.columns)}")
    print(f"Adapted Local Columns:  {len(adapted_df.columns)}")

    # Validation: Check for missing features required by the Global Model
    required_global = set(NUMERIC_COLS + CATEGORICAL_COLS)
    existing = set(adapted_df.columns)
    missing = required_global - existing

    if missing:
        print(f"🚨 WARNING: Still missing columns for Global Model: {missing}")
    else:
        print("✨ SUCCESS: All Global Features are present in the Adapted Local Set!")

    return adapted_df

In [0]:
def clean_cast_select(df, select=True):
    # Casting & Cleaning (Distributed)
    for c in NUMERIC_COLS:
        if c in df.columns:
            df_clean = df.withColumn(c, F.col(c).cast("double"))
        else:
            print(f"🚨 WARNING: Column {c} not found in the dataset.")

    # Lowercase 
    for c in CATEGORICAL_COLS:
        if c in df.columns:
            df_clean = df_clean.withColumn(c, lower(col(c).cast("string")))
        else:
            print(f"🚨 WARNING: Column {c} not found in the dataset.")

    # Fill nulls in categorical columns
    df_clean = df_clean.fillna('other', subset=CATEGORICAL_COLS)


    cols_to_use = NUMERIC_COLS + CATEGORICAL_COLS  + [TARGET_COL, 'id']
    return df_clean.select(cols_to_use) if select else df_clean

In [0]:
# --- 1. Clean Data (Distributed)---
df_spark = clean_cast_select(TRAIN_SET)
print("Train set sample:")
display(df_spark.limit(5))

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Imputer
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, rand
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# --- 2. Build Preprocessing Stages (Common for all models) ---
numeric_cols = [f.name for f in df_spark.schema.fields if isinstance(f.dataType, (DoubleType, IntegerType)) and f.name not in ['price', 'log_price', 'id']]
categorical_cols = [f.name for f in df_spark.schema.fields if isinstance(f.dataType, StringType) and f.name not in ['price', 'log_price', 'id']]
vector_cols = [f.name for f in df_spark.schema.fields if isinstance(f.dataType, VectorUDT) and f.name not in ['price', 'log_price', 'id']]

stages = []

# Imputer
imputed_numeric_cols = [f"imputed_{c}" for c in numeric_cols]
imputer = Imputer(inputCols=numeric_cols, outputCols=imputed_numeric_cols).setStrategy("median")
stages.append(imputer)

# Categorical
encoded_categorical_cols = []
for cat_col in categorical_cols:
    indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_idx", handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[f"{cat_col}_idx"], outputCols=[f"{cat_col}_vec"])
    stages.append(indexer)
    stages.append(encoder)
    encoded_categorical_cols.append(f"{cat_col}_vec")

# Assembler
assembler = VectorAssembler(
    inputCols=imputed_numeric_cols + encoded_categorical_cols + vector_cols,
    outputCol="features",
    handleInvalid="skip"
)
stages.append(assembler)

In [0]:
test_df_raw = adapt_paris_to_global(paris_test_df)
test_df_spark = clean_cast_select(test_df_raw)

🔧 Adapting Paris Local Set to Global Model Schema...
✅ Adaptation Mapping Complete.
Original Local Columns: 89
Adapted Local Columns:  109
✨ SUCCESS: All Global Features are present in the Adapted Local Set!


### Model Selection & Hyper Parameter Tuning

In [0]:
# ==============================================================================
# DEFINE MODEL CANDIDATES
# ==============================================================================
# Note: Ensure 'features' column exists in your data before this!
model_candidates = [
    {
        "name": "GBT_Deep",
        "estimator": GBTRegressor(featuresCol="features", labelCol="log_price", maxIter=50, maxDepth=7, seed=42)
    },
    {
        "name": "GBT_Shallow",
        "estimator": GBTRegressor(featuresCol="features", labelCol="log_price", maxIter=20, maxDepth=3, seed=42)
    },
    {
        "name": "GBT_Robust_MAE", 
        "estimator": GBTRegressor(featuresCol="features", labelCol="log_price", lossType="absolute", maxIter=50, maxDepth=7, seed=42)
    },
    {
        "name": "RandomForest_Baseline",
        "estimator": RandomForestRegressor(featuresCol="features", labelCol="log_price", numTrees=50, maxDepth=10, seed=42)
    }
]

In [0]:
import mlflow
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F

# ==============================================================================
# 1. PERFORMANCE OPTIMIZATION (CRITICAL FOR SMALL DATA)
# ==============================================================================
# Since your data is ~50MB, we force it into 4 partitions so all cores work.
# If we don't do this, 1 core does everything and the others sleep.
train_df, val_df = df_spark.randomSplit([0.8, 0.2], seed=42)
print("⚡ Optimizing Data Layout...")
train_df = train_df.repartition(4).cache()
val_df = val_df.repartition(4).cache()

# Trigger an action to materialize the cache immediately
print(f"   -> Training Rows: {train_df.count()}")
print(f"   -> Validation Rows: {val_df.count()}")

# ==============================================================================
# 1. THE TRAINING LOOP
# ==============================================================================
summary_results = []
EXPERIMENT_PATH = "/Users/ron.bartal@campus.technion.ac.il/Paris_Global_Spark"

mlflow.set_experiment(EXPERIMENT_PATH)
print(f"\n⚔️ Starting Battle Royale in: {EXPERIMENT_PATH}")

with mlflow.start_run(run_name="Global_Model_Comparison") as parent_run:
    
    for candidate in model_candidates:
        model_name = candidate['name']
        print(f"   🏃 Training: {model_name}...")
        
        with mlflow.start_run(run_name=f"Trial_{model_name}", nested=True):
            
            # A. Build Pipeline
            # We take the pre-defined prep stages and add the specific model at the end 
            # Ensure 'stages' includes your VectorAssembler!
            pipeline = Pipeline(stages=stages + [candidate['estimator']])
            
            # B. Train (Fit)
            model = pipeline.fit(train_df)
            
            # C. Predict (Transform)
            preds = model.transform(val_df)
            
            # D. Evaluate (Distributed Calculation - Fast!)
            # 1. R2 Score
            evaluator_r2 = RegressionEvaluator(labelCol="log_price", predictionCol="prediction", metricName="r2")
            r2_score = evaluator_r2.evaluate(preds)
            
            # 2. Business Metrics (MAE, MAPE, RMSE) calculated via Spark SQL
            metrics_df = preds.selectExpr(
                "abs(expm1(log_price) - expm1(prediction)) as error_dollar",
                "abs((expm1(log_price) - expm1(prediction)) / expm1(log_price)) * 100 as mape_pct"
            )
            
            stats = metrics_df.selectExpr(
                "mean(error_dollar) as mae",
                "percentile_approx(error_dollar, 0.5) as med_ae",
                "mean(mape_pct) as mape",
                "sqrt(mean(pow(error_dollar, 2))) as rmse"
            ).first()
            
            # E. Log to MLflow
            # Log Parameters
            mlflow.log_params(candidate['estimator'].extractParamMap())
            
            # Log Metrics
            mlflow.log_metrics({
                "MAE": stats['mae'],
                "MedAE": stats['med_ae'],
                "MAPE": stats['mape'],
                "RMSE": stats['rmse'],
                "R2_log": r2_score
            })
            
            # Save to local summary list
            summary_results.append({
                "Model": model_name,
                "MAE ($)": stats['mae'],
                "MedAE ($)": stats['med_ae'],
                "MAPE (%)": stats['mape'],
                "RMSE ($)": stats['rmse'],
                "R2 (log)": r2_score
            })
            
            print(f"      ✅ Done. MAE: ${stats['mae']:.2f} | R2: {r2_score:.3f}")

# ==============================================================================
# 4. FINAL LEADERBOARD
# ==============================================================================
leaderboard = pd.DataFrame(summary_results).sort_values(by="MAE ($)")
print("\n🏆 FINAL RESULTS LEADERBOARD:")
display(leaderboard)

### Train Full Model & valuate On Test Set

In [0]:
# --- USER INPUT REQUIRED ---
CHOSEN_ARCHITECTURE = "GBT_Deep" # <--- #TODO: Update this based on the leaderboard above!
chosen_model_name = f"{MODEL_NAME} ({CHOSEN_ARCHITECTURE})"
# ---------------------------

print(f"🚀 Training Final Production Model: {chosen_model_name}...")

# 1. Find the estimator config
selected_candidate = next(c for c in model_candidates if c['name'] == CHOSEN_ARCHITECTURE)

# 2. Retrain on FULL Data
final_pipeline = Pipeline(stages=stages + [selected_candidate['estimator']])
full_data = df_spark.repartition(4).cache()
print(f"✅ Training on {full_data.count()} rows...")
with mlflow.start_run(run_name="Final_Model_For_Eval"):
    final_model = final_pipeline.fit(full_data)
    #preds_test = final_model.transform(test_df_spark)
    
    # 3. Log Model
    mlflow.spark.log_model(final_model, "model", registered_model_name=chosen_model_name)

🔧 Adapting Paris Local Set to Global Model Schema...
✅ Adaptation Mapping Complete.
Original Local Columns: 89
Adapted Local Columns:  109
✨ SUCCESS: All Global Features are present in the Adapted Local Set!
🚀 Training Final Production Model: ֹglobal_pricing_spark_v5 (GBT_Deep)...
✅ Training on 1146517 rows...


2026/01/29 11:05:18 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/115 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Registered model 'ֹglobal_pricing_spark_v5 (GBT_Deep)' already exists. Creating a new version of this model...
2026/01/29 11:06:20 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: ֹglobal_pricing_spark_v5 (GBT_Deep), version 5
Created version '5' of model 'ֹglobal_pricing_spark_v5 (GBT_Deep)'.


🏃 View run Final_Model_For_Eval at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/3630736412863185/runs/d3485b339bda4ed28c1cce615457154b
🧪 View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/3630736412863185


In [0]:
import mlflow
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator

# ==========================================
# 1. CONFIGURATION
# ==========================================
raise Exception("TODO: Paste your Run ID below (Found in MLflow UI -> Experiment -> Run -> Artifacts) or exeption will raise")
RUN_ID = ""
MODEL_URI = f"runs:/{RUN_ID}/model"

# =========================================
# 2. Preprocess test data
# =========================================
print("🔧 Preprocessing test data...")
adapted_test_df = adapt_paris_to_global(paris_test_df)
test_clean = clean_cast_select(adapted_test_df)

# ==========================================
# 3. LOAD MODEL FROM STORAGE
# ==========================================
print(f"📥 Loading model from MLflow: {MODEL_URI}...")
final_model = mlflow.spark.load_model(MODEL_URI)
final_model_name = type(final_model.stages[-1]).__name__
print(f"✅ Model loaded successfully into memory: {final_model_name}")

# ==========================================
# 4. PREDICT & EVALUATE
# ==========================================
print("🚀 Running Inference on Test Set...")
preds_test = final_model.transform(test_clean)

# Calculate Metrics
metrics_df = preds_test.selectExpr(
    "abs(expm1(log_price) - expm1(prediction)) as error_dollar",
    "abs((expm1(log_price) - expm1(prediction)) / expm1(log_price)) * 100 as mape_pct"
)

stats = metrics_df.selectExpr(
    "mean(error_dollar) as mae",
    "percentile_approx(error_dollar, 0.5) as med_ae",
    "mean(mape_pct) as mape",
    "sqrt(mean(pow(error_dollar, 2))) as rmse"
).first()

# R2 Score
evaluator_r2 = RegressionEvaluator(labelCol="log_price", predictionCol="prediction", metricName="r2")
r2_score = evaluator_r2.evaluate(preds_test)

print("\n🏆 RESULTS FROM LOADED MODEL:")
print(f"   R2 (Log): {r2_score:.4f}")
print(f"   MAE:      ${stats['mae']:.2f}")
print(f"   MedAE:    ${stats['med_ae']:.2f} (Median Error)")
print(f"   MAPE:     {stats['mape']:.2f}%")
print(f"   RMSE:     ${stats['rmse']:.2f}")

# Optional: Generate plots again if needed
# log_visualizations(preds_test, title_prefix="Reloaded_Model_Check")
 # 4. Generate Standard Plots (Actual vs Pred)
log_visualizations(preds_test, final_model=final_model, chosen_model_name=final_model_name,title_prefix="Final_Production")

[0;31m---------------------------------------------------------------------------[0m
[0;31mException[0m                                 Traceback (most recent call last)
File [0;32m<command-5658576270839943>, line 17[0m
[1;32m     15[0m full_data [38;5;241m=[39m df_spark[38;5;241m.[39mrepartition([38;5;241m4[39m)[38;5;241m.[39mcache()
[1;32m     16[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124m✅ Training on [39m[38;5;132;01m{[39;00mfull_data[38;5;241m.[39mcount()[38;5;132;01m}[39;00m[38;5;124m rows...[39m[38;5;124m"[39m)
[0;32m---> 17[0m [38;5;28;01mwith[39;00m mlflow[38;5;241m.[39mstart_run(run_name[38;5;241m=[39m[38;5;124m"[39m[38;5;124mFinal_Model_For_Eval[39m[38;5;124m"[39m):
[1;32m     18[0m     final_model [38;5;241m=[39m final_pipeline[38;5;241m.[39mfit(full_data)
[1;32m     19[0m     [38;5;66;03m#preds_test = final_model.transform(test_df_spark)[39;00m
[1;32m     20[0m     
[1;32m     21[0m     [38

In [0]:
import mlflow
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator

# ==========================================
# 1. CONFIGURATION
# ==========================================
raise Exception("TODO: Paste your Run ID below (Found in MLflow UI -> Experiment -> Run -> Artifacts)")
RUN_ID = ""
MODEL_URI = f"runs:/{RUN_ID}/model"
OUTPUT_LOCAL_TRAIN_WITH_GLOBAL_PRED = "/FileStore/tables/paris_project/diamond/local_train_with_global_pred_v7.parquet"
OUTPUT_TEST_WITH_PRED = "/FileStore/tables/paris_project/diamond/test_set_with_global_pred_v7.parquet"
local_train_raw = local_train_df
test_raw = paris_test_df

# ==========================================
# 2. LOAD MODEL FROM STORAGE
# ==========================================
print(f"📥 Loading model from MLflow: {MODEL_URI}...")
final_model = mlflow.spark.load_model(MODEL_URI)
print("✅ Model loaded successfully into memory.")

# =========================================
# 3. Preprocess test data
# =========================================
print("🔧 Preprocessing train data...")
final_cols = local_train_raw.columns + ["global_pred_log"]
print(final_cols)
adapted_local_train = adapt_paris_to_global(local_train_raw)
local_train_clean = clean_cast_select(adapted_local_train, select=False)

# Predict using the loaded/trained Global Model

# Transform
train_preds = final_model.transform(local_train_clean)

# Just drop the heavy vector column ('features') and save everything else.
(train_preds
 .withColumnRenamed("prediction", "global_pred_log")
 .select(final_cols)
 .write.mode("overwrite").parquet(OUTPUT_LOCAL_TRAIN_WITH_GLOBAL_PRED)
)
print(f"✅ Saved Local Train with Global Preds to: {OUTPUT_LOCAL_TRAIN_WITH_GLOBAL_PRED}")


# --- 3. Process & Save Test Set ---
print("🔧 Preprocessing test data...")
adapted_test = adapt_paris_to_global(test_raw)
test_clean = clean_cast_select(adapted_test, select=False)

print("🚀 Processing Test Set...")
test_preds = final_model.transform(test_clean)

(test_preds
 .withColumnRenamed("prediction", "global_pred_log")
 .select(final_cols)
 .write.mode("overwrite").parquet(OUTPUT_TEST_WITH_PRED)
)
print(f"✅ Saved Test Set with Global Preds to: {OUTPUT_TEST_WITH_PRED}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mException[0m                                 Traceback (most recent call last)
File [0;32m<command-5658576270839943>, line 17[0m
[1;32m     15[0m full_data [38;5;241m=[39m df_spark[38;5;241m.[39mrepartition([38;5;241m4[39m)[38;5;241m.[39mcache()
[1;32m     16[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124m✅ Training on [39m[38;5;132;01m{[39;00mfull_data[38;5;241m.[39mcount()[38;5;132;01m}[39;00m[38;5;124m rows...[39m[38;5;124m"[39m)
[0;32m---> 17[0m [38;5;28;01mwith[39;00m mlflow[38;5;241m.[39mstart_run(run_name[38;5;241m=[39m[38;5;124m"[39m[38;5;124mFinal_Model_For_Eval[39m[38;5;124m"[39m):
[1;32m     18[0m     final_model [38;5;241m=[39m final_pipeline[38;5;241m.[39mfit(full_data)
[1;32m     19[0m     [38;5;66;03m#preds_test = final_model.transform(test_df_spark)[39;00m
[1;32m     20[0m     
[1;32m     21[0m     [38