In [0]:
from pyspark.sql import functions as F

TARGET_DB = "bakehouse_jobs"

silver_customers = spark.table(f"{TARGET_DB}.silver_customers")
silver_franchises = spark.table(f"{TARGET_DB}.silver_franchises")
silver_suppliers = spark.table(f"{TARGET_DB}.silver_suppliers")
silver_tx = spark.table(f"{TARGET_DB}.silver_sales_transactions")
silver_reviews = spark.table(f"{TARGET_DB}.silver_reviews_chunked")

# ========== DIM CUSTOMERS ==========

dim_customers = (
    silver_customers
    .select(
        "customer_id",
        "full_name",
        "email",
        "phone",
        "address",
        "city",
        "state",
        "country",
        "continent",
        F.col("postal_zip_code").alias("zip_code"),
        "gender"
    )
    .dropDuplicates(["customer_id"])
)

dim_customers.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{TARGET_DB}.dim_customers")


# ========== DIM FRANCHISES ==========

dim_franchises = (
    silver_franchises
    .select(
        "franchise_id",
        "franchise_name",
        "city",
        "district",
        F.col("zipcode_str").alias("zipcode"),
        "country",
        "size",
        "longitude",
        "latitude",
        "supplierID"
    )
    .dropDuplicates(["franchise_id"])
)

dim_franchises.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{TARGET_DB}.dim_franchises")


# ========== DIM SUPPLIERS ==========

dim_suppliers = (
    silver_suppliers
    .select(
        "supplier_id",
        "supplier_name",
        "ingredient",
        "continent",
        "city",
        "district",
        "size",
        "longitude",
        "latitude",
        "approved_flag"
    )
    .dropDuplicates(["supplier_id"])
)

dim_suppliers.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{TARGET_DB}.dim_suppliers")


# ========== FACT SALES TRANSACTIONS ==========

fact_sales = (
    silver_tx.alias("tx")
    .join(dim_franchises.alias("f"), "franchise_id", "left")
    .join(dim_suppliers.alias("s"), F.col("f.supplierID") == F.col("s.supplier_id"), "left")
)

fact_sales.select(
    "transaction_id",
    "date",
    "time",
    "dateTime",
    "customer_id",
    "franchise_id",
    "supplier_id",
    "product",
    "quantity",
    "unitPrice",
    "totalPrice",
    "paymentMethod",
    "card_hash"
).write.format("delta") \
 .mode("overwrite") \
 .saveAsTable(f"{TARGET_DB}.fact_sales_transactions")


# ========== FACT REVIEWS CHUNKED ==========

fact_reviews = silver_reviews.select(
    "franchise_id",
    "review_date",
    "review_date_date",
    "chunk_id",
    "review_uri",
    "chunked_text"
)

fact_reviews.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{TARGET_DB}.fact_franchise_reviews_chunked")


# ========== AGREGADO DIARIO DE REVIEWS ==========

agg_reviews_daily = (
    fact_reviews
    .groupBy("franchise_id", "review_date_date")
    .agg(
        F.countDistinct("review_uri").alias("num_reviews"),
        F.count("chunk_id").alias("num_chunks"),
        F.avg(F.length("chunked_text")).alias("avg_chunk_length")
    )
)

agg_reviews_daily.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{TARGET_DB}.agg_franchise_reviews_daily")