In [0]:
from pyspark.sql.functions import (
    col, sum, avg, max, min, count, countDistinct,
    datediff, current_date, when, concat, sha2,
    first, current_timestamp,row_number
)

from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [0]:
customers = spark.read.table("golden_360.silver.customers")
geolocation = spark.read.table("golden_360.silver.geolocation")
order_items = spark.read.table("golden_360.silver.order_items")
order_payments = spark.read.table("golden_360.silver.order_payments")
order_reviews = spark.read.table("golden_360.silver.order_reviews")
orders = spark.read.table("golden_360.silver.orders")
product_category = spark.read.table("golden_360.silver.product_category")
products = spark.read.table("golden_360.silver.products")
sellers = spark.read.table("golden_360.silver.sellers")


In [0]:
customers_with_golden_id = customers.withColumn(
    "golden_id", sha2(concat(F.col("customer_unique_id"),F.col("customer_id")),256)
)

customers_with_golden_id.show()

In [0]:
joined_df = customers_with_golden_id \
    .join(orders, on="customer_id", how="left") \
    .join(order_items, on="order_id", how="left") \
    .join(order_payments, on="order_id", how="left") \
    .join(order_reviews, on="order_id", how="left") \
    .join(products, on="product_id", how="left") \
    .join(
        product_category,
        on="product_category_name",
        how="left"
    ) \
    .join(
        geolocation,
        customers_with_golden_id.customer_zip_code_prefix ==
        geolocation.geolocation_zip_code_prefix,
        how="left"
    )


In [0]:
category_window = Window.partitionBy("golden_id") \
    .orderBy(col("category_count").desc())

category_df = joined_df \
    .groupBy("golden_id", "product_category_name_english") \
    .agg(count("*").alias("category_count")) \
    .withColumn("rn", row_number().over(category_window)) \
    .filter(col("rn") == 1) \
    .select(
        "golden_id",
        col("product_category_name_english").alias("top_category")
    )


In [0]:
gold_df = joined_df.groupBy(
    "golden_id",
    "customer_unique_id",
    "customer_id",
    "customer_city",
    "customer_state"
).agg(
    # Financial
    sum(col("payment_value")).alias("total_spend"),
    max(col("payment_value")).alias("max_single_order"),

    # Behavioral
    countDistinct(col("order_id")).alias("order_count"),
    min(col("order_purchase_timestamp")).alias("first_order_date"),
    max(col("order_purchase_timestamp")).alias("last_order_date"),

    # Product
    count(col("order_item_id")).alias("total_items"),
    countDistinct(col("product_id")).alias("unique_products"),
    countDistinct(col("product_category_name_english")).alias("unique_categories"),

    # Sentiment
    avg(col("review_score")).alias("avg_review_score"),
    countDistinct(col("review_id")).alias("total_reviews"),

    # Geography
    first(col("geolocation_lat")).alias("geographic_lat"),
    first(col("geolocation_lng")).alias("geographic_lng")
)


In [0]:
gold_df = gold_df.withColumn(
    "avg_order_value",
    when(col("order_count") == 0, 0.0)
    .otherwise(col("total_spend") / col("order_count"))
)

gold_df = gold_df.withColumn(
    "days_since_last_order",
    datediff(current_date(), col("last_order_date"))
)

gold_df = gold_df.withColumn(
    "product_diversity",
    when(col("order_count") == 0, 0.0)
    .otherwise(col("unique_categories") / col("order_count"))
)


In [0]:
gold_df = gold_df \
    .join(category_df, on="golden_id", how="left") \
    .withColumn("created_at", current_timestamp())


In [0]:
gold_df.write.format("delta").mode("overwrite").saveAsTable("golden_360.gold.unified_golden_360")

In [0]:
# How many customers in Gold?
gold_df.count()

# Should match unique customers in Silver
customers.select("customer_id").distinct().count()

In [0]:
gold_df.coalesce(1).write \
    .format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save("/Volumes/golden_360/gold/golden_data/")