In [None]:
"""
02_transform_orders.py — Transform raw orders into enriched, partitioned Delta table

Requirements:
- SparkSession available as spark
- Raw tables in Unity Catalog: pbi_project.raw_data.orders, pbi_project.raw_data.items, pbi_project.raw_data.targets
- Target schema: pbi_project.silver
"""

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, year, month, col
from pyspark.sql import functions as F

# ─── CONFIGURATION ──────────────────────────────────────────────────────────────

RAW_SCHEMA    = "pbi_project.raw_data"
SILVER_SCHEMA = "pbi_project.silver"

RAW_TABLES = {
    "orders":  f"{RAW_SCHEMA}.orders",
    "items":   f"{RAW_SCHEMA}.items",
    "targets": f"{RAW_SCHEMA}.targets"
}

SILVER_TABLE = f"{SILVER_SCHEMA}.orders"

# ─── LOGGER SETUP ────────────────────────────────────────────────────────────────

def setup_logger() -> logging.Logger:
    logger = logging.getLogger("TransformOrders")
    if not logger.handlers:
        h = logging.StreamHandler()
        h.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
        logger.addHandler(h)
    logger.setLevel(logging.INFO)
    return logger

# ─── MAIN FLOW ──────────────────────────────────────────────────────────────────

def main():
    logger = setup_logger()
    spark  = SparkSession.builder.appName("TransformOrders").getOrCreate()

    # 1) Garantir schema silver
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SILVER_SCHEMA}")
    logger.info(f"Schema ensured: {SILVER_SCHEMA}")

    # 2) Carregar com aliases e renomear user_id de targets
    o = spark.table(RAW_TABLES["orders"]).alias("o")
    i = spark.table(RAW_TABLES["items"]).alias("i")
    t = (
        spark.table(RAW_TABLES["targets"])
            .withColumnRenamed("category", "establishment_category")
            .withColumnRenamed("user_id", "target_user_id")
            .alias("t")
    )

    # 3) Transformar, juntar e projetar colunas sem ambiguidades
    enriched = (
        o
        .withColumn("order_date",  to_date(col("order_date")))
        .withColumn("order_year",  F.to_date(F.date_trunc("year",  F.col("order_date"))))
        .withColumn("order_month", F.to_date(F.date_trunc("month", F.col("order_date"))))
        .join(i, col("product_id") == F.col("item_id"), "left")
        .join(t, (col("user_id") == col("target_user_id")),"left")
        .drop("target_user_id")  # remove o antigo user_id de targets
        .select(
            "order_year",
            "order_month",
            "order_date",
            "order_id",
            "user_id",
            "product_id",
            "item_id",
            "category",
            "establishment_category",
            "city",
            "monthly_revenue_target"
        )
    )

    # 4) Gravar em Delta particionado
    enriched.write \
        .mode("overwrite") \
        .format("delta") \
        .partitionBy("order_year", "order_month") \
        .saveAsTable(SILVER_TABLE)

    logger.info(f"Written silver table: {SILVER_TABLE}")
    logger.info("✔️ Transform job completed successfully")

if __name__ == "__main__":
    main()
