In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import sys
sys.path.append('..')

from src.logging_utils import setup_logger, log_dataframe_stats

# Setup logger
logger = setup_logger(__name__, level="INFO")
logger.info("Starting Gold layer aggregation")

In [None]:
# Initialize Spark session (simplified for local development)
try:
    spark
    logger.info("Using existing Spark session")
except NameError:
    spark = SparkSession.builder \
        .appName("Gold_Aggregates") \
        .config("spark.sql.catalogImplementation", "hive") \
        .getOrCreate()
    logger.info("Created new Spark session")

# Configuration
SILVER_PATH = "Tables/silver"
GOLD_PATH = "Tables/gold"

## Step 1: Load Silver Data

In [None]:
# Read Silver tables (Parquet format for local development)
logger.info("Loading Silver layer data...")

customers_silver = spark.read.format("parquet").load(f"{SILVER_PATH}/customers")
orders_silver = spark.read.format("parquet").load(f"{SILVER_PATH}/orders")

log_dataframe_stats(customers_silver, "customers_silver", logger)
log_dataframe_stats(orders_silver, "orders_silver", logger)

## Step 2: Monthly Revenue (Fixed Version)

Calculate correct monthly revenue from completed orders.

In [None]:
# Monthly revenue - CORRECTED VERSION
# (Assumes you've fixed the bugs in src/silver.py)
logger.info("Computing monthly revenue...")

monthly_revenue = (
    orders_silver
    .filter(F.col("status") == "complete")  # Silver data has normalized status
    .withColumn("month", F.date_format(F.col("order_date"), "yyyy-MM"))
    .groupBy("month")
    .agg(
        F.sum(F.col("line_total")).alias("revenue"),
        F.count("order_id").alias("order_count"),
        F.countDistinct("customer_id").alias("unique_customers")
    )
    .orderBy("month")
)

print("\nâœ“ Monthly Revenue:")
display(monthly_revenue)

# Calculate total
total_revenue = monthly_revenue.select(F.sum("revenue")).collect()[0][0]
print(f"\nTotal Revenue: ${total_revenue:,.2f}")

## Step 3: Top Products by Revenue

Identify best-selling products (if product data exists in orders).

In [None]:
# Note: Original CSV doesn't have product SKU, this is for demonstration
# If your data has product information, uncomment and adapt:

# top_products = (
#     orders_silver
#     .filter(F.col("status") == "complete")
#     .groupBy("product_sku", "product_name")
#     .agg(
#         F.sum("line_total").alias("total_revenue"),
#         F.sum("quantity").alias("total_quantity"),
#         F.count("order_id").alias("order_count")
#     )
#     .orderBy(F.desc("total_revenue"))
#     .limit(10)
# )
# 
# display(top_products)

logger.info("Top products metric skipped - product data not in original CSV")

## Step 4: Customer Segmentation

Analyze customer purchase behavior.

In [None]:
# Customer lifetime value and segmentation
logger.info("Computing customer metrics...")

customer_metrics = (
    orders_silver
    .filter(F.col("status") == "complete")
    .groupBy("customer_id", "name", "email")
    .agg(
        F.sum("line_total").alias("total_spent"),
        F.count("order_id").alias("order_count"),
        F.avg("line_total").alias("avg_order_value"),
        F.min("order_date").alias("first_order_date"),
        F.max("order_date").alias("last_order_date")
    )
    .withColumn(
        "customer_segment",
        F.when(F.col("order_count") >= 5, "VIP")
         .when(F.col("order_count") >= 3, "Regular")
         .when(F.col("order_count") >= 2, "Repeat")
         .otherwise("One-time")
    )
    .orderBy(F.desc("total_spent"))
)

print("\nâœ“ Customer Metrics:")
display(customer_metrics.limit(20))

In [None]:
# Segment distribution
segment_distribution = (
    customer_metrics
    .groupBy("customer_segment")
    .agg(
        F.count("customer_id").alias("customer_count"),
        F.sum("total_spent").alias("segment_revenue")
    )
    .orderBy(F.desc("segment_revenue"))
)

print("\nCustomer Segment Distribution:")
display(segment_distribution)

## ðŸŽ¯ TASK C: Add Repeat Customer Rate Metric

**Objective**: Calculate the percentage of customers who made repeat purchases each month.

**Definition**: 
- Repeat customer = Customer who has made 2+ orders (cumulative) by that month
- Repeat customer rate = (Repeat customers / Total active customers) * 100

**Hints**:
1. Use window functions to track cumulative order count per customer
2. Group by month to count repeat vs. first-time customers
3. Calculate percentage

In [None]:
# TODO: Implement repeat_customer_rate calculation
# Your code here:

logger.info("Computing repeat customer rate by month...")

# Step 1: Add cumulative order count for each customer
window_spec = Window.partitionBy("customer_id").orderBy("order_date").rowsBetween(Window.unboundedPreceding, 0)

orders_with_order_number = (
    orders_silver
    .filter(F.col("status") == "complete")
    .withColumn("order_number", F.row_number().over(window_spec))
    .withColumn("month", F.date_format(F.col("order_date"), "yyyy-MM"))
    .withColumn("is_repeat_customer", F.when(F.col("order_number") > 1, 1).otherwise(0))
)

# Step 2: Aggregate by month
repeat_customer_rate = (
    orders_with_order_number
    .groupBy("month")
    .agg(
        F.countDistinct("customer_id").alias("total_customers"),
        F.sum("is_repeat_customer").alias("repeat_customer_orders"),
        F.countDistinct(
            F.when(F.col("is_repeat_customer") == 1, F.col("customer_id"))
        ).alias("repeat_customers")
    )
    .withColumn(
        "repeat_customer_rate",
        F.round((F.col("repeat_customers") / F.col("total_customers")) * 100, 2)
    )
    .orderBy("month")
)

print("\nâœ“ Repeat Customer Rate by Month:")
display(repeat_customer_rate)

# Validation
avg_repeat_rate = repeat_customer_rate.select(F.avg("repeat_customer_rate")).collect()[0][0]
print(f"\nAverage Repeat Customer Rate: {avg_repeat_rate:.2f}%")

## Step 5: Write Gold Tables

In [None]:
# Write monthly revenue to Gold layer (Parquet format)
logger.info("Writing monthly_revenue to Gold layer...")

monthly_revenue.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{GOLD_PATH}/monthly_revenue")

logger.info("âœ“ Monthly revenue Gold table created")

In [None]:
# Write customer metrics to Gold layer (Parquet format)
logger.info("Writing customer_metrics to Gold layer...")

customer_metrics.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{GOLD_PATH}/customer_metrics")

logger.info("âœ“ Customer metrics Gold table created")

In [None]:
# Write repeat customer rate to Gold layer (Parquet format)
logger.info("Writing repeat_customer_rate to Gold layer...")

repeat_customer_rate.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(f"{GOLD_PATH}/repeat_customer_rate")

logger.info("âœ“ Repeat customer rate Gold table created")

## Summary Dashboard

In [None]:
# Summary statistics
total_revenue = monthly_revenue.select(F.sum("revenue")).collect()[0][0]
total_customers = customer_metrics.count()
avg_order_value = customer_metrics.select(F.avg("avg_order_value")).collect()[0][0]
vip_customers = customer_metrics.filter(F.col("customer_segment") == "VIP").count()

print(f"""\n{'='*50}
Gold Layer Analytics Complete
{'='*50}

ðŸ“Š Business Metrics:
  Total Revenue: ${total_revenue:,.2f}
  Total Customers: {total_customers}
  Avg Order Value: ${avg_order_value:,.2f}
  VIP Customers: {vip_customers}
  Avg Repeat Rate: {avg_repeat_rate:.2f}%

âœ… Gold Tables Created:
  â†’ monthly_revenue
  â†’ customer_metrics
  â†’ repeat_customer_rate

{'='*50}\n""")