In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, sum, avg, countDistinct,
    datediff, date_trunc, expr
)

from google.colab import drive
drive.mount("/content/drive")

spark = (
    SparkSession.builder
    .appName("GoldPipeline")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

BASE = "/content/drive/MyDrive/dadosfera"
SILVER = f"{BASE}/cdm_silver"
GOLD = f"{BASE}/cdm_gold"

# ------------------------------------------------------
# Load Silver
# ------------------------------------------------------
order = spark.read.option("header", True).csv(f"{SILVER}/order.csv")
order_item = spark.read.option("header", True).csv(f"{SILVER}/order_item.csv")
product = spark.read.option("header", True).csv(f"{SILVER}/product.csv")
review = spark.read.option("header", True).csv(f"{SILVER}/review.csv")
customer = spark.read.option("header", True).csv(f"{SILVER}/customer.csv")

# ------------------------------------------------------
# Casts seguros
# ------------------------------------------------------
order = (
    order
    .withColumn("created_at", col("created_at").cast("timestamp"))
    .withColumn("delivered_customer_at", col("delivered_customer_at").cast("timestamp"))
    .withColumn("estimated_delivery_at", col("estimated_delivery_at").cast("timestamp"))
    .withColumn(
        "delivery_delay_days",
        datediff(
            col("delivered_customer_at"),
            col("estimated_delivery_at")
        )
    )
)

order_item = (
    order_item
    .withColumn("item_price", col("item_price").cast("double"))
    .withColumn("freight_value", col("freight_value").cast("double"))
    .withColumn("total_item_value", col("total_item_value").cast("double"))
)

review = review.withColumn("score", expr("try_cast(score as int)"))

# ------------------------------------------------------
# Aliases
# ------------------------------------------------------
o  = order.alias("o")
oi = order_item.alias("oi")
p  = product.alias("p")
r  = review.alias("r")
c  = customer.alias("c")

# ------------------------------------------------------
# Fact table
# ------------------------------------------------------
fact_sales = (
    oi
    .join(o, col("oi.order_id") == col("o.order_id"))
    .join(p, col("oi.product_id") == col("p.product_id"), "left")
    .join(r, col("oi.order_id") == col("r.order_id"), "left")
    .join(c, col("o.customer_id") == col("c.customer_id"), "left")
)

# ======================================================
# GOLD 1 – Receita mensal
# ======================================================
(
    fact_sales
    .withColumn("month", date_trunc("month", col("o.created_at")))
    .groupBy("month")
    .agg(sum(col("oi.total_item_value")).alias("revenue"))
    .orderBy("month")
    .coalesce(1)
    .write.mode("overwrite")
    .csv(f"{GOLD}/revenue_monthly", header=True)
)

# ======================================================
# GOLD 2 – Receita por categoria (Top 10)
# ======================================================
(
    fact_sales
    .groupBy(col("p.category_en"))
    .agg(sum(col("oi.total_item_value")).alias("revenue"))
    .filter(col("p.category_en").isNotNull())
    .orderBy(col("revenue").desc())
    .limit(10)
    .coalesce(1)
    .write.mode("overwrite")
    .csv(f"{GOLD}/revenue_category", header=True)
)

# ======================================================
# GOLD 3 – Avaliação média por categoria
# ======================================================
(
    fact_sales
    .groupBy(col("p.category_en"))
    .agg(
        avg(col("r.score")).alias("avg_review_score"),
        countDistinct(col("o.order_id")).alias("total_orders")
    )
    .filter(col("total_orders") >= 50)
    .orderBy(col("avg_review_score"))
    .coalesce(1)
    .write.mode("overwrite")
    .csv(f"{GOLD}/review_category", header=True)
)

# ======================================================
# GOLD 4 – Receita por estado
# ======================================================
(
    fact_sales
    .groupBy(col("c.state"))
    .agg(sum(col("oi.total_item_value")).alias("revenue"))
    .filter(col("c.state").isNotNull())
    .orderBy(col("revenue").desc())
    .limit(10)
    .coalesce(1)
    .write.mode("overwrite")
    .csv(f"{GOLD}/revenue_state", header=True)
)

# ======================================================
# GOLD 5 – KPIs Globais
# ======================================================
(
    fact_sales
    .agg(
        countDistinct(col("o.order_id")).alias("total_orders"),
        countDistinct(col("o.customer_id")).alias("total_customers"),
        sum(col("oi.total_item_value")).alias("total_revenue"),
        avg(col("oi.total_item_value")).alias("avg_ticket")
    )
    .coalesce(1)
    .write.mode("overwrite")
    .csv(f"{GOLD}/kpi_global", header=True)
)

print("Pipeline GOLD executado com sucesso")
