In [0]:
# 1. Setup Configuration
storage_account = ""
storage_account = ""
container_airbnb = "airbnb"
container_booking = "booking"
# New SAS tokens provided in the update
airbnb_sas = ""
booking_sas = ""

# Helper function to inject the token
def set_spark_token(token):
    spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
    spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
    spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", token)

# --- LOAD AIRBNB DATA ---
set_spark_token(airbnb_sas)
# Using abfss:// protocol to match the dfs.core.windows.net config
airbnb_path = f"abfss://airbnb@{storage_account}.dfs.core.windows.net/airbnb_1_12_parquet"
airbnb_df = spark.read.parquet(airbnb_path)
print(f"Airbnb Loaded: {airbnb_df.count()} rows")

# --- LOAD BOOKING DATA ---
# We must overwrite the config with the Booking token to access the second container
set_spark_token(booking_sas)
# Assuming the file structure matches Airbnb. If this fails, check the exact folder name in the repo.
fname = "booking_1_9.parquet" 
booking_path = f"abfss://{container_booking}@{storage_account}.dfs.core.windows.net/{fname}"
booking_df = spark.read.parquet(booking_path)
print(f"Booking Loaded: {booking_df.count()} rows")

Airbnb Loaded: 2098880 rows
Booking Loaded: 3239391 rows


In [0]:
from pyspark.sql.functions import col, lower
import os

# --- 1. SETUP CONFIGURATION ---
storage_account = ""
# Booking SAS Token (Different from Airbnb!)
booking_sas = ""

def set_booking_token():
    spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
    spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
    spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", booking_sas)

# Apply Token
set_booking_token()

# --- 2. LOAD DATA ---
# Using the filename 'booking_1_9.parquet' from your previous successful attempt
booking_path = f"abfss://booking@{storage_account}.dfs.core.windows.net/booking_1_9.parquet"
df_booking = spark.read.parquet(booking_path)

print("=== Booking.com Schema ===")
df_booking.printSchema()

# --- 3. IDENTIFY LOCATION COLUMN ---
# We check for likely column names automatically
columns = [c.lower() for c in df_booking.columns]
target_col = None

if "address" in columns:
    target_col = "address"
elif "hotel_address" in columns:
    target_col = "hotel_address"
elif "country" in columns:
    target_col = "country"
elif "location" in columns:
    target_col = "location"

print(f"\nDetected Filtering Column: '{target_col}'")

# --- 4. FILTER & SAVE ---
if target_col:
    # Robust filter for US/USA/United States
    df_usa_booking = df_booking.filter(
        lower(col(target_col)).contains("united states") | 
        lower(col(target_col)).contains(" usa") |
        lower(col(target_col)).endswith("usa")
    )
    
    count = df_usa_booking.count()
    print(f"Filtered Row Count: {count}")

    if count > 0:
        # Paths
        temp_folder = "Lodging_data/booking_usa_temp.parquet"
        final_path  = "Lodging_data/booking_usa_FINAL.parquet"

        # 1. Save to temp folder
        df_usa_booking.coalesce(1).write.mode("overwrite").parquet(temp_folder)
        
        # 2. Rename/Move the single file to the final location
        files = dbutils.fs.ls(temp_folder)
        part_file = [f.name for f in files if f.name.startswith("part-")][0]
        dbutils.fs.cp(f"{temp_folder}/{part_file}", final_path)
        
        # 3. Cleanup temp folder (Optional, keeps things clean)
        dbutils.fs.rm(temp_folder, recurse=True)

        print(f"\nSUCCESS! File saved at:\n{final_path}")
        
        # Verify
        print("\nSample Data:")
        df_usa_booking.select(target_col).show(5, truncate=False)
        
    else:
        print("Rows is 0. Check if the column selected actually contains 'United States'.")
else:
    print("ERROR: Could not automatically find an 'address' or 'country' column.")
    print("Please look at the Schema output above and replace 'target_col' manually in the code.")

=== Booking.com Schema ===
root
 |-- availability: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- bed_configuration: string (nullable = true)
 |    |    |-- max_number_of guests: string (nullable = true)
 |    |    |-- max_number_of_guests: long (nullable = true)
 |    |    |-- room_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- lan: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |-- country: string (nullable = true)
 |-- description: string (nullable = true)
 |-- fine_print: string (nullable = true)
 |-- hotel_id: string (nullable = true)
 |-- house_rules: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- rule: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- location: string (nullable = true)
 |-- man

USA Lodging data path

In [0]:
path = "Lodging_data/booking_usa_FINAL.parquet"

In [0]:
from pyspark.sql.functions import col, lower
import os

# --- 1. SETUP CONFIGURATION ---
storage_account = ""
# Airbnb SAS Token (From your previous context)
airbnb_sas = ""

def set_airbnb_token():
    spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
    spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
    spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", airbnb_sas)

# Apply Token
set_airbnb_token()

# --- 2. LOAD DATA ---
print("--- Loading Airbnb Data ---")
airbnb_path = f"abfss://airbnb@{storage_account}.dfs.core.windows.net/airbnb_1_12_parquet"
df_airbnb = spark.read.parquet(airbnb_path)

print("=== Airbnb Schema ===")
df_airbnb.printSchema()

# --- 3. IDENTIFY LOCATION COLUMN ---
# Auto-detect the best column to filter by
columns = [c.lower() for c in df_airbnb.columns]
target_col = None

# Priority list for Airbnb datasets
if "country" in columns:
    target_col = "country"
elif "smart_location" in columns: # Common in Airbnb scrapes
    target_col = "smart_location"
elif "location" in columns:
    target_col = "location"
elif "address" in columns:
    target_col = "address"

print(f"\nDetected Filtering Column: '{target_col}'")

# --- 4. FILTER & SAVE ---
if target_col:
    print(f"Filtering for 'United States' in column: {target_col}...")
    
    # Filter for United States / USA / US
    df_usa_airbnb = df_airbnb.filter(
        lower(col(target_col)).contains("united states") | 
        lower(col(target_col)).contains(" usa") |
        (lower(col(target_col)) == "us")
    )
    
    count = df_usa_airbnb.count()
    print(f"Filtered Row Count: {count}")

    if count > 0:
        # Define Paths
        base_folder = "Lodging_data"
        temp_folder = f"{base_folder}/airbnb_usa_temp.parquet"
        final_path  = f"{base_folder}/airbnb_usa.parquet"

        # 1. Save to temp folder (Single File)
        print("Saving to temp location...")
        df_usa_airbnb.coalesce(1).write.mode("overwrite").parquet(temp_folder)
        
        # 2. Rename/Move to final location
        print("Renaming to final .parquet file...")
        files = dbutils.fs.ls(temp_folder)
        # Find the part file (it starts with "part-")
        part_file = [f.name for f in files if f.name.startswith("part-")][0]
        dbutils.fs.cp(f"{temp_folder}/{part_file}", final_path)
        
        # 3. Cleanup
        dbutils.fs.rm(temp_folder, recurse=True)

        print(f"\n‚úÖ SUCCESS! Airbnb USA data saved at:\n{final_path}")
        
        # Verify
        print("\nSample Data:")
        df_usa_airbnb.select(target_col).show(5, truncate=False)
        
    else:
        print("‚ö†Ô∏è Warning: Filter returned 0 rows. Check the casing or column content.")
else:
    print("‚ùå ERROR: Could not automatically find a location column.")
    print("Please check the schema printed above and update 'target_col' manually.")

--- Loading Airbnb Data ---
=== Airbnb Schema ===
root
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- image: string (nullable = true)
 |-- description: string (nullable = true)
 |-- category: string (nullable = true)
 |-- availability: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- ratings: string (nullable = true)
 |-- seller_info: string (nullable = true)
 |-- breadcrumbs: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- guests: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- description_items: string (nullable = true)
 |-- category_rating: string (nullable = true)
 |-- house_rules: string (nullable = true)
 |-- details: string (nullable = true)
 |-- highlights: string (nullable = true)
 |-- arrangement_details: string (nullable = true)
 |-- amenities: string (nullable = true)

Loading airbnb and booking.com data (made my own dataframes)

Preprocessing datasets

In [0]:
# --- BLOCK 1: ADVANCED PREPROCESSING ---
from pyspark.sql.functions import col, lit, split, element_at, regexp_extract, trim, size, when, lower, regexp_replace, length
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer

# 2. Airbnb Transformation (With Smart Feature Extraction)
df_airbnb_clean = df_airbnb.select(
    lit("Airbnb").alias("source"),
    # Clean Price
    regexp_replace(col("price"), "[\$,]", "").cast("double").alias("price"),
    
    # Location Extraction
    trim(element_at(split(col("location"), ","), -2)).alias("state"),
    trim(element_at(split(col("location"), ","), -3)).alias("city"), # <--- NEW: Added City
    
    # Smart Room Type Logic (The Fix for "Stays")
    when(lower(col("name")).contains("shared room"), "Shared Room")
    .when(lower(col("name")).contains("private room"), "Private Room")
    .when(lower(col("name")).contains("hotel"), "Hotel")
    # "Entire", "House", "Apt", "Villa" all imply the whole place
    .when(lower(col("name")).rlike("entire|house|apt|apartment|villa|condo"), "Entire Home/Apt")
    .otherwise("Private Room") # Safe fallback
    .alias("room_type"),
    
    # Other Features
    regexp_replace(col("guests"), "[^0-9]", "").cast("int").alias("guests"),
    regexp_extract(col("ratings"), "([0-9]+\.[0-9]+)", 1).cast("double").alias("rating"),
    regexp_extract(col("reviews"), "([0-9]+)", 1).cast("int").alias("review_count"),
    size(split(col("amenities"), ",")).alias("amenities_count"),
    regexp_extract(col("host_rating"), "([0-9]+\.[0-9]+)", 1).cast("double").alias("host_score"),
    length(col("description")).alias("desc_length")
)

# 3. Booking Transformation (Aligning Columns)
df_booking_clean = df_booking.select(
    lit("Booking").alias("source"),
    lit(None).cast("double").alias("price"),
    trim(element_at(split(col("location"), ","), -2)).alias("state"),
    trim(element_at(split(col("location"), ","), -3)).alias("city"), # <--- NEW
    # Booking usually has good room types, but we map them to match Airbnb if needed
    col("availability")[0]["room_type"].alias("room_type"),
    col("availability")[0]["max_number_of_guests"].cast("int").alias("guests"),
    col("review_score").alias("rating"),
    col("number_of_reviews").cast("int").alias("review_count"),
    size(col("most_popular_facilities")).alias("amenities_count"),
    (col("manager_score") / 2).alias("host_score"), 
    length(col("description")).alias("desc_length")
)

# 4. Merge
df_master = df_airbnb_clean.unionByName(df_booking_clean)

# --- FILTERING & CLEANING ---

# A. Filter for Training (Must have Price)
df_with_price = df_master.filter(col("price").isNotNull())

# B. Dynamic Outlier Removal (95th Percentile)
high_limit = df_with_price.stat.approxQuantile("price", [0.95], 0.01)[0]
print(f"Dynamic Price Cutoff (95%): ${high_limit:.2f}")

df_train = df_with_price.filter(
    (col("price") > 15) &  # Increased floor slightly to avoid errors
    (col("price") <= high_limit) & 
    (col("guests") > 0)
)

# C. Clean Text Columns (Remove "United States" as state, empty strings)
print("Cleaning text fields...")
df_train = df_train.filter(
    (col("state").isNotNull()) & (length(trim(col("state"))) > 0) & (col("state") != "United States") &
    (col("room_type").isNotNull()) & (length(trim(col("room_type"))) > 0)
)

# D. Impute Missing Values
defaults = {
    "rating": 4.5, "review_count": 0, "amenities_count": 5, 
    "host_score": 4.5, "desc_length": 100, "city": "Unknown"
}
df_train = df_train.na.fill(defaults)

print(f"Ready for Training: {df_train.count():,} rows")

# Check the new Room Types!
print("\n--- New Room Type Distribution ---")
df_train.groupBy("room_type").count().orderBy("count", ascending=False).show()

Dynamic Price Cutoff (95%): $1065.00
Cleaning text fields...
Ready for Training: 1,365,151 rows

--- New Room Type Distribution ---
+---------------+-------+
|      room_type|  count|
+---------------+-------+
|   Private Room|1062632|
|Entire Home/Apt| 272085|
|          Hotel|  30434|
+---------------+-------+



Training Model

Evaluation

In [0]:
import pandas as pd
import xgboost as xgb
import joblib
import numpy as np
from pyspark.sql.functions import col, lit, split, element_at, regexp_extract, trim, size, when, lower, regexp_replace, length
from sklearn.model_selection import train_test_split

# --- PART 1: PREPROCESSING (Your Exact Logic) ---
print("--- 1. Processing Data in Spark ---")

# 1. Airbnb Transformation
df_airbnb_clean = df_airbnb.select(
    lit("Airbnb").alias("source"),
    # Fix: Added 'r' to regex strings to avoid syntax warnings
    regexp_replace(col("price"), r"[\$,]", "").cast("double").alias("price"),
    trim(element_at(split(col("location"), ","), -2)).alias("state"),
    trim(element_at(split(col("location"), ","), -3)).alias("city"),
    
    # Smart Room Type Logic
    when(lower(col("name")).contains("shared room"), "Shared Room")
    .when(lower(col("name")).contains("private room"), "Private Room")
    .when(lower(col("name")).contains("hotel"), "Hotel")
    .when(lower(col("name")).rlike("entire|house|apt|apartment|villa|condo"), "Entire Home/Apt")
    .otherwise("Private Room").alias("room_type"),
    
    # Features
    regexp_replace(col("guests"), "[^0-9]", "").cast("int").alias("guests"),
    regexp_extract(col("ratings"), r"([0-9]+\.[0-9]+)", 1).cast("double").alias("rating"),
    regexp_extract(col("reviews"), r"([0-9]+)", 1).cast("int").alias("review_count"),
    size(split(col("amenities"), ",")).alias("amenities_count"),
    regexp_extract(col("host_rating"), r"([0-9]+\.[0-9]+)", 1).cast("double").alias("host_score"),
    length(col("description")).alias("desc_length")
)

# 2. Booking Transformation
df_booking_clean = df_booking.select(
    lit("Booking").alias("source"),
    lit(None).cast("double").alias("price"),
    trim(element_at(split(col("location"), ","), -2)).alias("state"),
    trim(element_at(split(col("location"), ","), -3)).alias("city"),
    col("availability")[0]["room_type"].alias("room_type"),
    col("availability")[0]["max_number_of_guests"].cast("int").alias("guests"),
    col("review_score").alias("rating"),
    col("number_of_reviews").cast("int").alias("review_count"),
    size(col("most_popular_facilities")).alias("amenities_count"),
    (col("manager_score") / 2).alias("host_score"), 
    length(col("description")).alias("desc_length")
)

# 3. Merge & Filter
df_master = df_airbnb_clean.unionByName(df_booking_clean)
df_with_price = df_master.filter(col("price").isNotNull())

# 4. Outlier Removal (95th Percentile)
high_limit = df_with_price.stat.approxQuantile("price", [0.95], 0.01)[0]
print(f"   -> Price Ceiling (95%): ${high_limit:.2f}")

df_train = df_with_price.filter(
    (col("price") > 15) & 
    (col("price") <= high_limit) & 
    (col("guests") > 0)
)

# 5. Clean Text & Impute
print("   -> Cleaning text and imputing defaults...")
df_train = df_train.filter(
    (col("state").isNotNull()) & (length(trim(col("state"))) > 0) & (col("state") != "United States") &
    (col("room_type").isNotNull()) & (length(trim(col("room_type"))) > 0)
)

defaults = {"rating": 4.5, "review_count": 0, "amenities_count": 5, "host_score": 4.5, "desc_length": 100, "city": "Unknown"}
df_train = df_train.na.fill(defaults)

row_count = df_train.count()
print(f"‚úÖ Ready for Training: {row_count:,} rows")


# --- PART 2: TRAINING (Pandas + XGBoost) ---
print("\n--- 2. Training XGBoost Model ---")

# 1. Convert to Pandas
# If dataset > 500k rows, sample it to prevent OOM errors on the driver
if row_count > 500000:
    print("   -> Dataset large, taking 500k sample for efficient training...")
    pdf = df_train.sample(False, 500000.0/row_count, seed=42).toPandas()
else:
    pdf = df_train.toPandas()

# 2. Feature Engineering (For the Laptop Agent)
# Keep Top 50 States, group others
top_states = pdf['state'].value_counts().nlargest(50).index.tolist()
pdf['clean_state'] = pdf['state'].apply(lambda x: x if x in top_states else 'Other')

# Create Input Matrix (One-Hot Encoding)
X = pd.get_dummies(pdf[[
    'clean_state', 'room_type', 'guests', 'rating', 
    'review_count', 'amenities_count', 'host_score', 'desc_length'
]], drop_first=True)
y = pdf['price']

# 3. Train Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 4. Train XGBoost 
# Using parameters: Max Depth = 5, Iterations (Estimators) = 50
print(f"   -> Training on {len(X_train)} rows with max_depth=5, n_estimators=50...")

model = xgb.XGBRegressor(
    objective='reg:squarederror',
    max_depth=5,        # From your hyperparameter search
    n_estimators=50,    # Matches "maxIter=50"
    learning_rate=0.1, 
    n_jobs=-1
)

model.fit(X_train, y_train)

# 5. Evaluate
rmse = float(np.sqrt(((y_test - model.predict(X_test)) ** 2).mean()))
r2 = model.score(X_test, y_test)
print(f"‚úÖ Model Results: RMSE=${rmse:.2f} | R2={r2:.3f}")

# --- PART 3: SAVE ARTIFACTS ---
print("\n--- 3. Saving Brain for Agent ---")

artifacts = {
    "lodging_model": model,
    "model_columns": X_train.columns.tolist(), # Vital for alignment
    "stats": {"rmse": rmse, "avg_price": float(y.mean())}
}

save_path = "/dbfs/Workspace/Users/gil.caplan@campus.technion.ac.il/lodging_cost_predictor_v1.pkl"
joblib.dump(artifacts, save_path)

print(f"üéâ SUCCESS! Model saved to: {save_path}")
print("You can now download this file to your laptop.")

--- 1. Processing Data in Spark ---
   -> Price Ceiling (95%): $1233.19
   -> Cleaning text and imputing defaults...
‚úÖ Ready for Training: 1,371,680 rows

--- 2. Training XGBoost Model ---
   -> Dataset large, taking 500k sample for efficient training...
   -> Training on 400472 rows with max_depth=5, n_estimators=50...


Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

üèÉ View run bemused-gnat-629 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/1072358835225203/runs/42208d559fe942e59702918f9d32d934
üß™ View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/1072358835225203
‚úÖ Model Results: RMSE=$143.12 | R2=0.327

--- 3. Saving Brain for Agent ---
üéâ SUCCESS! Model saved to: /dbfs/Workspace/Users/gil.caplan@campus.technion.ac.il/lodging_cost_predictor_v1.pkl
You can now download this file to your laptop.


Model training


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, trim, length

# 1. Clean & Split Data
# Ensure we don't have empty strings that crash the model
df_train_clean = df_train.filter(
    (col("state").isNotNull()) & (length(trim(col("state"))) > 0) &
    (col("room_type").isNotNull()) & (length(trim(col("room_type"))) > 0)
)

train_data, test_data = df_train_clean.randomSplit([0.8, 0.2], seed=42)

# 2. Define The Pipeline Stages
# These turn text (State, Room Type) into numbers the model understands
state_indexer = StringIndexer(inputCol="state", outputCol="state_index", handleInvalid="skip")
room_indexer  = StringIndexer(inputCol="room_type", outputCol="room_index", handleInvalid="keep")

state_encoder = OneHotEncoder(inputCols=["state_index"], outputCols=["state_vec"])
room_encoder  = OneHotEncoder(inputCols=["room_index"], outputCols=["room_vec"])

assembler = VectorAssembler(
    inputCols=["guests", "state_vec", "room_vec", "rating", "review_count", "amenities_count", "host_score", "desc_length"], 
    outputCol="features"
)

# 3. The Optimized Model (Hardcoded with Winners)
# We use maxDepth=5 and maxIter=50 directly
gbt = GBTRegressor(featuresCol="features", labelCol="price", maxDepth=5, maxIter=50, seed=42)

pipeline = Pipeline(stages=[state_indexer, room_indexer, state_encoder, room_encoder, assembler, gbt])

# 4. Train
print("Training Final Model (Depth=5, Iter=50)...")
final_model = pipeline.fit(train_data)
print("Training Complete!")

# 5. Validate
predictions = final_model.transform(test_data)
evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"\n--- FINAL MODEL SCORECARD ---")
print(f"Error (RMSE): ${rmse:.2f}")
print(f"Accuracy (R2): {r2:.3f} (Explains {r2*100:.1f}% of variance)")

Training Final Model (Depth=5, Iter=50)...


Downloading artifacts:   0%|          | 0/65 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

üèÉ View run tasteful-sloth-963 at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/1072358835225203/runs/f05c2b9bad554d75a4fc33c3a867c3d8
üß™ View experiment at: https://adb-983293358114278.18.azuredatabricks.net/ml/experiments/1072358835225203
Training Complete!

--- FINAL MODEL SCORECARD ---
Error (RMSE): $130.41
Accuracy (R2): 0.357 (Explains 35.7% of variance)


Calculate Lodging costs

In [0]:
# --- BLOCK 4: CALCULATOR ---
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

def predict_trip_cost(state, travelers, nights, room_type="Entire home/apt", rating=4.8, reviews=50, amenities=15, host_score=4.9, desc_len=500):
    
    # Schema must match Training Data exactly
    schema = StructType([
        StructField("state", StringType(), True),
        StructField("guests", IntegerType(), True),
        StructField("room_type", StringType(), True),
        StructField("rating", DoubleType(), True),
        StructField("review_count", IntegerType(), True),
        StructField("amenities_count", IntegerType(), True),
        StructField("host_score", DoubleType(), True),
        StructField("desc_length", IntegerType(), True)
    ])
    
    data = [(state, travelers, room_type, rating, reviews, amenities, host_score, desc_len)]
    input_df = spark.createDataFrame(data, schema)
    
    try:
        # Use the 'best_model' from Cross Validation
        result = final_model.transform(input_df)
        price_per_night = result.collect()[0]["prediction"]
        total = price_per_night * nights
        
        print(f"--- Estimate for {state} ({nights} nights) ---")
        print(f"Features: {travelers} Guests, {room_type}, {rating} Stars")
        print(f"Rate:     ${price_per_night:.2f} / night")
        print(f"TOTAL:    ${total:.2f}\n")
        return total
    except Exception as e:
        print(f"Error: {e}. State '{state}' might not be in the training data.")

# --- TRY IT OUT ---
predict_trip_cost("New York", travelers=2, nights=3)
predict_trip_cost("Texas", travelers=4, nights=5, room_type="Entire home/apt", amenities=20)

--- Estimate for New York (3 nights) ---
Features: 2 Guests, Entire home/apt, 4.8 Stars
Rate:     $124.79 / night
TOTAL:    $374.36

--- Estimate for Texas (5 nights) ---
Features: 4 Guests, Entire home/apt, 4.8 Stars
Rate:     $117.52 / night
TOTAL:    $587.61



587.6091215781203

In [0]:
def run_sanity_check():
    print("====== MODEL LOGIC CHECKS ======")
    
    # CHECK 1: The "New York vs. Kansas" Test (Location)
    # Logic: Expensive state should cost more than cheaper state for identical listing
    print("\n1. LOCATION SENSITIVITY (2 Guests, Entire Home)")
    p_ny = predict_trip_cost("New York", 2, 1, room_type="Entire Home/Apt")
    p_ks = predict_trip_cost("Kansas", 2, 1, room_type="Entire Home/Apt")
    
    diff = ((p_ny - p_ks) / p_ks) * 100
    print(f"   -> Result: NY is {diff:.1f}% more expensive than KS. " + ("‚úÖ PASS" if p_ny > p_ks else "‚ùå FAIL"))

    # CHECK 2: The "Mansion vs. Room" Test (Room Type)
    # Logic: Entire home must cost more than a private room
    print("\n2. TYPE SENSITIVITY (Texas, 2 Guests)")
    p_home = predict_trip_cost("Texas", 2, 1, room_type="Entire Home/Apt")
    p_room = predict_trip_cost("Texas", 2, 1, room_type="Private Room")
    
    diff = ((p_home - p_room) / p_room) * 100
    print(f"   -> Result: Entire Home is {diff:.1f}% more expensive than Room. " + ("‚úÖ PASS" if p_home > p_room else "‚ùå FAIL"))

    # CHECK 3: The "Crowd" Test (Guest Count)
    # Logic: 6 guests should cost more than 1 guest (usually implies bigger house)
    print("\n3. CAPACITY SENSITIVITY (Florida, Entire Home)")
    p_1g = predict_trip_cost("Florida", 1, 1, room_type="Entire Home/Apt")
    p_6g = predict_trip_cost("Florida", 6, 1, room_type="Entire Home/Apt")
    
    diff = ((p_6g - p_1g) / p_1g) * 100
    print(f"   -> Result: 6 Guests cost {diff:.1f}% more than 1 Guest. " + ("‚úÖ PASS" if p_6g > p_1g else "‚ùå FAIL"))

    # CHECK 4: The "Luxury" Test (Quality)
    # Logic: High rating (5.0) + Many Amenities (30) vs Low rating (3.0) + Few Amenities (5)
    print("\n4. LUXURY SENSITIVITY (California, Entire Home)")
    p_lux = predict_trip_cost("California", 2, 1, rating=5.0, amenities=30, reviews=100)
    p_basic = predict_trip_cost("California", 2, 1, rating=3.5, amenities=5, reviews=5)
    
    diff = ((p_lux - p_basic) / p_basic) * 100
    print(f"   -> Result: Luxury listing is {diff:.1f}% more expensive. " + ("‚úÖ PASS" if p_lux > p_basic else "‚ùå FAIL"))

# Run the checks
run_sanity_check()


1. LOCATION SENSITIVITY (2 Guests, Entire Home)
--- Estimate for New York (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $124.79 / night
TOTAL:    $124.79

--- Estimate for Kansas (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $81.14 / night
TOTAL:    $81.14

   -> Result: NY is 53.8% more expensive than KS. ‚úÖ PASS

2. TYPE SENSITIVITY (Texas, 2 Guests)
--- Estimate for Texas (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $81.14 / night
TOTAL:    $81.14

--- Estimate for Texas (1 nights) ---
Features: 2 Guests, Private Room, 4.8 Stars
Rate:     $71.42 / night
TOTAL:    $71.42

   -> Result: Entire Home is 13.6% more expensive than Room. ‚úÖ PASS

3. CAPACITY SENSITIVITY (Florida, Entire Home)
--- Estimate for Florida (1 nights) ---
Features: 1 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $104.13 / night
TOTAL:    $104.13

--- Estimate for Florida (1 nights) ---
Features: 6 Guests, Entire Home/Apt, 4.8 Stars
R

In [0]:
# List of diverse scenarios to test the model's "Economic IQ"
test_cases = [
    # 1. The Expensive West Coast
    {"state": "California", "city": "Los Angeles", "guests": 2, "nights": 1, "type": "Entire Home/Apt"},
    
    # 2. The Tech Hub / Pacific Northwest
    {"state": "Washington", "city": "Seattle", "guests": 2, "nights": 1, "type": "Entire Home/Apt"},
    
    # 3. The South (Should be cheaper than CA/WA)
    {"state": "Texas", "city": "Austin", "guests": 2, "nights": 1, "type": "Entire Home/Apt"},
    
    # 4. The Midwest (Usually the most affordable)
    {"state": "Ohio", "city": "Columbus", "guests": 2, "nights": 1, "type": "Entire Home/Apt"},
    
    # 5. Vacation State (High variance, usually high)
    {"state": "Florida", "city": "Miami", "guests": 2, "nights": 1, "type": "Entire Home/Apt"}
]

print(f"{'STATE':<15} | {'TYPE':<15} | {'EST. PRICE':<10}")
print("-" * 45)

for test in test_cases:
    # We call the model for each case
    # Note: We pass standard rating/amenities to keep it a fair comparison
    try:
        cost = predict_trip_cost(
            state=test["state"], 
            travelers=test["guests"], 
            nights=test["nights"], 
            room_type=test["type"],
            rating=4.8,
            amenities=15
        )
        # We perform the print inside the function, but let's grab the return value to compare
        # (The function prints detailed info, but we can verify relative order here)
    except:
        pass

STATE           | TYPE            | EST. PRICE
---------------------------------------------
--- Estimate for California (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $163.35 / night
TOTAL:    $163.35

--- Estimate for Washington (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $100.40 / night
TOTAL:    $100.40

--- Estimate for Texas (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $81.14 / night
TOTAL:    $81.14

--- Estimate for Ohio (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $81.14 / night
TOTAL:    $81.14

--- Estimate for Florida (1 nights) ---
Features: 2 Guests, Entire Home/Apt, 4.8 Stars
Rate:     $104.13 / night
TOTAL:    $104.13



In [0]:
# Define a permanent path in your workspace
model_path = "/Workspace/Users/gil.caplan@campus.technion.ac.il/Lodging_data/airbnb_price_model_v1"

# Save the model (Overwrite if it exists)
print(f"Saving model to {model_path}...")
final_model.write().overwrite().save(model_path)
print("SUCCESS: Model saved! You can now load this tomorrow without retraining.")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNotADirectoryError[0m                        Traceback (most recent call last)
File [0;32m<command-7365365094603509>, line 43[0m
[1;32m     41[0m [38;5;66;03m# Run for both[39;00m
[1;32m     42[0m zip_and_move(airbnb_src, [38;5;124m"[39m[38;5;124mairbnb_usa_download[39m[38;5;124m"[39m)
[0;32m---> 43[0m zip_and_move(booking_src, [38;5;124m"[39m[38;5;124mbooking_usa_download[39m[38;5;124m"[39m)
[1;32m     45[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124müéâ Done. Go to your Workspace folder to download the .zip files.[39m[38;5;124m"[39m)

File [0;32m<command-7365365094603509>, line 32[0m, in [0;36mzip_and_move[0;34m(source_path, zip_name)[0m
[1;32m     30[0m [38;5;66;03m# Create zip in local /tmp folder[39;00m
[1;32m     31[0m base_name [38;5;241m=[39m os[38;5;241m.[39mpath[38;5;241m.[39mjoin(temp_dir, zip_name)
[0;32m-