In [0]:
silver_path = "/Volumes/workspace/default/silver/"
gold_path = "/Volumes/workspace/default/gold/"

from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:

# In Silver Layer - Clean and Join Orders with Customers
df_orders = spark.read.format("delta").load(silver_path + "orders")
df_customers = spark.read.format("delta").load(silver_path + "customers")

df_orders= (df_orders
.filter(col("order_status") == "delivered")
.filter(col("order_delivered_customer_date").isNotNull())
.join(df_customers, on="customer_id", how="inner")
.withColumn("delivery_days", datediff("order_delivered_customer_date", "order_purchase_timestamp"))
.withColumn("delay_days", datediff("order_delivered_customer_date", "order_estimated_delivery_date"))
.withColumn("order_month", trunc("order_purchase_timestamp", "MM"))
.select("order_id", "customer_state", "order_month", "delivery_days", "delay_days")
)

#df_orders.display()

df_order_items = spark.read.format("delta").load(silver_path + "orders_items")
#df_order_items.display()

# In Silver Layer - Aggregate Freight and Order Value per Order
df_cost = (
    df_order_items
    .groupBy("order_id")
    .agg(
        sum("freight_value").alias("freight_cost"),
        sum("price").alias("order_value")
    )
)

#df_cost.display()

In [0]:
# Compute Logistics KPIs per state & month
df_kpis = (
    df_orders
    .join(df_cost, on="order_id", how="left")
    .groupBy("customer_state", "order_month")
    .agg(
        count("order_id").alias("total_orders"),
        avg("delivery_days").alias("avg_delivery_days"),
        avg("freight_cost").alias("avg_freight_cost"),
        avg("order_value").alias("avg_order_value"),
        (sum(when(col("delay_days") > 0, 1).otherwise(0)) / count("order_id")).alias("percent_late_deliveries")
    )
)

# Save in Gold layer for Power BI Reporting

df_kpis.write.format("delta").mode("overwrite").save(gold_path + "logistics_kpis")
df_kpis.write \
    .mode("overwrite") \
    .option("header", True) \
    .csv(gold_path + "logistics_kpiscsv")

#df_kpis.display()

In [0]:
#Compute Aggregated Procurement Data

# Prepare order-level delivery info
df_orders = spark.read.format("delta").load(silver_path + "orders")
df_orders = df_orders.withColumn("order_month", date_format("order_purchase_timestamp", "yyyy-MM"))

# Join orders + order_items + products + sellers

df_order_items = spark.read.format("delta").load(silver_path + "orders_items")
df_sellers = spark.read.format("delta").load(silver_path + "sellers")
df_products = spark.read.format("delta").load(silver_path + "products")

df_orders_extended = (
df_order_items
.join(df_orders, "order_id")
.join(df_products.select("product_id", "product_category_name"), "product_id", "left")
.join(df_sellers.select("seller_id", "seller_state"), "seller_id", "left")
)
df_orders_extended.display()

#df_orders_extended.write.format("delta").mode("overwrite").save(gold_path + "procurement_orders")

# Aggregated Procurement Data

from pyspark.sql.functions import countDistinct, avg, sum

df_procurement = (
df_orders_extended
.groupBy("order_month", "seller_state", "product_category_name")
.agg(
countDistinct("order_id").alias("total_orders"),
countDistinct("seller_id").alias("unique_suppliers"),
avg("price").alias("avg_product_cost"),
sum("price").alias("total_procurement_cost"),
avg("freight_value").alias("avg_freight_cost")
)
)

df_procurement.write.format("delta").mode("overwrite").save(gold_path + "procurement_summary")
df_procurement.write.mode("overwrite").option("header", True).csv(gold_path + "procurement_summarycsv")


In [0]:
#Join all relevant tables
df_orders = spark.read.format("delta").load(silver_path + "orders")
df_orders = df_orders.withColumn("order_month", date_format("order_purchase_timestamp", "yyyy-MM"))

df_order_items = spark.read.format("delta").load(silver_path + "orders_items")
df_customers = spark.read.format("delta").load(silver_path + "customers")
df_reviews = spark.read.format("delta").load(silver_path + "orders_reviews")

df_orders_enriched = (
df_orders
.join(df_order_items.drop("created_at", "processed_at"), "order_id", "left")
.join(df_products.drop("created_at", "processed_at").select("product_id", "product_category_name"), "product_id", "left")
.join(df_customers.drop("created_at", "processed_at"), "customer_id", "left")
.join(df_reviews.drop("created_at", "processed_at").select("order_id", "review_score"), "order_id", "left")
)
#df_orders_enriched.display()
df_orders_enriched.write.format("delta").mode("overwrite").save(gold_path + "orders_enriched")
df_orders_enriched.write.mode("overwrite").option("header", True).csv(gold_path + "orders_enrichedcsv")

# Aggregated for Marketing KPIs

# Monthly Customer Metrics

df_customers_gold = (
df_orders_enriched
.groupBy("order_month")
.agg(
countDistinct("customer_id").alias("active_customers"),
countDistinct("order_id").alias("total_orders"),
avg("price").alias("avg_order_value"),
countDistinct("product_category_name").alias("unique_categories"),
avg("review_score").alias("avg_review_score")
)
)
#df_customers_gold.display()
df_customers_gold.write.format("delta").mode("overwrite").save(gold_path + "customer_marketing_summary")
df_customers_gold.write.mode("overwrite").option("header", True).csv(gold_path + "customer_marketing_summarycsv")

# Product Category Metrics

df_category_gold = (
df_orders_enriched
.groupBy("product_category_name")
.agg(
count("order_id").alias("total_orders"),
avg("price").alias("avg_product_price"),
sum("price").alias("total_sales"),
avg("review_score").alias("avg_review_score")
)
.orderBy(col("total_sales").desc())
)
#df_category_gold.display()
df_category_gold.write.format("delta").mode("overwrite").save(gold_path + "category_marketing_summary")
df_category_gold.write.mode("overwrite").option("header", True).csv(gold_path + "category_marketing_summarycsv")
# Regional Insights (State-Level)

df_regional_gold = (
df_orders_enriched
.groupBy("customer_state")
.agg(
count("order_id").alias("total_orders"),
sum("price").alias("total_revenue"),
avg("review_score").alias("avg_review_score")
)
)
#df_regional_gold.display()

df_regional_gold.write.format("delta").mode("overwrite").save(gold_path + "regional_marketing_summary")
df_regional_gold.write.mode("overwrite").option("header", True).csv(gold_path + "regional_marketing_summarycsv")

# Review Summary 

df_review_summary = (
df_reviews
.groupBy("review_score")
.agg(count("review_id").alias("review_count"))
.orderBy("review_score")
)

df_review_summary.display()
df_review_summary.write.format("delta").mode("overwrite").save(gold_path + "marketing_review_summary")
df_review_summary.write.mode("overwrite").option("header", True).csv(gold_path + "marketing_review_summarycsv")
