In [0]:
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Window
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

# ---------------------------------------------
# Spark & schema
# ---------------------------------------------

spark.sql("CREATE SCHEMA IF NOT EXISTS airbnb_project")
spark.catalog.setCurrentDatabase("airbnb_project")

# ---------------------------------------------
# Load Bronze raw tables
# ---------------------------------------------
raw_listings_df = spark.table("airbnb_project.airbnb_raw_listings")
raw_calendar_df = spark.table("airbnb_project.airbnb_raw_calendar")
raw_reviews_df = spark.table("airbnb_project.airbnb_raw_reviews")
raw_neigh_df = spark.table("airbnb_project.airbnb_raw_neighbourhoods")

# ============================================================
# 1) stg_listings: incremental upsert (Type 1) on listing_id
# ============================================================
# Normalize / clean from Bronze
window_latest = Window.partitionBy("id").orderBy(F.col("snapshot_date").desc())
raw_listings_dedup = (
    raw_listings_df
    .withColumn("row_num", F.row_number().over(window_latest))
    .filter(F.col("row_num") == 1)
)
stg_listings_current = (
    raw_listings_dedup
    .select(
        F.col("id").cast("bigint").alias("listing_id"),
        F.col("host_id").cast("bigint").alias("host_id"),
        F.col("name").alias("name"),
        F.col("host_name").alias("host_name"),
        F.col("neighbourhood_cleansed").alias("neighbourhood"),
        F.lit(None).cast("string").alias("neighbourhood_group"),
        F.col("latitude").cast("double").alias("latitude"),
        F.col("longitude").cast("double").alias("longitude"),
        F.col("room_type").alias("room_type"),
        F.regexp_replace(F.col("price").cast("string"), "[$,]", "").cast("double").alias("price"),
        F.col("snapshot_date").cast("date").alias("snapshot_date")
    )
    .where(F.col("listing_id").isNotNull())
)

stg_listings_target = "airbnb_project.airbnb_stg_listings"

try:
    # If target exists, MERGE (Type 1) on listing_id
    spark.table(stg_listings_target)
    delta_target = DeltaTable.forName(spark, stg_listings_target)

    (
        delta_target.alias("t")
        .merge(
            stg_listings_current.alias("s"),
            "t.listing_id = s.listing_id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

    print("Upserted airbnb_stg_listings incrementally (Type 1 on listing_id).")

except AnalysisException:
    # First run – create the table
    (
        stg_listings_current
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(stg_listings_target)
    )
    print("Created airbnb_stg_listings (initial load).")

# ============================================================
# 2) stg_calendar: incremental append by date watermark
# ============================================================
stg_calendar_current = (
    raw_calendar_df
    .select(
        F.col("listing_id").cast("bigint").alias("listing_id"),
        F.col("date").cast("date").alias("date"),
        F.when(F.col("available") == "t", F.lit(1))
         .when(F.col("available") == "f", F.lit(0))
         .otherwise(F.lit(None)).alias("is_available"),
        F.regexp_replace(F.col("price").cast("string"), "[$,]", "").cast("double").alias("price"),
        F.col("snapshot_date").cast("date").alias("snapshot_date")
    )
    .where(F.col("listing_id").isNotNull() & F.col("date").isNotNull())
)

stg_calendar_target = "airbnb_project.airbnb_stg_calendar"

try:
    existing_cal = spark.table(stg_calendar_target)
    max_date = existing_cal.agg(F.max("date")).collect()[0][0]

    if max_date is None:
        new_cal_rows = stg_calendar_current
    else:
        new_cal_rows = stg_calendar_current.filter(F.col("date") > F.lit(max_date))

    if new_cal_rows.count() == 0:
        print("No new rows to append to airbnb_stg_calendar.")
    else:
        (
            new_cal_rows
            .write
            .format("delta")
            .mode("append")
            .saveAsTable(stg_calendar_target)
        )
        print(f"Appended {new_cal_rows.count()} new rows to airbnb_stg_calendar after {max_date}.")

except AnalysisException:
    (
        stg_calendar_current
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(stg_calendar_target)
    )
    print("Created airbnb_stg_calendar (initial load).")

# ============================================================
# 3) stg_reviews: incremental append by review_date watermark
# ============================================================
stg_reviews_current = (
    raw_reviews_df
    .select(
        F.col("listing_id").cast("bigint").alias("listing_id"),
        F.col("id").cast("bigint").alias("review_id"),
        F.col("date").cast("date").alias("review_date"),
        F.col("reviewer_id").cast("bigint").alias("reviewer_id"),
        F.col("reviewer_name").cast("string").alias("reviewer_name"),
        F.col("comments").alias("string").alias("comments"),
        F.col("snapshot_date").cast("date").alias("snapshot_date")
    )
    .where(F.col("listing_id").isNotNull() & F.col("review_id").isNotNull())
)

stg_reviews_target = "airbnb_project.airbnb_stg_reviews"

try:
    existing_rev = spark.table(stg_reviews_target)
    max_rdate = existing_rev.agg(F.max("review_date")).collect()[0][0]

    if max_rdate is None:
        new_rev_rows = stg_reviews_current
    else:
        new_rev_rows = stg_reviews_current.filter(F.col("review_date") > F.lit(max_rdate))

    if new_rev_rows.count() == 0:
        print("No new rows to append to airbnb_stg_reviews.")
    else:
        (
            new_rev_rows
            .write
            .format("delta")
            .mode("append")
            .saveAsTable(stg_reviews_target)
        )
        print(f"Appended {new_rev_rows.count()} new rows to airbnb_stg_reviews after {max_rdate}.")

except AnalysisException:
    (
        stg_reviews_current
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(stg_reviews_target)
    )
    print("Created airbnb_stg_reviews (initial load).")

# ============================================================
# 4) stg_neighbourhoods: small dim – overwrite is fine
# ============================================================
stg_neigh_current = (
    raw_neigh_df
    .select(
        F.col("neighbourhood").alias("neighbourhood"),
        F.col("neighbourhood_group").alias("neighbourhood_group"),
        F.col("snapshot_date").cast("date").alias("snapshot_date")
    )
)

(
    stg_neigh_current
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("airbnb_project.airbnb_stg_neighbourhoods")
)

print("Updated airbnb_stg_neighbourhoods (overwrite).")
print("Silver layer updated.")