In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, trim

CATALOG = "catalogo_progetto"
BRONZE  = "bronze"
SILVER  = "silver"

# Preparazione schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SILVER}")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SILVER}")

In [0]:
df_hotels_bz = spark.table(f"{CATALOG}.{BRONZE}.hotels")
# Cambia "Country" se la colonna ha un nome diverso
df_hotels_sv = df_hotels_bz.filter(F.col("Country") != "XX")

df_hotels_sv.write.format("delta").mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER}.hotels")

In [0]:
df_customers_bz = spark.table(f"{CATALOG}.{BRONZE}.customers")

df_customers_sv = df_customers_bz.withColumn(
    "email",
    F.when(F.trim(F.col("email")).isin("", "null", "NULL"), F.lit(None)).otherwise(F.col("email"))
)

# Dedup: se c'è ingestion_date tieni il più recente, altrimenti semplice dropDuplicates
if "ingestion_date" in df_customers_sv.columns:
    w = Window.partitionBy("customer_id").orderBy(F.col("ingestion_date").desc())
    df_customers_sv = (
        df_customers_sv.withColumn("_rn", F.row_number().over(w))
                       .filter(F.col("_rn") == 1)
                       .drop("_rn")
    )
else:
    df_customers_sv = df_customers_sv.dropDuplicates(["customer_id"])

df_customers_sv.write.format("delta").mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER}.customers")


In [0]:
df_rooms_bz = spark.table(f"{CATALOG}.{BRONZE}.rooms")
df_rooms_sv = df_rooms_bz.dropDuplicates(["room_id"])   # cambia se il nome è diverso

df_rooms_sv.write.format("delta").mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER}.rooms")


In [0]:
CHECKIN  = "checkin_date"     # es.: "checkin"
CHECKOUT = "checkout_date"    # es.: "checkout"
CURRENCY = "currency"
TOTAL    = "total_amount"
valid_curr = ["EUR", "USD", "GBP"]

df_bookings_bz = spark.table(f"{CATALOG}.{BRONZE}.bookings")
b = df_bookings_bz

# Converti a date se arrivano come stringa (adatta il formato se necessario)
if b.schema[CHECKIN].dataType.simpleString() == "string":
    b = b.withColumn(CHECKIN, F.to_date(CHECKIN))
if b.schema[CHECKOUT].dataType.simpleString() == "string":
    b = b.withColumn(CHECKOUT, F.to_date(CHECKOUT))

# Flag date invertite + swap
b = (b.withColumn("dq_swapped_dates", F.when(F.col(CHECKIN) > F.col(CHECKOUT), F.lit(True)).otherwise(F.lit(False)))
     .withColumn("_tmp_in",  F.least(F.col(CHECKIN),  F.col(CHECKOUT)))
     .withColumn("_tmp_out", F.greatest(F.col(CHECKIN), F.col(CHECKOUT)))
     .drop(CHECKIN, CHECKOUT)
     .withColumnRenamed("_tmp_in",  CHECKIN)
     .withColumnRenamed("_tmp_out", CHECKOUT))

# Importi negativi → NULL + flag
b = (b.withColumn("dq_negative_total", F.when(F.col(TOTAL) < 0, F.lit(True)).otherwise(F.lit(False)))
     .withColumn(TOTAL, F.when(F.col(TOTAL) < 0, F.lit(None)).otherwise(F.col(TOTAL))))

# Currency non valida → NULL + flag
b = (b.withColumn("dq_invalid_currency",
                  F.when(~F.col(CURRENCY).isin(valid_curr), F.lit(True)).otherwise(F.lit(False)))
     .withColumn(CURRENCY,
                  F.when(~F.col(CURRENCY).isin(valid_curr), F.lit(None)).otherwise(F.col(CURRENCY))))

df_bookings_sv = b

df_bookings_sv.write.format("delta").mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER}.bookings")


In [0]:
import pyspark.sql.functions as F

PM_BOOKING_ID = "booking_id"
PM_AMOUNT     = "amount"
PM_CURRENCY   = "currency"
TOTAL         = "total_amount"
valid_curr    = ["EUR", "USD", "GBP"]

# bookings silver: rinominiamo le colonne per evitare conflitti
bk = (spark.table(f"{CATALOG}.{SILVER}.bookings")
        .select(
            F.col("booking_id").alias("bk_booking_id"),
            F.col(TOTAL).alias("bk_total_amount")
        ))

# payments bronze
p0 = spark.table(f"{CATALOG}.{BRONZE}.payments")

# join left
p = (p0.join(bk, p0[PM_BOOKING_ID] == F.col("bk_booking_id"), "left"))

# orphan: booking inesistente
p = p.withColumn(
    "dq_orphan_payment",
    F.when(F.col("bk_booking_id").isNull(), F.lit(True)).otherwise(F.lit(False))
)

# over_amount: amount > total_amount (solo se booking esiste)
p = p.withColumn(
    "dq_over_amount",
    F.when(
        (F.col("bk_booking_id").isNotNull()) & (F.col(PM_AMOUNT) > F.col("bk_total_amount")),
        F.lit(True)
    ).otherwise(F.lit(False))
)

# currency non valida → NULL + flag (niente prefisso 'p.')
p = (p.withColumn(
        "dq_invalid_currency",
        F.when(~F.col(PM_CURRENCY).isin(valid_curr), F.lit(True)).otherwise(F.lit(False))
     )
     .withColumn(
        PM_CURRENCY,
        F.when(~F.col(PM_CURRENCY).isin(valid_curr), F.lit(None)).otherwise(F.col(PM_CURRENCY))
     )
)

# (opzionale) rimuovi la chiave rinominata del bookings se non ti serve in output
p = p.drop("bk_booking_id")

# salva in silver
df_payments_sv = p
df_payments_sv.write.format("delta").mode("overwrite").saveAsTable(f"{CATALOG}.{SILVER}.payments")
