In [0]:
from pyspark.sql import functions as F
import uuid

CATALOG = "airbnb_lab3"
SILVER_DB = "airbnb_silver"
GOLD_DB = "airbnb_gold"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{GOLD_DB}")
def fq(db, t): return f"{CATALOG}.{db}.{t}"
def city_tbl(db, base, city): return f"{CATALOG}.{db}.{base}_{city}"

In [0]:
def recreate_partitioned(table_fqn: str, df, partition_cols=("yyyymm",)):
    tmpv = f"tmp_{uuid.uuid4().hex[:8]}"
    df.createOrReplaceTempView(tmpv)
    spark.sql(f"DROP TABLE IF EXISTS {table_fqn}")
    spark.sql(f"""
      CREATE TABLE {table_fqn}
      USING DELTA
      PARTITIONED BY ({",".join(partition_cols)})
      AS SELECT * FROM {tmpv}
    """)
    spark.catalog.dropTempView(tmpv)

In [0]:
# ---------- Monthly city metrics (incl. occupancy) ----------
def monthly_metrics_for(city):
    l = spark.table(city_tbl(SILVER_DB, "silver_listings", city)).alias("l")
    c = spark.table(city_tbl(SILVER_DB, "silver_calendar", city)).alias("c")
    cm = c.withColumn("yyyymm", F.date_format("date","yyyy-MM"))

    day_agg = (cm.groupBy("listing_id","yyyymm")
                 .agg(F.sum(F.col("is_occupied").cast("int")).alias("occupied_days"),
                      F.count(F.lit(1)).alias("total_days"),
                      F.avg("price").alias("avg_price")))

    j = (day_agg.join(l.select("listing_id","review_scores_rating"), "listing_id", "left")
              .withColumn("city", F.lit(city.upper()))
              .withColumn(
                  "occupancy_rate",
                  F.when(F.col("total_days") > 0, F.col("occupied_days")/F.col("total_days")).otherwise(F.lit(0.0))
              )
              .withColumn("occupancy_rate", F.least(F.lit(1.0), F.greatest(F.lit(0.0), F.col("occupancy_rate"))))
         )

    out = (j.groupBy("city","yyyymm")
             .agg(F.countDistinct("listing_id").alias("total_listings"),
                  F.avg("avg_price").alias("avg_nightly_price"),
                  F.avg("review_scores_rating").alias("avg_review_score"),
                  F.avg("occupancy_rate").alias("avg_occupancy_rate")))

    # final no-NULL guarantee
    return out.na.fill({
        "total_listings": 0,
        "avg_nightly_price": 0.0,
        "avg_review_score": 0.0,
        "avg_occupancy_rate": 0.0
    })

In [0]:
city_month = monthly_metrics_for("nyc").unionByName(monthly_metrics_for("nyc"), allowMissingColumns=True)
recreate_partitioned(fq(GOLD_DB,"gold_city_month_metrics"), city_month)

In [0]:
# ---------- Neighbourhood by month ----------
def neighbourhood_metrics_for(city):
    l = spark.table(city_tbl(SILVER_DB, "silver_listings", city))
    c = spark.table(city_tbl(SILVER_DB, "silver_calendar", city)).withColumn("yyyymm", F.date_format("date","yyyy-MM"))
    dm = (c.groupBy("listing_id","yyyymm")
            .agg(F.sum(F.col("is_occupied").cast("int")).alias("occupied_days"),
                 F.count(F.lit(1)).alias("total_days"),
                 F.avg("price").alias("avg_price")))
    j = dm.join(l.select("listing_id","neighbourhood"), "listing_id", "left")
    out = (j.groupBy(F.lit(city.upper()).alias("city"), "yyyymm", "neighbourhood")
            .agg(F.countDistinct("listing_id").alias("listing_count"),
                 F.avg("avg_price").alias("avg_price"),
                 F.avg(F.when(F.col("total_days") > 0, F.col("occupied_days")/F.col("total_days")).otherwise(F.lit(0.0))).alias("avg_occupancy_rate")))
    return out.na.fill({"listing_count": 0, "avg_price": 0.0, "avg_occupancy_rate": 0.0, "neighbourhood": "unknown"})


In [0]:
neigh_all = neighbourhood_metrics_for("nyc").unionByName(neighbourhood_metrics_for("nyc"))
recreate_partitioned(fq(GOLD_DB,"gold_neighbourhood_metrics"), neigh_all)

In [0]:
# ---------- Room type share by month (active listings) ----------
def roomtype_share_for(city):
    l = spark.table(city_tbl(SILVER_DB, "silver_listings", city))
    c = spark.table(city_tbl(SILVER_DB, "silver_calendar", city)).withColumn("yyyymm", F.date_format("date","yyyy-MM"))
    active = c.select("listing_id","yyyymm").distinct().join(l.select("listing_id","room_type"), "listing_id")
    by_rm = active.groupBy(F.lit(city.upper()).alias("city"), "yyyymm","room_type").agg(F.countDistinct("listing_id").alias("cnt"))
    total = by_rm.groupBy("city","yyyymm").agg(F.sum("cnt").alias("total"))
    joined = by_rm.join(total, ["city","yyyymm"], "left")
    out = joined.withColumn("share", F.when(F.col("total") > 0, F.col("cnt")/F.col("total")).otherwise(F.lit(0.0)))
    return out.na.fill({"cnt": 0, "total": 0, "share": 0.0, "room_type": "unknown"})


In [0]:
roomtype_all = roomtype_share_for("nyc").unionByName(roomtype_share_for("nyc"))
recreate_partitioned(fq(GOLD_DB,"gold_roomtype_share"), roomtype_all)

In [0]:
def table_exists(db_3part: str, table_name: str) -> bool:
    return spark.sql(f"SHOW TABLES IN {db_3part} LIKE '{table_name}'").count() > 0


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

def reviews_monthly_for(city):
    db = f"{CATALOG}.{SILVER_DB}"
    tbl_name = f"silver_reviews_{city}"
    if table_exists(db, tbl_name):
        r = spark.table(f"{db}.{tbl_name}").withColumn("yyyymm", F.date_format("review_date","yyyy-MM"))
        return r.groupBy(F.lit(city.upper()).alias("city"), "yyyymm").agg(F.count("*").alias("review_count"))
    else:
        empty_schema = StructType([
            StructField("city", StringType(), True),
            StructField("yyyymm", StringType(), True),
            StructField("review_count", LongType(), True),
        ])
        return spark.createDataFrame([], empty_schema)

In [0]:
reviews_month = reviews_monthly_for("nyc").unionByName(reviews_monthly_for("nyc"), allowMissingColumns=True).na.fill({"review_count": 0})
recreate_partitioned(fq(GOLD_DB,"gold_reviews_monthly"), reviews_month)

In [0]:
# ---------- Cross-city compare (month-aligned) ----------
g = spark.table(fq(GOLD_DB,"gold_city_month_metrics"))
cross = (g.groupBy("yyyymm")
    .pivot("city", ["LA","NYC"])
    .agg(F.first("total_listings").alias("total_listings"),
         F.first("avg_nightly_price").alias("avg_price"),
         F.first("avg_occupancy_rate").alias("occ_rate"),
         F.first("avg_review_score").alias("review"))
    .na.fill(0)
    .withColumn("delta_listings", F.col("LA_total_listings")-F.col("NYC_total_listings"))
    .withColumn("delta_avg_price", F.col("LA_avg_price")-F.col("NYC_avg_price"))
    .withColumn("delta_occ_rate", F.col("LA_occ_rate")-F.col("NYC_occ_rate"))
    .withColumn("delta_review", F.col("LA_review")-F.col("NYC_review"))
)
recreate_partitioned(fq(GOLD_DB,"gold_cross_city_compare"), cross)

In [0]:
# City month
city_month = (
    monthly_metrics_for("la")
    .unionByName(monthly_metrics_for("nyc"), allowMissingColumns=True)
    .na.drop("any")
)
city_month.write.format("delta").mode("overwrite").saveAsTable(fq(GOLD_DB,"gold_city_month_metrics"))

# Neighbourhood
neigh = (
    neighbourhood_metrics_for("la")
    .unionByName(neighbourhood_metrics_for("nyc"), allowMissingColumns=True)
    .na.drop("any")
)
neigh.write.format("delta").mode("overwrite").saveAsTable(fq(GOLD_DB,"gold_neighbourhood_metrics"))

# Room type
room = (
    roomtype_share_for("la")
    .unionByName(roomtype_share_for("nyc"), allowMissingColumns=True)
    .na.drop("any")
)
room.write.format("delta").mode("overwrite").saveAsTable(fq(GOLD_DB,"gold_roomtype_share"))

# Reviews
reviews = (
    reviews_monthly_for("la")
    .unionByName(reviews_monthly_for("nyc"), allowMissingColumns=True)
    .na.drop("any")
)
reviews.write.format("delta").mode("overwrite").saveAsTable(fq(GOLD_DB,"gold_reviews_monthly"))

# Cross-city
g = spark.table(fq(GOLD_DB,"gold_city_month_metrics"))
cross = (
    g.groupBy("yyyymm")
     .pivot("city", ["LA","NYC"])
     .agg(
         F.first("total_listings").alias("total_listings"),
         F.first("avg_nightly_price").alias("avg_price"),
         F.first("avg_occupancy_rate").alias("occ_rate"),
         F.first("avg_review_score").alias("review")
     )
     .na.drop("any")
)
cross.write.format("delta").mode("overwrite").saveAsTable(fq(GOLD_DB,"gold_cross_city_compare"))
