# Data scraping and pre processing


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lower, avg, sum, regexp_replace, split, trim, lit, udf
)
from pyspark.sql.types import StringType, DoubleType
import pandas as pd

# Initialize Spark Session
spark = SparkSession.builder.appName("MultiSourceDataIntegration").getOrCreate()

# Define DBFS data directory
data_dir = "/FileStore/data/"

print("Spark Session initialized successfully.")

Spark Session initialized successfully.


In [0]:
try:
    print("Loading datasets from DBFS...\n")

    # 1. NHTSA FARS Accident Data (2023)
    accident_df = spark.read.csv(f"{data_dir}accident.csv", header=True, inferSchema=True)

    # 2. SAMHSA Substance Abuse Data
    map_data_df = spark.read.csv(f"{data_dir}map_data.csv", header=True, inferSchema=True)

    # 3. FBI UCR Crime Data (2024)
    url = "https://www.beautifydata.com/united-states-crimes/fbi-ucr/2024/ranking-of-us-cities-by-crime-rate-by-crime-type/violent?utm_source=chatgpt.com"
    tables = pd.read_html(url, flavor='lxml')
    crime_df = tables[0] if tables else None

    # 4. Kaggle US Household Income Data
    kaggle_income_df = spark.read.csv(f"{data_dir}kaggle_income.csv", header=True, inferSchema=True)

    # 5. CDC PLACES County Health Data (2025)
    county_health_df = spark.read.csv(
        f"{data_dir}PLACES__Local_Data_for_Better_Health__County_Data__2025_release (1).csv",
        header=True,
        inferSchema=True
    )

    # 6. EPA Air Quality Index Data (2025)
    aqi_df = spark.read.csv(f"{data_dir}annual_aqi_by_county_2025.csv", header=True, inferSchema=True)

    print("All datasets loaded successfully.\n")
except:
    pass

Loading datasets from DBFS...

All datasets loaded successfully.



In [0]:
# Create state abbreviation to full name mapping
state_abbreviations = {
    'AL': 'Alabama', 'AK': 'Alaska', 'AZ': 'Arizona', 'AR': 'Arkansas', 'CA': 'California',
    'CO': 'Colorado', 'CT': 'Connecticut', 'DE': 'Delaware', 'FL': 'Florida', 'GA': 'Georgia',
    'HI': 'Hawaii', 'ID': 'Idaho', 'IL': 'Illinois', 'IN': 'Indiana', 'IA': 'Iowa',
    'KS': 'Kansas', 'KY': 'Kentucky', 'LA': 'Louisiana', 'ME': 'Maine', 'MD': 'Maryland',
    'MA': 'Massachusetts', 'MI': 'Michigan', 'MN': 'Minnesota', 'MS': 'Mississippi', 'MO': 'Missouri',
    'MT': 'Montana', 'NE': 'Nebraska', 'NV': 'Nevada', 'NH': 'New Hampshire', 'NJ': 'New Jersey',
    'NM': 'New Mexico', 'NY': 'New York', 'NC': 'North Carolina', 'ND': 'North Dakota', 'OH': 'Ohio',
    'OK': 'Oklahoma', 'OR': 'Oregon', 'PA': 'Pennsylvania', 'RI': 'Rhode Island', 'SC': 'South Carolina',
    'SD': 'South Dakota', 'TN': 'Tennessee', 'TX': 'Texas', 'UT': 'Utah', 'VT': 'Vermont',
    'VA': 'Virginia', 'WA': 'Washington', 'WV': 'West Virginia', 'WI': 'Wisconsin', 'WY': 'Wyoming'
}

state_name_udf = udf(lambda abbr: state_abbreviations.get(abbr, abbr), StringType())

print("State standardization mapping created.\n")

State standardization mapping created.



In [0]:
try:
    print("Processing crime data...")

    # Convert crime data to Spark DataFrame
    crime_spark_df = spark.createDataFrame(crime_df)

    # Extract City and State from combined 'City' column (format: "CityName, StateCode")
    crime_spark_df = (
        crime_spark_df
        .withColumn("City_State_Split", split(col("City"), ", "))
        .withColumn("City", lower(trim(col("City_State_Split").getItem(0))))
        .withColumn("State", lower(state_name_udf(trim(col("City_State_Split").getItem(1)))))
        .select(
            col("City"),
            col("State"),
            col("Crime Rate Per 100K").alias("Crime_Rate_Per_100K")
        )
    )

    # Create city-to-county mapping from income data
    city_county_mapping_df = (
        kaggle_income_df
        .select(
            lower(col("City")).alias("City"),
            lower(col("State_Name")).alias("State"),
            lower(regexp_replace(col("County"), "(?i) county$", "")).alias("County")
        )
        .distinct()
    )

    # Map counties to crime data
    crime_with_county_df = (
        crime_spark_df
        .join(city_county_mapping_df, on=["City", "State"], how="left")
        .filter(col("County").isNotNull())
        .select("City", "State", "County", "Crime_Rate_Per_100K")
    )

    print("Crime data processed.\n")

except:
    pass

Processing crime data...
Crime data processed.



In [0]:
try: 
    print("Processing accident data...")

    accident_fatalities_df = (
        accident_df
        .select(
            col("STATENAME").alias("State"),
            col("COUNTYNAME").alias("County"),
            col("CITYNAME").alias("City"),
            col("FATALS")
        )
        .groupBy("State", "County", "City")
        .agg(sum("FATALS").alias("Total_Fatalities"))
        # Clean county names: remove FIPS codes in parentheses
        .withColumn("County", regexp_replace(col("County"), "\\s*\\(\\d+\\)", ""))
        # Standardize to lowercase
        .withColumn("State", lower(col("State")))
        .withColumn("County", lower(col("County")))
        .withColumn("City", lower(col("City")))
    )

    print("Accident data processed.\n")
except:
    pass

Processing accident data...
Accident data processed.



In [0]:
print("Processing income data...")

income_df = (
    kaggle_income_df
    .select(
        col("State_Name").alias("State"),
        col("County"),
        col("City"),
        col("Median")
    )
    .groupBy("State", "County", "City")
    .agg(avg("Median").alias("Median_Income"))
    # Clean county names: remove ' County' suffix
    .withColumn("County", regexp_replace(col("County"), "(?i) County$", ""))
    # Standardize to lowercase
    .withColumn("State", lower(col("State")))
    .withColumn("County", lower(col("County")))
    .withColumn("City", lower(col("City")))
)

print("Income data processed.\n")

Processing income data...
Income data processed.



In [0]:
print("Processing health and air quality data...")

# Health Data Processing (County Level)
prepared_health_df = (
    county_health_df
    .filter(col("Measure") == "Any disability among adults")
    .select(
        lower(col("StateDesc")).alias("State"),
        lower(col("LocationName")).alias("County"),
        col("Data_Value").alias("Disability_Rate")
    )
    .groupBy("State", "County")
    .agg(avg("Disability_Rate").alias("Disability_Rate"))
)

# Air Quality Data Processing (County Level)
prepared_aqi_df = (
    aqi_df
    .select(
        lower(col("State")).alias("State"),
        lower(col("County")).alias("County"),
        col("Median AQI").alias("Median_AQI")
    )
    .groupBy("State", "County")
    .agg(avg("Median_AQI").alias("Median_AQI"))
)

print("Health and air quality data processed.\n")

Processing health and air quality data...
Health and air quality data processed.



In [0]:
print("Building integrated dataset...")

# Start with city-level crime data as base
mega_df = crime_with_county_df.select(
    col("City"),
    col("State"),
    col("County"),
    col("Crime_Rate_Per_100K").alias("City_Crime_Rate_Per_100K")
)

# Join city-level accident data
mega_df = mega_df.join(
    accident_fatalities_df,
    on=["State", "County", "City"],
    how="outer"
)

# Join city-level income data
mega_df = mega_df.join(
    income_df,
    on=["State", "County", "City"],
    how="outer"
)

# Join county-level health data
if prepared_health_df is not None and not prepared_health_df.isEmpty():
    mega_df = mega_df.join(
        prepared_health_df,
        on=["State", "County"],
        how="outer"
    )
else:
    print("Warning: Health data unavailable. Adding null Disability_Rate column.")
    mega_df = mega_df.withColumn("Disability_Rate", lit(None).cast(DoubleType()))

# Join county-level air quality data
mega_df = mega_df.join(
    prepared_aqi_df,
    on=["State", "County"],
    how="outer"
)

print("Integrated dataset built successfully.\n")

Building integrated dataset...
Integrated dataset built successfully.



In [0]:
print("=" * 80)
print("FINAL INTEGRATED DATASET (mega_df)")
print("=" * 80)

print("\nSchema:")
mega_df.printSchema()

print("\nSample Data (first 10 rows):")
mega_df.show(10, truncate=False)

print("\nDataset Statistics:")
print(f"Total rows: {mega_df.count()}")
print(f"Total columns: {len(mega_df.columns)}")

print("\nColumn Summary:")
for column in mega_df.columns:
    non_null_count = mega_df.filter(col(column).isNotNull()).count()
    print(f"  {column}: {non_null_count} non-null values")

FINAL INTEGRATED DATASET (mega_df)

Schema:
root
 |-- State: string (nullable = true)
 |-- County: string (nullable = true)
 |-- City: string (nullable = true)
 |-- City_Crime_Rate_Per_100K: double (nullable = true)
 |-- Total_Fatalities: long (nullable = true)
 |-- Median_Income: double (nullable = true)
 |-- Disability_Rate: double (nullable = true)
 |-- Median_AQI: double (nullable = true)


Sample Data (first 10 rows):
+-------+-------+--------------+------------------------+----------------+------------------+------------------+----------+
|State  |County |City          |City_Crime_Rate_Per_100K|Total_Fatalities|Median_Income     |Disability_Rate   |Median_AQI|
+-------+-------+--------------+------------------------+----------------+------------------+------------------+----------+
|alabama|autauga|abbeville     |NULL                    |NULL            |25216.0           |33.099999999999994|NULL      |
|alabama|autauga|adamsville    |NULL                    |NULL            |474

In [0]:
# Azure Storage Configuration
storage_account = "lab94290"
container = "airbnb"
data_path = "airbnb_1_12_parquet"

# SAS token for authentication (new token from email)
sas_token = "sp=rle&st=2025-12-24T17:37:04Z&se=2026-02-28T01:52:04Z&spr=https&sv=2024-11-04&sr=c&sig=a0lx%2BS6PuS%2FvJ9Tbt4NKdCJHLE9d1Y1D6vpE1WKFQtk%3D"

In [0]:
# Configure Spark to use SAS token authentication for ADLS Gen2
# This is the exact configuration from the instructor's notebook
spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net",
    "SAS"
)
spark.conf.set(
    f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)
spark.conf.set(
    f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net",
    sas_token
)

print("✓ Azure Storage connection configured successfully")
print(f"  Storage Account: {storage_account}")
print(f"  Container: {container}")

✓ Azure Storage connection configured successfully
  Storage Account: lab94290
  Container: airbnb


In [0]:
# Use abfss:// protocol with dfs.core.windows.net (ADLS Gen2)
path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{data_path}"

print(f"\nLoading Airbnb data from: {path}")

# Read the parquet dataset
airbnb_df = spark.read.parquet(path)

print(f"✓ Loaded {airbnb_df.count():,} rows")
print(f"  Columns: {len(airbnb_df.columns)}")

airbnb_df.show(5)


Loading Airbnb data from: abfss://airbnb@lab94290.dfs.core.windows.net/airbnb_1_12_parquet
✓ Loaded 2,098,880 rows
  Columns: 50
+--------------------+-----+--------------------+--------------------+--------+------------+--------+--------------------+-------+--------------------+--------------------+--------------------+---------+---------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+-----------------------+--------------------+---------------------+-------------+----------------------+-----------+----------+------------------+-----------------+--------------------+--------------------+-----------+--------+--------------------+--------------------------+-------+----------------+--------------------+----------

In [0]:
# Debug mode - sample data for faster development
DEBUG = False

if DEBUG:
    original_count = airbnb_df.count()
    airbnb_df = airbnb_df.sample(fraction=4000/original_count, seed=42).limit(4000)
    print(f"\n  DEBUG MODE: Sampled to {airbnb_df.count():,} rows for faster processing")
else:
    print(f"\n Using full dataset: {airbnb_df.count():,} rows")


 Using full dataset: 2,098,880 rows


In [0]:
from pyspark.sql.functions import col, when, expr, lit, count as spark_count
from pyspark.sql.types import DoubleType

print("="*80)
print("PRICE SANITY CHECK: FILTER UNREALISTIC LOW PRICES")
print("="*80)

PRICE SANITY CHECK: FILTER UNREALISTIC LOW PRICES


In [0]:
print("\n1. Analyzing price distribution...")

# Cast price to double
airbnb_df = airbnb_df.withColumn("price_numeric", col("price").cast(DoubleType()))

# Get overall statistics (distributed computation)
total_rows = airbnb_df.count()
valid_price_rows = airbnb_df.filter(col("price_numeric").isNotNull()).count()

print(f"   Total rows: {total_rows:,}")
print(f"   Rows with valid numeric price: {valid_price_rows:,}")
print(f"   Null prices: {total_rows - valid_price_rows:,}")

# Compute price percentiles (distributed using approx_percentile)
price_percentiles = airbnb_df.filter(col("price_numeric").isNotNull()).select(
    expr("approx_percentile(price_numeric, array(0.01, 0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99))").alias("percentiles")
).collect()[0]['percentiles']

print(f"\n   Price percentiles:")
percentile_labels = ['1st', '5th', '10th', '25th', '50th', '75th', '90th', '95th', '99th']
for label, value in zip(percentile_labels, price_percentiles):
    print(f"     {label:5s}: ${value:,.2f}")


1. Analyzing price distribution...
   Total rows: 2,098,880
   Rows with valid numeric price: 1,479,472
   Null prices: 619,408

   Price percentiles:
     1st  : $7.00
     5th  : $28.62
     10th : $41.85
     25th : $75.57
     50th : $136.00
     75th : $253.00
     90th : $544.00
     95th : $1,499.00
     99th : $11,684.17


In [0]:
print("\n2. Identifying unrealistic prices...")

# Strategy: Use multiple criteria
# Criterion 1: Hard minimum threshold (e.g., $10 - no realistic nightly rental is < $10)
hard_min_threshold = 10.0

# Criterion 2: Adaptive threshold - 1st percentile (catches extreme outliers)
adaptive_threshold = price_percentiles[0]  # 1st percentile

# Use the more conservative (higher) of the two
min_realistic_price = max(hard_min_threshold, adaptive_threshold)

print(f"\n   Hard minimum threshold: ${hard_min_threshold:.2f}")
print(f"   Adaptive threshold (1st percentile): ${adaptive_threshold:.2f}")
print(f"   Final minimum realistic price: ${min_realistic_price:.2f}")

# Count unrealistic prices
unrealistic_count = airbnb_df.filter(
    (col("price_numeric").isNotNull()) & 
    (col("price_numeric") < min_realistic_price)
).count()

print(f"\n   Unrealistic prices found: {unrealistic_count:,} ({unrealistic_count/valid_price_rows*100:.2f}% of valid prices)")

# Show examples of unrealistic prices
print("\n   Sample of unrealistic prices:")
unrealistic_sample = airbnb_df.filter(
    (col("price_numeric").isNotNull()) & 
    (col("price_numeric") < min_realistic_price)
).select(
    "property_id", "listing_name", "location", "price", "price_numeric"
).orderBy("price_numeric").limit(10)

display(unrealistic_sample)


2. Identifying unrealistic prices...

   Hard minimum threshold: $10.00
   Adaptive threshold (1st percentile): $7.00
   Final minimum realistic price: $10.00

   Unrealistic prices found: 15,540 (1.05% of valid prices)

   Sample of unrealistic prices:


property_id,listing_name,location,price,price_numeric
29964171.0,"Entire home in Conyers, Georgia, United States","Conyers, Georgia, United States",0.0,0.0
,,Aeroport,0.0,0.0
1.2141114818461158e+18,"Entire rental unit in Niagara Falls, Canada","Niagara Falls, Ontario, Canada",0.1,0.1
6.756307901881956e+17,"Entire home in Fresno, California, United States","Fresno, California, United States",1.0,1.0
1.1291447556883072e+18,"Entire guest suite in Moorpark, California, United States","Moorpark, California, United States",1.0,1.0
41955923.0,"Entire rental unit in Long Beach, California, United States","Long Beach, California, United States",1.0,1.0
8.972549468433427e+17,"Entire home in Indio, California, United States","Indio, California, United States",1.0,1.0
1.1354634225659628e+18,"Entire rental unit in Duluth, Minnesota, United States","Duluth, Minnesota, United States",1.0,1.0
1.0635393585308438e+18,"Entire rental unit in Santa Ana, California, United States","Santa Ana, California, United States",1.0,1.0
44408051.0,"Entire rental unit in Hawthorne, California, United States","Hawthorne, California, United States",1.0,1.0


In [0]:
print("\n3. Adding price sanity flag...")

airbnb_df = airbnb_df.withColumn(
    "price_sanity_flag",
    when(
        col("price_numeric").isNull(),
        "null_price"
    ).when(
        col("price_numeric") < min_realistic_price,
        "unrealistic_low"
    ).when(
        col("price_numeric") >= min_realistic_price,
        "valid"
    ).otherwise("unknown")
)

print("   ✓ Price sanity flag added")

# Show flag distribution
print("\n   Price sanity flag distribution:")
flag_dist = airbnb_df.groupBy("price_sanity_flag").agg(
    spark_count("*").alias("count")
).orderBy(col("count").desc())

display(flag_dist)


3. Adding price sanity flag...
   ✓ Price sanity flag added

   Price sanity flag distribution:


price_sanity_flag,count
valid,1463932
null_price,619408
unrealistic_low,15540


In [0]:
print("\n4. Filtering to realistic prices only...")

# Create filtered dataset
airbnb_df_filtered = airbnb_df.filter(
    col("price_sanity_flag") == "valid"
)

filtered_count = airbnb_df_filtered.count()
removed_count = total_rows - filtered_count

print(f"   Rows after filtering: {filtered_count:,}")
print(f"   Rows removed: {removed_count:,} ({removed_count/total_rows*100:.2f}%)")

# Replace price with price_numeric for consistency
airbnb_df_filtered = airbnb_df_filtered.withColumn("price", col("price_numeric"))

print("   ✓ Price column updated to numeric type")


4. Filtering to realistic prices only...
   Rows after filtering: 1,463,932
   Rows removed: 634,948 (30.25%)
   ✓ Price column updated to numeric type


In [0]:
print("\n5. Verifying filtered price distribution...")

# Recompute percentiles on filtered data
filtered_percentiles = airbnb_df_filtered.select(
    expr("approx_percentile(price, array(0.01, 0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99))").alias("percentiles")
).collect()[0]['percentiles']

print(f"\n   Filtered price percentiles:")
for label, value in zip(percentile_labels, filtered_percentiles):
    print(f"     {label:5s}: ${value:,.2f}")


5. Verifying filtered price distribution...

   Filtered price percentiles:
     1st  : $17.48
     5th  : $31.01
     10th : $44.00
     25th : $78.00
     50th : $137.79
     75th : $255.15
     90th : $550.00
     95th : $1,513.00
     99th : $11,833.00


In [0]:
print("\n" + "="*80)
print("PRICE FILTERING COMPLETE")
print("="*80)

print(f"\nFiltered dataset: airbnb_df_filtered")
print(f"  Rows: {filtered_count:,}")
print(f"  Minimum price: ${min_realistic_price:.2f}")
print(f"  Removed: {removed_count:,} unrealistic prices")

print(f"\nPrice range after filtering:")
print(f"  Min: ${filtered_percentiles[0]:.2f}")
print(f"  Median: ${filtered_percentiles[4]:.2f}")
print(f"  Max: ${filtered_percentiles[-1]:.2f}")

print("\n✓ Dataset ready for downstream anomaly detection!")
print("\nNext steps:")
print("  - Use 'airbnb_df_filtered' instead of 'airbnb_df' in subsequent cells")
print("  - All unrealistic prices have been removed")
print("  - Price column is now numeric (DoubleType)")

# Show sample of filtered data
print("\nSample of filtered data:")
display(airbnb_df_filtered.select(
    "property_id", "listing_name", "location", "price", "price_sanity_flag"
).limit(10))


PRICE FILTERING COMPLETE

Filtered dataset: airbnb_df_filtered
  Rows: 1,463,932
  Minimum price: $10.00
  Removed: 634,948 unrealistic prices

Price range after filtering:
  Min: $17.48
  Median: $137.79
  Max: $11833.00

✓ Dataset ready for downstream anomaly detection!

Next steps:
  - Use 'airbnb_df_filtered' instead of 'airbnb_df' in subsequent cells
  - All unrealistic prices have been removed
  - Price column is now numeric (DoubleType)

Sample of filtered data:


property_id,listing_name,location,price,price_sanity_flag
40458495,"Entire rental unit in Broadbeach, Australia","Broadbeach, Australia",238.0,valid
17988678,"Cycladic home in Aliki, Greece","Aliki, Greece",15118.0,valid
17857973,"Entire home in Guía, Spain","Guía, Canarias, Spain",174.0,valid
53932806,"Private room in guesthouse in Rovereto, Italy","Rovereto, Trentino-Alto Adige, Italy",2367.0,valid
18535615,"Entire rental unit in Peniscola, Spain","Peniscola, Comunidad Valenciana, Spain",116.0,valid
23049623,"Entire rental unit in Sallanches, France","Sallanches, Auvergne-Rhône-Alpes, France",237.0,valid
14160725,"Entire home in Chania, Greece","Chania, Greece",375.0,valid
18058823,"Casa particular in Cienfuegos, Cuba","Cienfuegos, Cuba",20.0,valid
53816734,"Room in boutique hotel in Isla Mujeres, Mexico","Isla Mujeres, Quintana Roo, Mexico",139.0,valid
13850728,"Entire home in Lostanges, France","Lostanges, Aquitaine Limousin Poitou-Charentes, France",139.0,valid


In [0]:
from pyspark.sql.functions import col, split, trim, lower

# Filter for United States listings - USE FILTERED DATA
usa_df = airbnb_df_filtered.filter(col("location").contains("United States"))

print(f"✓ USA listings: {usa_df.count():,} rows ({usa_df.count()/airbnb_df_filtered.count()*100:.1f}% of total)")

✓ USA listings: 453,841 rows (31.0% of total)


In [0]:
# Parse location into city, state, country
# Format: "City, State, Country"
usa_df = usa_df.withColumn("location_split", split(col("location"), ", "))

usa_df = usa_df.withColumn("city", trim(col("location_split")[0])) \
               .withColumn("state", trim(col("location_split")[1])) \
               .withColumn("country_parsed", trim(col("location_split")[2])) \
               .drop("location_split")

In [0]:
# DBTITLE 1,Filter USA Listings and Normalize Location
from pyspark.sql.functions import col, split, trim, lower

# Filter for United States listings
usa_df = airbnb_df_filtered.filter(col("location").contains("United States"))

print(f"✓ USA listings: {usa_df.count():,} rows ({usa_df.count()/airbnb_df_filtered.count()*100:.1f}% of total)")

# Parse location into city, state, country (with lowercase normalization)
usa_df = usa_df.withColumn("location_split", split(col("location"), ", "))

usa_df = usa_df.withColumn("city", lower(trim(col("location_split")[0]))) \
               .withColumn("state", lower(trim(col("location_split")[1]))) \
               .withColumn("country_parsed", trim(col("location_split")[2])) \
               .drop("location_split")

print("✓ Location parsed and normalized to lowercase")

# COMMAND ----------

# DBTITLE 1,Join USA Data with Environmental Data
print("Enriching USA data with environmental indicators...")

# Prepare environmental data
env_df_prepared = mega_df.select(
    lower(col("City")).alias("city"),
    lower(col("State")).alias("state"),
    col("County").alias("county"),
    col("City_Crime_Rate_Per_100K"),
    col("Total_Fatalities"),
    col("Median_Income"),
    col("Disability_Rate"),
    col("Median_AQI")
)

# Join on city and state
airbnb_enriched_df = usa_df.join(
    env_df_prepared,
    on=["city", "state"],
    how="left"
)

print(f"✓ Environmental data enriched: {airbnb_enriched_df.count():,} rows")

# Verify the join worked
matches = airbnb_enriched_df.filter(
    col("City_Crime_Rate_Per_100K").isNotNull()
).count()
total = airbnb_enriched_df.count()
match_rate = (matches / total * 100) if total > 0 else 0
print(f"  Matched records: {matches:,} ({match_rate:.1f}%)")
print(f"  Unmatched records: {total - matches:,} ({100-match_rate:.1f}%)")

# COMMAND ----------

# DBTITLE 1,Fill Null Crime Values (NOW SHOULD WORK)
from pyspark.sql.functions import col, when, lit, count

print("\n1. Filling missing crime data...")

# Check if there are any non-null crime values at all
crime_non_null_count = airbnb_enriched_df.filter(
    col("City_Crime_Rate_Per_100K").isNotNull()
).count()

if crime_non_null_count == 0:
    print("   ⚠️  WARNING: No crime data found! Join may have failed.")
    print("   Debug info:")
    print(f"     Total rows: {airbnb_enriched_df.count():,}")
    print("\n   Sample of airbnb_enriched_df:")
    airbnb_enriched_df.select("city", "state", "City_Crime_Rate_Per_100K").show(10)
    raise Exception("No crime data after join. Check join logic.")
else:
    # Calculate minimum non-null crime rate
    min_crime_rate = airbnb_enriched_df.filter(
        col("City_Crime_Rate_Per_100K").isNotNull()
    ).agg({"City_Crime_Rate_Per_100K": "min"}).collect()[0][0]
    
    if min_crime_rate is not None:
        print(f"   Minimum crime rate: {min_crime_rate:.2f}")
    else:
        raise Exception("Cannot compute minimum crime rate")
    
    # Fill null crime values
    airbnb_enriched_df = airbnb_enriched_df.withColumn(
        "City_Crime_Rate_Per_100K",
        when(col("City_Crime_Rate_Per_100K").isNull(), lit(min_crime_rate))
        .otherwise(col("City_Crime_Rate_Per_100K"))
    )
    
    null_count_before = airbnb_enriched_df.filter(col('City_Crime_Rate_Per_100K').isNull()).count()
    print(f"   ✓ Filled {null_count_before:,} null crime values with minimum rate")
    print(f"   ✓ Crime data now complete for all {airbnb_enriched_df.count():,} listings")

✓ USA listings: 453,841 rows (31.0% of total)
✓ Location parsed and normalized to lowercase
Enriching USA data with environmental indicators...
✓ Environmental data enriched: 730,949 rows
  Matched records: 153,680 (21.0%)
  Unmatched records: 577,269 (79.0%)

1. Filling missing crime data...
   Minimum crime rate: 249.31
   ✓ Filled 0 null crime values with minimum rate
   ✓ Crime data now complete for all 730,949 listings


In [0]:
from pyspark.sql.functions import col, when, lit, percentile_approx
from pyspark.sql.window import Window

print("="*80)
print("CREATING RISK INDICATORS")
print("="*80)

CREATING RISK INDICATORS


In [0]:
# =============================================================================
# STEP 2: Calculate thresholds for top 10% (90th percentile)
# =============================================================================

print("\n2. Calculating risk thresholds (top 10%)...")

# Crime threshold (90th percentile of non-null values)
crime_threshold = airbnb_enriched_df.filter(col("City_Crime_Rate_Per_100K").isNotNull()) \
    .approxQuantile("City_Crime_Rate_Per_100K", [0.9], 0.01)[0]

print(f"   Crime threshold (90th percentile): {crime_threshold:.2f}")

# Fatalities threshold (90th percentile, excluding nulls and zeros)
fatalities_threshold = airbnb_enriched_df.filter(
    (col("Total_Fatalities").isNotNull()) & (col("Total_Fatalities") > 0)
).approxQuantile("Total_Fatalities", [0.9], 0.01)[0]

print(f"   Fatalities threshold (90th percentile): {fatalities_threshold:.0f}")

# AQI threshold (10th percentile = worst air quality, excluding nulls)
# Lower AQI is better, so 10th percentile means bottom 10% (worst)
aqi_threshold = airbnb_enriched_df.filter(col("Median_AQI").isNotNull()) \
    .approxQuantile("Median_AQI", [0.1], 0.01)[0]

print(f"   AQI threshold (10th percentile - worst): {aqi_threshold:.0f}")


2. Calculating risk thresholds (top 10%)...
   Crime threshold (90th percentile): 539.85
   Fatalities threshold (90th percentile): 89
   AQI threshold (10th percentile - worst): 26


In [0]:
# =============================================================================
# STEP 3: Create binary risk indicator column
# =============================================================================

print("\n3. Creating high-risk indicator...")

# Create individual risk flags
airbnb_with_risk = airbnb_enriched_df.withColumn(
    "is_high_crime",
    when(
        col("City_Crime_Rate_Per_100K").isNotNull() & 
        (col("City_Crime_Rate_Per_100K") >= crime_threshold),
        1
    ).otherwise(0)
).withColumn(
    "is_high_fatalities",
    when(
        col("Total_Fatalities").isNotNull() & 
        (col("Total_Fatalities") > 0) &
        (col("Total_Fatalities") >= fatalities_threshold),
        1
    ).otherwise(0)
).withColumn(
    "is_poor_aqi",
    when(
        col("Median_AQI").isNotNull() & 
        (col("Median_AQI") <= aqi_threshold),
        1
    ).otherwise(0)
)

# Create combined high-risk indicator (1 if ANY of the risk factors is present)
airbnb_final = airbnb_with_risk.withColumn(
    "is_high_risk_area",
    when(
        (col("is_high_crime") == 1) | 
        (col("is_high_fatalities") == 1) | 
        (col("is_poor_aqi") == 1),
        1
    ).otherwise(0)
)

print("   ✓ Risk indicators created")


3. Creating high-risk indicator...
   ✓ Risk indicators created


In [0]:
# =============================================================================
# STEP 4: Display statistics
# =============================================================================

print("\n" + "="*80)
print("RISK INDICATOR STATISTICS")
print("="*80)

total_count = airbnb_final.count()
high_crime_count = airbnb_final.filter(col("is_high_crime") == 1).count()
high_fatalities_count = airbnb_final.filter(col("is_high_fatalities") == 1).count()
poor_aqi_count = airbnb_final.filter(col("is_poor_aqi") == 1).count()
high_risk_count = airbnb_final.filter(col("is_high_risk_area") == 1).count()

print(f"\nTotal listings: {total_count:,}")
print(f"\nIndividual Risk Factors:")
print(f"  High crime areas: {high_crime_count:,} ({high_crime_count/total_count*100:.1f}%)")
print(f"  High fatalities areas: {high_fatalities_count:,} ({high_fatalities_count/total_count*100:.1f}%)")
print(f"  Poor air quality areas: {poor_aqi_count:,} ({poor_aqi_count/total_count*100:.1f}%)")
print(f"\nCombined High-Risk Areas: {high_risk_count:,} ({high_risk_count/total_count*100:.1f}%)")


RISK INDICATOR STATISTICS

Total listings: 730,949

Individual Risk Factors:
  High crime areas: 82,231 (11.2%)
  High fatalities areas: 29,857 (4.1%)
  Poor air quality areas: 41,170 (5.6%)

Combined High-Risk Areas: 132,016 (18.1%)


In [0]:
# =============================================================================
# STEP 5: Show sample data with risk indicators
# =============================================================================

print("\nSample of data with risk indicators:")
display(
    airbnb_final.select(
        "city", "state", "listing_name", "price", "ratings",
        "City_Crime_Rate_Per_100K",
        "Total_Fatalities", "Median_AQI",
        "is_high_crime", "is_high_fatalities", "is_poor_aqi", "is_high_risk_area"
    ).limit(20)
)

# Show high-risk listings
print("\nSample of HIGH-RISK listings:")
display(
    airbnb_final.filter(col("is_high_risk_area") == 1)
    .select(
        "city", "state", "listing_name", "price", "ratings",
        "City_Crime_Rate_Per_100K", "Total_Fatalities", "Median_AQI",
        "is_high_crime", "is_high_fatalities", "is_poor_aqi"
    )
    .limit(20)
)

print("\n✓ Risk analysis complete!")
print(f"\nFinal dataset: airbnb_final ({airbnb_final.count():,} rows, {len(airbnb_final.columns)} columns)")


Sample of data with risk indicators:


city,state,listing_name,price,ratings,City_Crime_Rate_Per_100K,Total_Fatalities,Median_AQI,is_high_crime,is_high_fatalities,is_poor_aqi,is_high_risk_area
benton,tennessee,"Dome in Benton, Tennessee, United States",128.0,5.0,249.31,,42.0,0,0,0,0
carrabassett valley,maine,"Entire condo in Carrabassett Valley, Maine, United States",225.0,4.96,249.31,1.0,,0,0,0,0
bonita springs,florida,"Entire rental unit in Bonita Springs, Florida, United States",88.0,4.91,249.31,4.0,42.0,0,0,0,0
bonita springs,florida,"Entire rental unit in Bonita Springs, Florida, United States",88.0,4.91,249.31,,37.0,0,0,0,0
gordonville,pennsylvania,"Entire home in Gordonville, Pennsylvania, United States",255.0,4.89,249.31,,,0,0,0,0
kittery,maine,"Entire rental unit in Kittery, Maine, United States",225.0,5.0,249.31,1.0,36.0,0,0,0,0
innsbrook,missouri,"Entire chalet in Innsbrook, Missouri, United States",264.0,4.8,249.31,,,0,0,0,0
eden,utah,"Entire rental unit in Eden, Utah, United States",110.0,4.83,249.31,1.0,47.0,0,0,0,0
auburn,alabama,"Room in Auburn, Alabama, United States",605.36,4.64,249.31,8.0,,0,0,0,0
auburn,alabama,"Room in Auburn, Alabama, United States",605.36,4.64,249.31,,,0,0,0,0



Sample of HIGH-RISK listings:


city,state,listing_name,price,ratings,City_Crime_Rate_Per_100K,Total_Fatalities,Median_AQI,is_high_crime,is_high_fatalities,is_poor_aqi
winslow,arizona,"Entire home in Winslow, Arizona, United States",95.0,5.0,249.31,5.0,12.0,0,0,1
winslow,arizona,"Entire home in Winslow, Arizona, United States",95.0,5.0,249.31,,14.0,0,0,1
holbrook,arizona,"Campsite in Holbrook, Arizona, United States",13.0,4.72,249.31,6.0,12.0,0,0,1
holbrook,arizona,"Campsite in Holbrook, Arizona, United States",13.0,4.72,249.31,,14.0,0,0,1
baker city,oregon,"Entire home in Baker City, Oregon, United States",150.0,4.97,249.31,,25.0,0,0,1
butte,montana,"Room in Butte, Montana, United States",732.0,0.0,249.31,,12.0,0,0,1
baker city,oregon,"Entire cottage in Baker City, Oregon, United States",92.0,4.74,249.31,,25.0,0,0,1
houston,texas,"Entire home in Houston, Texas, United States",6233.0,5.0,1148.17,296.0,63.0,1,1,0
houston,texas,"Entire home in Houston, Texas, United States",6233.0,5.0,1148.17,,,1,0,0
houston,texas,"Entire home in Houston, Texas",200.0,4.88,1148.17,296.0,63.0,1,1,0



✓ Risk analysis complete!

Final dataset: airbnb_final (730,949 rows, 65 columns)


In [0]:
from pyspark.sql.functions import col, count, when, isnan, isnull, mean, stddev, min as spark_min, max as spark_max, countDistinct
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

print("="*80)
print("COMPREHENSIVE ANALYSIS: USA ENRICHED AIRBNB DATASET")
print("="*80)

COMPREHENSIVE ANALYSIS: USA ENRICHED AIRBNB DATASET


In [0]:
# =============================================================================
# 1. BASIC DATASET INFORMATION
# =============================================================================

print("\n" + "="*80)
print("1. BASIC DATASET INFORMATION")
print("="*80)

total_rows = airbnb_enriched_df.count()
total_cols = len(airbnb_enriched_df.columns)

print(f"\nDataset: airbnb_enriched_df")
print(f"  Total rows: {total_rows:,}")
print(f"  Total columns: {total_cols}")
print(f"\nColumn names:")
for i, col_name in enumerate(airbnb_enriched_df.columns, 1):
    print(f"  {i:2d}. {col_name}")


1. BASIC DATASET INFORMATION

Dataset: airbnb_enriched_df
  Total rows: 730,949
  Total columns: 61

Column names:
   1. city
   2. state
   3. name
   4. price
   5. image
   6. description
   7. category
   8. availability
   9. discount
  10. reviews
  11. ratings
  12. seller_info
  13. breadcrumbs
  14. location
  15. lat
  16. long
  17. guests
  18. pets_allowed
  19. description_items
  20. category_rating
  21. house_rules
  22. details
  23. highlights
  24. arrangement_details
  25. amenities
  26. images
  27. available_dates
  28. url
  29. final_url
  30. listing_title
  31. property_id
  32. listing_name
  33. location_details
  34. description_by_sections
  35. description_html
  36. location_details_html
  37. is_supperhost
  38. host_number_of_reviews
  39. host_rating
  40. hosts_year
  41. host_response_rate
  42. is_guest_favorite
  43. travel_details
  44. pricing_details
  45. total_price
  46. currency
  47. cancellation_policy
  48. property_number_of_reviews


In [0]:
# =============================================================================
# 2. DATA COMPLETENESS ANALYSIS
# =============================================================================

print("\n" + "="*80)
print("2. DATA COMPLETENESS ANALYSIS")
print("="*80)

print("\nNull/Missing value counts per column:")
print(f"\n{'Column':<35} {'Non-Null':<12} {'Null':<12} {'Null %':<10}")
print("-" * 70)

for col_name in airbnb_enriched_df.columns:
    null_count = airbnb_enriched_df.filter(col(col_name).isNull()).count()
    non_null_count = total_rows - null_count
    null_pct = (null_count / total_rows * 100) if total_rows > 0 else 0
    print(f"{col_name:<35} {non_null_count:<12,} {null_count:<12,} {null_pct:<10.1f}%")


2. DATA COMPLETENESS ANALYSIS

Null/Missing value counts per column:

Column                              Non-Null     Null         Null %    
----------------------------------------------------------------------
city                                730,949      0            0.0       %
state                               730,805      144          0.0       %
name                                730,949      0            0.0       %
price                               730,949      0            0.0       %
image                               730,949      0            0.0       %
description                         729,846      1,103        0.2       %
category                            730,949      0            0.0       %
availability                        730,947      2            0.0       %
discount                            0            730,949      100.0     %
reviews                             680,117      50,832       7.0       %
ratings                             730,948  

In [0]:
# =============================================================================
# 3. NUMERIC COLUMN STATISTICS
# =============================================================================

print("\n" + "="*80)
print("3. NUMERIC COLUMN STATISTICS")
print("="*80)

# Identify numeric columns
numeric_cols = [
    'price', 'ratings', 'property_number_of_reviews', 'total_price',
    'lat', 'long', 'City_Crime_Rate_Per_100K', 'Total_Fatalities',
    'Median_Income', 'Disability_Rate', 'Median_AQI'
]

# Filter to only columns that exist
numeric_cols = [c for c in numeric_cols if c in airbnb_enriched_df.columns]

print("\nSummary statistics for numeric columns:")
print(f"\n{'Column':<35} {'Count':<10} {'Mean':<12} {'Std':<12} {'Min':<12} {'Max':<12}")
print("-" * 95)

for col_name in numeric_cols:
    # Cast to double for statistics
    stats = airbnb_enriched_df.select(
        count(when(col(col_name).isNotNull(), 1)).alias('count'),
        mean(col(col_name).cast('double')).alias('mean'),
        stddev(col(col_name).cast('double')).alias('std'),
        spark_min(col(col_name).cast('double')).alias('min'),
        spark_max(col(col_name).cast('double')).alias('max')
    ).collect()[0]
    
    print(f"{col_name:<35} {stats['count']:<10,} {stats['mean']:<12.2f} {stats['std']:<12.2f} {stats['min']:<12.2f} {stats['max']:<12.2f}")


3. NUMERIC COLUMN STATISTICS

Summary statistics for numeric columns:

Column                              Count      Mean         Std          Min          Max         
-----------------------------------------------------------------------------------------------
price                               730,949    715.42       3896.74      10.00        751966.00   
ratings                             730,948    4.48         1.26         0.00         5.00        
property_number_of_reviews          401,289    64.84        94.89        0.00         3439.00     
total_price                         716,326    1527.52      5563.37      9.00         1234959.00  
lat                                 730,931    35.74        6.51         -90.00       71.29       
long                                730,931    -94.84       19.18        -176.64      155.22      
City_Crime_Rate_Per_100K            730,949    333.16       217.44       249.31       2501.28     
Total_Fatalities                    273,

In [0]:
# =============================================================================
# 4. CATEGORICAL COLUMN DISTRIBUTIONS
# =============================================================================

print("\n" + "="*80)
print("4. CATEGORICAL COLUMN DISTRIBUTIONS")
print("="*80)

# City distribution
print("\nTop 20 cities by listing count:")
city_dist = airbnb_enriched_df.groupBy('city').count().orderBy(col('count').desc()).limit(20)
display(city_dist)

# State distribution
print("\nTop 20 states by listing count:")
state_dist = airbnb_enriched_df.groupBy('state').count().orderBy(col('count').desc()).limit(20)
display(state_dist)

# Superhost distribution
print("\nSuperhost distribution:")
superhost_dist = airbnb_enriched_df.groupBy('is_supperhost').count().orderBy('is_supperhost')
display(superhost_dist)

# Guest favorite distribution
print("\nGuest favorite distribution:")
guest_fav_dist = airbnb_enriched_df.groupBy('is_guest_favorite').count().orderBy('is_guest_favorite')
display(guest_fav_dist)


4. CATEGORICAL COLUMN DISTRIBUTIONS

Top 20 cities by listing count:


city,count
kissimmee,20589
austin,8669
davenport,7843
miami,6666
dallas,6578
san francisco,6522
panama city beach,6212
las vegas,6200
atlanta,6004
houston,5769



Top 20 states by listing count:


state,count
florida,132498
california,75625
texas,61167
new york,32985
colorado,30462
north carolina,26801
arizona,23215
georgia,22283
washington,21269
tennessee,21159



Superhost distribution:


is_supperhost,count
,2609
False,327098
True,401242



Guest favorite distribution:


is_guest_favorite,count
False,435785
True,295164


In [0]:
# =============================================================================
# 5. PRICE ANALYSIS
# =============================================================================

print("\n" + "="*80)
print("5. PRICE ANALYSIS")
print("="*80)

# Price statistics
price_stats = airbnb_enriched_df.filter(col('price').isNotNull()).select(
    col('price').cast('double')
).summary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max')

print("\nPrice distribution:")
display(price_stats)

# Price by state (top 10)
print("\nAverage price by state (top 10 most expensive):")
price_by_state = airbnb_enriched_df.filter(col('price').isNotNull()).groupBy('state').agg(
    mean(col('price').cast('double')).alias('avg_price'),
    count('*').alias('listing_count')
).orderBy(col('avg_price').desc()).limit(10)
display(price_by_state)


5. PRICE ANALYSIS

Price distribution:


summary,price
count,730949.0
mean,715.4150576442402
stddev,3896.7367493797647
min,10.0
25%,122.0
50%,197.0
75%,350.0
max,751966.0



Average price by state (top 10 most expensive):


state,avg_price,listing_count
los osos,9317.0,1
clearwater,7837.0,1
queens,5968.0,1
united states,4905.724221212121,3300
los angeles,3388.75,4
az,2751.5,2
la,2000.0,1
henderson,1651.26,2
"seacrest,",1648.0,1
auberge resorts collection,1646.0,1


In [0]:
# =============================================================================
# 6. ENVIRONMENTAL DATA ANALYSIS
# =============================================================================

print("\n" + "="*80)
print("6. ENVIRONMENTAL DATA ANALYSIS")
print("="*80)

# Crime rate analysis
print("\nCrime rate statistics (listings with data):")
crime_with_data = airbnb_enriched_df.filter(col('City_Crime_Rate_Per_100K').isNotNull())
print(f"  Listings with crime data: {crime_with_data.count():,}")

crime_stats = crime_with_data.select(
    mean('City_Crime_Rate_Per_100K').alias('mean'),
    stddev('City_Crime_Rate_Per_100K').alias('std'),
    spark_min('City_Crime_Rate_Per_100K').alias('min'),
    spark_max('City_Crime_Rate_Per_100K').alias('max')
).collect()[0]

print(f"  Mean crime rate: {crime_stats['mean']:.2f} per 100K")
print(f"  Std deviation: {crime_stats['std']:.2f}")
print(f"  Min: {crime_stats['min']:.2f}")
print(f"  Max: {crime_stats['max']:.2f}")

# Income analysis
print("\nMedian income statistics (listings with data):")
income_with_data = airbnb_enriched_df.filter(col('Median_Income').isNotNull())
print(f"  Listings with income data: {income_with_data.count():,}")

income_stats = income_with_data.select(
    mean('Median_Income').alias('mean'),
    stddev('Median_Income').alias('std'),
    spark_min('Median_Income').alias('min'),
    spark_max('Median_Income').alias('max')
).collect()[0]

print(f"  Mean median income: ${income_stats['mean']:,.2f}")
print(f"  Std deviation: ${income_stats['std']:,.2f}")
print(f"  Min: ${income_stats['min']:,.2f}")
print(f"  Max: ${income_stats['max']:,.2f}")

# AQI analysis
print("\nAir Quality Index (AQI) statistics (listings with data):")
aqi_with_data = airbnb_enriched_df.filter(col('Median_AQI').isNotNull())
print(f"  Listings with AQI data: {aqi_with_data.count():,}")

aqi_stats = aqi_with_data.select(
    mean('Median_AQI').alias('mean'),
    stddev('Median_AQI').alias('std'),
    spark_min('Median_AQI').alias('min'),
    spark_max('Median_AQI').alias('max')
).collect()[0]

print(f"  Mean AQI: {aqi_stats['mean']:.2f}")
print(f"  Std deviation: {aqi_stats['std']:.2f}")
print(f"  Min: {aqi_stats['min']:.2f}")
print(f"  Max: {aqi_stats['max']:.2f}")

# Fatalities analysis
print("\nTraffic fatalities statistics (listings with data):")
fatalities_with_data = airbnb_enriched_df.filter(col('Total_Fatalities').isNotNull())
print(f"  Listings with fatalities data: {fatalities_with_data.count():,}")

fatalities_stats = fatalities_with_data.select(
    mean('Total_Fatalities').alias('mean'),
    stddev('Total_Fatalities').alias('std'),
    spark_min('Total_Fatalities').alias('min'),
    spark_max('Total_Fatalities').alias('max')
).collect()[0]

print(f"  Mean fatalities: {fatalities_stats['mean']:.2f}")
print(f"  Std deviation: {fatalities_stats['std']:.2f}")
print(f"  Min: {fatalities_stats['min']:.0f}")
print(f"  Max: {fatalities_stats['max']:.0f}")


6. ENVIRONMENTAL DATA ANALYSIS

Crime rate statistics (listings with data):
  Listings with crime data: 730,949
  Mean crime rate: 333.16 per 100K
  Std deviation: 217.44
  Min: 249.31
  Max: 2501.28

Median income statistics (listings with data):
  Listings with income data: 442,580
  Mean median income: $85,565.31
  Std deviation: $60,829.50
  Min: $0.00
  Max: $300,000.00

Air Quality Index (AQI) statistics (listings with data):
  Listings with AQI data: 423,063
  Mean AQI: 41.78
  Std deviation: 12.21
  Min: 3.00
  Max: 90.00

Traffic fatalities statistics (listings with data):
  Listings with fatalities data: 273,329
  Mean fatalities: 31.47
  Std deviation: 60.30
  Min: 1
  Max: 329


In [0]:
# =============================================================================
# 7. GEOGRAPHIC COVERAGE
# =============================================================================

print("\n" + "="*80)
print("7. GEOGRAPHIC COVERAGE")
print("="*80)

num_unique_cities = airbnb_enriched_df.select(countDistinct('city')).collect()[0][0]
num_unique_states = airbnb_enriched_df.select(countDistinct('state')).collect()[0][0]

print(f"\nUnique locations:")
print(f"  Unique cities: {num_unique_cities:,}")
print(f"  Unique states: {num_unique_states:,}")

# Listings with complete environmental data
complete_env_data = airbnb_enriched_df.filter(
    col('City_Crime_Rate_Per_100K').isNotNull() &
    col('Median_Income').isNotNull() &
    col('Median_AQI').isNotNull() &
    col('Total_Fatalities').isNotNull()
).count()

print(f"\nData completeness:")
print(f"  Listings with ALL environmental data: {complete_env_data:,} ({complete_env_data/total_rows*100:.1f}%)")
# print(f"  Listings with ANY environmental data: {matched_listings:,} ({match_rate:.1f}%)")
# print(f"  Listings with NO environmental data: {total_rows - matched_listings:,} ({100-match_rate:.1f}%)")

print("\n" + "="*80)
print("ANALYSIS COMPLETE")
print("="*80)

print(f"\nKey Insights:")
print(f"  • Dataset contains {total_rows:,} USA Airbnb listings across {num_unique_states} states")
print(f"  • {match_rate:.1f}% of listings have environmental data (crime, income, AQI, fatalities)")
print(f"  • Average price: ${float(price_stats.collect()[1][1]):.2f}")
print(f"  • {crime_with_data.count():,} listings have crime rate data")
print(f"  • Geographic coverage spans {num_unique_cities:,} unique cities")


7. GEOGRAPHIC COVERAGE

Unique locations:
  Unique cities: 12,695
  Unique states: 184

Data completeness:
  Listings with ALL environmental data: 70,487 (9.6%)

ANALYSIS COMPLETE

Key Insights:
  • Dataset contains 730,949 USA Airbnb listings across 184 states
  • 21.0% of listings have environmental data (crime, income, AQI, fatalities)
  • Average price: $715.42
  • 730,949 listings have crime rate data
  • Geographic coverage spans 12,695 unique cities


In [0]:
from pyspark.sql.functions import (
    col, log as spark_log, when, size, split, length, lit, 
    regexp_replace, lower, trim, expr
)
from pyspark.sql.types import DoubleType, IntegerType

print("="*80)
print("SCALABLE PRICE ANOMALY DETECTION - OPTION A: BUCKETED LOCAL QUANTILES")
print("="*80)

SCALABLE PRICE ANOMALY DETECTION - OPTION A: BUCKETED LOCAL QUANTILES


In [0]:
print("\n1. Filtering for valid prices...")

# Filter for non-null, positive prices
df_valid = airbnb_enriched_df.filter(
    (col("price").isNotNull()) & 
    (col("price").cast(DoubleType()) > 0)
)

original_count = airbnb_enriched_df.count()
valid_count = df_valid.count()

print(f"   Original rows: {original_count:,}")
print(f"   Valid price rows: {valid_count:,} ({valid_count/original_count*100:.1f}%)")


1. Filtering for valid prices...
   Original rows: 730,949
   Valid price rows: 730,949 (100.0%)


In [0]:
print("\n2. Creating target variable: log(price)...")

df_valid = df_valid.withColumn(
    "log_price",
    spark_log(col("price").cast(DoubleType()))
)

print("   ✓ log_price created")


2. Creating target variable: log(price)...
   ✓ log_price created


In [0]:
print("\n3. Preparing features...")

# Cast coordinates to double
df_valid = df_valid.withColumn("lat", col("lat").cast(DoubleType()))
df_valid = df_valid.withColumn("long", col("long").cast(DoubleType()))

# Create amenities_count
df_valid = df_valid.withColumn(
    "amenities_count",
    when(
        col("amenities").isNotNull(),
        size(split(col("amenities"), ","))
    ).otherwise(0)
)

# Extract room_type from listing_name
df_valid = df_valid.withColumn(
    "room_type",
    when(col("listing_name").contains("Entire"), "entire")
    .when(col("listing_name").contains("Private room"), "private")
    .when(col("listing_name").contains("Shared room"), "shared")
    .otherwise("other")
)

# Extract property_type from listing_name (simplified buckets)
df_valid = df_valid.withColumn(
    "property_type",
    when(col("listing_name").contains("home"), "home")
    .when(col("listing_name").contains("apartment") | col("listing_name").contains("rental unit"), "apartment")
    .when(col("listing_name").contains("condo"), "condo")
    .otherwise("other")
)

# Cast and fill reputation features
df_valid = df_valid.withColumn("ratings", col("ratings").cast(DoubleType()))
df_valid = df_valid.withColumn(
    "property_number_of_reviews",
    when(col("property_number_of_reviews").isNull(), 0)
    .otherwise(col("property_number_of_reviews").cast(IntegerType()))
)
df_valid = df_valid.withColumn("is_supperhost", col("is_supperhost").cast(IntegerType()))

df_valid = df_valid.fillna({
    'ratings': 0.0,
    'property_number_of_reviews': 0,
    'is_supperhost': 0
})

print("   ✓ Features prepared")


3. Preparing features...
   ✓ Features prepared


In [0]:
print("\n4. Creating similarity buckets...")

# Geohash bucket (precision 6 for ~1.2km x 0.6km cells)
# Using a simple lat/long bucketing approach since geohash function may not be available
# Bucket lat/long into ~0.01 degree bins (roughly 1km)
df_valid = df_valid.withColumn(
    "lat_bucket",
    (col("lat") * 100).cast(IntegerType())
)
df_valid = df_valid.withColumn(
    "long_bucket",
    (col("long") * 100).cast(IntegerType())
)

# Combine into geohash-like bucket
df_valid = df_valid.withColumn(
    "geo_bucket",
    expr("concat(lat_bucket, '_', long_bucket)")
)

print("   ✓ Geographic buckets created (precision ~1km)")

# Amenities bucket (0-5, 6-10, 11-20, 20+)
df_valid = df_valid.withColumn(
    "amenities_bucket",
    when(col("amenities_count") <= 5, "0-5")
    .when(col("amenities_count") <= 10, "6-10")
    .when(col("amenities_count") <= 20, "11-20")
    .otherwise("20+")
)

print("   ✓ Amenities buckets created (0-5, 6-10, 11-20, 20+)")

# Create composite similarity bucket
df_valid = df_valid.withColumn(
    "similarity_bucket",
    expr("concat(geo_bucket, '|', room_type, '|', property_type, '|', amenities_bucket)")
)

print("   ✓ Composite similarity bucket created")


4. Creating similarity buckets...
   ✓ Geographic buckets created (precision ~1km)
   ✓ Amenities buckets created (0-5, 6-10, 11-20, 20+)
   ✓ Composite similarity bucket created


In [0]:
print("\n" + "="*80)
print("DATA PREPARATION COMPLETE")
print("="*80)

print(f"\nDataset: df_valid")
print(f"  Rows: {df_valid.count():,}")

print(f"\nSimilarity bucketing:")
print(f"  - Geographic: lat/long bucketed to ~1km cells")
print(f"  - Room type: entire, private, shared, other")
print(f"  - Property type: home, apartment, condo, other")
print(f"  - Amenities: 0-5, 6-10, 11-20, 20+")

num_buckets = df_valid.select("similarity_bucket").distinct().count()
print(f"\nTotal unique similarity buckets: {num_buckets:,}")

avg_bucket_size = valid_count / num_buckets if num_buckets > 0 else 0
print(f"Average listings per bucket: {avg_bucket_size:.1f}")

print("\nSample data:")
display(df_valid.select(
    "property_id", "city", "state", "price", "log_price",
    "geo_bucket", "room_type", "property_type", "amenities_bucket", "similarity_bucket"
).limit(10))

print("\n✓ Ready for local quantile computation!")


DATA PREPARATION COMPLETE

Dataset: df_valid
  Rows: 730,949

Similarity bucketing:
  - Geographic: lat/long bucketed to ~1km cells
  - Room type: entire, private, shared, other
  - Property type: home, apartment, condo, other
  - Amenities: 0-5, 6-10, 11-20, 20+

Total unique similarity buckets: 189,121
Average listings per bucket: 3.9

Sample data:


property_id,city,state,price,log_price,geo_bucket,room_type,property_type,amenities_bucket,similarity_bucket
902367758953614183,benton,tennessee,128.0,4.852030263919617,3517_-8463,other,other,20+,3517_-8463|other|other|20+
48672316,carrabassett valley,maine,225.0,5.41610040220442,4505_-7031,entire,condo,20+,4505_-7031|entire|condo|20+
643423129076681645,bonita springs,florida,88.0,4.477336814478207,2633_-8180,entire,apartment,20+,2633_-8180|entire|apartment|20+
643423129076681645,bonita springs,florida,88.0,4.477336814478207,2633_-8180,entire,apartment,20+,2633_-8180|entire|apartment|20+
20800206,gordonville,pennsylvania,255.0,5.541263545158426,4003_-7610,entire,home,20+,4003_-7610|entire|home|20+
867572983915994824,kittery,maine,225.0,5.41610040220442,4308_-7074,entire,apartment,20+,4308_-7074|entire|apartment|20+
875581428033264239,innsbrook,missouri,264.0,5.575949103146316,3877_-9103,entire,other,20+,3877_-9103|entire|other|20+
791404048769330143,eden,utah,110.0,4.700480365792417,4132_-11182,entire,apartment,20+,4132_-11182|entire|apartment|20+
42024153,auburn,alabama,605.36,6.405823322386266,3264_-8551,other,other,20+,3264_-8551|other|other|20+
42024153,auburn,alabama,605.36,6.405823322386266,3264_-8551,other,other,20+,3264_-8551|other|other|20+



✓ Ready for local quantile computation!


In [0]:
from pyspark.sql.functions import col, expr, count as spark_count, when
from pyspark.sql.functions import mean, stddev, min as spark_min, max as spark_max
from pyspark.sql import Window

print("="*80)
print("STEP 2: COMPUTE LOCAL QUANTILES PER SIMILARITY BUCKET")
print("="*80)

STEP 2: COMPUTE LOCAL QUANTILES PER SIMILARITY BUCKET


In [0]:
# =============================================================================
# 1. Compute 25th percentile of log_price per bucket
# =============================================================================

print("\n1. Computing 25th percentile of log_price per bucket...")
print("   (Using Spark SQL approx_percentile - fully distributed)")

# Group by similarity bucket and compute 25th percentile
bucket_quantiles = df_valid.groupBy("similarity_bucket").agg(
    expr("approx_percentile(log_price, 0.25)").alias("bucket_q25_log_price"),
    spark_count("*").alias("bucket_size")
)

print(f"   ✓ Computed quantiles for {bucket_quantiles.count():,} buckets")

# Show bucket size distribution
print("\nBucket size statistics:")
bucket_stats = bucket_quantiles.select(
    expr("min(bucket_size)").alias("min"),
    expr("approx_percentile(bucket_size, 0.5)").alias("median"),
    expr("avg(bucket_size)").alias("mean"),
    expr("max(bucket_size)").alias("max")
).collect()[0]

print(f"   Min bucket size: {bucket_stats['min']}")
print(f"   Median bucket size: {bucket_stats['median']:.0f}")
print(f"   Mean bucket size: {bucket_stats['mean']:.1f}")
print(f"   Max bucket size: {bucket_stats['max']}")


1. Computing 25th percentile of log_price per bucket...
   (Using Spark SQL approx_percentile - fully distributed)
   ✓ Computed quantiles for 189,121 buckets

Bucket size statistics:
   Min bucket size: 1
   Median bucket size: 2
   Mean bucket size: 3.9
   Max bucket size: 1069


In [0]:
# =============================================================================
# 2. Join quantiles back to listings
# =============================================================================

print("\n2. Joining local quantiles back to listings...")

df_with_quantiles = df_valid.join(
    bucket_quantiles,
    on="similarity_bucket",
    how="left"
)

print("   ✓ Quantiles joined")


2. Joining local quantiles back to listings...
   ✓ Quantiles joined


In [0]:
# =============================================================================
# 3. Compute raw price anomaly score
# =============================================================================

print("\n3. Computing raw price anomaly score...")

# s_price_raw = max(0, q25_bucket - log_price)
df_with_anomaly = df_with_quantiles.withColumn(
    "s_price_raw",
    when(
        (col("bucket_q25_log_price").isNotNull()) &
        (col("bucket_q25_log_price") > col("log_price")),
        col("bucket_q25_log_price") - col("log_price")
    ).otherwise(0.0)
)

print("   ✓ Raw price anomaly computed")


3. Computing raw price anomaly score...
   ✓ Raw price anomaly computed


In [0]:
# =============================================================================
# 4. Statistics on anomalies
# =============================================================================

print("\n4. Analyzing price anomalies...")

anomalies_positive = df_with_anomaly.filter(col("s_price_raw") > 0)
num_anomalies = anomalies_positive.count()
total = df_with_anomaly.count()
pct_anomalies = (num_anomalies / total * 100) if total > 0 else 0

stats = df_with_anomaly.select(
    mean("s_price_raw").alias("mean"),
    stddev("s_price_raw").alias("std"),
    spark_min("s_price_raw").alias("min"),
    spark_max("s_price_raw").alias("max")
).collect()[0]

print(f"   Listings with positive anomaly: {num_anomalies:,} ({pct_anomalies:.1f}%)")
print(f"   Mean raw anomaly: {stats['mean']:.4f}")
print(f"   Std raw anomaly: {stats['std']:.4f}")
print(f"   Max raw anomaly: {stats['max']:.4f}")


4. Analyzing price anomalies...
   Listings with positive anomaly: 62,788 (8.6%)
   Mean raw anomaly: 0.0247
   Std raw anomaly: 0.1286
   Max raw anomaly: 5.5738


In [0]:
# =============================================================================
# Summary
# =============================================================================

print("\n" + "="*80)
print("LOCAL QUANTILE COMPUTATION COMPLETE")
print("="*80)

print(f"\nDataset: df_with_anomaly")
print(f"  Rows: {df_with_anomaly.count():,}")
print(f"  New columns: bucket_q25_log_price, bucket_size, s_price_raw")

print("\nSample data:")
display(df_with_anomaly.select(
    "property_id", "city", "state", "price", "log_price",
    "similarity_bucket", "bucket_q25_log_price", "bucket_size", "s_price_raw"
).limit(10))

print("\nTop 10 anomalies (most underpriced):")
display(df_with_anomaly.orderBy(col("s_price_raw").desc()).select(
    "property_id", "city", "state", "price", "log_price",
    "bucket_q25_log_price", "s_price_raw", "bucket_size"
).limit(10))

print("\n✓ Ready for normalization!")


LOCAL QUANTILE COMPUTATION COMPLETE

Dataset: df_with_anomaly
  Rows: 730,949
  New columns: bucket_q25_log_price, bucket_size, s_price_raw

Sample data:


property_id,city,state,price,log_price,similarity_bucket,bucket_q25_log_price,bucket_size,s_price_raw
902367758953614183,benton,tennessee,128.0,4.852030263919617,3517_-8463|other|other|20+,4.852030263919617,2,0.0
48672316,carrabassett valley,maine,225.0,5.41610040220442,4505_-7031|entire|condo|20+,5.0238805208462765,12,0.0
42024153,auburn,alabama,605.36,6.405823322386266,3264_-8551|other|other|20+,6.405823322386266,2,0.0
42024153,auburn,alabama,605.36,6.405823322386266,3264_-8551|other|other|20+,6.405823322386266,2,0.0
643423129076681645,bonita springs,florida,88.0,4.477336814478207,2633_-8180|entire|apartment|20+,4.204692619390966,4,0.0
643423129076681645,bonita springs,florida,88.0,4.477336814478207,2633_-8180|entire|apartment|20+,4.204692619390966,4,0.0
867572983915994824,kittery,maine,225.0,5.41610040220442,4308_-7074|entire|apartment|20+,5.41610040220442,1,0.0
791404048769330143,eden,utah,110.0,4.700480365792417,4132_-11182|entire|apartment|20+,4.859812404361672,13,0.1593320385692553
875581428033264239,innsbrook,missouri,264.0,5.575949103146316,3877_-9103|entire|other|20+,5.575949103146316,3,0.0
20800206,gordonville,pennsylvania,255.0,5.541263545158426,4003_-7610|entire|home|20+,5.236441962829949,5,0.0



Top 10 anomalies (most underpriced):


property_id,city,state,price,log_price,bucket_q25_log_price,s_price_raw,bucket_size
47890017,new york,united states,18.0,2.8903717578961645,8.464214266625351,5.573842508729187,59
34749828,beverly hills,california,30.0,3.4011973816621555,8.797095076549056,5.3958976948869,6
711469173030352391,orlando,florida,90.0,4.499809670330265,8.879611609982035,4.37980193965177,38
711469173030352391,orlando,florida,90.0,4.499809670330265,8.879611609982035,4.37980193965177,38
1233533517487378591,jersey city,new jersey,115.0,4.74493212836325,8.989943046329998,4.245010917966748,50
1233533517487378591,jersey city,new jersey,115.0,4.74493212836325,8.989943046329998,4.245010917966748,50
2977386,beverly hills,california,10.0,2.302585092994046,6.415096959171596,4.11251186617755,6
1177414235621227668,jersey city,new jersey,150.0,5.010635294096256,8.989943046329998,3.9793077522337423,50
1177414235621227668,jersey city,new jersey,150.0,5.010635294096256,8.989943046329998,3.9793077522337423,50
1156886612042523980,new york,united states,95.0,4.553876891600541,8.504107951867582,3.9502310602670407,37



✓ Ready for normalization!


In [0]:
from pyspark.sql.functions import col, exp, log as spark_log, when, least, greatest, percent_rank
from pyspark.sql.functions import mean, stddev, lit
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

print("="*80)
print("STEP 3: NORMALIZE ANOMALY & COMPUTE REPUTATION SIGNALS")
print("="*80)

STEP 3: NORMALIZE ANOMALY & COMPUTE REPUTATION SIGNALS


In [0]:
print("\n1. Normalizing price anomaly score...")

# Create window for percentile rank
window_spec = Window.orderBy(col("s_price_raw"))

# Compute percentile rank (0 to 1)
df_with_price_score = df_with_anomaly.withColumn(
    "s_price",
    percent_rank().over(window_spec)
)

print("   ✓ Price anomaly normalized to [0, 1] using percent_rank()")


1. Normalizing price anomaly score...
   ✓ Price anomaly normalized to [0, 1] using percent_rank()


In [0]:
print("\n2. Computing review count signal...")

# s_reviews = exp(-reviews / tau)
tau = 12.0

df_with_signals = df_with_price_score.withColumn(
    "s_reviews",
    exp(-col("property_number_of_reviews").cast(DoubleType()) / lit(tau))
)

print(f"   ✓ Review signal computed (tau={tau})")


2. Computing review count signal...
   ✓ Review signal computed (tau=12.0)


In [0]:
print("\n3. Computing rating signal...")

# effective_rating = rating × min(1, log(reviews+1) / log(20))
df_with_signals = df_with_signals.withColumn(
    "effective_rating",
    col("ratings") * least(
        lit(1.0),
        spark_log(col("property_number_of_reviews") + 1) / spark_log(lit(20.0))
    )
)

# s_rating = max(0, 4.2 - effective_rating) / 4.2
df_with_signals = df_with_signals.withColumn(
    "s_rating",
    greatest(lit(0.0), lit(4.2) - col("effective_rating")) / lit(4.2)
)

print("   ✓ Rating signal computed")


3. Computing rating signal...
   ✓ Rating signal computed


In [0]:
print("\n4. Computing superhost signal...")

# s_superhost = 1 if not superhost, 0 if superhost
df_with_signals = df_with_signals.withColumn(
    "s_superhost",
    when(col("is_supperhost") == 0, 1.0).otherwise(0.0)
)

print("   ✓ Superhost signal computed")


4. Computing superhost signal...
   ✓ Superhost signal computed


In [0]:
print("\n5. Analyzing risk signals...")

signal_stats = df_with_signals.select(
    mean("s_price").alias("s_price_mean"),
    stddev("s_price").alias("s_price_std"),
    mean("s_reviews").alias("s_reviews_mean"),
    stddev("s_reviews").alias("s_reviews_std"),
    mean("s_rating").alias("s_rating_mean"),
    stddev("s_rating").alias("s_rating_std"),
    mean("s_superhost").alias("s_superhost_mean")
).collect()[0]

print("\nRisk signal statistics:")
print(f"\n  s_price (price anomaly):")
print(f"    Mean: {signal_stats['s_price_mean']:.4f}")
print(f"    Std:  {signal_stats['s_price_std']:.4f}")

print(f"\n  s_reviews (review count):")
print(f"    Mean: {signal_stats['s_reviews_mean']:.4f}")
print(f"    Std:  {signal_stats['s_reviews_std']:.4f}")

print(f"\n  s_rating (rating quality):")
print(f"    Mean: {signal_stats['s_rating_mean']:.4f}")
print(f"    Std:  {signal_stats['s_rating_std']:.4f}")

print(f"\n  s_superhost:")
print(f"    Mean: {signal_stats['s_superhost_mean']:.4f}")


5. Analyzing risk signals...

Risk signal statistics:

  s_price (price anomaly):
    Mean: 0.0822
    Std:  0.2683

  s_reviews (review count):
    Mean: 0.5952
    Std:  0.4421

  s_rating (rating quality):
    Mean: 0.5298
    Std:  0.4731

  s_superhost:
    Mean: 1.0000


In [0]:
from pyspark.sql.functions import col, lit, percent_rank, when, mean, min as spark_min, max as spark_max
from pyspark.sql.window import Window

print("="*80)
print("STEP 4: FINAL RISK SCORE, RANKING, AND FLAGGING")
print("="*80)

STEP 4: FINAL RISK SCORE, RANKING, AND FLAGGING


In [0]:
print("\n1. Computing weighted final risk score...")

# Weights
w1 = 0.40  # price anomaly
w2 = 0.25  # review count
w3 = 0.25  # rating quality
w4 = 0.10  # superhost

print(f"   Weights: price={w1}, reviews={w2}, rating={w3}, superhost={w4}")

# Compute weighted risk score
df_with_risk = df_with_signals.withColumn(
    "risk_score_raw",
    (lit(w1) * col("s_price")) +
    (lit(w2) * col("s_reviews")) +
    (lit(w3) * col("s_rating")) +
    (lit(w4) * col("s_superhost"))
)

print("   ✓ Raw risk score computed")


1. Computing weighted final risk score...
   Weights: price=0.4, reviews=0.25, rating=0.25, superhost=0.1
   ✓ Raw risk score computed


In [0]:
print("\n2. Normalizing risk score to [0, 1]...")

# Get min and max for normalization
risk_stats = df_with_risk.select(
    spark_min("risk_score_raw").alias("min"),
    spark_max("risk_score_raw").alias("max")
).collect()[0]

min_risk = risk_stats['min']
max_risk = risk_stats['max']

print(f"   Risk score range: [{min_risk:.4f}, {max_risk:.4f}]")

# Normalize
df_with_risk = df_with_risk.withColumn(
    "final_risk_score",
    (col("risk_score_raw") - lit(min_risk)) / lit(max_risk - min_risk)
)

print("   ✓ Risk score normalized")


2. Normalizing risk score to [0, 1]...
   Risk score range: [0.1000, 1.0000]
   ✓ Risk score normalized


In [0]:
print("\n3. Computing risk percentile rank...")

# Create window for percentile rank
risk_window = Window.orderBy(col("final_risk_score"))

df_with_risk = df_with_risk.withColumn(
    "risk_rank_percentile",
    percent_rank().over(risk_window)
)

print("   ✓ Risk percentile computed")


3. Computing risk percentile rank...
   ✓ Risk percentile computed


In [0]:
print("\n4. Flagging high-risk listings...")

# Flag top 10%
df_final = df_with_risk.withColumn(
    "high_risk",
    when(col("risk_rank_percentile") >= 0.90, True).otherwise(False)
)

# Count high-risk listings
high_risk_count = df_final.filter(col("high_risk") == True).count()
total_count = df_final.count()
pct_high_risk = (high_risk_count / total_count * 100) if total_count > 0 else 0

print(f"   ✓ Flagged {high_risk_count:,} listings as high-risk ({pct_high_risk:.1f}%)")


4. Flagging high-risk listings...
   ✓ Flagged 37,859 listings as high-risk (5.2%)


In [0]:
# DBTITLE 1,Verify All Columns Preserved
print("\n" + "="*80)
print("FINAL COLUMN VERIFICATION")
print("="*80)

print(f"\nColumn count progression:")
print(f"  airbnb_df (original):        {len(airbnb_df.columns)} columns")
print(f"  usa_df (after parsing):      {len(usa_df.columns)} columns")
print(f"  airbnb_enriched_df:          {len(airbnb_enriched_df.columns)} columns")
print(f"  df_valid (filtered):         {len(df_valid.columns)} columns")
print(f"  df_final (with anomalies):   {len(df_final.columns)} columns")

print(f"\nAll columns in df_final:")
for i, col_name in enumerate(df_final.columns, 1):
    print(f"  {i:2d}. {col_name}")

# Verify original columns are still there
original_cols = set(airbnb_df.columns)
final_cols = set(df_final.columns)
missing_cols = original_cols - final_cols

if missing_cols:
    print(f"\n⚠️  WARNING: {len(missing_cols)} original columns were dropped!")
    for col_name in sorted(missing_cols):
        print(f"    - {col_name}")
else:
    print(f"\n✅ SUCCESS: All {len(original_cols)} original columns are preserved in df_final!")
    
added_cols = final_cols - original_cols
print(f"\nNew columns added during processing: {len(added_cols)}")
for col_name in sorted(added_cols):
    print(f"  + {col_name}")


FINAL COLUMN VERIFICATION

Column count progression:
  airbnb_df (original):        52 columns
  usa_df (after parsing):      55 columns
  airbnb_enriched_df:          61 columns
  df_valid (filtered):         70 columns
  df_final (with anomalies):   82 columns

All columns in df_final:
   1. similarity_bucket
   2. city
   3. state
   4. name
   5. price
   6. image
   7. description
   8. category
   9. availability
  10. discount
  11. reviews
  12. ratings
  13. seller_info
  14. breadcrumbs
  15. location
  16. lat
  17. long
  18. guests
  19. pets_allowed
  20. description_items
  21. category_rating
  22. house_rules
  23. details
  24. highlights
  25. arrangement_details
  26. amenities
  27. images
  28. available_dates
  29. url
  30. final_url
  31. listing_title
  32. property_id
  33. listing_name
  34. location_details
  35. description_by_sections
  36. description_html
  37. location_details_html
  38. is_supperhost
  39. host_number_of_reviews
  40. host_rating
  4

In [0]:
print("\n5. Creating final output DataFrame...")

# Select output columns
output_df = df_final.select(
    col("property_id").alias("listing_id"),
    col("city"),
    col("state"),
    col("price"),
    col("log_price"),
    col("bucket_q25_log_price").alias("predicted_q25_price"),
    col("s_price"),
    col("s_reviews"),
    col("s_rating"),
    col("s_superhost"),
    col("final_risk_score"),
    col("risk_rank_percentile"),
    col("high_risk").alias("high_risk_flag"),
    col("similarity_bucket"),
    col("bucket_size")
)

print(f"   ✓ Output DataFrame created: {output_df.count():,} rows")


5. Creating final output DataFrame...
   ✓ Output DataFrame created: 730,949 rows


In [0]:
print("\n" + "="*80)
print("ANOMALY DETECTION COMPLETE")
print("="*80)

print(f"\nFinal dataset: output_df")
print(f"  Total listings: {total_count:,}")
print(f"  High-risk listings: {high_risk_count:,} ({pct_high_risk:.1f}%)")

print("\nRisk score distribution:")
risk_dist = output_df.select(
    spark_min("final_risk_score").alias("min"),
    mean("final_risk_score").alias("mean"),
    spark_max("final_risk_score").alias("max")
).collect()[0]

print(f"  Min:  {risk_dist['min']:.4f}")
print(f"  Mean: {risk_dist['mean']:.4f}")
print(f"  Max:  {risk_dist['max']:.4f}")

print("\nTop 20 highest-risk listings:")
display(output_df.orderBy(col("final_risk_score").desc()).limit(20))

print("\nHigh-risk listings only (sample):")
display(output_df.filter(col("high_risk_flag") == True).orderBy(col("final_risk_score").desc()).limit(20))

print("\n✓ Analysis complete!")
print("\nOutput DataFrame 'output_df' contains:")
print("  - listing_id, city, state, price")
print("  - predicted_q25_price (local 25th percentile from bucket)")
print("  - s_price, s_reviews, s_rating, s_superhost")
print("  - final_risk_score, risk_rank_percentile, high_risk_flag")
print("  - similarity_bucket, bucket_size (for explainability)")


ANOMALY DETECTION COMPLETE

Final dataset: output_df
  Total listings: 730,949
  High-risk listings: 37,859 (5.2%)

Risk score distribution:
  Min:  0.0000
  Mean: 0.3490
  Max:  1.0000

Top 20 highest-risk listings:


listing_id,city,state,price,log_price,predicted_q25_price,s_price,s_reviews,s_rating,s_superhost,final_risk_score,risk_rank_percentile,high_risk_flag,similarity_bucket,bucket_size
47890017,new york,united states,18.0,2.8903717578961645,8.464214266625351,1.0,1.0,1.0,1.0,1.0,1.0,True,4076_-7398|entire|apartment|20+,59
34749828,beverly hills,california,30.0,3.4011973816621555,8.797095076549056,0.9999986319136244,1.0,1.0,1.0,0.9999993919616108,0.9999986319136244,True,3414_-11841|entire|other|20+,6
711469173030352391,orlando,florida,90.0,4.499809670330265,8.879611609982035,0.9999958957408734,1.0,1.0,1.0,0.9999981758848324,0.9999958957408734,True,2838_-8147|private|other|20+,38
711469173030352391,orlando,florida,90.0,4.499809670330265,8.879611609982035,0.9999958957408734,1.0,1.0,1.0,0.9999981758848324,0.9999958957408734,True,2838_-8147|private|other|20+,38
1233533517487378591,jersey city,new jersey,115.0,4.74493212836325,8.989943046329998,0.9999931595681224,1.0,1.0,1.0,0.9999969598080544,0.9999931595681224,True,4072_-7403|entire|apartment|20+,50
1233533517487378591,jersey city,new jersey,115.0,4.74493212836325,8.989943046329998,0.9999931595681224,1.0,1.0,1.0,0.9999969598080544,0.9999931595681224,True,4072_-7403|entire|apartment|20+,50
2977386,beverly hills,california,10.0,2.302585092994046,6.415096959171596,0.999991791481747,1.0,1.0,1.0,0.9999963517696652,0.999991791481747,True,3407_-11838|entire|condo|20+,6
1177414235621227668,jersey city,new jersey,150.0,5.010635294096256,8.989943046329998,0.999989055308996,1.0,1.0,1.0,0.9999951356928872,0.999989055308996,True,4072_-7403|entire|apartment|20+,50
1177414235621227668,jersey city,new jersey,150.0,5.010635294096256,8.989943046329998,0.999989055308996,1.0,1.0,1.0,0.9999951356928872,0.999989055308996,True,4072_-7403|entire|apartment|20+,50
1156886612042523980,new york,united states,95.0,4.553876891600541,8.504107951867582,0.9999876872226204,1.0,1.0,1.0,0.999994527654498,0.9999876872226204,True,4073_-7398|entire|apartment|20+,37



High-risk listings only (sample):


listing_id,city,state,price,log_price,predicted_q25_price,s_price,s_reviews,s_rating,s_superhost,final_risk_score,risk_rank_percentile,high_risk_flag,similarity_bucket,bucket_size
47890017,new york,united states,18.0,2.8903717578961645,8.464214266625351,1.0,1.0,1.0,1.0,1.0,1.0,True,4076_-7398|entire|apartment|20+,59
34749828,beverly hills,california,30.0,3.4011973816621555,8.797095076549056,0.9999986319136244,1.0,1.0,1.0,0.9999993919616108,0.9999986319136244,True,3414_-11841|entire|other|20+,6
711469173030352391,orlando,florida,90.0,4.499809670330265,8.879611609982035,0.9999958957408734,1.0,1.0,1.0,0.9999981758848324,0.9999958957408734,True,2838_-8147|private|other|20+,38
711469173030352391,orlando,florida,90.0,4.499809670330265,8.879611609982035,0.9999958957408734,1.0,1.0,1.0,0.9999981758848324,0.9999958957408734,True,2838_-8147|private|other|20+,38
1233533517487378591,jersey city,new jersey,115.0,4.74493212836325,8.989943046329998,0.9999931595681224,1.0,1.0,1.0,0.9999969598080544,0.9999931595681224,True,4072_-7403|entire|apartment|20+,50
1233533517487378591,jersey city,new jersey,115.0,4.74493212836325,8.989943046329998,0.9999931595681224,1.0,1.0,1.0,0.9999969598080544,0.9999931595681224,True,4072_-7403|entire|apartment|20+,50
2977386,beverly hills,california,10.0,2.302585092994046,6.415096959171596,0.999991791481747,1.0,1.0,1.0,0.9999963517696652,0.999991791481747,True,3407_-11838|entire|condo|20+,6
1177414235621227668,jersey city,new jersey,150.0,5.010635294096256,8.989943046329998,0.999989055308996,1.0,1.0,1.0,0.9999951356928872,0.999989055308996,True,4072_-7403|entire|apartment|20+,50
1177414235621227668,jersey city,new jersey,150.0,5.010635294096256,8.989943046329998,0.999989055308996,1.0,1.0,1.0,0.9999951356928872,0.999989055308996,True,4072_-7403|entire|apartment|20+,50
1156886612042523980,new york,united states,95.0,4.553876891600541,8.504107951867582,0.9999876872226204,1.0,1.0,1.0,0.999994527654498,0.9999876872226204,True,4073_-7398|entire|apartment|20+,37



✓ Analysis complete!

Output DataFrame 'output_df' contains:
  - listing_id, city, state, price
  - predicted_q25_price (local 25th percentile from bucket)
  - s_price, s_reviews, s_rating, s_superhost
  - final_risk_score, risk_rank_percentile, high_risk_flag
  - similarity_bucket, bucket_size (for explainability)


# Create Features for Machine Learning and Recommendation Algorithm


In [0]:
try:
    load_path = "dbfs:/Workspace/Users/eliezerm@campus.technion.ac.il/eliezer_data"
    df_reloaded = spark.read.parquet(load_path)
except:
    pass

In [0]:
try:  
    # COMMAND ----------
    # DBTITLE 1,Imports and Configuration

    from pyspark.sql.functions import (
        col, lit, when, coalesce, create_map, array,
        isnan, isnull, lower, trim, to_json, size,
        concat_ws, struct, regexp_replace,
        # ADD THESE NEW IMPORTS:
        length, split, regexp_extract, 
        expr, udf, array_contains
    )
    from pyspark.sql.types import (
        DoubleType, StringType, IntegerType,
        # ADD THIS:
        FloatType
    )

    print("="*80)
    print("COMPREHENSIVE FEATURES VECTOR PIPELINE")
    print("="*80)
    print("Extracts ALL features into a single dictionary per listing")
    print("="*80)

    # COMMAND ----------
    # DBTITLE 1,Define All Feature Categories

    # =============================================================================
    # TEXT/DESCRIPTION FEATURES (9)
    # =============================================================================
    TEXT_DESCRIPTION_FEATURES = [
        "description",
        "description_html",
        "description_by_sections",
        "description_items",
        "highlights",
        "house_rules",
        "listing_name",
        "listing_title",
        "details",
    ]

    # =============================================================================
    # LOCATION FEATURES (6)
    # =============================================================================
    LOCATION_FEATURES = [
        "country",
        "country_parsed",
        "city",
        "state",
        "county",
        "location",
    ]

    # =============================================================================
    # CATEGORICAL/BUCKETING RAW FEATURES (2)
    # =============================================================================
    CATEGORICAL_RAW_FEATURES = [
        "category",
        "category_rating",
    ]

    # =============================================================================
    # PRICE/DISCOUNT FEATURES (2)
    # =============================================================================
    PRICE_DISCOUNT_FEATURES = [
        "price",
        "discount",
    ]

    # =============================================================================
    # REVIEW FEATURES (1)
    # =============================================================================
    REVIEW_FEATURES = [
        "reviews",
    ]

    # =============================================================================
    # URL/METADATA FEATURES (5)
    # =============================================================================
    URL_METADATA_FEATURES = [
        "url",
        "final_url",
        "postcode_map_url",
        "breadcrumbs",
        "arrangement_details",
    ]

    # =============================================================================
    # MEDIA FEATURES (3)
    # =============================================================================
    MEDIA_FEATURES = [
        "image",
        "images",
        "host_image",
    ]

    # =============================================================================
    # HOST/SELLER FEATURES (3)
    # =============================================================================
    HOST_SELLER_FEATURES = [
        "host_details",
        "seller_info",
        "name",
    ]

    # =============================================================================
    # OTHER FEATURES (3)
    # =============================================================================
    OTHER_FEATURES = [
        "currency",
        "amenities",
        "travel_details",
    ]

    # =============================================================================
    # NUMERICAL FEATURES (30) - From Original Pipeline
    # =============================================================================
    NUMERICAL_FEATURES = [
        ("price_numeric", 0.0),
        ("total_price", 0.0),
        ("log_price", 0.0),
        ("bucket_q25_log_price", 0.0),
        ("lat", 0.0),
        ("long", 0.0),
        ("guests", 0.0),
        ("amenities_count", 0.0),
        ("bucket_size", 0.0),
        ("ratings", 0.0),
        ("property_number_of_reviews", 0.0),
        ("effective_rating", 0.0),
        ("host_number_of_reviews", 0.0),
        ("host_rating", 0.0),
        ("hosts_year", 0.0),
        ("host_response_rate", 0.0),
        ("is_supperhost", 0.0),
        ("City_Crime_Rate_Per_100K", 0.0),
        ("Total_Fatalities", 0.0),
        ("Median_Income", 0.0),
        ("Disability_Rate", 0.0),
        ("Median_AQI", 0.0),
        ("s_price_raw", 0.0),
        ("s_price", 0.0),
        ("s_reviews", 0.0),
        ("s_rating", 0.0),
        ("s_superhost", 0.0),
        ("risk_score_raw", 0.0),
        ("final_risk_score", 0.0),
        ("risk_rank_percentile", 0.0),
    ]

    # =============================================================================
    # CATEGORICAL ONE-HOT FEATURES (8 categories -> 24 one-hot features)
    # =============================================================================
    CATEGORICAL_ONEHOT_FEATURES = [
        ("room_type", ["entire", "private", "shared", "other"]),
        ("property_type", ["home", "apartment", "condo", "other"]),
        ("amenities_bucket", ["0-5", "6-10", "11-20", "20+"]),
        ("pets_allowed", ["true", "false"]),
        ("is_guest_favorite", ["true", "false"]),
        ("availability", ["true", "false"]),
        ("high_risk", ["true", "false"]),
        ("price_sanity_flag", ["valid", "unrealistic_low", "null_price", "unknown"]),
    ]

    # Count totals
    total_text = len(TEXT_DESCRIPTION_FEATURES)
    total_location = len(LOCATION_FEATURES)
    total_cat_raw = len(CATEGORICAL_RAW_FEATURES)
    total_price_discount = len(PRICE_DISCOUNT_FEATURES)
    total_review = len(REVIEW_FEATURES)
    total_url = len(URL_METADATA_FEATURES)
    total_media = len(MEDIA_FEATURES)
    total_host = len(HOST_SELLER_FEATURES)
    total_other = len(OTHER_FEATURES)
    total_numerical = len(NUMERICAL_FEATURES)
    total_onehot = sum(len(values) for _, values in CATEGORICAL_ONEHOT_FEATURES)

    print(f"\nFeature Categories Defined:")
    print(f"  Text/Description: {total_text}")
    print(f"  Location: {total_location}")
    print(f"  Categorical Raw: {total_cat_raw}")
    print(f"  Price/Discount: {total_price_discount}")
    print(f"  Review: {total_review}")
    print(f"  URL/Metadata: {total_url}")
    print(f"  Media: {total_media}")
    print(f"  Host/Seller: {total_host}")
    print(f"  Other: {total_other}")
    print(f"  Numerical: {total_numerical}")
    print(f"  Categorical One-Hot: {total_onehot}")
    print(f"\n  TOTAL FEATURES: {total_text + total_location + total_cat_raw + total_price_discount + total_review + total_url + total_media + total_host + total_other + total_numerical + total_onehot}")

    # COMMAND ----------
    # DBTITLE 1,Text-Derived Recommender Features Definition

    # =============================================================================
    # TEXT-DERIVED RECOMMENDER FEATURES (15 NEW)
    # =============================================================================
    # These features are derived from textual analysis of reviews and descriptions
    # to help the recommender system provide better apartment suggestions to users

    TEXT_DERIVED_FEATURES = [
        ("description_word_count", 0.0),
        ("description_quality_score", 0.0),
        ("review_sentiment_positive", 0.0),
        ("review_sentiment_negative", 0.0),
        ("cleanliness_score_text", 0.0),
        ("location_quality_text", 0.0),
        ("host_responsiveness_text", 0.0),
        ("value_for_money_text", 0.0),
        ("family_friendly_score", 0.0),
        ("business_travel_score", 0.0),
        ("luxury_indicator_score", 0.0),
        ("outdoor_space_score", 0.0),
        ("quietness_score", 0.0),
        ("accessibility_score", 0.0),
        ("pet_friendliness_text", 0.0),
    ]

    # Keyword dictionaries for text analysis
    POSITIVE_SENTIMENT_WORDS = [
        "great", "excellent", "amazing", "wonderful", "fantastic", "perfect", 
        "lovely", "beautiful", "clean", "comfortable", "cozy", "spacious",
        "friendly", "helpful", "responsive", "recommend", "loved", "enjoyed",
        "best", "outstanding", "superb", "brilliant", "awesome", "pleasant"
    ]

    NEGATIVE_SENTIMENT_WORDS = [
        "dirty", "noisy", "loud", "smell", "smelly", "broken", "old", "worn",
        "uncomfortable", "disappointing", "terrible", "horrible", "awful", "bad",
        "worst", "rude", "unresponsive", "problem", "issue", "complaint", "bugs",
        "cockroach", "mice", "mold", "stain", "unsafe", "dangerous"
    ]

    CLEANLINESS_WORDS = [
        "clean", "spotless", "immaculate", "tidy", "pristine", "fresh",
        "sanitized", "hygienic", "neat", "dirty", "dusty", "stain", "mold"
    ]

    LOCATION_QUALITY_WORDS = [
        "central", "convenient", "walkable", "close", "nearby", "accessible",
        "quiet neighborhood", "safe area", "great location", "perfect location",
        "metro", "subway", "bus stop", "restaurant", "shop", "downtown"
    ]

    HOST_RESPONSIVENESS_WORDS = [
        "responsive", "quick", "fast reply", "helpful host", "great host",
        "communicative", "attentive", "welcoming", "accommodating", "friendly host",
        "easy check-in", "smooth check-in", "flexible"
    ]

    VALUE_WORDS = [
        "value", "worth", "affordable", "reasonable", "good price", "great price",
        "bargain", "deal", "expensive", "overpriced", "cheap", "budget"
    ]

    FAMILY_FRIENDLY_WORDS = [
        "family", "kid", "kids", "child", "children", "baby", "infant", "crib",
        "high chair", "family-friendly", "child-friendly", "safe for kids",
        "playground", "toys", "games"
    ]

    BUSINESS_TRAVEL_WORDS = [
        "wifi", "wi-fi", "workspace", "desk", "work from home", "business",
        "laptop", "office", "quiet", "work-friendly", "fast internet",
        "dedicated workspace", "professional"
    ]

    LUXURY_WORDS = [
        "luxury", "luxurious", "premium", "high-end", "elegant", "designer",
        "upscale", "exclusive", "sophisticated", "stunning", "gorgeous",
        "marble", "modern", "stylish", "chic", "boutique"
    ]

    OUTDOOR_SPACE_WORDS = [
        "balcony", "terrace", "patio", "garden", "yard", "outdoor", "deck",
        "rooftop", "pool", "bbq", "barbecue", "grill", "courtyard", "veranda"
    ]

    QUIETNESS_WORDS = [
        "quiet", "peaceful", "tranquil", "serene", "silent", "calm",
        "noisy", "loud", "noise", "street noise", "traffic", "party"
    ]

    ACCESSIBILITY_WORDS = [
        "accessible", "wheelchair", "elevator", "lift", "ground floor",
        "step-free", "accessible bathroom", "grab bars", "ramp", "disability",
        "mobility", "handicap"
    ]

    PET_FRIENDLY_WORDS = [
        "pet", "pets", "dog", "dogs", "cat", "cats", "pet-friendly",
        "pet friendly", "animals", "furry", "dog park", "pet fee"
    ]

    total_text_derived = len(TEXT_DERIVED_FEATURES)

    print(f"\nText-Derived Recommender Features Defined: {total_text_derived}")
    print("  These features extract signals from reviews and descriptions")
    print("  to improve recommendation quality for users")

    # COMMAND ----------
    # DBTITLE 1,Text/Description Feature Extractors

    def extract_description(df_col):
        """Extract full property description as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_description_html(df_col):
        """Extract HTML version of description as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_description_by_sections(df_col):
        """Extract sectioned description as JSON string"""
        # Column is already a string, so just coalesce with default
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    def extract_description_items(df_col):
        """Extract parsed description items as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    def extract_highlights(df_col):
        """Extract key property highlights as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    def extract_house_rules(df_col):
        """Extract property house rules as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    def extract_listing_name(df_col):
        """Extract property listing name as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_listing_title(df_col):
        """Extract property title as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_details(df_col):
        """Extract additional property details as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    # Registry for text extractors
    TEXT_EXTRACTORS = {
        "description": extract_description,
        "description_html": extract_description_html,
        "description_by_sections": extract_description_by_sections,
        "description_items": extract_description_items,
        "highlights": extract_highlights,
        "house_rules": extract_house_rules,
        "listing_name": extract_listing_name,
        "listing_title": extract_listing_title,
        "details": extract_details,
    }

    print("✓ Text/Description feature extractors defined (9 functions)")

    # COMMAND ----------
    # DBTITLE 1,Location Feature Extractors

    def extract_country(df_col):
        """Extract country name as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_country_parsed(df_col):
        """Extract parsed country as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_city(df_col):
        """Extract city name as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_state(df_col):
        """Extract state name as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_county(df_col):
        """Extract county name as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_location(df_col):
        """Extract location string as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    # Registry for location extractors
    LOCATION_EXTRACTORS = {
        "country": extract_country,
        "country_parsed": extract_country_parsed,
        "city": extract_city,
        "state": extract_state,
        "county": extract_county,
        "location": extract_location,
    }

    print("✓ Location feature extractors defined (6 functions)")

    # COMMAND ----------
    # DBTITLE 1,Categorical Raw Feature Extractors

    def extract_category(df_col):
        """Extract property category as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_category_rating(df_col):
        """Extract category-level rating as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    # Registry for categorical raw extractors
    CATEGORICAL_RAW_EXTRACTORS = {
        "category": extract_category,
        "category_rating": extract_category_rating,
    }

    print("✓ Categorical raw feature extractors defined (2 functions)")

    # COMMAND ----------
    # DBTITLE 1,Price/Discount Feature Extractors

    def extract_price_raw(df_col):
        """Extract raw price as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_discount(df_col):
        """Extract discount amount/percentage as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    # Registry for price/discount extractors
    PRICE_DISCOUNT_EXTRACTORS = {
        "price": extract_price_raw,
        "discount": extract_discount,
    }

    print("✓ Price/Discount feature extractors defined (2 functions)")

    # COMMAND ----------
    # DBTITLE 1,Review Feature Extractors

    def extract_reviews(df_col):
        """Extract full review text/data as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    # Registry for review extractors
    REVIEW_EXTRACTORS = {
        "reviews": extract_reviews,
    }

    print("✓ Review feature extractors defined (1 function)")

    # COMMAND ----------
    # DBTITLE 1,URL/Metadata Feature Extractors

    def extract_url(df_col):
        """Extract original listing URL as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_final_url(df_col):
        """Extract final/processed URL as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_postcode_map_url(df_col):
        """Extract map URL for postcode as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_breadcrumbs(df_col):
        """Extract navigation breadcrumbs as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_arrangement_details(df_col):
        """Extract arrangement/booking details as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    # Registry for URL/metadata extractors
    URL_METADATA_EXTRACTORS = {
        "url": extract_url,
        "final_url": extract_final_url,
        "postcode_map_url": extract_postcode_map_url,
        "breadcrumbs": extract_breadcrumbs,
        "arrangement_details": extract_arrangement_details,
    }

    print("✓ URL/Metadata feature extractors defined (5 functions)")

    # COMMAND ----------
    # DBTITLE 1,Media Feature Extractors

    def extract_image(df_col):
        """Extract property image URL as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_images(df_col):
        """Extract multiple images as JSON array string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    def extract_host_image(df_col):
        """Extract host profile image URL as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    # Registry for media extractors
    MEDIA_EXTRACTORS = {
        "image": extract_image,
        "images": extract_images,
        "host_image": extract_host_image,
    }

    print("✓ Media feature extractors defined (3 functions)")

    # COMMAND ----------
    # DBTITLE 1,Host/Seller Feature Extractors

    def extract_host_details(df_col):
        """Extract detailed host information as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("{}"))

    def extract_seller_info(df_col):
        """Extract seller information as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("{}"))

    def extract_name(df_col):
        """Extract host/listing name as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    # Registry for host/seller extractors
    HOST_SELLER_EXTRACTORS = {
        "host_details": extract_host_details,
        "seller_info": extract_seller_info,
        "name": extract_name,
    }

    print("✓ Host/Seller feature extractors defined (3 functions)")

    # COMMAND ----------
    # DBTITLE 1,Other Feature Extractors

    def extract_currency(df_col):
        """Extract currency code as string"""
        return coalesce(col(df_col).cast(StringType()), lit(""))

    def extract_amenities(df_col):
        """Extract raw amenities data as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("[]"))

    def extract_travel_details(df_col):
        """Extract travel details as JSON string"""
        return coalesce(col(df_col).cast(StringType()), lit("{}"))

    # Registry for other extractors
    OTHER_EXTRACTORS = {
        "currency": extract_currency,
        "amenities": extract_amenities,
        "travel_details": extract_travel_details,
    }

    print("✓ Other feature extractors defined (3 functions)")

    # COMMAND ----------
    # DBTITLE 1,Text-Derived Recommender Feature Extractors

    def count_keyword_matches(text_col, keywords_list):
        """
        Count how many keywords from the list appear in the text.
        Returns a column expression.
        """
        # Build a regex pattern for all keywords (case-insensitive)
        pattern = "|".join([f"\\b{word}\\b" for word in keywords_list])
        # Use regexp_replace to mark matches, then count
        # Alternative: count occurrences using size and split
        lowered = lower(coalesce(text_col, lit("")))
        # Count matches by checking each keyword
        match_count = lit(0)
        for keyword in keywords_list:
            match_count = match_count + when(
                lowered.contains(keyword.lower()), lit(1)
            ).otherwise(lit(0))
        return match_count

    def extract_description_word_count(df):
        """
        Extract word count from description.
        Longer descriptions often provide more useful information to users.
        """
        desc_col = coalesce(col("description").cast(StringType()), lit(""))
        # Count words by splitting on whitespace
        word_count = size(split(trim(desc_col), "\\s+"))
        # Handle empty strings
        return when(trim(desc_col) == "", lit("0.0")).otherwise(word_count.cast(StringType()))

    def extract_description_quality_score(df):
        """
        Score description quality based on length and presence of key info.
        Combines description, highlights, and details for comprehensive score.
        """
        desc = coalesce(col("description").cast(StringType()), lit(""))
        highlights = coalesce(col("highlights").cast(StringType()), lit(""))
        details = coalesce(col("details").cast(StringType()), lit(""))
        
        combined = concat_ws(" ", desc, highlights, details)
        combined_lower = lower(combined)
        
        # Base score from length (0-5 points based on word count)
        word_count = size(split(trim(combined), "\\s+"))
        length_score = when(word_count > 200, lit(5.0)
            ).when(word_count > 100, lit(4.0)
            ).when(word_count > 50, lit(3.0)
            ).when(word_count > 20, lit(2.0)
            ).when(word_count > 0, lit(1.0)
            ).otherwise(lit(0.0))
        
        # Bonus for mentioning key aspects (0-5 points)
        info_score = lit(0.0)
        info_keywords = ["bedroom", "bathroom", "kitchen", "wifi", "parking", 
                        "check-in", "checkout", "location", "amenities", "neighborhood"]
        for kw in info_keywords:
            info_score = info_score + when(combined_lower.contains(kw), lit(0.5)).otherwise(lit(0.0))
        
        total_score = length_score + info_score
        # Normalize to 0-10 scale
        normalized = when(total_score > 10, lit(10.0)).otherwise(total_score)
        
        return normalized.cast(StringType())

    def extract_review_sentiment_positive(df):
        """
        Count positive sentiment indicators in reviews.
        Higher score = more positive guest experiences.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        positive_count = lit(0.0)
        for word in POSITIVE_SENTIMENT_WORDS:
            positive_count = positive_count + when(
                reviews_col.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return positive_count.cast(StringType())

    def extract_review_sentiment_negative(df):
        """
        Count negative sentiment indicators in reviews.
        Higher score = more complaints/issues reported.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        negative_count = lit(0.0)
        for word in NEGATIVE_SENTIMENT_WORDS:
            negative_count = negative_count + when(
                reviews_col.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return negative_count.cast(StringType())

    def extract_cleanliness_score_text(df):
        """
        Extract cleanliness signals from reviews.
        Key factor for user satisfaction.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        # Positive cleanliness words add points
        clean_score = lit(0.0)
        positive_clean = ["clean", "spotless", "immaculate", "tidy", "pristine", "fresh", "sanitized"]
        negative_clean = ["dirty", "dusty", "stain", "mold", "filthy", "grimy"]
        
        for word in positive_clean:
            clean_score = clean_score + when(reviews_col.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        for word in negative_clean:
            clean_score = clean_score - when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        # Normalize: negative scores become 0
        final_score = when(clean_score < 0, lit(0.0)).otherwise(clean_score)
        
        return final_score.cast(StringType())

    def extract_location_quality_text(df):
        """
        Extract location quality signals from reviews and description.
        Helps users find well-located properties.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        location_score = lit(0.0)
        for word in LOCATION_QUALITY_WORDS:
            location_score = location_score + when(
                combined.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return location_score.cast(StringType())

    def extract_host_responsiveness_text(df):
        """
        Extract host responsiveness signals from reviews.
        Good hosts = better guest experience.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        host_score = lit(0.0)
        for word in HOST_RESPONSIVENESS_WORDS:
            host_score = host_score + when(
                reviews_col.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return host_score.cast(StringType())

    def extract_value_for_money_text(df):
        """
        Extract value perception from reviews.
        Helps users find good deals.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        value_score = lit(0.0)
        positive_value = ["value", "worth it", "affordable", "reasonable", "good price", "great price", "bargain", "deal"]
        negative_value = ["expensive", "overpriced", "not worth"]
        
        for word in positive_value:
            value_score = value_score + when(reviews_col.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        for word in negative_value:
            value_score = value_score - when(reviews_col.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        # Keep as-is (can be negative to indicate poor value)
        return value_score.cast(StringType())

    def extract_family_friendly_score(df):
        """
        Extract family-friendliness signals.
        Helps families find suitable properties.
        """
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        combined = concat_ws(" ", desc_col, amenities_col, reviews_col)
        
        family_score = lit(0.0)
        for word in FAMILY_FRIENDLY_WORDS:
            family_score = family_score + when(
                combined.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return family_score.cast(StringType())

    def extract_business_travel_score(df):
        """
        Extract business travel suitability signals.
        Helps business travelers find work-friendly spaces.
        """
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        highlights_col = lower(coalesce(col("highlights").cast(StringType()), lit("")))
        combined = concat_ws(" ", desc_col, amenities_col, highlights_col)
        
        business_score = lit(0.0)
        for word in BUSINESS_TRAVEL_WORDS:
            business_score = business_score + when(
                combined.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return business_score.cast(StringType())

    def extract_luxury_indicator_score(df):
        """
        Extract luxury/premium property signals.
        Helps users seeking upscale accommodations.
        """
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        highlights_col = lower(coalesce(col("highlights").cast(StringType()), lit("")))
        combined = concat_ws(" ", desc_col, highlights_col)
        
        luxury_score = lit(0.0)
        for word in LUXURY_WORDS:
            luxury_score = luxury_score + when(
                combined.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return luxury_score.cast(StringType())

    def extract_outdoor_space_score(df):
        """
        Extract outdoor space availability signals.
        Important for many travelers.
        """
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", desc_col, amenities_col)
        
        outdoor_score = lit(0.0)
        for word in OUTDOOR_SPACE_WORDS:
            outdoor_score = outdoor_score + when(
                combined.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return outdoor_score.cast(StringType())

    def extract_quietness_score(df):
        """
        Extract quietness/noise level signals.
        Net positive = quiet property, negative = noise issues.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        quiet_score = lit(0.0)
        positive_quiet = ["quiet", "peaceful", "tranquil", "serene", "silent", "calm"]
        negative_quiet = ["noisy", "loud", "noise", "street noise", "traffic noise", "party"]
        
        for word in positive_quiet:
            quiet_score = quiet_score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        for word in negative_quiet:
            quiet_score = quiet_score - when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return quiet_score.cast(StringType())

    def extract_accessibility_score(df):
        """
        Extract accessibility feature signals.
        Helps users with mobility needs.
        """
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", desc_col, amenities_col)
        
        accessibility_score = lit(0.0)
        for word in ACCESSIBILITY_WORDS:
            accessibility_score = accessibility_score + when(
                combined.contains(word.lower()), lit(1.0)
            ).otherwise(lit(0.0))
        
        return accessibility_score.cast(StringType())

    def extract_pet_friendliness_text(df):
        """
        Extract pet-friendliness signals beyond the boolean flag.
        Helps pet owners find truly welcoming properties.
        """
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        house_rules_col = lower(coalesce(col("house_rules").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", desc_col, house_rules_col, amenities_col)
        
        pet_score = lit(0.0)
        positive_pet = ["pet friendly", "pet-friendly", "pets welcome", "pets allowed", "dog friendly", "dog park"]
        negative_pet = ["no pets", "no animals", "pet free"]
        
        for word in positive_pet:
            pet_score = pet_score + when(combined.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        # General pet mentions (weaker signal)
        general_pet = ["pet", "dog", "cat"]
        for word in general_pet:
            pet_score = pet_score + when(combined.contains(word), lit(0.5)).otherwise(lit(0.0))
        
        for word in negative_pet:
            pet_score = pet_score - when(combined.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return pet_score.cast(StringType())

    # Registry for text-derived feature extractors
    # These extractors take the full dataframe and return column expressions
    TEXT_DERIVED_EXTRACTORS = {
        "description_word_count": extract_description_word_count,
        "description_quality_score": extract_description_quality_score,
        "review_sentiment_positive": extract_review_sentiment_positive,
        "review_sentiment_negative": extract_review_sentiment_negative,
        "cleanliness_score_text": extract_cleanliness_score_text,
        "location_quality_text": extract_location_quality_text,
        "host_responsiveness_text": extract_host_responsiveness_text,
        "value_for_money_text": extract_value_for_money_text,
        "family_friendly_score": extract_family_friendly_score,
        "business_travel_score": extract_business_travel_score,
        "luxury_indicator_score": extract_luxury_indicator_score,
        "outdoor_space_score": extract_outdoor_space_score,
        "quietness_score": extract_quietness_score,
        "accessibility_score": extract_accessibility_score,
        "pet_friendliness_text": extract_pet_friendliness_text,
    }

    print("✓ Text-Derived Recommender feature extractors defined (15 functions)")

    # COMMAND ----------
    # DBTITLE 1,Numerical Feature Extractors (Existing)

    def extract_numerical_feature(df_col, default_val=0.0):
        """Generic numerical feature extractor - returns string representation"""
        return coalesce(col(df_col).cast(DoubleType()).cast(StringType()), lit(str(default_val)))

    # Create individual extractors for each numerical feature
    def extract_price_numeric(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_total_price(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_log_price(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_bucket_q25_log_price(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_lat(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_long(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_guests(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_amenities_count(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_bucket_size(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_ratings(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_property_number_of_reviews(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_effective_rating(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_host_number_of_reviews(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_host_rating(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_hosts_year(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_host_response_rate(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_is_supperhost(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_city_crime_rate(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_total_fatalities(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_median_income(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_disability_rate(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_median_aqi(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_s_price_raw(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_s_price(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_s_reviews(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_s_rating(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_s_superhost(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_risk_score_raw(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_final_risk_score(df_col):
        return extract_numerical_feature(df_col, 0.0)

    def extract_risk_rank_percentile(df_col):
        return extract_numerical_feature(df_col, 0.0)

    # Registry for numerical extractors
    NUMERICAL_EXTRACTORS = {
        "price_numeric": extract_price_numeric,
        "total_price": extract_total_price,
        "log_price": extract_log_price,
        "bucket_q25_log_price": extract_bucket_q25_log_price,
        "lat": extract_lat,
        "long": extract_long,
        "guests": extract_guests,
        "amenities_count": extract_amenities_count,
        "bucket_size": extract_bucket_size,
        "ratings": extract_ratings,
        "property_number_of_reviews": extract_property_number_of_reviews,
        "effective_rating": extract_effective_rating,
        "host_number_of_reviews": extract_host_number_of_reviews,
        "host_rating": extract_host_rating,
        "hosts_year": extract_hosts_year,
        "host_response_rate": extract_host_response_rate,
        "is_supperhost": extract_is_supperhost,
        "City_Crime_Rate_Per_100K": extract_city_crime_rate,
        "Total_Fatalities": extract_total_fatalities,
        "Median_Income": extract_median_income,
        "Disability_Rate": extract_disability_rate,
        "Median_AQI": extract_median_aqi,
        "s_price_raw": extract_s_price_raw,
        "s_price": extract_s_price,
        "s_reviews": extract_s_reviews,
        "s_rating": extract_s_rating,
        "s_superhost": extract_s_superhost,
        "risk_score_raw": extract_risk_score_raw,
        "final_risk_score": extract_final_risk_score,
        "risk_rank_percentile": extract_risk_rank_percentile,
    }

    print("✓ Numerical feature extractors defined (30 functions)")

    # COMMAND ----------
    # DBTITLE 1,Categorical One-Hot Feature Extractors (Existing)

    # Room Type
    def extract_room_type_entire(df_col):
        return when(lower(trim(col(df_col))) == "entire", lit("1.0")).otherwise(lit("0.0"))

    def extract_room_type_private(df_col):
        return when(lower(trim(col(df_col))) == "private", lit("1.0")).otherwise(lit("0.0"))

    def extract_room_type_shared(df_col):
        return when(lower(trim(col(df_col))) == "shared", lit("1.0")).otherwise(lit("0.0"))

    def extract_room_type_other(df_col):
        return when(lower(trim(col(df_col))) == "other", lit("1.0")).otherwise(lit("0.0"))

    # Property Type
    def extract_property_type_home(df_col):
        return when(lower(trim(col(df_col))) == "home", lit("1.0")).otherwise(lit("0.0"))

    def extract_property_type_apartment(df_col):
        return when(lower(trim(col(df_col))) == "apartment", lit("1.0")).otherwise(lit("0.0"))

    def extract_property_type_condo(df_col):
        return when(lower(trim(col(df_col))) == "condo", lit("1.0")).otherwise(lit("0.0"))

    def extract_property_type_other(df_col):
        return when(lower(trim(col(df_col))) == "other", lit("1.0")).otherwise(lit("0.0"))

    # Amenities Bucket
    def extract_amenities_bucket_0_5(df_col):
        return when(col(df_col) == "0-5", lit("1.0")).otherwise(lit("0.0"))

    def extract_amenities_bucket_6_10(df_col):
        return when(col(df_col) == "6-10", lit("1.0")).otherwise(lit("0.0"))

    def extract_amenities_bucket_11_20(df_col):
        return when(col(df_col) == "11-20", lit("1.0")).otherwise(lit("0.0"))

    def extract_amenities_bucket_20_plus(df_col):
        return when(col(df_col) == "20+", lit("1.0")).otherwise(lit("0.0"))

    # Boolean Features
    def extract_pets_allowed_true(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "true") | (col(df_col) == True),
            lit("1.0")
        ).otherwise(lit("0.0"))

    def extract_pets_allowed_false(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "false") | (col(df_col) == False) | col(df_col).isNull(),
            lit("1.0")
        ).otherwise(lit("0.0"))

    def extract_is_guest_favorite_true(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "true") | (col(df_col) == True),
            lit("1.0")
        ).otherwise(lit("0.0"))

    def extract_is_guest_favorite_false(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "false") | (col(df_col) == False) | col(df_col).isNull(),
            lit("1.0")
        ).otherwise(lit("0.0"))

    def extract_availability_true(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "true") | (col(df_col) == True),
            lit("1.0")
        ).otherwise(lit("0.0"))

    def extract_availability_false(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "false") | (col(df_col) == False) | col(df_col).isNull(),
            lit("1.0")
        ).otherwise(lit("0.0"))

    def extract_high_risk_true(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "true") | (col(df_col) == True),
            lit("1.0")
        ).otherwise(lit("0.0"))

    def extract_high_risk_false(df_col):
        return when(
            (lower(col(df_col).cast(StringType())) == "false") | (col(df_col) == False) | col(df_col).isNull(),
            lit("1.0")
        ).otherwise(lit("0.0"))

    # Price Sanity Flag
    def extract_price_sanity_valid(df_col):
        return when(lower(trim(col(df_col))) == "valid", lit("1.0")).otherwise(lit("0.0"))

    def extract_price_sanity_unrealistic_low(df_col):
        return when(lower(trim(col(df_col))) == "unrealistic_low", lit("1.0")).otherwise(lit("0.0"))

    def extract_price_sanity_null_price(df_col):
        return when(lower(trim(col(df_col))) == "null_price", lit("1.0")).otherwise(lit("0.0"))

    def extract_price_sanity_unknown(df_col):
        return when(
            (lower(trim(col(df_col))) == "unknown") | col(df_col).isNull(),
            lit("1.0")
        ).otherwise(lit("0.0"))

    # Registry for categorical one-hot extractors
    # Format: {output_feature_name: (source_column, extractor_function)}
    CATEGORICAL_ONEHOT_EXTRACTORS = {
        "room_type_entire": ("room_type", extract_room_type_entire),
        "room_type_private": ("room_type", extract_room_type_private),
        "room_type_shared": ("room_type", extract_room_type_shared),
        "room_type_other": ("room_type", extract_room_type_other),
        "property_type_home": ("property_type", extract_property_type_home),
        "property_type_apartment": ("property_type", extract_property_type_apartment),
        "property_type_condo": ("property_type", extract_property_type_condo),
        "property_type_other": ("property_type", extract_property_type_other),
        "amenities_bucket_0_5": ("amenities_bucket", extract_amenities_bucket_0_5),
        "amenities_bucket_6_10": ("amenities_bucket", extract_amenities_bucket_6_10),
        "amenities_bucket_11_20": ("amenities_bucket", extract_amenities_bucket_11_20),
        "amenities_bucket_20_plus": ("amenities_bucket", extract_amenities_bucket_20_plus),
        "pets_allowed_true": ("pets_allowed", extract_pets_allowed_true),
        "pets_allowed_false": ("pets_allowed", extract_pets_allowed_false),
        "is_guest_favorite_true": ("is_guest_favorite", extract_is_guest_favorite_true),
        "is_guest_favorite_false": ("is_guest_favorite", extract_is_guest_favorite_false),
        "availability_true": ("availability", extract_availability_true),
        "availability_false": ("availability", extract_availability_false),
        "high_risk_true": ("high_risk", extract_high_risk_true),
        "high_risk_false": ("high_risk", extract_high_risk_false),
        "price_sanity_valid": ("price_sanity_flag", extract_price_sanity_valid),
        "price_sanity_unrealistic_low": ("price_sanity_flag", extract_price_sanity_unrealistic_low),
        "price_sanity_null_price": ("price_sanity_flag", extract_price_sanity_null_price),
        "price_sanity_unknown": ("price_sanity_flag", extract_price_sanity_unknown),
    }

    print("✓ Categorical one-hot feature extractors defined (24 functions)")

    # COMMAND ----------
    # DBTITLE 1,Validate Features Against DataFrame

    def validate_features_against_df(df, extractors_dict, feature_type=""):
        """
        Validate that source columns exist in the dataframe.
        Returns dict of valid extractors.
        """
        df_columns = set(df.columns)
        valid = {}
        missing = []
        
        for feature_name, extractor in extractors_dict.items():
            # For one-hot extractors, extractor is (source_col, func)
            if isinstance(extractor, tuple):
                source_col = extractor[0]
            else:
                source_col = feature_name
            
            if source_col in df_columns:
                valid[feature_name] = extractor
            else:
                missing.append(source_col)
        
        if missing:
            unique_missing = list(set(missing))
            print(f"  ⚠️  {feature_type}: {len(unique_missing)} source columns not found: {unique_missing[:5]}{'...' if len(unique_missing) > 5 else ''}")
        
        print(f"  ✓ {feature_type}: {len(valid)} valid features")
        return valid

    print("\n" + "="*80)
    print("VALIDATING FEATURES AGAINST df_reloaded")
    print("="*80)

    # Validate all feature categories
    VALID_TEXT = validate_features_against_df(df_reloaded, TEXT_EXTRACTORS, "Text/Description")
    VALID_LOCATION = validate_features_against_df(df_reloaded, LOCATION_EXTRACTORS, "Location")
    VALID_CATEGORICAL_RAW = validate_features_against_df(df_reloaded, CATEGORICAL_RAW_EXTRACTORS, "Categorical Raw")
    VALID_PRICE_DISCOUNT = validate_features_against_df(df_reloaded, PRICE_DISCOUNT_EXTRACTORS, "Price/Discount")
    VALID_REVIEW = validate_features_against_df(df_reloaded, REVIEW_EXTRACTORS, "Review")
    VALID_URL_METADATA = validate_features_against_df(df_reloaded, URL_METADATA_EXTRACTORS, "URL/Metadata")
    VALID_MEDIA = validate_features_against_df(df_reloaded, MEDIA_EXTRACTORS, "Media")
    VALID_HOST_SELLER = validate_features_against_df(df_reloaded, HOST_SELLER_EXTRACTORS, "Host/Seller")
    VALID_OTHER = validate_features_against_df(df_reloaded, OTHER_EXTRACTORS, "Other")
    VALID_NUMERICAL = validate_features_against_df(df_reloaded, NUMERICAL_EXTRACTORS, "Numerical")
    VALID_ONEHOT = validate_features_against_df(df_reloaded, CATEGORICAL_ONEHOT_EXTRACTORS, "Categorical One-Hot")

    # Validate text-derived features (these use multiple source columns)
    def validate_text_derived_features(df):
        """
        Validate text-derived features. These use multiple source columns
        so we check if the key source columns exist.
        """
        df_columns = set(df.columns)
        required_cols = ["description", "reviews", "highlights", "details", "amenities", "house_rules"]
        
        missing = [c for c in required_cols if c not in df_columns]
        available = [c for c in required_cols if c in df_columns]
        
        if missing:
            print(f"  ⚠️  Text-Derived: Some source columns not found: {missing}")
        
        # All text-derived extractors are valid if at least description and reviews exist
        valid_extractors = {}
        if "description" in df_columns or "reviews" in df_columns:
            valid_extractors = TEXT_DERIVED_EXTRACTORS.copy()
        
        print(f"  ✓ Text-Derived: {len(valid_extractors)} features (using: {available})")
        return valid_extractors

    VALID_TEXT_DERIVED = validate_text_derived_features(df_reloaded)

    # COMMAND ----------
    # DBTITLE 1,Build Comprehensive Features Map

    # COMMAND ----------
    # DBTITLE 1,Build Comprehensive Features Map

    def build_comprehensive_features_map(df, 
                                        valid_text,
                                        valid_location,
                                        valid_categorical_raw,
                                        valid_price_discount,
                                        valid_review,
                                        valid_url_metadata,
                                        valid_media,
                                        valid_host_seller,
                                        valid_other,
                                        valid_numerical,
                                        valid_onehot,
                                        valid_text_derived):
        """
        Build a single map column containing ALL features.
        All values are stored as StringType for uniformity.
        
        Returns:
            Column expression for comprehensive features map
        """
        map_args = []
        
        # 1. Text/Description Features
        for feature_name, extractor_func in valid_text.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 2. Location Features
        for feature_name, extractor_func in valid_location.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 3. Categorical Raw Features
        for feature_name, extractor_func in valid_categorical_raw.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 4. Price/Discount Features
        for feature_name, extractor_func in valid_price_discount.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 5. Review Features
        for feature_name, extractor_func in valid_review.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 6. URL/Metadata Features
        for feature_name, extractor_func in valid_url_metadata.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 7. Media Features
        for feature_name, extractor_func in valid_media.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 8. Host/Seller Features
        for feature_name, extractor_func in valid_host_seller.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 9. Other Features
        for feature_name, extractor_func in valid_other.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 10. Numerical Features
        for feature_name, extractor_func in valid_numerical.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(feature_name))
        
        # 11. Categorical One-Hot Features
        for output_name, (source_col, extractor_func) in valid_onehot.items():
            map_args.append(lit(output_name))
            map_args.append(extractor_func(source_col))
        
        # 12. Text-Derived Recommender Features (NEW)
        for feature_name, extractor_func in valid_text_derived.items():
            map_args.append(lit(feature_name))
            map_args.append(extractor_func(df))
        
        # Create the map column
        features_map_col = create_map(*map_args)
        
        return features_map_col

    print("✓ Comprehensive features map builder defined (updated with text-derived features)")

    # COMMAND ----------
    # DBTITLE 1,Create Features Vector DataFrame

    print("\n" + "="*80)
    print("CREATING FEATURES VECTOR DATAFRAME")
    print("="*80)

    # Build the comprehensive features map expression
    print("\n1. Building comprehensive features map expression...")
    features_map_expr = build_comprehensive_features_map(
        df_reloaded,
        VALID_TEXT,
        VALID_LOCATION,
        VALID_CATEGORICAL_RAW,
        VALID_PRICE_DISCOUNT,
        VALID_REVIEW,
        VALID_URL_METADATA,
        VALID_MEDIA,
        VALID_HOST_SELLER,
        VALID_OTHER,
        VALID_NUMERICAL,
        VALID_ONEHOT,
        VALID_TEXT_DERIVED  # NEW PARAMETER
    )
    print("   ✓ Features map expression created")

    # Create the features vector dataframe
    print("\n2. Creating features vector dataframe...")
    df_features_vector = df_reloaded.select(
        col("property_id").alias("listing_id"),
        features_map_expr.alias("features_dict")
    )

    # Cache for performance if needed multiple times
    df_features_vector = df_features_vector.persist()

    print("   ✓ Features vector dataframe created and cached")

    # COMMAND ----------
    # DBTITLE 1,Create Feature Names Mapping

    # Create comprehensive mapping of all feature names
    FEATURE_NAMES_MAPPING = {}
    idx = 0

    # Track categories for reference
    FEATURE_CATEGORIES = {
        "text_description": [],
        "location": [],
        "categorical_raw": [],
        "price_discount": [],
        "review": [],
        "url_metadata": [],
        "media": [],
        "host_seller": [],
        "other": [],
        "numerical": [],
        "categorical_onehot": [],
        "text_derived": [],  # NEW CATEGORY

    }

    # Add features by category
    for feature_name in VALID_TEXT.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["text_description"].append(feature_name)
        idx += 1

    for feature_name in VALID_LOCATION.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["location"].append(feature_name)
        idx += 1

    for feature_name in VALID_CATEGORICAL_RAW.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["categorical_raw"].append(feature_name)
        idx += 1

    for feature_name in VALID_PRICE_DISCOUNT.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["price_discount"].append(feature_name)
        idx += 1

    for feature_name in VALID_REVIEW.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["review"].append(feature_name)
        idx += 1

    for feature_name in VALID_URL_METADATA.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["url_metadata"].append(feature_name)
        idx += 1

    for feature_name in VALID_MEDIA.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["media"].append(feature_name)
        idx += 1

    for feature_name in VALID_HOST_SELLER.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["host_seller"].append(feature_name)
        idx += 1

    for feature_name in VALID_OTHER.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["other"].append(feature_name)
        idx += 1

    for feature_name in VALID_NUMERICAL.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["numerical"].append(feature_name)
        idx += 1

    for feature_name in VALID_ONEHOT.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["categorical_onehot"].append(feature_name)
        idx += 1

    # Add text-derived features to mapping
    for feature_name in VALID_TEXT_DERIVED.keys():
        FEATURE_NAMES_MAPPING[feature_name] = idx
        FEATURE_CATEGORIES["text_derived"] = FEATURE_CATEGORIES.get("text_derived", [])
        FEATURE_CATEGORIES["text_derived"].append(feature_name)
        idx += 1

    print("\nFeature Names Mapping Created:")
    print("-" * 60)
    for category, features in FEATURE_CATEGORIES.items():
        if features:
            print(f"\n{category.upper()} ({len(features)} features):")
            for f in features[:5]:
                print(f"  {FEATURE_NAMES_MAPPING[f]:3d}: {f}")
            if len(features) > 5:
                print(f"  ... and {len(features) - 5} more")

    # COMMAND ----------
    # DBTITLE 1,Verify Output Schema

    print("\n" + "="*80)
    print("OUTPUT VERIFICATION")
    print("="*80)

    print("\nSchema of df_features_vector:")
    df_features_vector.printSchema()

    total_features = len(FEATURE_NAMES_MAPPING)
    print(f"\nTotal features in dictionary: {total_features}")

    print("\nFeatures by category:")
    for category, features in FEATURE_CATEGORIES.items():
        print(f"  {category}: {len(features)}")

    # COMMAND ----------
    # DBTITLE 1,Display Sample Output

    print("\nSample output (first 3 rows):")
    display(df_features_vector.limit(3))

    # Show a single record in detail
    print("\nDetailed view of single record features:")
    sample_row = df_features_vector.limit(1).collect()[0]
    print(f"\nListing ID: {sample_row['listing_id']}")
    print(f"\nFeatures Dictionary ({len(sample_row['features_dict'])} features):")

    # Show sample of each category
    for category, features in FEATURE_CATEGORIES.items():
        if features:
            print(f"\n--- {category.upper()} ---")
            for f in features[:3]:
                value = sample_row['features_dict'].get(f, 'N/A')
                # Truncate long values for display
                if isinstance(value, str) and len(value) > 100:
                    value = value[:100] + "..."
                print(f"  {f}: {value}")
            if len(features) > 3:
                print(f"  ... and {len(features) - 3} more features")

    # COMMAND ----------
    # DBTITLE 1,Summary Statistics

    print("\n" + "="*80)
    print("PIPELINE SUMMARY")
    print("="*80)

    # Count total rows (use action to trigger computation)
    total_rows = df_features_vector.count()

    print(f"""
    Pipeline Configuration:
    - Source DataFrame: df_reloaded
    - Output DataFrame: df_features_vector
    
    Output Schema:
    - listing_id: string (primary key from property_id)
    - features_dict: map<string, string> (dictionary of all feature values)

    Features Extracted ({total_features} total):

    TEXT/DESCRIPTION ({len(FEATURE_CATEGORIES['text_description'])} features):
        {', '.join(FEATURE_CATEGORIES['text_description'][:5])}{'...' if len(FEATURE_CATEGORIES['text_description']) > 5 else ''}

    LOCATION ({len(FEATURE_CATEGORIES['location'])} features):
        {', '.join(FEATURE_CATEGORIES['location'])}

    CATEGORICAL RAW ({len(FEATURE_CATEGORIES['categorical_raw'])} features):
        {', '.join(FEATURE_CATEGORIES['categorical_raw'])}

    PRICE/DISCOUNT ({len(FEATURE_CATEGORIES['price_discount'])} features):
        {', '.join(FEATURE_CATEGORIES['price_discount'])}

    REVIEW ({len(FEATURE_CATEGORIES['review'])} features):
        {', '.join(FEATURE_CATEGORIES['review'])}

    URL/METADATA ({len(FEATURE_CATEGORIES['url_metadata'])} features):
        {', '.join(FEATURE_CATEGORIES['url_metadata'])}

    MEDIA ({len(FEATURE_CATEGORIES['media'])} features):
        {', '.join(FEATURE_CATEGORIES['media'])}

    HOST/SELLER ({len(FEATURE_CATEGORIES['host_seller'])} features):
        {', '.join(FEATURE_CATEGORIES['host_seller'])}

    OTHER ({len(FEATURE_CATEGORIES['other'])} features):
        {', '.join(FEATURE_CATEGORIES['other'])}

    NUMERICAL ({len(FEATURE_CATEGORIES['numerical'])} features):
        Price, Location, Property, Ratings, Host, Environmental, Risk scores

    CATEGORICAL ONE-HOT ({len(FEATURE_CATEGORIES['categorical_onehot'])} features):
        Room type, Property type, Amenities bucket, Boolean flags, Price sanity

        TEXT-DERIVED RECOMMENDER ({len(FEATURE_CATEGORIES.get('text_derived', []))} features):
        Sentiment analysis, cleanliness, location quality, host responsiveness,
        value perception, family/business suitability, luxury indicators,
        outdoor space, quietness, accessibility, pet-friendliness

    Total Listings Processed: {total_rows:,}

    Mappings Available:
    - FEATURE_NAMES_MAPPING: feature_name -> index ({len(FEATURE_NAMES_MAPPING)} entries)
    - FEATURE_CATEGORIES: category -> list of feature names
    """)

    print("✓ Pipeline complete!")

    # COMMAND ----------
    # DBTITLE 1,Output Reference

    # =============================================================================
    # FINAL OUTPUT DATAFRAMES AND MAPPINGS
    # =============================================================================

    # 1. df_features_vector - Main output DataFrame
    #    Schema: listing_id (string), features_dict (map<string, string>)
    #    Contains ALL features: text, location, categorical, numerical, one-hot

    # 2. FEATURE_NAMES_MAPPING - Dict mapping feature names to indices
    #    Usage: FEATURE_NAMES_MAPPING['description'] -> 0
    #           FEATURE_NAMES_MAPPING['price_numeric'] -> 45

    # 3. FEATURE_CATEGORIES - Dict mapping category names to feature lists
    #    Usage: FEATURE_CATEGORIES['numerical'] -> ['price_numeric', 'lat', ...]

    # 4. Valid feature registries by category:
    #    - VALID_TEXT, VALID_LOCATION, VALID_CATEGORICAL_RAW
    #    - VALID_PRICE_DISCOUNT, VALID_REVIEW, VALID_URL_METADATA
    #    - VALID_MEDIA, VALID_HOST_SELLER, VALID_OTHER
    #    - VALID_NUMERICAL, VALID_ONEHOT

    print("="*80)
    print("OUTPUT REFERENCE")
    print("="*80)
    print("\nMain Output:")
    print("  • df_features_vector - DataFrame with listing_id and features_dict")
    print(f"    Rows: {total_rows:,}")
    print(f"    Features per row: {total_features}")

    print("\nMappings:")
    print("  • FEATURE_NAMES_MAPPING - feature_name -> index")
    print("  • FEATURE_CATEGORIES - category -> list of features")

    print("\nData Types in features_dict:")
    print("  • Text fields: stored as strings")
    print("  • Numerical fields: stored as string representation of numbers")
    print("  • Arrays/Lists: stored as JSON strings")
    print("  • Nested objects: stored as JSON strings")
    print("  • One-hot encoded: stored as '1.0' or '0.0'")

    print("\n✓ Ready for downstream processing!")
except:
    pass

In [0]:
try: 
    from pyspark.sql.functions import col, lit, when, lower, coalesce, concat_ws, create_map, map_concat
    from pyspark.sql.types import StringType

    # COMMAND ----------
    # DBTITLE 1,User-Interest Features and Composite Scores - Definitions

    # =============================================================================
    # 30 USER-INTEREST FEATURES
    # =============================================================================
    # These features are specifically designed to help users understand key aspects
    # of the apartment they are considering renting on Airbnb

    USER_INTEREST_FEATURES = [
        # Comfort & Quality Features (1-6)
        ("bed_comfort_score", 0.0),           # Mentions of bed/sleep quality
        ("bathroom_quality_score", 0.0),      # Bathroom condition signals
        ("kitchen_quality_score", 0.0),       # Kitchen usability signals
        ("temperature_control_score", 0.0),   # AC/heating mentions
        ("natural_light_score", 0.0),         # Light/brightness mentions
        ("space_efficiency_score", 0.0),      # Space/room size perception
        
        # Location & Neighborhood Features (7-12)
        ("public_transport_score", 0.0),      # Transit accessibility
        ("walkability_score", 0.0),           # Walking distance to amenities
        ("nightlife_proximity_score", 0.0),   # Bars/clubs/entertainment nearby
        ("restaurant_proximity_score", 0.0),  # Dining options nearby
        ("grocery_proximity_score", 0.0),     # Supermarket/store access
        ("neighborhood_safety_score", 0.0),   # Safety perception from reviews
        
        # Experience & Atmosphere Features (13-18)
        ("view_quality_score", 0.0),          # View mentions (city, ocean, etc.)
        ("privacy_level_score", 0.0),         # Privacy vs shared space
        ("local_authenticity_score", 0.0),    # Local/authentic experience
        ("modern_amenities_score", 0.0),      # Modern/updated features
        ("aesthetic_appeal_score", 0.0),      # Design/style/decor mentions
        ("noise_insulation_score", 0.0),      # Soundproofing quality
        
        # Service & Process Features (19-24)
        ("checkin_ease_score", 0.0),          # Check-in process smoothness
        ("listing_accuracy_score", 0.0),      # Matches description/photos
        ("communication_quality_score", 0.0), # Host communication quality
        ("response_speed_score", 0.0),        # How fast host responds
        ("flexibility_score", 0.0),           # Flexible policies
        ("local_tips_score", 0.0),            # Host provides local recommendations
        
        # Trip Type Suitability Features (25-30)
        ("solo_traveler_score", 0.0),         # Good for solo travelers
        ("couple_romantic_score", 0.0),       # Good for couples/romantic trips
        ("group_gathering_score", 0.0),       # Good for groups/parties
        ("long_stay_score", 0.0),             # Suitable for extended stays
        ("first_time_visitor_score", 0.0),    # Good for first-time visitors
        ("repeat_guest_indicator", 0.0),      # Mentions of return visits
    ]

    # =============================================================================
    # 10 COMPOSITE SCORES
    # =============================================================================
    # These aggregate scores provide high-level insights across multiple dimensions

    COMPOSITE_SCORES = [
        ("overall_quality_score", 0.0),       # Overall apartment quality
        ("location_convenience_score", 0.0),  # Location & accessibility
        ("host_excellence_score", 0.0),       # Host quality & service
        ("value_perception_score", 0.0),      # Value for money
        ("comfort_index_score", 0.0),         # Physical comfort
        ("family_suitability_score", 0.0),    # Family-friendliness
        ("business_ready_score", 0.0),        # Work/business suitability
        ("social_experience_score", 0.0),     # Social/entertainment
        ("relaxation_wellness_score", 0.0),   # Peace & relaxation
        ("local_immersion_score", 0.0),       # Authentic local experience
    ]

    print("="*80)
    print("USER-INTEREST FEATURES AND COMPOSITE SCORES")
    print("="*80)
    print(f"User-Interest Features: {len(USER_INTEREST_FEATURES)}")
    print(f"Composite Scores: {len(COMPOSITE_SCORES)}")
    print(f"Total New Features: {len(USER_INTEREST_FEATURES) + len(COMPOSITE_SCORES)}")

    # COMMAND ----------
    # DBTITLE 1,Keyword Dictionaries for User-Interest Features

    # =============================================================================
    # KEYWORD DICTIONARIES FOR FEATURE EXTRACTION
    # =============================================================================

    # Comfort & Quality Keywords
    BED_COMFORT_WORDS_POS = ["comfortable bed", "great sleep", "slept well", "comfy bed", 
                            "soft mattress", "quality mattress", "cozy bed", "restful sleep",
                            "good night sleep", "plush", "memory foam"]
    BED_COMFORT_WORDS_NEG = ["uncomfortable bed", "hard mattress", "bad sleep", "couldn't sleep",
                            "lumpy", "squeaky bed", "thin mattress", "back pain"]

    BATHROOM_WORDS_POS = ["clean bathroom", "nice bathroom", "great shower", "good water pressure",
                        "modern bathroom", "spacious bathroom", "hot water", "fresh towels",
                        "bathroom amenities", "toiletries provided"]
    BATHROOM_WORDS_NEG = ["dirty bathroom", "mold", "no hot water", "weak shower", "small bathroom",
                        "bathroom issues", "plumbing problems", "broken toilet"]

    KITCHEN_WORDS_POS = ["full kitchen", "well-equipped kitchen", "cooking", "great kitchen",
                        "modern kitchen", "kitchen amenities", "pots and pans", "coffee maker",
                        "fully stocked", "appliances", "dishwasher"]
    KITCHEN_WORDS_NEG = ["no kitchen", "limited kitchen", "dirty kitchen", "missing utensils",
                        "broken appliances", "no cooking"]

    TEMPERATURE_WORDS_POS = ["great ac", "good air conditioning", "comfortable temperature",
                            "central air", "heating works", "warm in winter", "cool in summer",
                            "climate control", "thermostat"]
    TEMPERATURE_WORDS_NEG = ["no ac", "broken ac", "too hot", "too cold", "no heating",
                            "freezing", "sweltering", "stuffy"]

    NATURAL_LIGHT_WORDS = ["bright", "natural light", "sunny", "sunlight", "windows",
                        "light-filled", "airy", "well-lit", "morning light", "sunset view"]

    SPACE_WORDS_POS = ["spacious", "roomy", "large", "plenty of space", "open floor plan",
                    "generous space", "big rooms", "lots of room"]
    SPACE_WORDS_NEG = ["cramped", "tiny", "small", "tight", "compact", "no space", "cluttered"]

    # Location & Neighborhood Keywords
    TRANSPORT_WORDS = ["metro", "subway", "bus stop", "train station", "public transport",
                    "transit", "uber", "lyft", "tram", "bus line", "underground",
                    "easy to get around", "well connected"]

    WALKABILITY_WORDS = ["walkable", "walking distance", "steps away", "walk to", "on foot",
                        "pedestrian", "stroll", "within walking", "easy walk", "short walk"]

    NIGHTLIFE_WORDS = ["bars", "clubs", "nightlife", "nightclub", "pub", "live music",
                    "entertainment", "party area", "vibrant", "lively area", "happening"]

    RESTAURANT_WORDS = ["restaurants", "cafes", "dining", "eateries", "food options",
                        "coffee shops", "brunch", "breakfast spot", "great food nearby",
                        "foodie", "cuisine", "bistro"]

    GROCERY_WORDS = ["grocery", "supermarket", "convenience store", "market", "shopping",
                    "groceries nearby", "food store", "whole foods", "trader joe"]

    SAFETY_WORDS_POS = ["safe", "secure", "safe area", "safe neighborhood", "well-lit streets",
                        "security", "gated", "doorman", "secure building", "felt safe"]
    SAFETY_WORDS_NEG = ["unsafe", "sketchy", "dangerous", "crime", "scary", "wouldn't walk",
                        "shady", "not safe", "security concerns"]

    # Experience & Atmosphere Keywords
    VIEW_WORDS = ["view", "overlook", "skyline", "ocean view", "city view", "mountain view",
                "water view", "scenic", "panoramic", "beautiful view", "stunning view",
                "sunrise", "sunset", "balcony view", "window view"]

    PRIVACY_WORDS_POS = ["private", "privacy", "secluded", "own space", "separate entrance",
                        "private entrance", "no shared", "entire place", "all to yourself"]
    PRIVACY_WORDS_NEG = ["shared", "thin walls", "noise from neighbors", "heard everything",
                        "no privacy", "common area"]

    LOCAL_AUTHENTIC_WORDS = ["local", "authentic", "neighborhood", "hidden gem", "off the beaten path",
                            "local favorite", "like a local", "real experience", "community",
                            "local spots", "neighborhood feel", "not touristy"]

    MODERN_WORDS = ["modern", "updated", "renovated", "new", "contemporary", "sleek",
                    "state of the art", "recently renovated", "brand new", "upgraded"]

    AESTHETIC_WORDS = ["beautiful", "stylish", "decorated", "design", "aesthetic", "instagram",
                    "cute", "charming", "artistic", "gorgeous", "lovely decor", "tasteful"]

    SOUNDPROOF_WORDS_POS = ["quiet", "soundproof", "peaceful", "silent", "no noise"]
    SOUNDPROOF_WORDS_NEG = ["noisy", "loud", "noise", "thin walls", "street noise", "heard neighbors"]

    # Service & Process Keywords
    CHECKIN_WORDS_POS = ["easy check-in", "smooth check-in", "self check-in", "keyless",
                        "straightforward", "simple check-in", "lockbox", "smart lock",
                        "24/7 check-in", "flexible check-in"]
    CHECKIN_WORDS_NEG = ["check-in issues", "late check-in", "difficult check-in",
                        "couldn't find", "problems getting in", "key issues"]

    ACCURACY_WORDS_POS = ["as described", "accurate", "exactly as pictured", "matched photos",
                        "what we expected", "true to listing", "no surprises", "as advertised"]
    ACCURACY_WORDS_NEG = ["not as described", "misleading", "different from photos",
                        "false advertising", "smaller than expected", "disappointment"]

    COMMUNICATION_WORDS_POS = ["great communication", "responsive host", "quick response",
                            "helpful host", "answered questions", "available", "attentive",
                            "excellent communication", "always available"]
    COMMUNICATION_WORDS_NEG = ["no response", "hard to reach", "unresponsive", "slow response",
                            "poor communication", "never replied", "ghosted"]

    FLEXIBILITY_WORDS = ["flexible", "accommodating", "understanding", "early check-in",
                        "late checkout", "luggage storage", "worked with us", "adjusted"]

    LOCAL_TIPS_WORDS = ["recommendations", "local tips", "guidebook", "suggested restaurants",
                        "showed us around", "local knowledge", "insider tips", "helpful guide",
                        "pointed us to", "told us about"]

    # Trip Type Keywords
    SOLO_WORDS = ["solo", "alone", "single traveler", "by myself", "solo trip",
                "perfect for one", "single person", "independent"]

    COUPLE_WORDS = ["romantic", "couples", "honeymoon", "anniversary", "date night",
                    "romantic getaway", "partner", "loved one", "intimate", "cozy for two"]

    GROUP_WORDS = ["group", "friends", "party", "gathering", "celebration", "bachelor",
                "bachelorette", "reunion", "large group", "everyone", "hosted"]

    LONG_STAY_WORDS = ["long stay", "extended stay", "monthly", "long term", "week",
                    "stayed for", "lived here", "home away from home", "settled in"]

    FIRST_TIME_WORDS = ["first time", "first visit", "new to", "tourist", "exploring",
                        "sightseeing", "attractions", "landmarks", "must see"]

    REPEAT_GUEST_WORDS = ["return", "come back", "again", "second time", "every time",
                        "always stay", "regular", "will be back", "definitely return"]

    print("✓ Keyword dictionaries defined for all user-interest features")

    # COMMAND ----------
    # DBTITLE 1,User-Interest Feature Extraction Functions (1-10)

    def extract_bed_comfort_score(df):
        """
        Score indicating bed/sleep comfort quality from reviews.
        Helps users who prioritize good sleep.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in BED_COMFORT_WORDS_POS:
            score = score + when(reviews_col.contains(word), lit(1.5)).otherwise(lit(0.0))
        for word in BED_COMFORT_WORDS_NEG:
            score = score - when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_bathroom_quality_score(df):
        """
        Score indicating bathroom quality and cleanliness.
        Important factor for guest satisfaction.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in BATHROOM_WORDS_POS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        for word in BATHROOM_WORDS_NEG:
            score = score - when(combined.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_kitchen_quality_score(df):
        """
        Score indicating kitchen usability and equipment.
        Critical for guests planning to cook.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col, amenities_col)
        
        score = lit(0.0)
        for word in KITCHEN_WORDS_POS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        for word in KITCHEN_WORDS_NEG:
            score = score - when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_temperature_control_score(df):
        """
        Score indicating climate control quality (AC/heating).
        Essential for comfort in various climates.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, amenities_col)
        
        score = lit(0.0)
        for word in TEMPERATURE_WORDS_POS:
            score = score + when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        for word in TEMPERATURE_WORDS_NEG:
            score = score - when(combined.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_natural_light_score(df):
        """
        Score indicating natural light and brightness.
        Affects mood and livability.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in NATURAL_LIGHT_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_space_efficiency_score(df):
        """
        Score indicating perceived space and room size.
        Helps users understand actual livable space.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in SPACE_WORDS_POS:
            score = score + when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        for word in SPACE_WORDS_NEG:
            score = score - when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_public_transport_score(df):
        """
        Score indicating public transportation accessibility.
        Key for travelers without cars.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in TRANSPORT_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_walkability_score(df):
        """
        Score indicating walking accessibility to amenities.
        Important for exploring on foot.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in WALKABILITY_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_nightlife_proximity_score(df):
        """
        Score indicating proximity to nightlife and entertainment.
        For travelers seeking vibrant nightlife.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in NIGHTLIFE_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_restaurant_proximity_score(df):
        """
        Score indicating proximity to restaurants and cafes.
        For food lovers and convenience seekers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in RESTAURANT_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())

    print("✓ User-interest feature extractors defined (1-10)")

    # COMMAND ----------
    # DBTITLE 1,User-Interest Feature Extraction Functions (11-20)

    def extract_grocery_proximity_score(df):
        """
        Score indicating proximity to grocery stores.
        Essential for longer stays and cooking.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in GROCERY_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_neighborhood_safety_score(df):
        """
        Score indicating perceived neighborhood safety.
        Critical factor for all travelers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in SAFETY_WORDS_POS:
            score = score + when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        for word in SAFETY_WORDS_NEG:
            score = score - when(reviews_col.contains(word), lit(3.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_view_quality_score(df):
        """
        Score indicating view quality from the property.
        Premium feature for many travelers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        highlights_col = lower(coalesce(col("highlights").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col, highlights_col)
        
        score = lit(0.0)
        for word in VIEW_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_privacy_level_score(df):
        """
        Score indicating privacy level of the accommodation.
        Important for guests valuing personal space.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in PRIVACY_WORDS_POS:
            score = score + when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        for word in PRIVACY_WORDS_NEG:
            score = score - when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_local_authenticity_score(df):
        """
        Score indicating authentic local experience potential.
        For travelers seeking genuine cultural immersion.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in LOCAL_AUTHENTIC_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_modern_amenities_score(df):
        """
        Score indicating modern/updated amenities.
        For guests preferring contemporary accommodations.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in MODERN_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_aesthetic_appeal_score(df):
        """
        Score indicating visual appeal and design quality.
        For design-conscious travelers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in AESTHETIC_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_noise_insulation_score(df):
        """
        Score indicating noise/sound insulation quality.
        Critical for light sleepers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in SOUNDPROOF_WORDS_POS:
            score = score + when(reviews_col.contains(word), lit(1.5)).otherwise(lit(0.0))
        for word in SOUNDPROOF_WORDS_NEG:
            score = score - when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_checkin_ease_score(df):
        """
        Score indicating ease of check-in process.
        Reduces arrival stress for guests.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in CHECKIN_WORDS_POS:
            score = score + when(reviews_col.contains(word), lit(1.5)).otherwise(lit(0.0))
        for word in CHECKIN_WORDS_NEG:
            score = score - when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_listing_accuracy_score(df):
        """
        Score indicating how accurately listing matches reality.
        Builds trust and sets correct expectations.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in ACCURACY_WORDS_POS:
            score = score + when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        for word in ACCURACY_WORDS_NEG:
            score = score - when(reviews_col.contains(word), lit(3.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())

    print("✓ User-interest feature extractors defined (11-20)")

    # COMMAND ----------
    # DBTITLE 1,User-Interest Feature Extraction Functions (21-30)

    def extract_communication_quality_score(df):
        """
        Score indicating host communication quality.
        Key factor in guest experience.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in COMMUNICATION_WORDS_POS:
            score = score + when(reviews_col.contains(word), lit(1.5)).otherwise(lit(0.0))
        for word in COMMUNICATION_WORDS_NEG:
            score = score - when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_response_speed_score(df):
        """
        Score indicating host response speed.
        Uses host_response_rate if available, plus review mentions.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        # Base score from response-related mentions
        score = lit(0.0)
        quick_words = ["quick response", "fast response", "responded immediately", 
                    "replied quickly", "prompt", "within minutes"]
        slow_words = ["slow response", "took forever", "never responded", "waited"]
        
        for word in quick_words:
            score = score + when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        for word in slow_words:
            score = score - when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_flexibility_score(df):
        """
        Score indicating host/policy flexibility.
        Valuable for travelers with changing plans.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in FLEXIBILITY_WORDS:
            score = score + when(reviews_col.contains(word), lit(1.5)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_local_tips_score(df):
        """
        Score indicating quality of local recommendations from host.
        Enhances travel experience.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in LOCAL_TIPS_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_solo_traveler_score(df):
        """
        Score indicating suitability for solo travelers.
        For independent travelers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in SOLO_WORDS:
            score = score + when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        
        # Bonus for safety and good location (important for solo travelers)
        for word in SAFETY_WORDS_POS:
            score = score + when(combined.contains(word), lit(0.5)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_couple_romantic_score(df):
        """
        Score indicating suitability for couples/romantic trips.
        For romantic getaways.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in COUPLE_WORDS:
            score = score + when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        
        # Bonus for privacy and views
        for word in PRIVACY_WORDS_POS:
            score = score + when(combined.contains(word), lit(0.5)).otherwise(lit(0.0))
        for word in VIEW_WORDS:
            score = score + when(combined.contains(word), lit(0.3)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_group_gathering_score(df):
        """
        Score indicating suitability for groups/gatherings.
        For friend trips and celebrations.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in GROUP_WORDS:
            score = score + when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        
        # Bonus for space
        for word in SPACE_WORDS_POS:
            score = score + when(combined.contains(word), lit(0.5)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_long_stay_score(df):
        """
        Score indicating suitability for extended stays.
        For digital nomads and long-term travelers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col, amenities_col)
        
        score = lit(0.0)
        for word in LONG_STAY_WORDS:
            score = score + when(combined.contains(word), lit(1.5)).otherwise(lit(0.0))
        
        # Bonus for kitchen and laundry (important for long stays)
        long_stay_amenities = ["washer", "dryer", "laundry", "dishwasher", "full kitchen"]
        for word in long_stay_amenities:
            score = score + when(combined.contains(word), lit(0.5)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_first_time_visitor_score(df):
        """
        Score indicating suitability for first-time visitors.
        For tourists new to the area.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        score = lit(0.0)
        for word in FIRST_TIME_WORDS:
            score = score + when(combined.contains(word), lit(1.0)).otherwise(lit(0.0))
        
        # Bonus for good location and local tips
        for word in LOCATION_QUALITY_WORDS:
            score = score + when(combined.contains(word), lit(0.3)).otherwise(lit(0.0))
        for word in LOCAL_TIPS_WORDS:
            score = score + when(combined.contains(word), lit(0.5)).otherwise(lit(0.0))
        
        return score.cast(StringType())


    def extract_repeat_guest_indicator(df):
        """
        Score indicating mentions of return visits.
        Strong quality signal.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        score = lit(0.0)
        for word in REPEAT_GUEST_WORDS:
            score = score + when(reviews_col.contains(word), lit(2.0)).otherwise(lit(0.0))
        
        return score.cast(StringType())

    print("✓ User-interest feature extractors defined (21-30)")

    # COMMAND ----------
    # DBTITLE 1,Composite Score Calculation Functions

    def calculate_overall_quality_score(df):
        """
        OVERALL QUALITY SCORE (0-100 scale)
        Combines: cleanliness, accuracy, amenities, and general positive sentiment.
        Provides a single quality metric for quick assessment.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        # Component 1: Cleanliness (25 points max)
        clean_score = lit(0.0)
        for word in ["clean", "spotless", "immaculate", "tidy", "pristine"]:
            clean_score = clean_score + when(reviews_col.contains(word), lit(5.0)).otherwise(lit(0.0))
        clean_score = when(clean_score > 25, lit(25.0)).otherwise(clean_score)
        
        # Component 2: Accuracy (25 points max)
        accuracy_score = lit(0.0)
        for word in ACCURACY_WORDS_POS:
            accuracy_score = accuracy_score + when(reviews_col.contains(word), lit(5.0)).otherwise(lit(0.0))
        accuracy_score = when(accuracy_score > 25, lit(25.0)).otherwise(accuracy_score)
        
        # Component 3: Amenities quality (25 points max)
        amenity_score = lit(0.0)
        amenity_quality_words = ["well-equipped", "everything we needed", "all amenities", 
                                "fully stocked", "great amenities", "modern appliances"]
        for word in amenity_quality_words:
            amenity_score = amenity_score + when(reviews_col.contains(word), lit(5.0)).otherwise(lit(0.0))
        amenity_score = when(amenity_score > 25, lit(25.0)).otherwise(amenity_score)
        
        # Component 4: General satisfaction (25 points max)
        satisfaction_score = lit(0.0)
        satisfaction_words = ["highly recommend", "would stay again", "perfect", "exceeded expectations",
                            "amazing stay", "wonderful experience"]
        for word in satisfaction_words:
            satisfaction_score = satisfaction_score + when(reviews_col.contains(word), lit(5.0)).otherwise(lit(0.0))
        satisfaction_score = when(satisfaction_score > 25, lit(25.0)).otherwise(satisfaction_score)
        
        # Negative deductions
        deduction = lit(0.0)
        negative_words = ["disappointing", "dirty", "broken", "misleading", "avoid", "worst"]
        for word in negative_words:
            deduction = deduction + when(reviews_col.contains(word), lit(10.0)).otherwise(lit(0.0))
        
        total = clean_score + accuracy_score + amenity_score + satisfaction_score - deduction
        # Ensure score is between 0 and 100
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_location_convenience_score(df):
        """
        LOCATION CONVENIENCE SCORE (0-100 scale)
        Combines: transport, walkability, proximity to amenities, and neighborhood quality.
        Essential for travelers prioritizing accessibility.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        # Component 1: Public transport (25 points max)
        transport_score = lit(0.0)
        for word in TRANSPORT_WORDS:
            transport_score = transport_score + when(combined.contains(word), lit(3.0)).otherwise(lit(0.0))
        transport_score = when(transport_score > 25, lit(25.0)).otherwise(transport_score)
        
        # Component 2: Walkability (25 points max)
        walk_score = lit(0.0)
        for word in WALKABILITY_WORDS:
            walk_score = walk_score + when(combined.contains(word), lit(3.0)).otherwise(lit(0.0))
        walk_score = when(walk_score > 25, lit(25.0)).otherwise(walk_score)
        
        # Component 3: Nearby amenities (25 points max)
        amenity_score = lit(0.0)
        for word in RESTAURANT_WORDS + GROCERY_WORDS:
            amenity_score = amenity_score + when(combined.contains(word), lit(2.0)).otherwise(lit(0.0))
        amenity_score = when(amenity_score > 25, lit(25.0)).otherwise(amenity_score)
        
        # Component 4: Location quality mentions (25 points max)
        location_score = lit(0.0)
        for word in LOCATION_QUALITY_WORDS:
            location_score = location_score + when(combined.contains(word), lit(3.0)).otherwise(lit(0.0))
        location_score = when(location_score > 25, lit(25.0)).otherwise(location_score)
        
        total = transport_score + walk_score + amenity_score + location_score
        final = when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_host_excellence_score(df):
        """
        HOST EXCELLENCE SCORE (0-100 scale)
        Combines: communication, responsiveness, check-in ease, and helpfulness.
        Reflects the quality of host interaction.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        # Component 1: Communication (25 points max)
        comm_score = lit(0.0)
        for word in COMMUNICATION_WORDS_POS:
            comm_score = comm_score + when(reviews_col.contains(word), lit(4.0)).otherwise(lit(0.0))
        comm_score = when(comm_score > 25, lit(25.0)).otherwise(comm_score)
        
        # Component 2: Responsiveness (25 points max)
        resp_score = lit(0.0)
        for word in HOST_RESPONSIVENESS_WORDS:
            resp_score = resp_score + when(reviews_col.contains(word), lit(4.0)).otherwise(lit(0.0))
        resp_score = when(resp_score > 25, lit(25.0)).otherwise(resp_score)
        
        # Component 3: Check-in (25 points max)
        checkin_score = lit(0.0)
        for word in CHECKIN_WORDS_POS:
            checkin_score = checkin_score + when(reviews_col.contains(word), lit(4.0)).otherwise(lit(0.0))
        checkin_score = when(checkin_score > 25, lit(25.0)).otherwise(checkin_score)
        
        # Component 4: Local tips & flexibility (25 points max)
        extra_score = lit(0.0)
        for word in LOCAL_TIPS_WORDS + FLEXIBILITY_WORDS:
            extra_score = extra_score + when(reviews_col.contains(word), lit(3.0)).otherwise(lit(0.0))
        extra_score = when(extra_score > 25, lit(25.0)).otherwise(extra_score)
        
        # Negative deductions
        deduction = lit(0.0)
        for word in COMMUNICATION_WORDS_NEG:
            deduction = deduction + when(reviews_col.contains(word), lit(15.0)).otherwise(lit(0.0))
        
        total = comm_score + resp_score + checkin_score + extra_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_value_perception_score(df):
        """
        VALUE PERCEPTION SCORE (0-100 scale)
        Combines: value mentions, price-quality perception, and deal indicators.
        Helps users understand bang for their buck.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        
        # Component 1: Positive value mentions (50 points max)
        value_score = lit(0.0)
        positive_value = ["great value", "worth every penny", "good value", "reasonable price",
                        "affordable", "bargain", "good deal", "excellent value", "worth it"]
        for word in positive_value:
            value_score = value_score + when(reviews_col.contains(word), lit(8.0)).otherwise(lit(0.0))
        value_score = when(value_score > 50, lit(50.0)).otherwise(value_score)
        
        # Component 2: Quality for price (50 points max)
        quality_score = lit(0.0)
        quality_value = ["exceeded expectations", "more than expected", "pleasantly surprised",
                        "better than expected", "incredible for the price"]
        for word in quality_value:
            quality_score = quality_score + when(reviews_col.contains(word), lit(10.0)).otherwise(lit(0.0))
        quality_score = when(quality_score > 50, lit(50.0)).otherwise(quality_score)
        
        # Negative deductions
        deduction = lit(0.0)
        negative_value = ["overpriced", "too expensive", "not worth", "rip off", "overcharged"]
        for word in negative_value:
            deduction = deduction + when(reviews_col.contains(word), lit(20.0)).otherwise(lit(0.0))
        
        total = value_score + quality_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_comfort_index_score(df):
        """
        COMFORT INDEX SCORE (0-100 scale)
        Combines: bed quality, temperature, space, noise, and overall comfort.
        Essential for quality sleep and relaxation.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, amenities_col)
        
        # Component 1: Bed comfort (25 points max)
        bed_score = lit(0.0)
        for word in BED_COMFORT_WORDS_POS:
            bed_score = bed_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        bed_score = when(bed_score > 25, lit(25.0)).otherwise(bed_score)
        
        # Component 2: Temperature control (25 points max)
        temp_score = lit(0.0)
        for word in TEMPERATURE_WORDS_POS:
            temp_score = temp_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        temp_score = when(temp_score > 25, lit(25.0)).otherwise(temp_score)
        
        # Component 3: Space (25 points max)
        space_score = lit(0.0)
        for word in SPACE_WORDS_POS:
            space_score = space_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        space_score = when(space_score > 25, lit(25.0)).otherwise(space_score)
        
        # Component 4: Quietness (25 points max)
        quiet_score = lit(0.0)
        for word in SOUNDPROOF_WORDS_POS:
            quiet_score = quiet_score + when(combined.contains(word), lit(5.0)).otherwise(lit(0.0))
        quiet_score = when(quiet_score > 25, lit(25.0)).otherwise(quiet_score)
        
        # Negative deductions
        deduction = lit(0.0)
        for word in BED_COMFORT_WORDS_NEG + TEMPERATURE_WORDS_NEG + SPACE_WORDS_NEG + SOUNDPROOF_WORDS_NEG:
            deduction = deduction + when(combined.contains(word), lit(8.0)).otherwise(lit(0.0))
        
        total = bed_score + temp_score + space_score + quiet_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())

    print("✓ Composite score functions defined (1-5)")

    # COMMAND ----------
    # DBTITLE 1,Composite Score Calculation Functions (6-10)

    def calculate_family_suitability_score(df):
        """
        FAMILY SUITABILITY SCORE (0-100 scale)
        Combines: family mentions, safety, space, and child-friendly amenities.
        Essential for traveling families.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col, amenities_col)
        
        # Component 1: Family mentions (30 points max)
        family_score = lit(0.0)
        for word in FAMILY_FRIENDLY_WORDS:
            family_score = family_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        family_score = when(family_score > 30, lit(30.0)).otherwise(family_score)
        
        # Component 2: Safety (30 points max)
        safety_score = lit(0.0)
        for word in SAFETY_WORDS_POS:
            safety_score = safety_score + when(combined.contains(word), lit(6.0)).otherwise(lit(0.0))
        safety_score = when(safety_score > 30, lit(30.0)).otherwise(safety_score)
        
        # Component 3: Space (20 points max)
        space_score = lit(0.0)
        for word in SPACE_WORDS_POS:
            space_score = space_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        space_score = when(space_score > 20, lit(20.0)).otherwise(space_score)
        
        # Component 4: Child amenities (20 points max)
        child_amenity_score = lit(0.0)
        child_amenities = ["crib", "high chair", "toys", "games", "playground", "child-safe",
                        "baby", "stroller", "pack n play"]
        for word in child_amenities:
            child_amenity_score = child_amenity_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        child_amenity_score = when(child_amenity_score > 20, lit(20.0)).otherwise(child_amenity_score)
        
        # Negative deductions
        deduction = lit(0.0)
        not_family = ["adults only", "no children", "not suitable for children", "party house"]
        for word in not_family:
            deduction = deduction + when(combined.contains(word), lit(30.0)).otherwise(lit(0.0))
        
        total = family_score + safety_score + space_score + child_amenity_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_business_ready_score(df):
        """
        BUSINESS READY SCORE (0-100 scale)
        Combines: WiFi quality, workspace, quiet environment, and professional amenities.
        For remote workers and business travelers.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col, amenities_col)
        
        # Component 1: WiFi/Internet (30 points max)
        wifi_score = lit(0.0)
        wifi_words = ["fast wifi", "fast internet", "wifi", "wi-fi", "high-speed internet",
                    "reliable internet", "good wifi", "great wifi"]
        for word in wifi_words:
            wifi_score = wifi_score + when(combined.contains(word), lit(5.0)).otherwise(lit(0.0))
        wifi_score = when(wifi_score > 30, lit(30.0)).otherwise(wifi_score)
        
        # Component 2: Workspace (30 points max)
        workspace_score = lit(0.0)
        workspace_words = ["desk", "workspace", "work from home", "office", "dedicated workspace",
                        "work-friendly", "laptop-friendly", "work area"]
        for word in workspace_words:
            workspace_score = workspace_score + when(combined.contains(word), lit(5.0)).otherwise(lit(0.0))
        workspace_score = when(workspace_score > 30, lit(30.0)).otherwise(workspace_score)
        
        # Component 3: Quiet environment (25 points max)
        quiet_score = lit(0.0)
        for word in SOUNDPROOF_WORDS_POS:
            quiet_score = quiet_score + when(combined.contains(word), lit(5.0)).otherwise(lit(0.0))
        quiet_score = when(quiet_score > 25, lit(25.0)).otherwise(quiet_score)
        
        # Component 4: Professional amenities (15 points max)
        prof_score = lit(0.0)
        prof_amenities = ["coffee maker", "printer", "iron", "self check-in", "24/7"]
        for word in prof_amenities:
            prof_score = prof_score + when(combined.contains(word), lit(3.0)).otherwise(lit(0.0))
        prof_score = when(prof_score > 15, lit(15.0)).otherwise(prof_score)
        
        # Negative deductions
        deduction = lit(0.0)
        anti_work = ["no wifi", "slow internet", "wifi issues", "noisy"]
        for word in anti_work:
            deduction = deduction + when(combined.contains(word), lit(20.0)).otherwise(lit(0.0))
        
        total = wifi_score + workspace_score + quiet_score + prof_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_social_experience_score(df):
        """
        SOCIAL EXPERIENCE SCORE (0-100 scale)
        Combines: group suitability, nightlife access, entertainment, and social atmosphere.
        For travelers seeking social/party experiences.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        # Component 1: Group/Party mentions (30 points max)
        group_score = lit(0.0)
        for word in GROUP_WORDS:
            group_score = group_score + when(combined.contains(word), lit(5.0)).otherwise(lit(0.0))
        group_score = when(group_score > 30, lit(30.0)).otherwise(group_score)
        
        # Component 2: Nightlife access (30 points max)
        nightlife_score = lit(0.0)
        for word in NIGHTLIFE_WORDS:
            nightlife_score = nightlife_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        nightlife_score = when(nightlife_score > 30, lit(30.0)).otherwise(nightlife_score)
        
        # Component 3: Entertainment (25 points max)
        entertainment_score = lit(0.0)
        entertainment_words = ["tv", "netflix", "streaming", "games", "game room", "pool table",
                            "entertainment", "smart tv", "cable"]
        for word in entertainment_words:
            entertainment_score = entertainment_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        entertainment_score = when(entertainment_score > 25, lit(25.0)).otherwise(entertainment_score)
        
        # Component 4: Social atmosphere (15 points max)
        social_score = lit(0.0)
        social_words = ["vibrant", "lively", "happening", "fun area", "great vibe"]
        for word in social_words:
            social_score = social_score + when(combined.contains(word), lit(3.0)).otherwise(lit(0.0))
        social_score = when(social_score > 15, lit(15.0)).otherwise(social_score)
        
        # Negative deductions
        deduction = lit(0.0)
        anti_social = ["no parties", "quiet hours", "no events", "strict rules"]
        for word in anti_social:
            deduction = deduction + when(combined.contains(word), lit(10.0)).otherwise(lit(0.0))
        
        total = group_score + nightlife_score + entertainment_score + social_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_relaxation_wellness_score(df):
        """
        RELAXATION & WELLNESS SCORE (0-100 scale)
        Combines: quietness, views, outdoor space, spa-like amenities, and peaceful atmosphere.
        For travelers seeking rest and rejuvenation.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        amenities_col = lower(coalesce(col("amenities").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col, amenities_col)
        
        # Component 1: Quietness/Peace (30 points max)
        quiet_score = lit(0.0)
        peace_words = ["quiet", "peaceful", "tranquil", "serene", "calm", "relaxing", "zen"]
        for word in peace_words:
            quiet_score = quiet_score + when(combined.contains(word), lit(5.0)).otherwise(lit(0.0))
        quiet_score = when(quiet_score > 30, lit(30.0)).otherwise(quiet_score)
        
        # Component 2: Views/Nature (25 points max)
        view_score = lit(0.0)
        nature_words = VIEW_WORDS + ["nature", "garden", "trees", "greenery", "park view"]
        for word in nature_words:
            view_score = view_score + when(combined.contains(word), lit(2.0)).otherwise(lit(0.0))
        view_score = when(view_score > 25, lit(25.0)).otherwise(view_score)
        
        # Component 3: Outdoor space (25 points max)
        outdoor_score = lit(0.0)
        for word in OUTDOOR_SPACE_WORDS:
            outdoor_score = outdoor_score + when(combined.contains(word), lit(3.0)).otherwise(lit(0.0))
        outdoor_score = when(outdoor_score > 25, lit(25.0)).otherwise(outdoor_score)
        
        # Component 4: Wellness amenities (20 points max)
        wellness_score = lit(0.0)
        wellness_words = ["hot tub", "jacuzzi", "sauna", "spa", "bathtub", "soaking tub",
                        "massage", "yoga", "gym", "pool"]
        for word in wellness_words:
            wellness_score = wellness_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        wellness_score = when(wellness_score > 20, lit(20.0)).otherwise(wellness_score)
        
        # Negative deductions
        deduction = lit(0.0)
        anti_relax = ["noisy", "loud", "busy street", "construction", "traffic noise"]
        for word in anti_relax:
            deduction = deduction + when(combined.contains(word), lit(15.0)).otherwise(lit(0.0))
        
        total = quiet_score + view_score + outdoor_score + wellness_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())


    def calculate_local_immersion_score(df):
        """
        LOCAL IMMERSION SCORE (0-100 scale)
        Combines: local authenticity, neighborhood character, local tips, and cultural experience.
        For travelers seeking genuine local experiences.
        """
        reviews_col = lower(coalesce(col("reviews").cast(StringType()), lit("")))
        desc_col = lower(coalesce(col("description").cast(StringType()), lit("")))
        combined = concat_ws(" ", reviews_col, desc_col)
        
        # Component 1: Local authenticity (30 points max)
        authentic_score = lit(0.0)
        for word in LOCAL_AUTHENTIC_WORDS:
            authentic_score = authentic_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        authentic_score = when(authentic_score > 30, lit(30.0)).otherwise(authentic_score)
        
        # Component 2: Local tips from host (25 points max)
        tips_score = lit(0.0)
        for word in LOCAL_TIPS_WORDS:
            tips_score = tips_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        tips_score = when(tips_score > 25, lit(25.0)).otherwise(tips_score)
        
        # Component 3: Neighborhood character (25 points max)
        neighborhood_score = lit(0.0)
        neighborhood_words = ["neighborhood", "local shops", "local restaurants", "markets",
                            "community", "residential", "locals", "culture"]
        for word in neighborhood_words:
            neighborhood_score = neighborhood_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        neighborhood_score = when(neighborhood_score > 25, lit(25.0)).otherwise(neighborhood_score)
        
        # Component 4: Cultural experience (20 points max)
        culture_score = lit(0.0)
        culture_words = ["culture", "history", "historic", "museum", "art", "traditional",
                        "authentic experience", "local experience"]
        for word in culture_words:
            culture_score = culture_score + when(combined.contains(word), lit(4.0)).otherwise(lit(0.0))
        culture_score = when(culture_score > 20, lit(20.0)).otherwise(culture_score)
        
        # Negative for overly touristy
        deduction = lit(0.0)
        touristy = ["touristy", "tourist trap", "crowded with tourists", "very commercial"]
        for word in touristy:
            deduction = deduction + when(combined.contains(word), lit(15.0)).otherwise(lit(0.0))
        
        total = authentic_score + tips_score + neighborhood_score + culture_score - deduction
        final = when(total < 0, lit(0.0)).when(total > 100, lit(100.0)).otherwise(total)
        
        return final.cast(StringType())

    print("✓ Composite score functions defined (6-10)")

    # COMMAND ----------
    # DBTITLE 1,Register All New Feature Extractors

    # =============================================================================
    # REGISTRY FOR USER-INTEREST FEATURES
    # =============================================================================
    USER_INTEREST_EXTRACTORS = {
        # Comfort & Quality Features (1-6)
        "bed_comfort_score": extract_bed_comfort_score,
        "bathroom_quality_score": extract_bathroom_quality_score,
        "kitchen_quality_score": extract_kitchen_quality_score,
        "temperature_control_score": extract_temperature_control_score,
        "natural_light_score": extract_natural_light_score,
        "space_efficiency_score": extract_space_efficiency_score,
        
        # Location & Neighborhood Features (7-12)
        "public_transport_score": extract_public_transport_score,
        "walkability_score": extract_walkability_score,
        "nightlife_proximity_score": extract_nightlife_proximity_score,
        "restaurant_proximity_score": extract_restaurant_proximity_score,
        "grocery_proximity_score": extract_grocery_proximity_score,
        "neighborhood_safety_score": extract_neighborhood_safety_score,
        
        # Experience & Atmosphere Features (13-18)
        "view_quality_score": extract_view_quality_score,
        "privacy_level_score": extract_privacy_level_score,
        "local_authenticity_score": extract_local_authenticity_score,
        "modern_amenities_score": extract_modern_amenities_score,
        "aesthetic_appeal_score": extract_aesthetic_appeal_score,
        "noise_insulation_score": extract_noise_insulation_score,
        
        # Service & Process Features (19-24)
        "checkin_ease_score": extract_checkin_ease_score,
        "listing_accuracy_score": extract_listing_accuracy_score,
        "communication_quality_score": extract_communication_quality_score,
        "response_speed_score": extract_response_speed_score,
        "flexibility_score": extract_flexibility_score,
        "local_tips_score": extract_local_tips_score,
        
        # Trip Type Suitability Features (25-30)
        "solo_traveler_score": extract_solo_traveler_score,
        "couple_romantic_score": extract_couple_romantic_score,
        "group_gathering_score": extract_group_gathering_score,
        "long_stay_score": extract_long_stay_score,
        "first_time_visitor_score": extract_first_time_visitor_score,
        "repeat_guest_indicator": extract_repeat_guest_indicator,
    }

    # =============================================================================
    # REGISTRY FOR COMPOSITE SCORES
    # =============================================================================
    COMPOSITE_SCORE_EXTRACTORS = {
        "overall_quality_score": calculate_overall_quality_score,
        "location_convenience_score": calculate_location_convenience_score,
        "host_excellence_score": calculate_host_excellence_score,
        "value_perception_score": calculate_value_perception_score,
        "comfort_index_score": calculate_comfort_index_score,
        "family_suitability_score": calculate_family_suitability_score,
        "business_ready_score": calculate_business_ready_score,
        "social_experience_score": calculate_social_experience_score,
        "relaxation_wellness_score": calculate_relaxation_wellness_score,
        "local_immersion_score": calculate_local_immersion_score,
    }

    print("✓ All feature extractors registered")
    print(f"  User-Interest Features: {len(USER_INTEREST_EXTRACTORS)}")
    print(f"  Composite Scores: {len(COMPOSITE_SCORE_EXTRACTORS)}")

    # COMMAND ----------
    # DBTITLE 1,Build Extended Features Map Function

    def build_extended_features_map(df, 
                                    existing_features_expr,
                                    user_interest_extractors,
                                    composite_score_extractors):
        """
        Extend the existing features map with new user-interest features and composite scores.
        
        Args:
            df: Source DataFrame
            existing_features_expr: Existing features map column expression
            user_interest_extractors: Dict of user-interest feature extractors
            composite_score_extractors: Dict of composite score extractors
        
        Returns:
            Extended features map column expression
        """
        # Start with additional map arguments for new features
        new_map_args = []
        
        # Add user-interest features
        for feature_name, extractor_func in user_interest_extractors.items():
            new_map_args.append(lit(feature_name))
            new_map_args.append(extractor_func(df))
        
        # Add composite scores
        for score_name, score_func in composite_score_extractors.items():
            new_map_args.append(lit(score_name))
            new_map_args.append(score_func(df))
        
        # Create the new features map
        new_features_map = create_map(*new_map_args)
        
        # Combine with existing features using map_concat
        # Use map_concat to merge maps (available in Spark 3.0+)
        from pyspark.sql.functions import map_concat
        combined_map = map_concat(existing_features_expr, new_features_map)
        
        return combined_map

    print("✓ Extended features map builder defined")

    # COMMAND ----------
    # DBTITLE 1,Update df_features_vector with New Features

    print("\n" + "="*80)
    print("UPDATING FEATURES VECTOR WITH USER-INTEREST FEATURES AND COMPOSITE SCORES")
    print("="*80)

    # Import map_concat for combining maps
    from pyspark.sql.functions import map_concat, lit

    # Step 1: Create the new features map expression
    print("\n1. Building new features map expression...")
    new_features_args = []

    # Add user-interest features
    for feature_name, extractor_func in USER_INTEREST_EXTRACTORS.items():
        new_features_args.append(lit(feature_name))
        new_features_args.append(extractor_func(df_reloaded))

    # Add composite scores
    for score_name, score_func in COMPOSITE_SCORE_EXTRACTORS.items():
        new_features_args.append(lit(score_name))
        new_features_args.append(score_func(df_reloaded))

    new_features_map = create_map(*new_features_args)
    print(f"   ✓ New features map created with {len(USER_INTEREST_EXTRACTORS) + len(COMPOSITE_SCORE_EXTRACTORS)} features")

    # Step 2: Rebuild the complete features vector with all features
    print("\n2. Rebuilding complete features vector...")

    # Build complete features map including all existing + new features
    complete_features_map_expr = build_comprehensive_features_map(
        df_reloaded,
        VALID_TEXT,
        VALID_LOCATION,
        VALID_CATEGORICAL_RAW,
        VALID_PRICE_DISCOUNT,
        VALID_REVIEW,
        VALID_URL_METADATA,
        VALID_MEDIA,
        VALID_HOST_SELLER,
        VALID_OTHER,
        VALID_NUMERICAL,
        VALID_ONEHOT,
        VALID_TEXT_DERIVED
    )

    # Combine existing features with new features
    combined_features_map = map_concat(complete_features_map_expr, new_features_map)

    # Step 3: Create updated features vector dataframe
    print("\n3. Creating updated features vector dataframe...")
    df_features_vector_updated = df_reloaded.select(
        col("property_id").alias("listing_id"),
        combined_features_map.alias("features_dict")
    )

    # Unpersist old cached dataframe and cache new one
    df_features_vector.unpersist()
    df_features_vector = df_features_vector_updated.persist()

    print("   ✓ Features vector updated and cached")

    # COMMAND ----------
    # DBTITLE 1,Update Feature Mappings

    # Update FEATURE_NAMES_MAPPING with new features
    current_idx = len(FEATURE_NAMES_MAPPING)

    # Add user-interest features to mapping
    FEATURE_CATEGORIES["user_interest"] = []
    for feature_name in USER_INTEREST_EXTRACTORS.keys():
        FEATURE_NAMES_MAPPING[feature_name] = current_idx
        FEATURE_CATEGORIES["user_interest"].append(feature_name)
        current_idx += 1

    # Add composite scores to mapping
    FEATURE_CATEGORIES["composite_scores"] = []
    for score_name in COMPOSITE_SCORE_EXTRACTORS.keys():
        FEATURE_NAMES_MAPPING[score_name] = current_idx
        FEATURE_CATEGORIES["composite_scores"].append(score_name)
        current_idx += 1

    print("\nFeature Mappings Updated:")
    print("-" * 60)

    print(f"\nUSER-INTEREST FEATURES ({len(FEATURE_CATEGORIES['user_interest'])} features):")
    for i, f in enumerate(FEATURE_CATEGORIES["user_interest"]):
        print(f"  {FEATURE_NAMES_MAPPING[f]:3d}: {f}")

    print(f"\nCOMPOSITE SCORES ({len(FEATURE_CATEGORIES['composite_scores'])} scores):")
    for f in FEATURE_CATEGORIES["composite_scores"]:
        print(f"  {FEATURE_NAMES_MAPPING[f]:3d}: {f}")

    # COMMAND ----------
    # DBTITLE 1,Verify Updated Features Vector

    print("\n" + "="*80)
    print("VERIFICATION OF UPDATED FEATURES VECTOR")
    print("="*80)

    print("\nUpdated Schema of df_features_vector:")
    df_features_vector.printSchema()

    total_features_updated = len(FEATURE_NAMES_MAPPING)
    print(f"\nTotal features in dictionary: {total_features_updated}")

    print("\nFeatures by category:")
    for category, features in FEATURE_CATEGORIES.items():
        if features:
            print(f"  {category}: {len(features)}")

    # COMMAND ----------
    # DBTITLE 1,Display Sample with New Features

    print("\nSample output with new features (first 2 rows):")
    display(df_features_vector.limit(2))

    # Show new features in detail for single record
    print("\nDetailed view of NEW features for single record:")
    sample_row = df_features_vector.limit(1).collect()[0]
    print(f"\nListing ID: {sample_row['listing_id']}")

    print(f"\n--- USER-INTEREST FEATURES ({len(FEATURE_CATEGORIES['user_interest'])}) ---")
    for f in FEATURE_CATEGORIES["user_interest"]:
        value = sample_row['features_dict'].get(f, 'N/A')
        print(f"  {f}: {value}")

    print(f"\n--- COMPOSITE SCORES ({len(FEATURE_CATEGORIES['composite_scores'])}) ---")
    for f in FEATURE_CATEGORIES["composite_scores"]:
        value = sample_row['features_dict'].get(f, 'N/A')
        print(f"  {f}: {value}")

    # COMMAND ----------
    # DBTITLE 1,Final Summary

    print("\n" + "="*80)
    print("FINAL PIPELINE SUMMARY - USER-INTEREST FEATURES AND COMPOSITE SCORES")
    print("="*80)

    total_rows = df_features_vector.count()

    print(f"""
    =============================================================================
    ADDED FEATURES SUMMARY
    =============================================================================

    30 USER-INTEREST FEATURES:
    --------------------------
    These features help users understand specific aspects of the apartment:

    COMFORT & QUALITY (6 features):
        • bed_comfort_score - Sleep quality indicators
        • bathroom_quality_score - Bathroom condition
        • kitchen_quality_score - Kitchen usability
        • temperature_control_score - AC/heating quality
        • natural_light_score - Brightness/light
        • space_efficiency_score - Room size perception

    LOCATION & NEIGHBORHOOD (6 features):
        • public_transport_score - Transit access
        • walkability_score - Walking accessibility
        • nightlife_proximity_score - Entertainment nearby
        • restaurant_proximity_score - Dining options
        • grocery_proximity_score - Shopping access
        • neighborhood_safety_score - Safety perception

    EXPERIENCE & ATMOSPHERE (6 features):
        • view_quality_score - Views from property
        • privacy_level_score - Privacy vs shared
        • local_authenticity_score - Local experience
        • modern_amenities_score - Modern features
        • aesthetic_appeal_score - Design/decor
        • noise_insulation_score - Soundproofing

    SERVICE & PROCESS (6 features):
        • checkin_ease_score - Check-in smoothness
        • listing_accuracy_score - Matches description
        • communication_quality_score - Host communication
        • response_speed_score - Response time
        • flexibility_score - Policy flexibility
        • local_tips_score - Local recommendations

    TRIP TYPE SUITABILITY (6 features):
        • solo_traveler_score - Solo travel fit
        • couple_romantic_score - Couples/romance
        • group_gathering_score - Groups/parties
        • long_stay_score - Extended stays
        • first_time_visitor_score - Tourist friendly
        • repeat_guest_indicator - Return visits

    10 COMPOSITE SCORES (0-100 scale):
    ----------------------------------
    These aggregate scores provide high-level insights:

        • overall_quality_score - Combined quality metric
        • location_convenience_score - Location & accessibility
        • host_excellence_score - Host quality & service
        • value_perception_score - Value for money
        • comfort_index_score - Physical comfort
        • family_suitability_score - Family-friendliness
        • business_ready_score - Work/business suitability
        • social_experience_score - Social/entertainment
        • relaxation_wellness_score - Peace & relaxation
        • local_immersion_score - Authentic local experience

    =============================================================================
    TOTAL FEATURES IN VECTOR
    =============================================================================

    Previous features: {total_features_updated - 40}
    New user-interest features: 30
    New composite scores: 10
    ---------------------
    TOTAL FEATURES: {total_features_updated}

    Total Listings Processed: {total_rows:,}

    =============================================================================
    HOW THESE FEATURES HELP USERS
    =============================================================================

    1. QUICK ASSESSMENT: Composite scores (0-100) give instant understanding
    of apartment quality across different dimensions.

    2. PERSONALIZED MATCHING: Users can filter/sort by features that matter
    to them (e.g., business travelers can prioritize business_ready_score).

    3. TRIP-TYPE RECOMMENDATIONS: Scores like family_suitability_score or
    couple_romantic_score help match apartments to specific trip types.

    4. DETAILED INSIGHTS: Individual features provide granular information
    about specific aspects like bed comfort or kitchen quality.

    5. TRUST INDICATORS: listing_accuracy_score and repeat_guest_indicator
    help users identify trustworthy listings.

    =============================================================================
    """)

    print("✓ All features successfully added to df_features_vector!")
    print("✓ Ready for use in recommendation system!")
except:
    pass

In [0]:
try:
    # COMMAND ----------
    # DBTITLE 1,Convert Features Dictionary to Wide Column Format (Optimized)

    from pyspark.sql.functions import col

    # =============================================================================
    # CONFIGURATION
    # =============================================================================

    # Get sorted feature names once
    all_features = sorted(
        FEATURE_NAMES_MAPPING.keys(), 
        key=lambda x: FEATURE_NAMES_MAPPING[x]
    )
    num_features = len(all_features)

    # =============================================================================
    # BUILD WIDE FORMAT DATAFRAME
    # =============================================================================

    # Build select expression: listing_id + all feature columns extracted from map
    select_cols = [col("listing_id")] + [
        col("features_dict").getItem(feature_name).alias(feature_name)
        for feature_name in all_features
    ]

    # Create wide format dataframe and cache
    df_features_wide = (
        df_features_vector
        .select(*select_cols)
        .cache()
    )

    # Trigger cache materialization with single count action
    row_count = df_features_wide.count()

    # =============================================================================
    # VERIFICATION (Single pass - no additional actions)
    # =============================================================================

    # Get schema info (metadata only - no action needed)
    columns_list = df_features_wide.columns
    num_columns = len(columns_list)

    print("=" * 80)
    print("FEATURES WIDE FORMAT - CONVERSION COMPLETE")
    print("=" * 80)
    print(f"\n  Rows:            {row_count:,}")
    print(f"  Total Columns:   {num_columns}")
    print(f"  Feature Columns: {num_features}")
    print(f"\n  Columns (first 20):")

    for i, col_name in enumerate(columns_list[:20], 1):
        print(f"    {i:2d}. {col_name}")

    if num_columns > 20:
        print(f"\n    ... and {num_columns - 20} more columns")

    # COMMAND ----------
    # DBTITLE 1,Display Sample Data

    # Display sample - Databricks optimizes this automatically
    display(df_features_wide.limit(5))

    # COMMAND ----------
    # DBTITLE 1,Summary Statistics

    print("\n" + "=" * 80)
    print("CONVERSION SUMMARY")
    print("=" * 80)

    print(f"""
    ✓ CONVERSION COMPLETE

    DataFrame:     df_features_wide
    Schema:        listing_id + {num_features} feature columns
    Dimensions:    {num_columns} columns × {row_count:,} rows
    Data Type:     All feature values are StringType
    Status:        Cached in memory

    Features by Category:""")

    for category, features in FEATURE_CATEGORIES.items():
        if features:
            print(f"  • {category}: {len(features)} columns")

    print("""
    Ready for:
    ✓ ML pipelines (VectorAssembler)
    ✓ Export to Parquet/Delta
    ✓ Statistical analysis
    ✓ Feature transformations
    """)

    # COMMAND ----------
    # DBTITLE 1,Export Options (Optional - Uncomment as needed)

    # Delta format (recommended for Databricks)
    # df_features_wide.write.format("delta").mode("overwrite").saveAsTable("features_wide")

    # Parquet format
    # df_features_wide.write.mode("overwrite").parquet("/path/to/features_wide")

    # CSV format (use repartition for better parallelism on write)
    # df_features_wide.repartition(10).write.mode("overwrite").option("header", "true").csv("/path/to/features_csv")

    print(f"✓ df_features_wide ready: {num_columns} cols × {row_count:,} rows")
except:
    pass

In [0]:
try: 
    import json
    import os
    from pyspark.sql.functions import col, mean, min as spark_min, max as spark_max, stddev
    from pyspark.sql.types import NumericType, IntegerType, DoubleType, LongType, FloatType

    def generate_df_summary(df, output_path="/Workspace/Users/avital.ido@campus.technion.ac.il/app_data/df_summary.json"):
        """
        Generates a statistical summary of the DataFrame and saves it to a JSON file.
        """
        
        # 1. Define relevant columns for the app (to ensure nothing is missed)
        # These match the keys in SCHEMA_METADATA inside app.py
        relevant_columns = [
            'city', 'state', 'location', 'ratings', 'property_number_of_reviews',
            'City_Crime_Rate_Per_100K', 'Total_Fatalities', 'Median_Income',
            'Disability_Rate', 'Median_AQI', 'amenities_count', 'price',
            # New score columns used in the UI bar
            'overall_quality_score', 'location_convenience_score', 'host_excellence_score',
            'value_perception_score', 'comfort_index_score', 'family_suitability_score',
            'business_ready_score', 'social_experience_score', 'relaxation_wellness_score',
            'local_immersion_score'
        ]
        
        # Check which columns actually exist in the Spark DataFrame
        actual_cols = [c for c in relevant_columns if c in df.columns]
        
        numeric_types = (IntegerType, DoubleType, LongType, FloatType)
        
        # Identify numeric columns only from the relevant list
        numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, numeric_types) and f.name in actual_cols]
        
        # Calculate statistics (Min, Max, Mean, StdDev)
        agg_exprs = []
        for c in numeric_cols:
            agg_exprs.append(spark_min(col(c)).alias(f"{c}_min"))
            agg_exprs.append(spark_max(col(c)).alias(f"{c}_max"))
            agg_exprs.append(mean(col(c)).alias(f"{c}_mean"))
            agg_exprs.append(stddev(col(c)).alias(f"{c}_std"))

        # Execute aggregation
        if agg_exprs:
            stats_row = df.select(agg_exprs).collect()[0].asDict()
        else:
            stats_row = {}

        summary_dict = {}
        
        # Build the final dictionary structure
        for field in df.schema.fields:
            col_name = field.name
            
            # Process only relevant columns
            if col_name in actual_cols:
                col_type = field.dataType.simpleString()
                
                summary_dict[col_name] = {
                    "type": col_type
                }
                
                # Add stats if it's a numeric column
                if col_name in numeric_cols:
                    summary_dict[col_name].update({
                        "min": stats_row.get(f"{col_name}_min"),
                        "max": stats_row.get(f"{col_name}_max"),
                        "mean": stats_row.get(f"{col_name}_mean"),
                        "std": stats_row.get(f"{col_name}_std")
                    })

        # Ensure output directory exists
        output_dir = os.path.dirname(output_path)
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # Save dictionary to JSON file
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(summary_dict, f, indent=4, ensure_ascii=False)
            
        print(f"✅ Summary saved successfully to: {output_path}")
        return summary_dict

    res_dict = generate_df_summary(df_features_wide)
except:
    pass

In [0]:
try:
    import os
    import json
    import shutil
    from pyspark.sql.functions import col, lower, trim, regexp_replace

    # 1. Define Paths
    base_output_dir = "/Workspace/Users/avital.ido@campus.technion.ac.il/final_app_data"
    data_dir = os.path.join(base_output_dir, "data")
    temp_dir = os.path.join(base_output_dir, "_temp_partitioned")

    # Create directories
    os.makedirs(data_dir, exist_ok=True)

    # 2. Load & Clean Data in Spark
    print("⏳ Loading data from Spark...")
    df_spark = df_features_wide
    selected_columns = [
        'name', 
        'listing_name', 
        'url', 
        'city', 
        'state', 
        'location',
        'county',
        'price', 
        'guests', 
        'pets_allowed',
        'amenities_count',
        'description', 
        'highlights', 
        'image',      
        'images',      
        'seller_info', 
        'reviews', 
        'ratings', 
        'property_number_of_reviews',
        'risk_score_raw', 
        'City_Crime_Rate_Per_100K',
        'Total_Fatalities',
        'Median_Income',
        'Disability_Rate',
        'Median_AQI',
        'overall_quality_score',
        'location_convenience_score',
        'host_excellence_score',
        'value_perception_score',
        'comfort_index_score',
        'family_suitability_score',
        'business_ready_score',
        'social_experience_score',
        'relaxation_wellness_score',
        'local_immersion_score'
    ]
    
    df_clean = df_spark.select(*[col(c) for c in selected_columns if c in df_spark.columns]) \
    .withColumn("price", col("price").cast("double")) \
    .withColumn("final_score", col("effective_rating")) \
    .withColumn("state_clean", lower(trim(regexp_replace(col("state"), "\\.$", "")))) \
    .withColumn("city_clean", lower(trim(col("city")))) \
    .filter(col("state_clean").isNotNull() & (col("state_clean") != ""))

    # Cache the DataFrame since we use it twice (mapping + write)
    df_clean.cache()

    # Trigger cache and get count
    total_rows = df_clean.count()
    print(f"📊 Total rows to process: {total_rows:,}")

    # 3. Create Location Mapping (City -> State File)
    print("🗺️ Building location map...")

    # Get distinct city-state pairs efficiently
    mapping_rows = (
        df_clean
        .select("city_clean", "state_clean")
        .distinct()
        .collect()
    )

    location_map = {}
    unique_states = set()

    for row in mapping_rows:
        city = row["city_clean"]
        state = row["state_clean"]
        if city and state:
            location_map[city] = state
            location_map[state] = state
            unique_states.add(state)

    # Save location map
    map_path = os.path.join(base_output_dir, "location_map.json")
    with open(map_path, "w") as f:
        json.dump(location_map, f)

    print(f"✅ Location map saved. Mapped {len(location_map)} locations.")

    # 4. Write ALL state parquet files in ONE parallel Spark operation
    print(f"💾 Writing parquet files for {len(unique_states)} states using Spark...")

    # Clean temp directory if exists
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)

    # Write partitioned by state - Spark handles parallelism automatically
    # Repartition ensures one partition per state for single-file output
    (
        df_clean
        .repartition(len(unique_states), "state_clean")
        .write
        .mode("overwrite")
        .partitionBy("state_clean")
        .parquet(temp_dir)
    )

    print("📁 Reorganizing files to final structure...")

    # 5. Move files from partitioned structure to flat {state}.parquet format
    # This is just file I/O - very fast
    for state in unique_states:
        partition_path = os.path.join(temp_dir, f"state_clean={state}")
        
        if os.path.exists(partition_path):
            # Find the parquet file(s) in the partition directory
            parquet_files = [
                f for f in os.listdir(partition_path) 
                if f.endswith(".parquet") and not f.startswith("_")
            ]
            
            if parquet_files:
                src_file = os.path.join(partition_path, parquet_files[0])
                dst_file = os.path.join(data_dir, f"{state}.parquet")
                
                # If multiple part files exist, merge them
                if len(parquet_files) > 1:
                    # Read all parts and write as single file
                    (
                        spark.read.parquet(partition_path)
                        .coalesce(1)
                        .write
                        .mode("overwrite")
                        .parquet(dst_file + "_tmp")
                    )
                    # Move the actual file
                    tmp_dir = dst_file + "_tmp"
                    for f in os.listdir(tmp_dir):
                        if f.endswith(".parquet") and not f.startswith("_"):
                            shutil.move(os.path.join(tmp_dir, f), dst_file)
                            break
                    shutil.rmtree(tmp_dir)
                else:
                    shutil.copy2(src_file, dst_file)

    # 6. Cleanup
    shutil.rmtree(temp_dir, ignore_errors=True)
    df_clean.unpersist()

    print(f"🎉 All Done! Saved {len(unique_states)} state files to {data_dir}")
except:
    pass

# Creates the Application


In [0]:
%pip install scikit-learn openai pandas streamlit

dbutils.library.restartPython()

import openai
import json
import pandas as pd
import numpy as np
import os
from typing import Dict, List, Tuple
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics.pairwise import euclidean_distances


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%%writefile app.py
import streamlit as st
import pandas as pd
import json
import ast
import openai
import numpy as np
import os
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from typing import Dict, List, Any, Optional
from sklearn.metrics.pairwise import euclidean_distances
# --- Configuration & Setup ---

try:
    api_key = st.secrets["OPENAI_API_KEY"]
except:
    api_key = st.text_input("Enter OpenAI API Key", type="password")

if not api_key:
    st.warning("Please provide an API Key to continue.")
    st.stop()

client = openai.OpenAI(api_key=api_key)

@st.cache_data
def load_schema_metadata() -> Dict[str, Any]:
    """
    Loads metadata (min/max/mean) from the JSON generated by the Data Engineering pipeline.
    """
    if os.path.exists(SUMMARY_FILE):
        try:
            with open(SUMMARY_FILE, 'r') as f:
                return json.load(f)
        except Exception as e:
            # Fallback if file is corrupt
            return {}
            
    # Fallback to hardcoded defaults if file is missing (Optional)
    return {
        'price': {'type': 'double', 'mean': 200.0},
        'ratings': {'type': 'double', 'mean': 4.5}
    }




# --- Paths ---
BASE_DIR = "/Workspace/Users/avital.ido@campus.technion.ac.il/final_app_data"
DATA_DIR = os.path.join(BASE_DIR, "data")
MAP_FILE = os.path.join(BASE_DIR, "location_map.json")
SUMMARY_FILE = os.path.join(BASE_DIR, "df_summary.json")
SCHEMA_METADATA = load_schema_metadata()

# --- Helper Functions ---

def safe_parse(value: Any, default: Any = None) -> Any:
    if value is None or value == "": return default
    if not isinstance(value, str): return value
    try: return json.loads(value)
    except:
        try: return ast.literal_eval(value)
        except: return default

def get_score_color(score: float) -> str:
    try: score = float(score)
    except: return "#808080"
    score = max(0, min(10, score))
    if score < 5: return "#21c354"
    elif score < 7.5: return "#ffa500"
    else: return "#ff4b4b"

def get_gradient_color(score_val: float) -> str:
    """
    Calculates a color from Red (0) to Yellow (50) to Green (100).
    Returns a hex string.
    """
    try:
        val = float(score_val)
    except:
        return "#e0e0e0" # Gray for N/A
    
    val = max(0, min(10, val)) 
    
    if val <= 5:
        # Interpolate Red -> Yellow
        ratio = val / 5.0
        r = int(255 + (255 - 255) * ratio) # 255 -> 255
        g = int(75 + (193 - 75) * ratio)   # 75 -> 193
        b = int(75 + (7 - 75) * ratio)     # 75 -> 7
    else:
        # Interpolate Yellow -> Green
        ratio = (val - 5) / 5.0
        r = int(255 + (33 - 255) * ratio)  # 255 -> 33
        g = int(193 + (195 - 193) * ratio) # 193 -> 195
        b = int(7 + (84 - 7) * ratio)      # 7 -> 84
        
    return f"#{r:02x}{g:02x}{b:02x}"

# --- Optimized Data Loading ---

@st.cache_data
def load_location_map() -> Dict[str, str]:
    """
    Loads the JSON mapping that links cities/states to their filename code.
    Example: {"miami": "fl", "florida": "fl"}
    """
    if os.path.exists(MAP_FILE):
        with open(MAP_FILE, 'r') as f:
            return json.load(f)
        
    fallback_path = "/Workspace/Users/avital.ido@campus.technion.ac.il/unique_locations_list.json"
    if os.path.exists(fallback_path):
        try:
            with open(fallback_path, 'r') as f:
                data = json.load(f)
                if isinstance(data, list):
                    return {str(item).lower(): str(item).lower() for item in data}
                
                return data
        except Exception:
            return {} 
            
    return {}


    

def load_data_for_state(state_code: str):
    """
    Loads data and performs strict deduplication based on Title/Name and URL.
    """
    file_path = os.path.join(DATA_DIR, f"{state_code}.parquet")
    
    if not os.path.exists(file_path):
        st.error(f"Data file for state '{state_code}' not found.")
        return pd.DataFrame()
        
    try:
        df = pd.read_parquet(file_path)
        
        # Standardize the Name column for deduplication
        # Checks 'listing_title', then 'name', then 'listing_name'
        if 'listing_title' in df.columns:
            df['dup_check_name'] = df['listing_title']
        elif 'name' in df.columns:
            df['dup_check_name'] = df['name']
        else:
            df['dup_check_name'] = df.get('listing_name', 'Unknown')

        # Drop duplicates based on URL first (fastest)
        if 'url' in df.columns:
            df = df.drop_duplicates(subset=['url'])
            
        # Drop duplicates based on Name (stricter)
        # This ensures we don't see the same apartment with a slightly different link
        if 'dup_check_name' in df.columns:
            df = df.drop_duplicates(subset=['dup_check_name'])
            
        return df
    except Exception as e:
        st.error(f"Error loading data: {e}")
        return pd.DataFrame()

# --- Logic ---

def find_similar_properties(property_data: pd.DataFrame, target_record: Dict, top_k: int = 50):
    top_k = min(top_k, len(property_data))
    # FIXED: Uncommented and fixed logic to ensure it returns a DataFrame
    if property_data.empty: return property_data
    
    numeric_cols = [col for col, meta in SCHEMA_METADATA.items() if meta.get('type') in ['double', 'int', 'bigint', 'float'] and col in property_data.columns]
    
    df_numeric = property_data[numeric_cols].copy().fillna(0)
    target_vector = []
    valid_cols = []
    
    for col in numeric_cols:
        val = target_record.get(col)
        if val is not None and val != "":
            try: target_vector.append(float(val)); valid_cols.append(col)
            except: pass
            
    if not valid_cols: 
        return property_data.head(top_k)

    X = df_numeric[valid_cols]
    scaler = StandardScaler()
    try:
        X_scaled = scaler.fit_transform(X)
        target_vector_scaled = scaler.transform([target_vector])
        
        k = min(len(X), top_k)
        nn = NearestNeighbors(n_neighbors=k, metric='euclidean')
        nn.fit(X_scaled)
        
        dists, idxs = nn.kneighbors(target_vector_scaled)
        
        sim_props = property_data.iloc[idxs[0]].copy()
        sim_props['match_score'] = 1 / (1 + dists[0])
        
        return sim_props
    except: 
        return property_data.head(top_k)

def update_best_match(df, row, decision, selections=None):
    """
    Updates the recommendation queue based on user interaction.
    If user LIKES a property, the remaining queue is re-sorted to show similar items first.
    """
    if selections is None: selections = []
    
    # Normalize the name for the selections list (using logic from user request)
    current_name = row.get('listing_title', row.get('name', 'Unknown'))
    
    # Handle LIKE decision
    if decision == "like":
        prop_data = row.to_dict() if hasattr(row, 'to_dict') else row
        
        # Ensure name consistency in saved data
        prop_data['name'] = current_name
        
        # Add to favorites (Double check against existing names/urls in selections)
        existing_identifiers = {item.get('url') for item in selections} | {item.get('name') for item in selections}
        
        if prop_data.get('url') not in existing_identifiers and current_name not in existing_identifiers:
            selections.append(prop_data)
            
        # --- SMART RE-RANKING LOGIC ---
        # If user liked this, we find remaining items that are closest to this one
        # This creates a "More like this" effect dynamically
        
        # Define features for similarity (Must allow for missing columns)
        potential_features = [
            'price', 'ratings', 'property_number_of_reviews', 
            'amenities_count', 'City_Crime_Rate_Per_100K', 
            'Median_Income', 'overall_quality_score'
        ]
        
        valid_features = [c for c in potential_features if c in df.columns]
        
        # We need at least 2 rows (current + others) and valid features to re-rank
        if valid_features and len(df) > 1:
            try:
                # 1. Get vector of the LIKED property
                target_vector = row[valid_features].fillna(0).values.reshape(1, -1)
                
                # 2. Get vectors of REMAINING properties (excluding current)
                remaining_df = df.iloc[1:].copy()
                candidate_vectors = remaining_df[valid_features].fillna(0).values
                
                # 3. Calculate distance (Lower is better/more similar)
                dists = euclidean_distances(target_vector, candidate_vectors).flatten()
                
                # 4. Sort remaining DF by distance
                remaining_df['sim_dist'] = dists
                remaining_df = remaining_df.sort_values('sim_dist', ascending=True).drop(columns=['sim_dist'])
                
                # Return re-ordered list
                return remaining_df.reset_index(drop=True), selections
            except:
                pass # If math fails, fall back to standard behavior

    # Standard behavior: Just remove the current item and move to next
    if len(df) > 0:
        return df.iloc[1:].reset_index(drop=True), selections
        
    return df, selections

def load_filtered_data(max_price: float, location_filter: str, required_columns: list):
    """
    Loads data and removes duplicates immediately to prevent same property appearing twice.
    """
    data_path = "/dbfs/FileStore/Users/avital.ido@campus.technion.ac.il/eliezer_data"
    try:
        
        # filters = [('price', '<=', float(max_price))]
        filters = [
                [('price', '<=', float(max_price)), ('city', '==', location_filter)],
                [('price', '<=', float(max_price)), ('state', '==', location_filter)],
                [('price', '<=', float(max_price)), ('county', '==', location_filter)]
            ]
        df = pd.read_parquet(data_path, columns=required_columns, filters=filters, engine='pyarrow')
        
        # Drop Duplicates Logic
        if 'url' in df.columns:
            df = df.drop_duplicates(subset=['url'])
        else:
            cols_to_check = [c for c in ['name', 'city', 'price'] if c in df.columns]
            if cols_to_check:
                df = df.drop_duplicates(subset=cols_to_check)
        
        return df
    except Exception as e:
        st.error(f"Error loading filtered data: {e}")
        return pd.DataFrame()


def find_best_match(user_location: str, max_price: float, requirements: str):
    # 1. Resolve State File
    location_map = load_location_map()
    search_term = user_location.lower().strip()
    
    # Try to find the state code from the map
    state_code = location_map.get(search_term)
    
    if not state_code:
        # Fallback logic
        required_cols = list(SCHEMA_METADATA.keys()) + ['name', 'listing_name', 'image', 'url', 'seller_info', 'guests', 'highlights', 'description', 'reviews', 'images', 'pets_allowed', 'listing_title']
        return load_filtered_data(max_price, user_location, required_cols)

    # 2. Load SPECIFIC State Data (Now deduped by name)
    df = load_data_for_state(state_code)
    
    if df.empty: return pd.DataFrame()
    
    # 3. Filter by Price
    df = df[df['price'] <= float(max_price)]
    
    # 4. Filter by specific City
    if search_term != state_code:
         df = df[
            df['city'].astype(str).str.lower().str.contains(search_term, na=False) | 
            df['state'].astype(str).str.lower().str.contains(search_term, na=False)
        ]
    
    # --- ENHANCED FILTERING: Remove items already in Favorites by Name AND URL ---
    if 'selections' in st.session_state and st.session_state.selections:
        liked_urls = {item.get('url') for item in st.session_state.selections}
        liked_names = {item.get('name') for item in st.session_state.selections} # Collect names
        
        # Filter by URL
        if 'url' in df.columns:
            df = df[~df['url'].isin(liked_urls)]
            
        # Filter by Name/Title (Handles the user request)
        # We check against 'name', 'listing_name', or 'listing_title'
        cols_to_check = [c for c in ['name', 'listing_name', 'listing_title'] if c in df.columns]
        for c in cols_to_check:
             df = df[~df[c].isin(liked_names)]
    # ---------------------------------------------------------------------------

    if df.empty: return pd.DataFrame()
    # 5. LLM Logic
    system_prompt = f"""
    You are a highly intelligent data matching assistant for an Airbnb search engine.
    Your task is to translate user requirements into a structured JSON record that follows the provided schema.

    RULES FOR VALUE ASSIGNMENT:
    1. UNDERSTAND CONTEXT: Analyze the user's free text. If they ask for "safety", reduce 'City_Crime_Rate_Per_100K'. If they want "luxury", increase 'Median_Income' and 'amenities_count'.
    2. NUMERIC FIELDS:
       - If the user provides a requirement that maps to a numeric field, set a realistic value based on the [min, max, mean, std] provided.
       - IMPORTANT: If the user DOES NOT mention or imply anything about a numeric field, you MUST set its value to exactly the 'mean' provided in the schema.
    3. STRING FIELDS:
       - If information is provided or can be inferred (like City/State from location), fill it.
       - If NO information is provided for a string field, set it to an empty string "".
    4. CONSISTENCY: Ensure all keys from the schema are present in the output.

    Schema Metadata:
    {list(SCHEMA_METADATA.keys())}

    Return ONLY a valid JSON object.
    """
    
    try:
        res = client.chat.completions.create(model="gpt-4o", messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": f"Loc: {user_location}, Max: {max_price}, Req: {requirements}"}], response_format={"type": "json_object"}, temperature=0.7)
        target = json.loads(res.choices[0].message.content)
        target['price'] = float(max_price)
    except: target = {'price': float(max_price)}
    
    return find_similar_properties(df, target)

# --- CALLBACK FUNCTION ---

def handle_swipe(decision):
    if st.session_state.current_df is None or len(st.session_state.current_df) == 0:
        return

    current_row = st.session_state.current_df.iloc[0]
    
    new_df, new_selections = update_best_match(
        st.session_state.current_df, 
        current_row, 
        decision, 
        st.session_state.selections
    )
    
    st.session_state.current_df = new_df
    st.session_state.selections = new_selections
    st.session_state.gallery_idx = 0

# --- UI ---

def main():
    st.set_page_config(page_title="Airbnb Matcher", page_icon="🏡", layout="wide")
    
    if 'page' not in st.session_state: st.session_state.page = 'form'
    if 'current_df' not in st.session_state: st.session_state.current_df = None
    if 'selections' not in st.session_state: st.session_state.selections = []
    if 'gallery_idx' not in st.session_state: st.session_state.gallery_idx = 0
    if 'last_prop_id' not in st.session_state: st.session_state.last_prop_id = None
        
    st.markdown("""<style>
        .block-container { padding-top: 2rem; }
        .header-container { text-align: center; margin-bottom: 30px; }
        .header-title { 
            font-size: 4rem; 
            font-weight: 800; 
            background: linear-gradient(90deg, #FF5A5F 0%, #FF385C 100%);
            -webkit-background_clip: text;
            -webkit-text-fill-color: transparent;
            margin-bottom: 0;
            cursor: pointer;
            text-align: center;
        }
        .header-subtitle { font-size: 1.2rem; color: #717171; font-weight: 500; margin-top: -10px; text-align: center; }
        .main-card { background: white; padding: 25px; border-radius: 15px; box-shadow: 0 4px 15px rgba(0,0,0,0.08); margin-bottom: 20px; }
        .score-badge { font-size: 2em; font-weight: bold; padding: 10px; border-radius: 12px; color: white; text-align: center; display: block; }
        .highlight-item { background: #f8f9fa; border-left: 4px solid #FF5A5F; padding: 10px; margin-bottom: 8px; border-radius: 4px; }
        .reviews-container { max-height: 250px; overflow-y: auto; padding: 10px; border: 1px solid #eee; border-radius: 8px; margin-top: 10px; }
        .review-bubble { background: #f7f7f7; padding: 12px; border-radius: 12px; margin-bottom: 10px; font-style: italic; border-left: 3px solid #ddd; font-size: 0.9em; color: #444; }
        .custom-img { width: 100%; border-radius: 10px; object-fit: cover; height: 400px; display: block; margin-left: auto; margin-right: auto; }
        div.stButton > button.title-btn { background: none; border: none; font-size: 4rem; font-weight: 800; color: #FF5A5F; width: 100%; }
        .about-card { padding: 20px; border-radius: 10px; background: #f9f9f9; margin-bottom: 15px; border-left: 5px solid #FF5A5F; }
        button[data-testid="baseButton-secondary"] { border-color: #ff4b4b !important; color: #ff4b4b !important; font-weight: bold; }
        button[data-testid="baseButton-secondary"]:hover { background-color: #ff4b4b !important; color: white !important; }
        button[data-testid="baseButton-primary"] { background-color: #21c354 !important; border-color: #21c354 !important; font-weight: bold; }
        button[data-testid="baseButton-primary"]:hover { background-color: #1a9c43 !important; border-color: #1a9c43 !important; }

        .score-container {
            display: flex;
            flex-direction: row;
            width: 100%;          /* Force container to take full width */
            gap: 5px;             /* Small gap to fit all 10 items */
            padding: 5px 0;
            margin-bottom: 15px;
            /* overflow-x is removed so items shrink to fit instead of scroll */
        }
        .score-container::-webkit-scrollbar {
            display: none; /* Hide scrollbar for Chrome/Safari */
        }
        .score-card {
            flex: 1;              /* Grow to fill space equally */
            min-width: 0;         /* Allow item to shrink below content size if needed */
            text-align: center;
            padding: 6px 2px;
            border-radius: 6px;
            color: #333;
            box-shadow: 0 1px 3px rgba(0,0,0,0.1);
            border: 1px solid rgba(0,0,0,0.05);
            display: flex;
            flex-direction: column;
            justify-content: center;
            align-items: center;
        }
        
        .score-val { 
            font-weight: 800; 
            font-size: 0.9em; 
            line-height: 1.1;
        }
        
        .score-label { 
            font-size: 0.55em;    /* Slightly smaller to prevent text overflow */
            line-height: 1.1; 
            font-weight: 600; 
            text-transform: uppercase; 
            margin-top: 2px;
            white-space: nowrap;  /* Keep text on one line */
            overflow: hidden;     /* Hide overflow */
            text-overflow: ellipsis; /* Add ... if text is too long */
            width: 100%;
        }

        .score-card {
            /* ... (keep existing properties) ... */
            position: relative; /* Needed to anchor the tooltip */
            cursor: pointer;    /* Indicates interactivity */
        }

        /* The hidden text box */
        .score-card .tooltip-text {
            visibility: hidden;
            width: 160px;
            background-color: #333;
            color: #fff;
            text-align: center;
            border-radius: 6px;
            padding: 8px;
            font-size: 0.7rem;
            font-weight: normal;
            line-height: 1.2;
            
            /* Position the tooltip */
            position: absolute;
            z-index: 10;
            bottom: 110%; /* Place above the card */
            left: 50%;
            margin-left: -80px; /* Center it (half of width) */
            
            /* Fade effect */
            opacity: 0;
            transition: opacity 0.3s;
            box-shadow: 0 4px 6px rgba(0,0,0,0.3);
        }

        /* Show the tooltip on hover */
        .score-card:hover .tooltip-text {
            visibility: visible;
            opacity: 1;
        }
        
        /* Little arrow at the bottom of the tooltip */
        .score-card .tooltip-text::after {
            content: "";
            position: absolute;
            top: 100%;
            left: 50%;
            margin-left: -5px;
            border-width: 5px;
            border-style: solid;
            border-color: #333 transparent transparent transparent;
        }
    </style>""", unsafe_allow_html=True)
    
    # Header
    _, col_center, _ = st.columns([1, 6, 1])
    with col_center:
        if st.button("🏡 AirBNB Matcher", key="home_btn", use_container_width=True):
            st.session_state.selections = []
            st.session_state.current_df = None
            st.session_state.page = 'form'
            st.rerun()
        st.markdown('<p class="header-subtitle">Developed by Ido Avital & Eliezer Mashivoc & Mike Gruntov & Evgeny Mishlyakov</p>', unsafe_allow_html=True)

    # PAGE 1: FORM
    if st.session_state.page == 'form':
        left_col, center_col, right_col = st.columns([1, 2, 1])
        with center_col:
            st.markdown("<h3 style='text-align: center;'>Find your perfect stay</h3><br>", unsafe_allow_html=True)
            
            # Load locations from map
            location_map = load_location_map()
            available_locs = sorted(list(location_map.keys()))
            
            if available_locs:
                loc_display = [l.title() for l in available_locs]
                selected_loc_disp = st.selectbox("Select Target Location", options=loc_display)
                loc = selected_loc_disp.lower() 
            else:
                loc = st.text_input("Target Location").lower()
            
            price = st.number_input("Maximum Price per Night ($)", min_value=50, value=1000, step=10)
            req = st.text_area("Describe your dream vacation", height=145, placeholder="e.g. Quiet apartment near the beach...")
            st.markdown("<br>", unsafe_allow_html=True)
            
            if st.button("🚀 Start Search", use_container_width=True):
                loading_msg = "🔍 Scanning Airbnb ecosystem... (This may take up to 90s)"
                with st.spinner(loading_msg):
                    res = find_best_match(loc, price, req)
                    if not res.empty:
                        st.session_state.current_df = res
                        st.session_state.page = 'swipe'
                        st.rerun()
                    else: st.error("No matches found. Please try different criteria.")
            
            st.markdown("<div style='height: 10px'></div>", unsafe_allow_html=True)
            if st.button("ℹ️ About This App", use_container_width=True):
                st.session_state.page = 'about'
                st.rerun()

    # PAGE 4: ABOUT
    elif st.session_state.page == 'about':
        st.subheader("ℹ️ About AirBNB Matcher")
        st.markdown("""
        <div class="about-card"><h3>🎯 The Mission</h3><p>Finding the perfect Airbnb can be overwhelming. This tool uses <b>AI and Big Data</b> to match you with properties that fit your specific needs—not just your budget.</p></div>
        <div class="about-card"><h3>⚙️ How It Works</h3><ul><li><b>Smart Filtering:</b> We scan partitioned data for maximum speed.</li><li><b>AI Analysis:</b> GPT-4 understands your text description.</li><li><b>Similarity Matching:</b> KNN algorithms find properties that match your profile.</li></ul></div>
        <div class="about-card"><h3>👨‍💻 Developers</h3><p>Built by <b>Ido Avital, Eliezer Mashivoc, Mike Gruntov, Evgeny Mishlyakov</b>.</p></div>
        """, unsafe_allow_html=True)
        if st.button("⬅️ Back to Search", use_container_width=True):
            st.session_state.page = 'form'
            st.rerun()

    # PAGE 2: SWIPE
    elif st.session_state.page == 'swipe':
        if st.session_state.current_df is None or len(st.session_state.current_df) == 0:
            st.success("🎉 Reviewed all!"); st.session_state.page = 'results'; st.rerun(); return

        row = st.session_state.current_df.iloc[0]
        listing_name = row.get('name', row.get('listing_name', 'Listing'))
        parts = listing_name.split(' · ', 2)
        name = parts[0]
        rating = parts[1]
        short_description = parts[2]

        title = row.get('listing_title', name)

        pets_allowed = row.get('pets_allowed', 'No')
        if pets_allowed != 'true':
            pets_allowed = 'No Pets Allowed'
        else:
            pets_allowed = 'Pets Allowed'
            
        df_len = len(st.session_state.current_df)
        prop_key = f"{df_len}_{name[:10]}" 
        
        if st.session_state.last_prop_id != name:
            st.session_state.gallery_idx = 0
            st.session_state.last_prop_id = name
        
        main_img = row.get('image', None)
        extra_imgs = safe_parse(row.get('images'), [])
        gallery_images = (extra_imgs if isinstance(extra_imgs, list) else [])
        if not gallery_images: gallery_images = ["https://via.placeholder.com/800x600?text=No+Image"]
        
        idx = st.session_state.gallery_idx
        if idx >= len(gallery_images): idx = 0
        
        with st.container():
            st.markdown('<div class="main-card">', unsafe_allow_html=True)
            st.markdown(f"<h2>{title}</h2>", unsafe_allow_html=True)
            st.markdown(f"📍 {row.get('city')}, {row.get('state')}<hr>", unsafe_allow_html=True)
            
            score_columns = [
                ('Overall', 'overall_quality_score'),
                ('Location', 'location_convenience_score'),
                ('Host', 'host_excellence_score'),
                ('Value', 'value_perception_score'),
                ('Comfort', 'comfort_index_score'),
                ('Family', 'family_suitability_score'),
                ('Business', 'business_ready_score'),
                ('Social', 'social_experience_score'),
                ('Wellness', 'relaxation_wellness_score'),
                ('Local', 'local_immersion_score')
            ]
            SCORE_EXPLANATIONS = {
                'overall_quality_score': "Aggregated score combining all metrics for a final verdict.",
                'location_convenience_score': "Proximity to city center, transit, and major landmarks.",
                'host_excellence_score': "Based on host response time, superhost status, and ratings.",
                'value_perception_score': "Price-per-night relative to amenities and location quality.",
                'comfort_index_score': "Evaluates bed count, bathroom privacy, and space.",
                'family_suitability_score': "Safety, kid-friendly amenities, and spaciousness.",
                'business_ready_score': "Wifi quality, dedicated workspace, and self check-in.",
                'social_experience_score': "Nightlife proximity and dining options nearby.",
                'relaxation_wellness_score': "Quietness, views, and spa-like amenities.",
                'local_immersion_score': "Authentic neighborhood feel vs tourist trap vibes."
            }
            # Construct the HTML for the score bar
            scores_html = '<div class="score-container">'
            for label, col_name in score_columns:
                try:
                    raw_val = row.get(col_name, 90)
                    raw_val = float(raw_val) if raw_val is not None else 90
                except: raw_val = 0
                raw_val = max(5, min(100.0, raw_val))
                normalized = raw_val / 100.0
                boost_factor = 0.2
                val = (normalized ** boost_factor) * 10

                bg_color = get_gradient_color(val)
                tooltip_text = SCORE_EXPLANATIONS.get(col_name, "Analysis of property features.")

                scores_html += f'<div class="score-card" style="background-color: {bg_color};">'
                scores_html += f'<div class="score-val">{val:.0f}</div>'
                scores_html += f'<div class="score-label">{label}</div>'
                scores_html += f'<span class="tooltip-text">{tooltip_text}</span>'
                scores_html += '</div>'
            scores_html += '</div>'
            st.markdown(scores_html, unsafe_allow_html=True)

            c_img, c_stats = st.columns([1.5, 1])
            with c_img:
                st.markdown(f'<img src="{gallery_images[idx]}" class="custom-img">', unsafe_allow_html=True)
                if len(gallery_images) > 1:
                    st.markdown("<div style='height: 10px;'></div>", unsafe_allow_html=True)
                    c_spacer_l, c_prev, c_text, c_next, c_spacer_r = st.columns([3, 1, 3, 1, 3])
                    with c_prev:
                        if st.button("⬅️", key=f"prev_{prop_key}", use_container_width=True): st.session_state.gallery_idx = (idx - 1) % len(gallery_images); st.rerun()
                    with c_text:
                        st.markdown(f"<div style='text-align:center;color:#555;font-weight:500;padding-top:8px;'>{idx + 1} / {len(gallery_images)}</div>", unsafe_allow_html=True)
                    with c_next:
                        if st.button("➡️", key=f"next_{prop_key}", use_container_width=True): st.session_state.gallery_idx = (idx + 1) % len(gallery_images); st.rerun()

            with c_stats:
                score = float(row.get('risk_score_raw', 0) or 0)*10
                st.markdown("<br>", unsafe_allow_html=True)
                st.markdown(f'<div class="score-badge" style="background:{get_score_color(score)}">Fraud Risk Score: {score:.1f}/10</div>', unsafe_allow_html=True)
                st.write(f"")
                st.write(f"**💰 Price:** ${row.get('price')}")
                st.write(f"**🏠 Arrangement:** {short_description}")
                st.write(f"**👥 Guests:** {row.get('guests', 'N/A')}")
                st.write(f"**⭐ Rating:** {row.get('ratings')} ({row.get('property_number_of_reviews')} reviews)")
                st.write(f"**🐕 Pets:** {pets_allowed}")
                
                st.markdown("<br>", unsafe_allow_html=True)
                st.link_button("🏠 View in Airbnb", row.get('url', '#'))
                s = safe_parse(row.get('seller_info'), {})
                st.link_button("👤 Host details", s.get('url', '#') if isinstance(s, dict) else '#')

            hls = safe_parse(row.get('highlights'), [])
            if hls and isinstance(hls, list):
                st.markdown("#### ✨ Highlights")
                ch = st.columns(min(3, len(hls)))
                for i, h in enumerate(hls[:3]):
                    if isinstance(h, dict):
                        with ch[i]: st.markdown(f'<div class="highlight-item"><b>{h.get("name")}</b><br><small>{h.get("value")}</small></div>', unsafe_allow_html=True)

            with st.expander("📝 Description"): st.write(row.get('description', 'No desc.'))
            revs = safe_parse(row.get('reviews'), [])
            st.markdown(f"#### 💬 Reviews ({len(revs) if isinstance(revs, list) else 0})")
            if isinstance(revs, list) and revs:
                rh = "".join([f'<div class="review-bubble">"{r}"</div>' for r in revs])
                st.markdown(f'<div class="reviews-container">{rh}</div>', unsafe_allow_html=True)
            else: st.info("No text reviews.")
            st.markdown('</div>', unsafe_allow_html=True)
        
        ca1, ca2, ca3 = st.columns([1, 2, 1])
        with ca1:
            st.button("❌ Pass", on_click=handle_swipe, args=("dislike",), use_container_width=True, key=f"pass_btn_{prop_key}", type="secondary")
        with ca2:
            if st.button("📋 Favorites", use_container_width=True): st.session_state.page = 'results'; st.rerun()
        with ca3:
            st.button("❤️ Like", on_click=handle_swipe, args=("like",), use_container_width=True, type="primary", key=f"like_btn_{prop_key}")

    # PAGE 3: RESULTS
    elif st.session_state.page == 'results':
        st.subheader("🎉 Your Favorites")
        
        if not st.session_state.selections: 
            st.info("No likes yet.")
        else:
            for i, item in enumerate(st.session_state.selections):
                # Display logic... (Keep your existing expander code here)
                with st.expander(f"{i+1}. {item.get('name', 'Prop')} - ${item.get('price')}", expanded=True):
                    cr1, cr2 = st.columns([1, 3])
                    with cr1: 
                        img = item.get('image')
                        if img: st.markdown(f'<img src="{img}" class="custom-img" style="height: 150px;">', unsafe_allow_html=True)
                    with cr2:
                        st.write(f"Location: {item.get('city')}")
                        st.link_button("Book", item.get('url', '#'))
        
        st.markdown("---")
        cb1, cb2 = st.columns(2)
        
        with cb1: 
            # --- UI LOGIC CHANGE: Check if any properties remain ---
            if st.session_state.current_df is None or st.session_state.current_df.empty:
                st.info("✅ You have viewed all properties.")
            else:
                if st.button("⬅️ Back to Swipe"): 
                    st.session_state.page = 'swipe'
                    st.rerun()
            # -------------------------------------------------------
            
        with cb2:
            if st.button("🔄 New Search"): 
                st.session_state.selections = []; st.session_state.current_df = None; st.session_state.page = 'form'; st.rerun()

if __name__ == "__main__":
    main()

Overwriting app.py


In [0]:
import subprocess
import time
import os

# 1. Kill old streamlit processes to free the port
try:
    subprocess.run(["pkill", "-f", "streamlit"], check=False)
    time.sleep(2)
except:
    pass

log_file = open("streamlit.log", "w")

# 2. Run the streamlit app in the background
process = subprocess.Popen(
    ["streamlit", "run", "app.py", "--server.port", "8501", "--server.address", "0.0.0.0", "--server.headless", "true"],
    # stdout=subprocess.PIPE,
    # stderr=subprocess.PIPE
    stdout=log_file, 
    stderr=log_file
)

print("🚀 Starting Streamlit server...")
time.sleep(5) # Wait for server to initialize

# 3. Construct the Driver Proxy URL
try:
    cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
    org_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")
    
    # Try getting workspace URL dynamically
    workspace_url = spark.conf.get("spark.databricks.workspaceUrl", None)
    
    if workspace_url:
        dashboard_url = f"https://{workspace_url}/driver-proxy/o/{org_id}/{cluster_id}/8501/"
    else:
        dashboard_url = f"https://adb-{org_id}.{cluster_id[0:2]}.azuredatabricks.net/driver-proxy/o/{org_id}/{cluster_id}/8501/"

    print("-" * 80)
    print(f"✅ App is running! Click the link below to open:")
    print(f"\n🔗 {dashboard_url}\n")
    print("-" * 80)

except Exception as e:
    print(f"Error constructing URL: {e}")

🚀 Starting Streamlit server...
--------------------------------------------------------------------------------
✅ App is running! Click the link below to open:

🔗 https://adb-983293358114278.18.azuredatabricks.net/driver-proxy/o/983293358114278/1223-140823-io2f79uq/8501/

--------------------------------------------------------------------------------
