In [0]:
%run ./variables


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType
from pyspark.sql import Window

# Full table names
BRONZE_EVENTS_TABLE = f"{catalog_name}.{bronze_schema}.bronze_user_events"
BRONZE_CUSTOMERS_TABLE = f"{catalog_name}.{bronze_schema}.bronze_customer_profiles"
BRONZE_PRODUCTS_TABLE = f"{catalog_name}.{bronze_schema}.bronze_product_details"
SILVER_TABLE = f"{catalog_name}.{silver_schema}.silver_sessionized_activity"
GOLD_DAILY_PRODUCT_TABLE = f"{catalog_name}.{gold_schema}.gold_daily_product_performance"
GOLD_CUSTOMER_SUMMARY_TABLE = f"{catalog_name}.{gold_schema}.customer_purchase_summary"

In [0]:

#Define schema Schema Enforcement: Define data types to prevent inference errors

customer_schema=StructType([
    StructField("customer_id",IntegerType(),True),
    StructField("signup_date",DateType(),True),
    StructField("location",StringType(),True)
])

In [0]:
#Read and the write as delta table-customer
customers_df=spark.read.csv(raw_batch_customer_path,header=True,schema=customer_schema)
customers_df.write.mode("overwrite").saveAsTable(BRONZE_CUSTOMERS_TABLE)


In [0]:
#products_table
products_schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("category",StringType(),True),
    StructField("price",IntegerType(),True)
    ])

In [0]:
#Read and the write as delta table-products
products_df=spark.read.parquet(raw_batch_product_path,header=True,schema=products_schema)
products_df.write.mode("overwrite").saveAsTable(BRONZE_PRODUCTS_TABLE)

In [0]:
event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_timestamp", StringType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("session_id", StringType(), True)
    ])

In [0]:
events_df=spark.readStream\
.format("cloudFiles")\
.option("cloudFiles.format","json")\
.option("cloudFiles.schemaLocation",f"{schema_location}/bronze_events")\
.schema(event_schema)\
.load(raw_streaming_path)\
.withColumn("ingestion_timestamp",F.current_timestamp())\
.withColumn("event_dt",F.to_timestamp("event_timestamp"))

In [0]:
events_df.writeStream.format("delta").outputMode("append")\
    .option("checkpointLocation",f"{checkpoint_path}/bronze_events")\
    .trigger(availableNow=True)\
    .toTable(BRONZE_EVENTS_TABLE)

In [0]:
bronze_events_df=spark.readStream.table(BRONZE_EVENTS_TABLE)
customer_df=spark.read.table(BRONZE_CUSTOMERS_TABLE)
products_df=spark.read.table(BRONZE_PRODUCTS_TABLE)

customer_df=customer_df.withColumn("region",F.when(F.col("location").isin('New York', 'Philadelphia'),'East Coast')
                                   .when(F.col("location").isin('Los Angeles', 'San Diego'),'West Coast').otherwise("Central")
                                   )

                                

In [0]:
deduplicated_event_df=bronze_events_df.withWatermark("event_dt","3 minutes").dropDuplicates(["event_id"])

In [0]:
enriched_df=deduplicated_event_df\
                .join(F.broadcast(products_df),"product_id","inner")\
                .join(F.broadcast(customer_df),deduplicated_event_df["user_id"]==customer_df["customer_id"],"left")\
                .withColumn("event_dt",F.to_date("event_dt"))\
                .withColumnRenamed("event_type","action")\
                .select("event_id","event_timestamp","event_dt","action",\
                 deduplicated_event_df["user_id"],deduplicated_event_df["product_id"],\
                "product_name","category","price","region")

enriched_df.writeStream.format("delta").outputMode("append")\
    .option("checkpointLocation",f"{checkpoint_path}/silver_activity")\
    .trigger(availableNow=True)\
    .toTable(SILVER_TABLE)

In [0]:
silver_df=spark.read.table(SILVER_TABLE)
#skewed dat adding salting
salting_factor=5
salted_df=silver_df.withColumn("salt",F.rand()*salting_factor)
salted_agg_df=salted_df.groupBy("event_dt","product_id","product_name","category","price","salt")\
                 .agg(F.count(F.when(F.col("action")== "view_product",1)).alias("views"),\
                      F.count(F.when(F.col("action") == "add_to_cart", 1)).alias("adds_to_cart"),\
                      F.count(F.when(F.col("action") == "purchase", 1)).alias("purchases"),\
                      F.sum(F.when(F.col("action")=="purchase",F.col("price")).otherwise(0)).alias("revenue")\
                      )
daily_product_performance=salted_agg_df.groupBy("event_dt","product_id","product_name","category","price")\
    .agg(F.sum("views").alias("total_views"),\
    F.sum("adds_to_cart").alias("total_adds_to_cart"),\
    F.sum("purchases").alias("total_purchases"),\
    F.sum("revenue").alias("total_revenue")).drop("salt")

Window_spec=Window.partitionBy("event_dt","category").orderBy("total_revenue")
daily_performance=daily_product_performance.withColumn("rank",F.dense_rank().over(Window_spec))

daily_performance.write.format("delta").mode("overwrite")\
    .partitionBy("event_dt")\
    .saveAsTable(GOLD_DAILY_PRODUCT_TABLE)


In [0]:
silver_df = spark.read.table(SILVER_TABLE)
unknown_location_count = silver_df.filter(F.col("region").isNull()).count()

customer_summary_df = (
    silver_df.filter(F.col("action") == 'purchase')
    .groupBy("user_id", "region")
    .agg(
        F.sum("price").alias("total_purchase_value"),
        F.count("event_id").alias("total_purchases"),
        F.approx_count_distinct("product_id").alias("distinct_products_purchased"),
        F.max("event_dt").alias("last_purchase_timestamp")
    )
    .orderBy(F.col("total_purchase_value").desc())
)

(
    customer_summary_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(GOLD_CUSTOMER_SUMMARY_TABLE)
)
