In [0]:
import dlt
from pyspark.sql.functions import *

# Schema da cui leggere i dati Silver
silver_schema = "catalog_progetto_finale.silver_schema_pf"

# Lettura delle tabelle Silver come DataFrame Spark standard
bookings_silver_df = spark.read.table(f"{silver_schema}.bookings_silver")
customers_silver_df = spark.read.table(f"{silver_schema}.customers_silver")
payments_silver_df = spark.read.table(f"{silver_schema}.payments_silver")

#Calcolare il fatturato totale e il numero di prenotazioni per ogni singolo giorno
@dlt.table(name="kpi_daily_revenue_gold", comment="KPI 1: Fatturato giornaliero.")
def kpi_daily_revenue_gold():
    return (
        bookings_silver_df.filter(col("status") == "confirmed")
        .groupBy(to_date("checkin_date").alias("date"))
        .agg(
            sum("total_amount").alias("gross_revenue"),
            count("booking_id").alias("bookings_count")
        )
    )
#.agg(...) -> Per ogni gruppo (ogni giorno), calcola due aggregati: la somma di total_amount (il fatturato) e il conteggio dei booking_id (il numero di prenotazioni)

#Capire quali canali di prenotazione hanno il tasso di cancellazione più alto
@dlt.table(name="kpi_cancellation_rate_by_source_gold", comment="KPI 2: Tasso di cancellazione per canale.")
def kpi_cancellation_rate_by_source_gold():
    return (
        bookings_silver_df.groupBy("source")
        .agg(
            count("booking_id").alias("total_bookings"),
            sum(when(col("status") == "cancelled", 1).otherwise(0)).alias("cancelled")
        )
        .withColumn("cancellation_rate_pct", (col("cancelled") / col("total_bookings")) * 100)
    )

#Per ogni hotel, confrontare il valore totale delle prenotazioni con i pagamenti effettivamente ricevuti
@dlt.table(name="kpi_collection_rate_by_hotel_gold", comment="KPI 3: Tasso di incasso per hotel.")
def kpi_collection_rate_by_hotel_gold():
    #valore totale prenotato per ogni hotel
    bookings_value = bookings_silver_df.groupBy("hotel_id").agg(sum("total_amount").alias("total_bookings_value"))
    
    #valore totale pagato per ogni hotel
    payments_with_hotel = payments_silver_df.join(bookings_silver_df.select("booking_id", "hotel_id"), "booking_id", "left")
    payments_value = payments_with_hotel.groupBy("hotel_id").agg(sum("amount").alias("total_payments_value"))
    
    kpi = bookings_value.join(payments_value, "hotel_id", "left").na.fill(0)
    return kpi.withColumn("collection_rate", col("total_payments_value") / col("total_bookings_value"))

#Identificare i casi in cui la stessa stanza è stata prenotata da persone diverse in periodi che si sovrappongono
@dlt.table(name="kpi_overbooking_alerts_gold", comment="KPI 4: Segnalazione di prenotazioni sovrapposte.")
def kpi_overbooking_alerts_gold():
    bookings = bookings_silver_df.select("room_id", "booking_id", "checkin_date", "checkout_date")
    b1 = bookings.alias("b1")
    b2 = bookings.alias("b2")
    
    overlap_filter = (
        (col("b1.room_id") == col("b2.room_id")) &
        #evitare di confrontare una prenotazione con se stessa 
        (col("b1.booking_id") < col("b2.booking_id")) &
        (col("b1.checkin_date") < col("b2.checkout_date")) &
        (col("b1.checkout_date") > col("b2.checkin_date"))
    )
    
    return (
        b1.join(b2, overlap_filter)
        .select(
            col("b1.room_id").alias("room_id"),
            col("b1.booking_id").alias("booking_id_1"),
            col("b2.booking_id").alias("booking_id_2"),
            greatest(col("b1.checkin_date"), col("b2.checkin_date")).alias("overlap_start"),
            least(col("b1.checkout_date"), col("b2.checkout_date")).alias("overlap_end")
        )
    )

#Calcolare il valore totale generato da ogni cliente e il suo scontrino medio
@dlt.table(name="kpi_customer_value_gold", comment="KPI 5: Valore generato da ogni cliente.")
def kpi_customer_value_gold():
    merged_df = bookings_silver_df.join(customers_silver_df, "customer_id")
    
    return (
        merged_df.groupBy("customer_id", "first_name", "last_name", "email")
        .agg(
            count("booking_id").alias("bookings_count"),
            sum("total_amount").alias("revenue_sum")
        )
        .withColumn("avg_ticket", col("revenue_sum") / col("bookings_count"))
    )