In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.airbnb_bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.airbnb_silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.airbnb_gold")


In [0]:
display(
    spark.table("workspace.airbnb_bronze.listings_raw")
         .groupBy("city")
         .count()
)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

CATALOG = "workspace"  
BRONZE_TABLE = f"{CATALOG}.airbnb_bronze.listings_raw"
SILVER_LISTINGS_TABLE = f"{CATALOG}.airbnb_silver.listings"
SILVER_HOSTS_TABLE    = f"{CATALOG}.airbnb_silver.hosts"

dbutils.widgets.text("city", "Paris")
city_name = dbutils.widgets.get("city")
print(f"[silver] Transform for city={city_name}")


In [0]:
display(
    spark.table("workspace.airbnb_silver.listings")
         .groupBy("city")
         .count()
         .orderBy("city")
)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

CATALOG = "workspace"
BRONZE_TABLE = f"{CATALOG}.airbnb_bronze.listings_raw"

city_name = "Venice"

# ---------------------------------------------------------
# Step 0: Load bronze data for this city
# ---------------------------------------------------------
bronze_city = (
    spark.table(BRONZE_TABLE)
         .where(F.col("city") == city_name)
)

print("Step 0: raw Paris rows in bronze")
print(bronze_city.count())

print("Columns in Paris bronze dataset:")
print(bronze_city.columns)

# ---------------------------------------------------------
# ðŸ§¼ Step 0.1: Clean empty list-like strings
# ---------------------------------------------------------
def clean_empty_list_strings(df, cols):
    for c in cols:
        if c in df.columns:
            df = df.withColumn(
                c,
                F.when(
                    F.col(c).isNull() |
                    (F.trim(F.col(c)) == "") |
                    (F.trim(F.col(c)) == "[]") |
                    (F.trim(F.col(c)) == "['']") |
                    (F.length(F.col(c)) <= 4),
                    F.lit(None)
                ).otherwise(F.col(c))
            )
    return df

cols_to_clean = ["host_verifications", "amenities"]
bronze_city = clean_empty_list_strings(bronze_city, cols_to_clean)
print(f"[silver] Cleaned empty placeholders in {cols_to_clean}")

# ---------------------------------------------------------
# Step 1: Identify dynamic columns (price, lat, lon)
# ---------------------------------------------------------
price_candidates = [c for c in ["price","price_x","price_y","cost","nightly_price"] if c in bronze_city.columns]
lat_candidates   = [c for c in ["latitude","lat","Latitude","geo_lat"] if c in bronze_city.columns]
lon_candidates   = [c for c in ["longitude","lon","Longitude","lng","geo_lon"] if c in bronze_city.columns]

print("Price candidates:", price_candidates)
print("Lat candidates:", lat_candidates)
print("Lon candidates:", lon_candidates)

price_col = price_candidates[0] if price_candidates else None
lat_col   = lat_candidates[0]   if lat_candidates else None
lon_col   = lon_candidates[0]   if lon_candidates else None

num_regex = r'(\d+(\.\d+)?)'
bronze_city_typed = bronze_city

# ---------------------------------------------------------
# Step 2: Cast and clean numeric / text fields
# ---------------------------------------------------------
# bathrooms_clean
bronze_city_typed = bronze_city_typed.withColumn(
    "bathrooms_clean",
    F.when(
        F.col("bathrooms_text").rlike("(?i)^\\s*half"), F.lit(0.5)
    ).when(
        F.col("bathrooms_text").isNull() | (F.trim(F.col("bathrooms_text")) == ""),
        F.lit(None).cast("double")
    ).when(
        F.col("bathrooms_text").rlike(num_regex),
        F.regexp_extract("bathrooms_text", num_regex, 1).cast("double")
    ).otherwise(F.lit(None).cast("double"))
)

# price_clean (robust euro/commas/space parsing)
if price_col is not None:
    # Step 1: replace commas with dots (e.g. "120,50 â‚¬" -> "120.50 â‚¬")
    tmp_price = F.regexp_replace(F.col(price_col), ",", ".")
    # Step 2: remove everything except digits and dot ("â‚¬1 234.50 EUR" -> "1234.50")
    tmp_price = F.regexp_replace(tmp_price, r"[^0-9\.]", "")
    # Step 3: extract first numeric token
    bronze_city_typed = bronze_city_typed.withColumn(
        "price_number_str",
        F.regexp_extract(tmp_price, r"(\d+(\.\d+)?)", 1)
    )
    # Step 4: cast to double
    bronze_city_typed = bronze_city_typed.withColumn(
        "price_clean",
        F.col("price_number_str").cast("double")
    )
else:
    bronze_city_typed = bronze_city_typed.withColumn(
        "price_clean", F.lit(None).cast("double")
    )

# ---------------------------------------------------------
# Step 3: Date and numeric conversions
# ---------------------------------------------------------
bronze_city_typed = (
    bronze_city_typed
    .withColumn("last_scraped_dt", F.to_date("last_scraped"))
    .withColumn("host_since_dt",   F.to_date("host_since"))
    .withColumn("availability_30_int",  F.col("availability_30").cast("int"))
    .withColumn("availability_365_int", F.col("availability_365").cast("int"))
    .withColumn("review_scores_rating_dbl", F.col("review_scores_rating").cast("double"))
)

print("Step 1: after type casting (bathrooms, price_clean, dates):")
print(bronze_city_typed.count())

# ---------------------------------------------------------
# Step 4: Keep only the latest snapshot per listing_id
# ---------------------------------------------------------
w_latest = Window.partitionBy("id").orderBy(
    F.col("last_scraped_dt").desc_nulls_last()
)

silver_latest_city = (
    bronze_city_typed
    .withColumn("rn", F.row_number().over(w_latest))
    .where(F.col("rn") == 1)
    .drop("rn")
)

print("Step 2: after keeping latest snapshot per listing_id:")
print(silver_latest_city.count())

# ---------------------------------------------------------
# Step 5: Apply quality filters
# ---------------------------------------------------------
if lat_col is not None:
    silver_latest_city = silver_latest_city.withColumn("lat_tmp", F.col(lat_col).cast("double"))
else:
    silver_latest_city = silver_latest_city.withColumn("lat_tmp", F.lit(None).cast("double"))

if lon_col is not None:
    silver_latest_city = silver_latest_city.withColumn("lon_tmp", F.col(lon_col).cast("double"))
else:
    silver_latest_city = silver_latest_city.withColumn("lon_tmp", F.lit(None).cast("double"))

silver_filtered_preview = (
    silver_latest_city
    .where(F.col("price_clean").isNotNull())
    .where((F.col("price_clean") > 0) & (F.col("price_clean") < 5000))
    .where(F.col("lat_tmp").isNotNull() & F.col("lon_tmp").isNotNull())
)

print("Step 3: after applying quality filters we use in silver:")
print(silver_filtered_preview.count())

# ---------------------------------------------------------
# Step 6: Show sample data before/after filters
# ---------------------------------------------------------
print("Example rows that PASSED filters:")
display(
    silver_filtered_preview.select(
        "id", "price_clean", "lat_tmp", "lon_tmp", "last_scraped_dt"
    ).limit(20)
)

print("Example rows BEFORE filters (to inspect raw columns):")
display(
    silver_latest_city.select(
        "id",
        "price_clean",
        "bathrooms_text",
        "bathrooms_clean",
        lat_col if lat_col else F.lit(None).alias("lat_col_missing"),
        lon_col if lon_col else F.lit(None).alias("lon_col_missing"),
        "last_scraped_dt"    ).limit(20)
)


In [0]:
# =========================
# Build / Update SILVER (per-city) from BRONZE
# =========================
from pyspark.sql import functions as F
from pyspark.sql import Window

CATALOG = "workspace"
BRONZE_TABLE          = f"{CATALOG}.airbnb_bronze.listings_raw"
SILVER_LISTINGS_TABLE = f"{CATALOG}.airbnb_silver.listings"
SILVER_HOSTS_TABLE    = f"{CATALOG}.airbnb_silver.hosts"



# ---- City param (widget) ----
try:
    dbutils.widgets.get("city")
except Exception:
    dbutils.widgets.text("city", "Paris")
city_name = dbutils.widgets.get("city")
print(f"[silver] Transform for city = {city_name}")

# ---------------------------------------------------------
# Step 0: Load bronze data for this city (and basic assertions)
# ---------------------------------------------------------
bronze_city = spark.table(BRONZE_TABLE).where(F.col("city") == city_name)

row_cnt = bronze_city.count()
print(f"[silver] bronze rows for {city_name}: {row_cnt}")
if row_cnt == 0:
    raise ValueError(f"No Bronze rows for city='{city_name}'. Check BRONZE_TABLE or city spelling.")

print("[silver] Columns in bronze:", bronze_city.columns)

# ---------------------------------------------------------
# Step 0.1: Defensive cleaning of empty list-like strings
# ---------------------------------------------------------
def clean_empty_list_strings(df, cols):
    for c in cols:
        if c in df.columns:
            df = df.withColumn(
                c,
                F.when(
                    F.col(c).isNull() |
                    (F.trim(F.col(c)) == "") |
                    (F.trim(F.col(c)) == "[]") |
                    (F.trim(F.col(c)) == "['']") |
                    (F.length(F.col(c)) <= 4),
                    F.lit(None)
                ).otherwise(F.col(c))
            )
    return df

bronze_city = clean_empty_list_strings(bronze_city, ["host_verifications", "amenities"])
print(f"[silver] Cleaned placeholders in: ['host_verifications', 'amenities']")

# ---------------------------------------------------------
# Step 0.2: Create robust PK and ingestion timestamp if missing
# ---------------------------------------------------------
cset = set(bronze_city.columns)
if "listing_id" in cset:
    bronze_city = bronze_city.withColumn("pk_listing_id", F.col("listing_id").cast("string"))
elif "id" in cset:
    bronze_city = bronze_city.withColumn("pk_listing_id", F.col("id").cast("string"))
elif "listing_url" in cset:
    bronze_city = bronze_city.withColumn("pk_listing_id", F.sha2(F.col("listing_url"), 256))
else:
    raise ValueError("No suitable primary key available (need one of: listing_id, id, listing_url).")

if "ingestion_timestamp" not in cset:
    bronze_city = bronze_city.withColumn("ingestion_timestamp", F.current_timestamp())

# ---------------------------------------------------------
# Step 1: Identify dynamic columns (price / lat / lon)
# ---------------------------------------------------------
price_candidates = [c for c in ["price","price_x","price_y","cost","nightly_price"] if c in bronze_city.columns]
lat_candidates   = [c for c in ["latitude","lat","Latitude","geo_lat"]               if c in bronze_city.columns]
lon_candidates   = [c for c in ["longitude","lon","Longitude","lng","geo_lon"]       if c in bronze_city.columns]

price_col = price_candidates[0] if price_candidates else None
lat_col   = lat_candidates[0]   if lat_candidates else None
lon_col   = lon_candidates[0]   if lon_candidates else None

print("[silver] Using columns:",
      "price_col =", price_col, "| lat_col =", lat_col, "| lon_col =", lon_col)

# ---------------------------------------------------------
# Step 2: Cast and clean numeric / text fields (guarded)
# ---------------------------------------------------------
num_regex = r'(\d+(\.\d+)?)'
df = bronze_city

# bathrooms_clean (only if bathrooms_text exists)
if "bathrooms_text" in df.columns:
    df = df.withColumn(
        "bathrooms_clean",
        F.when(F.col("bathrooms_text").rlike("(?i)^\\s*half"), F.lit(0.5))
         .when(F.col("bathrooms_text").isNull() | (F.trim(F.col("bathrooms_text")) == ""), F.lit(None).cast("double"))
         .when(F.col("bathrooms_text").rlike(num_regex), F.regexp_extract("bathrooms_text", num_regex, 1).cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )

# price_clean (robust-ish for â‚¬, commas, spaces)
if price_col is not None:
    tmp_price = F.regexp_replace(F.col(price_col), ",", ".")
    tmp_price = F.regexp_replace(tmp_price, r"[^0-9\.]", "")
    df = df.withColumn("price_number_str", F.regexp_extract(tmp_price, num_regex, 1))
    df = df.withColumn("price_clean", F.col("price_number_str").cast("double"))
else:
    df = df.withColumn("price_clean", F.lit(None).cast("double"))

# dates & numerics (guarded)
if "last_scraped" in df.columns:
    df = df.withColumn("last_scraped_dt", F.to_date("last_scraped"))
else:
    df = df.withColumn("last_scraped_dt", F.lit(None).cast("date"))

if "host_since" in df.columns:
    df = df.withColumn("host_since_dt", F.to_date("host_since"))

if "availability_30" in df.columns:
    df = df.withColumn("availability_30_int", F.col("availability_30").cast("int"))
if "availability_365" in df.columns:
    df = df.withColumn("availability_365_int", F.col("availability_365").cast("int"))
if "review_scores_rating" in df.columns:
    df = df.withColumn("review_scores_rating_dbl", F.col("review_scores_rating").cast("double"))

# lat/lon clean
if lat_col is not None:
    df = df.withColumn("lat_tmp", F.col(lat_col).cast("double"))
else:
    df = df.withColumn("lat_tmp", F.lit(None).cast("double"))

if lon_col is not None:
    df = df.withColumn("lon_tmp", F.col(lon_col).cast("double"))
else:
    df = df.withColumn("lon_tmp", F.lit(None).cast("double"))

print("[silver] After type casting:", df.count())

# ---------------------------------------------------------
# Step 3: Keep only the latest snapshot per listing within the city
# ---------------------------------------------------------
w_latest = Window.partitionBy("pk_listing_id").orderBy(
    F.col("last_scraped_dt").desc_nulls_last(),
    F.col("ingestion_timestamp").desc_nulls_last()
)

latest_only = (
    df.withColumn("rn", F.row_number().over(w_latest))
      .where(F.col("rn") == 1)
      .drop("rn")
)

print(f"[silver] {city_name}: rows after latest-per-listing =", latest_only.count())

# ---------------------------------------------------------
# Step 4: Apply quality filters with safety checks
# ---------------------------------------------------------
non_null_price_cnt = latest_only.where(F.col("price_clean").isNotNull()).count()
print(f"[silver] {city_name}: listings with non-null price_clean = {non_null_price_cnt}")

if non_null_price_cnt > 0:
    filtered_stage_a = (
        latest_only
        .where(F.col("price_clean").isNotNull())
        .where((F.col("price_clean") > 0) & (F.col("price_clean") < 5000))
    )
else:
    print(f"[silver] {city_name}: WARNING price parsing failed for entire city; skipping price filter.")
    filtered_stage_a = latest_only

non_null_geo_cnt = filtered_stage_a.where(
    F.col("lat_tmp").isNotNull() & F.col("lon_tmp").isNotNull()
).count()
print(f"[silver] {city_name}: listings with valid lat/lon = {non_null_geo_cnt}")

if non_null_geo_cnt > 0:
    filtered_stage_b = filtered_stage_a.where(F.col("lat_tmp").isNotNull() & F.col("lon_tmp").isNotNull())
    final_filtered = filtered_stage_b if filtered_stage_b.count() > 0 else filtered_stage_a
else:
    print(f"[silver] {city_name}: WARNING no usable lat/lon; skipping geo filter.")
    final_filtered = filtered_stage_a

final_to_write = final_filtered.drop("price_number_str", "lat_tmp", "lon_tmp")
final_count = final_to_write.count()
print(f"[silver] {city_name}: FINAL rows to merge into silver = {final_count}")

# ---------------------------------------------------------
# Step 5: Upsert into SILVER listings (merge on (pk_listing_id, city))
# ---------------------------------------------------------
# ---------------------------------------------------------
# Step 5 (fixed): Upsert into SILVER listings (robust merge)
# ---------------------------------------------------------
if not spark.catalog.tableExists(SILVER_LISTINGS_TABLE):
    (final_to_write
        .write
        .format("delta")
        .mode("overwrite")
        .partitionBy("city")
        .saveAsTable(SILVER_LISTINGS_TABLE))
    print(f"[silver] Created {SILVER_LISTINGS_TABLE} with first batch ({city_name})")
else:
    staging = final_to_write

    # If silver target uses 'listing_id' but staging only has 'pk_listing_id', map it
    if "listing_id" not in staging.columns and "pk_listing_id" in staging.columns:
        staging = staging.withColumn("listing_id", F.col("pk_listing_id"))

    staging.createOrReplaceTempView("silver_new_batch")
    s_cols = staging.columns
    t_cols = [f.name for f in spark.table(SILVER_LISTINGS_TABLE).schema]

    # Join keys (adjust if your target uses different keys)
    join_keys = ["listing_id", "city"]

    # Intersection of columns to avoid unresolved column references
    common_cols = [c for c in s_cols if c in t_cols]

    # Ensure join keys are present in the insert list (they must exist on target)
    for k in join_keys:
        if k not in common_cols and k in s_cols and k in t_cols:
            common_cols.append(k)

    # Columns to update (don't include join keys)
    update_cols = [c for c in common_cols if c not in join_keys]

    # If there are no non-key columns to update, set a benign update using join keys (sets to same values)
    if update_cols:
        set_updates = ", ".join([f"t.{c} = s.{c}" for c in update_cols])
    else:
        # fallback: update the join keys to themselves from source (no-op but satisfies MERGE)
        set_updates = ", ".join([f"t.{k} = s.{k}" for k in join_keys if k in t_cols and k in s_cols])
        if not set_updates:
            # extremely defensive: if even that fails, set a no-op using ingestion_timestamp if possible
            if "ingestion_timestamp" in common_cols:
                set_updates = "t.ingestion_timestamp = s.ingestion_timestamp"
            else:
                raise RuntimeError("No columns available to update in MERGE; check schemas.")

    insert_cols_list = [c for c in common_cols if c in t_cols]
    if not insert_cols_list:
        # ensure at least primary key + city for insert
        insert_cols_list = [k for k in join_keys if k in s_cols and k in t_cols]
        if not insert_cols_list:
            raise RuntimeError("No valid insert columns found for MERGE into listings; check schemas.")

    insert_cols = ", ".join(insert_cols_list)
    insert_vals = ", ".join([f"s.{c}" for c in insert_cols_list])

    merge_condition = " AND ".join([f"t.{k} = s.{k}" for k in join_keys if k in t_cols and k in s_cols])
    if not merge_condition:
        raise RuntimeError("Cannot build merge condition for listings: missing join key columns on target/source.")

    print("[silver] listings MERGE condition:", merge_condition)
    print("[silver] listings UPDATE cols:", update_cols)
    print("[silver] listings INSERT cols:", insert_cols_list)

    spark.sql(f"""
        MERGE INTO {SILVER_LISTINGS_TABLE} t
        USING silver_new_batch s
        ON {merge_condition}
        WHEN MATCHED THEN UPDATE SET {set_updates}
        WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals})
    """)
    print(f"[silver] Merged {city_name} rows into {SILVER_LISTINGS_TABLE}")


# ---------------------------------------------------------
# Step 6 (fixed): Build / upsert SILVER hosts (robust merge)
# ---------------------------------------------------------
if "host_id" in final_to_write.columns:
    silver_hosts_city = (
        final_to_write
        .filter(F.col("host_id").isNotNull())
        .groupBy("city", "host_id")
        .agg(
            F.min("host_since_dt").alias("host_since_dt"),
            F.countDistinct("pk_listing_id").alias("listings_count"),
            F.first("host_name", ignorenulls=True).alias("host_name"),
            F.first("host_is_superhost", ignorenulls=True).alias("host_is_superhost")
        )
        .withColumn("host_since_year", F.year("host_since_dt"))
    )

    if not spark.catalog.tableExists(SILVER_HOSTS_TABLE):
        (silver_hosts_city
            .write
            .format("delta")
            .mode("overwrite")
            .partitionBy("city")
            .saveAsTable(SILVER_HOSTS_TABLE))
        print(f"[silver] Created {SILVER_HOSTS_TABLE} with first batch ({city_name})")
    else:
        hosts_staging = silver_hosts_city
        hosts_staging.createOrReplaceTempView("hosts_new_batch")

        s_hcols = hosts_staging.columns
        t_hcols = [f.name for f in spark.table(SILVER_HOSTS_TABLE).schema]

        # Join keys for hosts
        h_join_keys = ["host_id", "city"]

        # columns common between source and target
        common_hcols = [c for c in s_hcols if c in t_hcols]

        # ensure join keys are present in insert list
        for k in h_join_keys:
            if k not in common_hcols and k in s_hcols and k in t_hcols:
                common_hcols.append(k)

        # update columns (exclude join keys)
        update_hcols = [c for c in common_hcols if c not in h_join_keys]

        if update_hcols:
            set_updates_hosts = ", ".join([f"t.{c} = s.{c}" for c in update_hcols])
        else:
            set_updates_hosts = ", ".join([f"t.{k} = s.{k}" for k in h_join_keys if k in t_hcols and k in s_hcols])
            if not set_updates_hosts:
                if "listings_count" in common_hcols:
                    set_updates_hosts = "t.listings_count = s.listings_count"
                else:
                    raise RuntimeError("No columns available to update in MERGE for hosts; check schemas.")

        insert_cols_hosts_list = [c for c in common_hcols if c in t_hcols]
        if not insert_cols_hosts_list:
            insert_cols_hosts_list = [k for k in h_join_keys if k in s_hcols and k in t_hcols]
            if not insert_cols_hosts_list:
                raise RuntimeError("No valid insert columns found for MERGE into hosts; check schemas.")

        insert_cols_hosts = ", ".join(insert_cols_hosts_list)
        insert_vals_hosts = ", ".join([f"s.{c}" for c in insert_cols_hosts_list])

        merge_condition_hosts = " AND ".join([f"t.{k} = s.{k}" for k in h_join_keys if k in t_hcols and k in s_hcols])
        if not merge_condition_hosts:
            raise RuntimeError("Cannot build merge condition for hosts: missing join key columns on target/source.")

        print("[silver] hosts MERGE condition:", merge_condition_hosts)
        print("[silver] hosts UPDATE cols:", update_hcols)
        print("[silver] hosts INSERT cols:", insert_cols_hosts_list)

        spark.sql(f"""
            MERGE INTO {SILVER_HOSTS_TABLE} t
            USING hosts_new_batch s
            ON {merge_condition_hosts}
            WHEN MATCHED THEN UPDATE SET {set_updates_hosts}
            WHEN NOT MATCHED THEN INSERT ({insert_cols_hosts}) VALUES ({insert_vals_hosts})
        """)
        print(f"[silver] Merged {city_name} host rows into {SILVER_HOSTS_TABLE}")
else:
    print(f"[silver] WARNING: 'host_id' column not present; skipping hosts table.")


# ---------------------------------------------------------
# Step 7: Sanity checks
# ---------------------------------------------------------
print("[silver] Row counts by city (listings):")
display(spark.table(SILVER_LISTINGS_TABLE).groupBy("city").count().orderBy("city"))

if spark.catalog.tableExists(SILVER_HOSTS_TABLE):
    print("[silver] Row counts by city (hosts):")
    display(spark.table(SILVER_HOSTS_TABLE).groupBy("city").count().orderBy("city"))


In [0]:
# from pyspark.sql import functions as F
# from pyspark.sql import Window

# CATALOG = "workspace" 
# BRONZE_TABLE          = f"{CATALOG}.airbnb_bronze.listings_raw"
# SILVER_LISTINGS_TABLE = f"{CATALOG}.airbnb_silver.listings"
# SILVER_HOSTS_TABLE    = f"{CATALOG}.airbnb_silver.hosts"

# spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.airbnb_silver")

# # ---- city param ----
# dbutils.widgets.text("city", "Paris")
# city_name = dbutils.widgets.get("city")
# print(f"[silver] Transform for city={city_name}")

# # ---- load bronze for that city ----
# bronze_city = (
#     spark.table(BRONZE_TABLE)
#          .where(F.col("city") == city_name)
# )

# print(f"[silver] bronze rows for {city_name}: {bronze_city.count()}")

# # === INSERTED: clean empty list-like strings ('[]', "['']", blanks) ===
# def clean_empty_list_strings(df, cols):
#     for c in cols:
#         if c in df.columns:
#             df = df.withColumn(
#                 c,
#                 F.when(
#                     F.col(c).isNull() |
#                     (F.trim(F.col(c)) == "") |
#                     (F.trim(F.col(c)) == "[]") |
#                     (F.trim(F.col(c)) == "['']") |
#                     (F.length(F.col(c)) <= 4),
#                     F.lit(None)
#                 ).otherwise(F.col(c))
#             )
#     return df


# cols_to_clean = ["host_verifications", "amenities"]
# bronze_city = clean_empty_list_strings(bronze_city, cols_to_clean)
# print(f"[silver] Cleaned empty placeholders in: {cols_to_clean}")

# price_candidates = [c for c in ["price","price_x","price_y","cost","nightly_price"] if c in bronze_city.columns]
# lat_candidates   = [c for c in ["latitude","lat","Latitude","geo_lat"]               if c in bronze_city.columns]
# lon_candidates   = [c for c in ["longitude","lon","Longitude","lng","geo_lon"]       if c in bronze_city.columns]

# price_col = price_candidates[0] if price_candidates else None
# lat_col   = lat_candidates[0]   if lat_candidates else None
# lon_col   = lon_candidates[0]   if lon_candidates else None

# print("[silver] using columns:",
#       "price_col =", price_col,
#       "lat_col =", lat_col,
#       "lon_col =", lon_col)

# num_regex_bath = r'(\d+(\.\d+)?)' 

# df_typed = bronze_city

# df_typed = df_typed.withColumn(
#     "bathrooms_clean",
#     F.when(
#         F.col("bathrooms_text").rlike("(?i)^\\s*half"),
#         F.lit(0.5)
#     )
#     .when(
#         F.col("bathrooms_text").isNull() | (F.trim(F.col("bathrooms_text")) == ""),
#         F.lit(None).cast("double")
#     )
#     .when(
#         F.col("bathrooms_text").rlike(num_regex_bath),
#         F.regexp_extract("bathrooms_text", num_regex_bath, 1).cast("double")
#     )
#     .otherwise(F.lit(None).cast("double"))
# )

# if price_col is not None:
#     # normalize commas to dots
#     tmp_price = F.regexp_replace(F.col(price_col), ",", ".")
#     # remove currency symbols/letters/spaces, keep digits and dots
#     tmp_price = F.regexp_replace(tmp_price, r"[^0-9\.]", "")
#     # grab first numeric token
#     df_typed = df_typed.withColumn(
#         "price_number_str",
#         F.regexp_extract(tmp_price, r"(\d+(\.\d+)?)", 1)
#     )
#     # cast final numeric
#     df_typed = df_typed.withColumn(
#         "price_clean",
#         F.col("price_number_str").cast("double")
#     )
# else:
#     df_typed = df_typed.withColumn(
#         "price_clean", F.lit(None).cast("double")
#     )

# df_typed = (
#     df_typed
#     .withColumn("last_scraped_dt", F.to_date("last_scraped"))
#     .withColumn("host_since_dt",   F.to_date("host_since"))
#     .withColumn("availability_30_int",  F.col("availability_30").cast("int"))
#     .withColumn("availability_365_int", F.col("availability_365").cast("int"))
#     .withColumn("review_scores_rating_dbl", F.col("review_scores_rating").cast("double"))
# )

# w_latest = Window.partitionBy("id").orderBy(
#     F.col("last_scraped_dt").desc_nulls_last()
# )

# latest_only = (
#     df_typed
#     .withColumn("rn", F.row_number().over(w_latest))
#     .where(F.col("rn") == 1)
#     .drop("rn")
# )

# count_latest = latest_only.count()
# print(f"[silver] {city_name}: rows after latest snapshot per listing_id = {count_latest}")

# if lat_col is not None:
#     latest_only = latest_only.withColumn("lat_tmp", F.col(lat_col).cast("double"))
# else:
#     latest_only = latest_only.withColumn("lat_tmp", F.lit(None).cast("double"))

# if lon_col is not None:
#     latest_only = latest_only.withColumn("lon_tmp", F.col(lon_col).cast("double"))
# else:
#     latest_only = latest_only.withColumn("lon_tmp", F.lit(None).cast("double"))

# non_null_price_cnt = latest_only.where(F.col("price_clean").isNotNull()).count()
# print(f"[silver] {city_name}: listings with non-null price_clean = {non_null_price_cnt}")

# if non_null_price_cnt > 0:
#     filtered_stage_a = (
#         latest_only
#         .where(F.col("price_clean").isNotNull())
#         .where((F.col("price_clean") > 0) & (F.col("price_clean") < 5000))
#     )
# else:
#     print(f"[silver] {city_name}: WARNING price parsing failed for entire city, skipping price filter.")
#     filtered_stage_a = latest_only

# count_stage_a = filtered_stage_a.count()
# print(f"[silver] {city_name}: rows after price filter logic = {count_stage_a}")

# non_null_geo_cnt = filtered_stage_a.where(
#     F.col("lat_tmp").isNotNull() & F.col("lon_tmp").isNotNull()
# ).count()
# print(f"[silver] {city_name}: listings with valid lat/lon = {non_null_geo_cnt}")

# if non_null_geo_cnt > 0:
#     filtered_stage_b = (
#         filtered_stage_a
#         .where(F.col("lat_tmp").isNotNull())
#         .where(F.col("lon_tmp").isNotNull())
#     )
#     count_stage_b = filtered_stage_b.count()
#     print(f"[silver] {city_name}: rows after geo filter logic = {count_stage_b}")
#     final_filtered = filtered_stage_b if count_stage_b > 0 else filtered_stage_a
# else:
#     print(f"[silver] {city_name}: WARNING no usable lat/lon, skipping geo filter.")
#     final_filtered = filtered_stage_a

# final_count = final_filtered.count()
# print(f"[silver] {city_name}: FINAL rows to merge into silver = {final_count}")

# final_to_write = final_filtered.drop("lat_tmp", "lon_tmp", "price_number_str")

# if not spark.catalog.tableExists(SILVER_LISTINGS_TABLE):
#     (
#         final_to_write
#         .write
#         .format("delta")
#         .mode("overwrite")
#         .partitionBy("city")
#         .saveAsTable(SILVER_LISTINGS_TABLE)
#     )
#     print(f"[silver] Created {SILVER_LISTINGS_TABLE} with first batch ({city_name})")
# else:
#     final_to_write.createOrReplaceTempView("silver_new_batch")

#     cols = final_to_write.columns
#     merge_condition = "t.id = s.id AND t.city = s.city"

#     set_updates = ",\n".join([f"t.{c} = s.{c}" for c in cols])
#     insert_cols = ", ".join(cols)
#     insert_vals = ", ".join([f"s.{c}" for c in cols])

#     spark.sql(f"""
#         MERGE INTO {SILVER_LISTINGS_TABLE} t
#         USING silver_new_batch s
#         ON {merge_condition}
#         WHEN MATCHED THEN UPDATE SET
#         {set_updates}
#         WHEN NOT MATCHED THEN INSERT ({insert_cols})
#         VALUES ({insert_vals})
#     """)

#     print(f"[silver] Merged {city_name} rows into {SILVER_LISTINGS_TABLE}")

# # --- build / upsert hosts ---
# silver_hosts_city = (
#     final_to_write
#     .filter(F.col("host_id").isNotNull())
#     .groupBy("city", "host_id")
#     .agg(
#         F.min("host_since_dt").alias("host_since_dt"),
#         F.countDistinct("id").alias("listings_count"),
#         F.first("host_name").alias("host_name"),
#         F.first("host_is_superhost").alias("host_is_superhost")
#     )
#     .withColumn("host_since_year", F.year("host_since_dt"))
# )

# if not spark.catalog.tableExists(SILVER_HOSTS_TABLE):
#     (
#         silver_hosts_city
#         .write
#         .format("delta")
#         .mode("overwrite")
#         .partitionBy("city")
#         .saveAsTable(SILVER_HOSTS_TABLE)
#     )
#     print(f"[silver] Created {SILVER_HOSTS_TABLE} with first batch ({city_name})")
# else:
#     silver_hosts_city.createOrReplaceTempView("hosts_new_batch")

#     hcols = silver_hosts_city.columns
#     merge_condition_hosts = "t.host_id = s.host_id AND t.city = s.city"

#     set_updates_hosts = ",\n".join([f"t.{c} = s.{c}" for c in hcols])
#     insert_cols_hosts = ", ".join(hcols)
#     insert_vals_hosts = ", ".join([f"s.{c}" for c in hcols])

#     spark.sql(f"""
#         MERGE INTO {SILVER_HOSTS_TABLE} t
#         USING hosts_new_batch s
#         ON {merge_condition_hosts}
#         WHEN MATCHED THEN UPDATE SET
#         {set_updates_hosts}
#         WHEN NOT MATCHED THEN INSERT ({insert_cols_hosts})
#         VALUES ({insert_vals_hosts})
#     """)

#     print(f"[silver] Merged {city_name} host rows into {SILVER_HOSTS_TABLE}")

# # --- sanity check final silver ---
# display(
#     spark.table(SILVER_LISTINGS_TABLE)
#          .groupBy("city")
#          .count()
#          .orderBy("city")
# )

# display(
#     spark.table(SILVER_HOSTS_TABLE)
#          .groupBy("city")
#          .count()
#          .orderBy("city")
# )


In [0]:
from pyspark.sql import functions as F

silver = spark.table("workspace.airbnb_silver.listings")

display(
    silver.groupBy("city").agg(
        F.sum(F.col("host_verifications").isNull().cast("int")).alias("empty_host_verifications"),
        F.sum(F.col("amenities").isNull().cast("int")).alias("empty_amenities")
    )
)
