## 2. Create an enriched table for customers and products 
## 

In [0]:
preprocess_status = dbutils.notebook.run("/Workspace/Users/aniamrita1610@gmail.com/PEI_Assessment/Silver_Layer/pre-processing", timeout_seconds=3600)

In [0]:
customer_df = spark.read.table("pei_adb_proj.silver.customer_stan_silver")
product_df = spark.read.table("pei_adb_proj.silver.product_stan_silver")
order_df = spark.read.table("pei_adb_proj.silver.order_stan_silver")

In [0]:
from pyspark.sql.functions import sum, count
#Customer Enriched Table

customer_enriched = order_df.groupBy("Customer_ID")\
                            .agg(sum("Price").alias("Total_Spend"), count("Order_ID").alias("Total_Orders"))
customer_enriched = customer_df.join(customer_enriched, "Customer_ID", "left").fillna({"Total_Spend": 0, "Total_Orders": 0})
customer_enriched.display()

In [0]:
product_enriched = order_df.groupBy("Product_ID").agg(sum("Quantity").alias("Total_Units_Sold"),sum("Price").alias("Total_Revenue"))
product_enriched = product_df.join(product_enriched, "Product_ID", "left").fillna({"Total_Units_Sold": 0, "Total_Revenue": 0})
product_enriched.display()

#### Customer Enrich Test Cases

In [0]:
# The enriched table should contain all customers (left join)
customer_count = customer_df.select("Customer_ID").distinct().count()
enriched_count = customer_enriched.select("Customer_ID").distinct().count()

assert customer_count == enriched_count, "Test Case 1 Failed: Some customers are missing after join."
print("OK Test Case 1 Passed: All customers are present after join.")


In [0]:
# Customers with no orders should have Total_Spend = 0 and Total_Orders = 0
from pyspark.sql.functions import col
null_customers = customer_enriched.filter((col("Total_Spend") == 0) & (col("Total_Orders") == 0))

assert null_customers.count() >= 0, "Test Case 2 Failed: Customers with no orders not correctly handled."
print("OK Test Case 2 Passed: Customers with no orders have Total_Spend = 0 and Total_Orders = 0.")


In [0]:
expected_columns = {'Customer_ID', 'Total_Spend', 'Total_Orders'}

assert expected_columns.issubset(set(customer_enriched.columns)), "Test Case 3 Failed: Schema is missing expected columns."
print("OK Test Case 3 Passed: Schema is correct.")


####Product Enrich Test Cases

In [0]:
product_count = product_df.select("Product_ID").distinct().count()
enriched_count = product_enriched.select("Product_ID").distinct().count()

assert product_count == enriched_count, "Test Case 1 Failed: Some products are missing after join."
print("OK Test Case 1 Passed: All products are retained after join.")


In [0]:
expected_columns = {'Product_ID', 'Total_Units_Sold', 'Total_Revenue'}

assert expected_columns.issubset(set(product_enriched.columns)), "Test Case 2 Failed: Schema is missing expected columns."
print("OK Test Case 2 Passed: Schema is correct.")


#### Write to Silver Layer

In [0]:
spark.sql("DROP TABLE IF EXISTS pei_adb_proj.silver.customer_enriched")
spark.sql("DROP TABLE IF EXISTS pei_adb_proj.silver.product_enriched")

In [0]:
customer_enriched.write.format("delta").mode("overwrite").saveAsTable("pei_adb_proj.silver.customer_enriched")
product_enriched.write.format("delta").mode("overwrite").saveAsTable("pei_adb_proj.silver.product_enriched")