In [0]:
from pyspark.sql.functions import count, avg, sum, countDistinct, col, round, desc, expr, lit

# Set context
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA instacart")

# Load Silver Tables
df_orders = spark.read.table("silver_orders")
df_items = spark.read.table("silver_order_items")
df_products = spark.read.table("silver_products")

# ==========================================
# 1. TABLE: GOLD_GLOBAL_KPIS (FIXED)
# Purpose: The "Big Number Cards" at the top of your dashboard
# ==========================================
print("Building gold_global_kpis...")

# Calculate global scalar values
total_orders = df_orders.count()
total_users = df_orders.select("user_id").distinct().count()
total_items = df_items.count()
avg_items_per_order = total_items / total_orders
global_reorder_rate = df_items.filter(col("reordered") == 1).count() / total_items

# FIX: Use __builtins__.round() to use Python's native round instead of Spark's
kpi_data = [(
    total_orders, 
    total_users, 
    total_items, 
    __builtins__.round(avg_items_per_order, 1), 
    __builtins__.round(global_reorder_rate, 2)
)]

kpi_columns = ["total_orders", "total_users", "total_items_sold", "avg_basket_size", "global_reorder_rate"]

df_kpis = spark.createDataFrame(kpi_data, kpi_columns)

df_kpis.write.format("delta").mode("overwrite").saveAsTable("gold_global_kpis")
print("Created gold_global_kpis")


# ==========================================
# 2. TABLE: GOLD_CUSTOMER_FEATURES
# Purpose: Input for ML & Customer Segmentation
# ==========================================
print("Building gold_customer_features...")

user_order_stats = df_orders.groupBy("user_id").agg(
    count("order_id").alias("total_orders"),
    round(avg("days_since_prior_order"), 2).alias("avg_days_between_orders")
)

user_item_stats = df_orders.join(df_items, "order_id") \
    .groupBy("user_id").agg(
        count("product_id").alias("total_products_purchased"),
        sum("reordered").alias("total_reordered"),
        countDistinct("product_id").alias("unique_products_bought")
    )

df_gold_customers = user_order_stats.join(user_item_stats, "user_id") \
    .withColumn("avg_basket_size", round(col("total_products_purchased") / col("total_orders"), 2)) \
    .withColumn("reorder_ratio", round(col("total_reordered") / col("total_products_purchased"), 2)) \
    .drop("total_reordered") 

df_gold_customers.write.format("delta").mode("overwrite").saveAsTable("gold_customer_features")
print("Created gold_customer_features")


# ==========================================
# 3. TABLE: GOLD_PRODUCT_PERFORMANCE
# Purpose: Top selling products list
# ==========================================
print("Building gold_product_performance...")

product_stats = df_items.groupBy("product_id").agg(
    count("order_id").alias("total_sales"),
    sum("reordered").alias("reorder_count")
)

df_gold_products = product_stats.join(df_products, "product_id") \
    .withColumn("reorder_rate", round(col("reorder_count") / col("total_sales"), 2)) \
    .select("product_id", "product_name", "aisle", "department", "total_sales", "reorder_rate")

df_gold_products.write.format("delta").mode("overwrite").saveAsTable("gold_product_performance")
print("Created gold_product_performance")


# ==========================================
# 4. TABLE: GOLD_DEPARTMENT_ANALYTICS
# Purpose: Pie Chart (Share of Sales by Dept)
# ==========================================
print("Building gold_department_analytics...")

df_dept_stats = df_items.join(df_products, "product_id") \
    .groupBy("department").agg(
        count("order_id").alias("total_items_sold"),
        round(avg("reordered"), 2).alias("avg_reorder_rate")
    ) \
    .orderBy(desc("total_items_sold"))

df_dept_stats.write.format("delta").mode("overwrite").saveAsTable("gold_department_analytics")
print("Created gold_department_analytics")


# ==========================================
# 5. TABLE: GOLD_AISLE_ANALYTICS
# Purpose: Bar Chart (Top 10 Popular Aisles)
# ==========================================
print("Building gold_aisle_analytics...")

df_aisle_stats = df_items.join(df_products, "product_id") \
    .groupBy("aisle").agg(
        count("order_id").alias("total_sales")
    ) \
    .orderBy(desc("total_sales")) \
    .limit(20)

df_aisle_stats.write.format("delta").mode("overwrite").saveAsTable("gold_aisle_analytics")
print("Created gold_aisle_analytics")


# ==========================================
# 6. TABLE: GOLD_BASKET_SIZE_DIST
# Purpose: Histogram (How big are orders usually?)
# ==========================================
print("Building gold_basket_size_distribution...")

order_sizes = df_items.groupBy("order_id").count().withColumnRenamed("count", "basket_size")

df_basket_dist = order_sizes.groupBy("basket_size").agg(
    count("order_id").alias("order_count")
).orderBy("basket_size")

df_basket_dist.write.format("delta").mode("overwrite").saveAsTable("gold_basket_size_distribution")
print("Created gold_basket_size_distribution")


# ==========================================
# 7. TABLE: GOLD_PEAK_TIMES
# Purpose: Heatmap (When do people shop?)
# ==========================================
print("Building gold_peak_times...")

df_peak_times = df_orders.groupBy("order_dow", "order_hour_of_day").agg(
    count("order_id").alias("total_orders")
).orderBy("order_dow", "order_hour_of_day")

df_peak_times.write.format("delta").mode("overwrite").saveAsTable("gold_peak_times")
print("Created gold_peak_times")


# ==========================================
# Final Verification
# ==========================================
print("\n--- Gold Layer Ready ---")
display(spark.sql("SELECT * FROM gold_global_kpis"))

Building gold_global_kpis...
Created gold_global_kpis
Building gold_customer_features...
Created gold_customer_features
Building gold_product_performance...
Created gold_product_performance
Building gold_department_analytics...
Created gold_department_analytics
Building gold_aisle_analytics...
Created gold_aisle_analytics
Building gold_basket_size_distribution...
Created gold_basket_size_distribution
Building gold_peak_times...
Created gold_peak_times

--- Gold Layer Ready ---


total_orders,total_users,total_items_sold,avg_basket_size,global_reorder_rate
3421083,206209,33819106,9.9,0.59


In [0]:
from pyspark.sql.functions import col, count, desc, rank
from pyspark.sql.window import Window

# 1. Load the necessary tables
df_segments = spark.read.table("gold_customer_segments_labeled")
df_orders = spark.read.table("silver_orders")
df_items = spark.read.table("silver_order_items")
df_products = spark.read.table("silver_products")

# 2. Join everything to link Segments -> Orders -> Products
# This is a big join, but essential for the "Deep Dive" analytics
print("Building gold_segment_analytics...")

df_full_join = df_segments.join(df_orders, "user_id") \
    .join(df_items, "order_id") \
    .join(df_products, "product_id")

# 3. Aggregation: Top Products per Segment
# We want to know the top 5 products for EACH segment
window_spec = Window.partitionBy("customer_segment").orderBy(desc("product_count"))

df_segment_products = df_full_join.groupBy("customer_segment", "product_name", "department") \
    .agg(count("product_id").alias("product_count")) \
    .withColumn("rank", rank().over(window_spec)) \
    .filter(col("rank") <= 10) # Keep top 10 per segment to keep Power BI fast

df_segment_products.write.format("delta").mode("overwrite").saveAsTable("gold_segment_analytics")

print("Created table: gold_segment_analytics")
display(df_segment_products)

Building gold_segment_analytics...
Created table: gold_segment_analytics


customer_segment,product_name,department,product_count,rank
Core Regulars,Banana,produce,210123,1
Core Regulars,Bag of Organic Bananas,produce,163715,2
Core Regulars,Organic Baby Spinach,produce,107075,3
Core Regulars,Organic Strawberries,produce,104573,4
Core Regulars,Organic Hass Avocado,produce,85327,5
Core Regulars,Organic Avocado,produce,81573,6
Core Regulars,Large Lemon,produce,69850,7
Core Regulars,Strawberries,produce,67083,8
Core Regulars,Limes,produce,60782,9
Core Regulars,Organic Whole Milk,dairy eggs,55177,10
