In [0]:
#Setup and read
from pyspark.sql.functions import (
    col, upper, trim, current_timestamp, lit, when,
    regexp_replace, length, coalesce, concat_ws, 
    year, datediff, to_date, count, countDistinct, sum, max, round, current_date,dense_rank,rank
)
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.window import Window

spark.sql("SHOW TABLES IN Silver").show()
spark.table("silver.customers").printSchema()
spark.table("silver.orders").printSchema()
spark.table("silver.order_items").printSchema()
spark.table("silver.products").printSchema()

print("====Orders====")
spark.table("silver.orders").show(3)
print("====Order Items====")
spark.table("silver.order_items").show(3)



In [0]:
print("====Join====")
join_tables = spark.table("silver.orders").join(
    spark.table("silver.order_items"), 
    "order_id"
)

# Crear columna de revenue
join_with_revenue = join_tables.withColumn(
    "item_revenue",
    col("price") + col("freight_value")  
)

print("===Revenue select====")
join_with_revenue.select(
    "order_id", 
    "order_status", 
    "order_purchase_timestamp", 
    "price",
    "freight_value",
    "item_revenue"
).show(3)

with_date_revenue = join_with_revenue.withColumn(
    "purchase_date",
    to_date(col("order_purchase_timestamp"))  
)

print("====Date Conversion====")
with_date_revenue.select(
    "order_purchase_timestamp", 
    "purchase_date"
).show(3)

print("===Group by Day====")
revenue_grouped_by_day = with_date_revenue.groupBy("purchase_date").agg(
    sum("item_revenue").alias("total_revenue"),
    count("order_id").alias("total_orders"),
    countDistinct("order_id").alias("total_unique_orders")
).orderBy("purchase_date", ascending=False)

revenue_grouped_by_day.show(10)

print("===Save====")
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

revenue_grouped_by_day.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.revenue_by_day") \
    
print("Table gold.revenue_by_day created sucessfully")

spark.table("gold.revenue_by_day").show(10)

In [0]:
print("====Join====")
join_tables = spark.table("silver.orders").join(
    spark.table("silver.order_items"), 
    "order_id"
)

# Crear columna de revenue
join_with_revenue = join_tables.withColumn(
    "item_revenue",
    col("price") + col("freight_value")  
)

print("===Sample data select====")
join_with_revenue.select(
    "customer_id",
    "order_id",
    "order_purchase_timestamp",
    "item_revenue"
).show(3)


max_date = spark.table("silver.orders").agg(max("order_purchase_timestamp")).collect()[0][0]


print("===Group by Customer====")
customer_metrics = join_with_revenue.groupBy("customer_id").agg(
    
    # RECENCY: días desde última compra
    datediff(
        lit(max_date), 
        max("order_purchase_timestamp")
    ).alias("days_since_last_order"),
    
    # FREQUENCY: total de órdenes
    countDistinct("order_id").alias("total_orders"),
    
    # MONETARY: total gastado
    round(sum("item_revenue"), 2).alias("total_spent")
)

print("===Customer Metrics R,F,M Raw====")
customer_metrics.orderBy("total_spent", ascending=False).show(10)

print("Creating Scores")

print("===Creating Scores====")
customer_scored = customer_metrics.withColumn(
    "recency_score",
    when(col("days_since_last_order") <= 90, 5)
    .when(col("days_since_last_order") <= 180, 4)
    .when(col("days_since_last_order") <= 365, 3)
    .when(col("days_since_last_order") <= 730, 2)
    .otherwise(1)
)

customer_scored = customer_scored.withColumn(
    "frequency_score",
    when(col("total_orders") >= 10, 5)
    .when(col("total_orders") >= 5, 4)
    .when(col("total_orders") >= 3, 3)
    .when(col("total_orders") >= 2, 2)
    .otherwise(1)
)

customer_scored = customer_scored.withColumn(
    "monetary_score",
    when(col("total_spent") >= 1000, 5)
    .when(col("total_spent") >= 500, 4)
    .when(col("total_spent") >= 200, 3)
    .when(col("total_spent") >= 100, 2)
    .otherwise(1)
)

print("===Customer with Scores====")
customer_scored.select(
    "customer_id",
    "days_since_last_order", "recency_score",
    "total_orders", "frequency_score",
    "total_spent", "monetary_score"
).show(10)

print("Creating Segments")
customer_rfm_segments = customer_scored.withColumn(
    "segment",
    when(
        (col("recency_score") >= 4) & 
        (col("frequency_score") >= 4) & 
        (col("monetary_score") >= 4),
        "Champions"
    )
    .when(col("frequency_score") >= 4, "Loyal Customers")
    .when(
        (col("recency_score") >= 4) & 
        (col("frequency_score") >= 2),
        "Potential Loyalist"
    )
    .when(
        (col("recency_score") <= 2) & 
        (col("frequency_score") >= 3),
        "At Risk"
    )
    .when(
        (col("recency_score") <= 2) & 
        (col("frequency_score") <= 2),
        "Hibernating"
    )
    .when(
        (col("recency_score") >= 4) & 
        (col("frequency_score") == 1),
        "New Customers"
    )
    .otherwise("Need Attention")
)

print("===Segment Distribution====")
customer_rfm_segments.groupBy("segment").agg(
    count("customer_id").alias("customer_count"),
    round(sum("total_spent"), 2).alias("segment_revenue")
).orderBy("segment_revenue", ascending=False).show()


print("===Save====")
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

spark.sql("DROP TABLE IF EXISTS gold.customer_rfm_segments")

# Now write your DataFrame as a new Delta table
customer_rfm_segments.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.customer_rfm_segments")

print("Table gold.RFM_Analysis created sucessfully")

spark.table("gold.customer_rfm_segments").show(10)

In [0]:
df_orders = spark.table("silver.orders")
df_order_items = spark.table("silver.order_items")
df_products = spark.table("silver.products")

spark.table("silver.products").printSchema

df_product_revenue = (df_order_items
                      .join(df_products, on="product_id", how="left")
                      .join(df_orders.select("order_id", "order_status"), on="order_id", how="left")
                      .groupBy("product_id", "product_category_name")
                      .agg(
                          sum(col("price") + col("freight_value")).alias("item_revenue"),
                          count("order_id").alias("total_orders"),
                          round(sum(col("price") + col("freight_value")) / count("order_id"), 2).alias("avg_order_value")
                      )
)


window_spec = Window.orderBy(col("item_revenue").desc())

df_ranked = (df_product_revenue
             .withColumn("revenue_rank", dense_rank().over(window_spec))
             .withColumn("data_source", lit("olist"))
             .withColumn("data_layer", lit("gold"))
             .withColumn("processed_at", current_timestamp())
             .orderBy("revenue_rank")
)            

print(f"Analysis complete: {df_ranked.count()} products ranked")

df_category_summary = (df_product_revenue
                       .groupBy("product_category_name")
                       .agg(
                           sum(col("item_revenue")).alias("category_total_revenue"),
                           sum(col("total_orders")).alias("category_total_orders"),
                           count("product_id").alias("products_in_category")
                       )
                       .orderBy(col("category_total_revenue").desc())
)

print("Product Perfomance complete by Category")
print(f"Top Product Revenue: R${df_ranked.select('item_revenue').first()[0]:,.2f}")
print(f"Top category: {df_category_summary.select('product_category_name').first()[0]}")


print("===Save====")
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

df_category_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.product_category_summary") \
    
print("Table gold.product_category_summary created sucessfully")
