In [0]:
from pyspark.sql.functions import col, avg, stddev, max

# 1️⃣ Read historical KPIs from Gold table
df_gold = spark.table("hotel_gold.daily_kpis")

# Compute historical ADR stats
historical_stats = df_gold.agg(
    avg("avg_adr").alias("mean_adr"),
    stddev("avg_adr").alias("std_adr")
).collect()[0]

# Extract mean and std
mean_adr = historical_stats["mean_adr"]
std_adr = historical_stats["std_adr"]

print(f"Historical ADR mean: {mean_adr:.2f}, std: {std_adr:.2f}")

# 2️⃣ Get latest day’s bookings from Silver table
df_silver = spark.table("hotel_silver.bookings_clean")
latest_date = df_silver.agg(max("reservation_status_date")).collect()[0][0]

df_latest = df_silver.filter(col("reservation_status_date") == latest_date)

# Calculate current day ADR
current_day_adr = df_latest.agg(avg("adr").alias("current_day_adr")).collect()[0][0]

print(f"Current day ADR ({latest_date}): {current_day_adr:.2f}")

# 3️⃣ Calculate drift percentage (now drift_pct is defined)
drift_pct = ((current_day_adr - mean_adr) / mean_adr) * 100
print(f"ADR drift percentage: {drift_pct:.2f}%")

# 4️⃣ Optional: flag significant drift
threshold = 10  # percent
if abs(drift_pct) > threshold:
    print("⚠️ ADR drift detected!")
else:
    print("✅ ADR is stable")

# 1️⃣ Count bookings on the current day
num_bookings = df_latest.count()
print(f"Number of bookings on {latest_date}: {num_bookings}")

# 2️⃣ Calculate monetary impact of ADR drift
impact_usd = (current_day_adr - mean_adr) * num_bookings
print(f"Monetary impact of ADR drift: ${impact_usd:.2f}")

import mlflow

# End any active run
if mlflow.active_run() is not None:
    mlflow.end_run()

# Now start a new run
with mlflow.start_run():
    mlflow.log_metric("adr_drift_pct", drift_pct)
    mlflow.log_metric("adr_drift_usd", impact_usd)

