In [0]:
%pip install ucimlrepo --quiet
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Import required libraries
from ucimlrepo import fetch_ucirepo 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("OnlineRetailAnalytics").getOrCreate()

In [0]:
# ==========================================
# BRONZE LAYER - DATA INGESTION
# ==========================================

print("=== BRONZE LAYER: DATA INGESTION ===")

# Fetch the Online Retail dataset from UCI repository
online_retail = fetch_ucirepo(id=352)

# Define explicit schema for data consistency
schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", DoubleType(), True),
    StructField("Country", StringType(), True)
])

# Convert pandas DataFrame to Spark DataFrame with schema
retail_spark_df = spark.createDataFrame(online_retail.data.original, schema=schema)

print(f"Total records ingested: {retail_spark_df.count():,}")
retail_spark_df.printSchema()

=== BRONZE LAYER: DATA INGESTION ===
Total records ingested: 541,909
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
# ==========================================
# BRONZE LAYER - BASIC DATA CLEANING
# ==========================================

print("=== BRONZE LAYER: BASIC CLEANING ===")

# Apply basic transformations and filters
retail_clean_df = retail_spark_df \
    .withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")) \
    .withColumn("CustomerID", col("CustomerID").cast("integer")) \
    .withColumn("TotalPrice", col("Quantity") * col("UnitPrice")) \
    .filter(col("Quantity") > 0) \
    .filter(col("UnitPrice") > 0) \
    .filter(col("CustomerID").isNotNull())

# Display data quality metrics
print(f"Records after cleaning: {retail_clean_df.count():,}")
print(f"Unique customers: {retail_clean_df.select('CustomerID').distinct().count():,}")

# Show date range of the dataset
date_range = retail_clean_df.agg(min('InvoiceDate'), max('InvoiceDate')).collect()[0]
print(f"Date range: {date_range[0]} to {date_range[1]}")

# Save Bronze layer to Delta table
retail_clean_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("online_retail_bronze")

print(" Bronze layer saved: online_retail_bronze")

=== BRONZE LAYER: BASIC CLEANING ===
Records after cleaning: 397,884
Unique customers: 4,338
Date range: 2010-12-01 08:26:00 to 2011-12-09 12:50:00
 Bronze layer saved: online_retail_bronze


In [0]:
# ==========================================
# SILVER LAYER - DATA ENRICHMENT
# ==========================================

print("=== SILVER LAYER: DATA ENRICHMENT ===")

# Read from Bronze layer
bronze_df = spark.table("online_retail_bronze")

# Add time-based features and business categorizations
silver_df = bronze_df \
    .withColumn("Year", year(col("InvoiceDate"))) \
    .withColumn("Month", month(col("InvoiceDate"))) \
    .withColumn("Quarter", quarter(col("InvoiceDate"))) \
    .withColumn("DayOfWeek", dayofweek(col("InvoiceDate"))) \
    .withColumn("Hour", hour(col("InvoiceDate"))) \
    .withColumn("TotalPrice", round(col("Quantity") * col("UnitPrice"), 2))

# Add day name for better readability
silver_df = silver_df.withColumn("DayName", 
    when(col("DayOfWeek") == 1, "Sunday")
    .when(col("DayOfWeek") == 2, "Monday")
    .when(col("DayOfWeek") == 3, "Tuesday")
    .when(col("DayOfWeek") == 4, "Wednesday")
    .when(col("DayOfWeek") == 5, "Thursday")
    .when(col("DayOfWeek") == 6, "Friday")
    .when(col("DayOfWeek") == 7, "Saturday")
)

# Add weekend indicator and product categorization
silver_df = silver_df \
    .withColumn("IsWeekend", when(col("DayOfWeek").isin([1, 7]), 1).otherwise(0)) \
    .withColumn("ProductCategory", 
        when(col("StockCode").startswith("POST"), "Postage")
        .when(col("StockCode").startswith("D"), "Discount")
        .when(col("StockCode").startswith("C"), "Cancelled")
        .when(col("StockCode").startswith("M"), "Manual")
        .when(col("StockCode").contains("BANK"), "Bank_Charges")
        .otherwise("Product")
    ) \
    .withColumn("created_at", current_timestamp())

print(f"Silver layer records: {silver_df.count():,}")

=== SILVER LAYER: DATA ENRICHMENT ===
Silver layer records: 397,884


In [0]:
# Remove duplicate records based on business logic
print("Removing duplicate records...")

# Define window for deduplication
dedup_window = Window.partitionBy("InvoiceNo", "StockCode", "CustomerID").orderBy(col("InvoiceDate").desc())

# Keep only the latest record for each unique combination
silver_df = silver_df \
    .withColumn("row_num", row_number().over(dedup_window)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

# Save Silver layer with partitioning for better performance
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .partitionBy("Year", "Month") \
    .saveAsTable("online_retail_silver")

print(f" Silver layer saved with {silver_df.count():,} records")

Removing duplicate records...
 Silver layer saved with 387,841 records


In [0]:
# ==========================================
# GOLD LAYER - CUSTOMER ANALYTICS (RFM)
# ==========================================

print("=== GOLD LAYER: CUSTOMER RFM ANALYSIS ===")

# Read from Silver layer
silver_df = spark.table("online_retail_silver")

# Get the latest date in the dataset for recency calculations
current_date = silver_df.agg(max("InvoiceDate")).collect()[0][0]
print(f"Analysis reference date: {current_date}")

# Calculate base RFM metrics for each customer
customer_base_metrics = silver_df \
    .filter(col("CustomerID").isNotNull()) \
    .filter(col("TotalPrice") > 0) \
    .groupBy("CustomerID", "Country") \
    .agg(
        # Recency: Days since last purchase
        datediff(lit(current_date), max("InvoiceDate")).alias("Recency"),
        # Frequency: Number of unique invoices (transactions)
        countDistinct("InvoiceNo").alias("Frequency"),
        # Monetary: Total amount spent
        round(sum("TotalPrice"), 2).alias("Monetary"),
        # Additional customer insights
        count("*").alias("TotalItems"),
        countDistinct("StockCode").alias("UniqueProducts"),
        min("InvoiceDate").alias("FirstPurchase"),
        max("InvoiceDate").alias("LastPurchase"),
        round(avg("TotalPrice"), 2).alias("AvgOrderValue"),
        round(sum("TotalPrice") / countDistinct("InvoiceNo"), 2).alias("AvgBasketValue")
    )

print(f"Total customers analyzed: {customer_base_metrics.count():,}")

=== GOLD LAYER: CUSTOMER RFM ANALYSIS ===
Analysis reference date: 2011-12-09 12:50:00
Total customers analyzed: 4,346


In [0]:
# Perform data quality validation before RFM scoring
print("=== RFM DATA QUALITY VALIDATION ===")

# Check for invalid data that could skew RFM analysis
invalid_monetary = customer_base_metrics.filter(col("Monetary") <= 0).count()
invalid_frequency = customer_base_metrics.filter(col("Frequency") <= 0).count()
invalid_recency = customer_base_metrics.filter(col("Recency") < 0).count()

print(f"Customers with invalid monetary values: {invalid_monetary}")
print(f"Customers with invalid frequency values: {invalid_frequency}")
print(f"Customers with invalid recency values: {invalid_recency}")

# Filter to clean dataset for accurate RFM scoring
clean_customers = customer_base_metrics.filter(
    (col("Monetary") > 0) & 
    (col("Frequency") > 0) & 
    (col("Recency") >= 0)
)

print(f"Clean customers for RFM scoring: {clean_customers.count():,}")

=== RFM DATA QUALITY VALIDATION ===
Customers with invalid monetary values: 0
Customers with invalid frequency values: 0
Customers with invalid recency values: 0
Clean customers for RFM scoring: 4,346


In [0]:
# Calculate RFM scores using quintiles
print("Calculating RFM scores...")

# Apply quintile scoring for each RFM component
# Note: For Recency, lower values (more recent) get higher scores, so we reverse the ranking

# Define window specifications with explicit partitioning
# Using lit(1) to create a single partition for global quintiles across all customers
recency_window = Window.partitionBy(lit(1)).orderBy("Recency")
frequency_window = Window.partitionBy(lit(1)).orderBy(col("Frequency").desc())
monetary_window = Window.partitionBy(lit(1)).orderBy(col("Monetary").desc())

# Apply quintile scoring for each RFM component
# Note: For Recency, lower values (more recent) get higher scores, so we reverse the ranking
rfm_scored = clean_customers \
    .withColumn("R_Score", 6 - ntile(5).over(recency_window)) \
    .withColumn("F_Score", ntile(5).over(frequency_window)) \
    .withColumn("M_Score", ntile(5).over(monetary_window)) \
    .withColumn("RFM_Score", concat(col("R_Score"), col("F_Score"), col("M_Score")))

# Display RFM score distribution to validate proper quintile creation
print("RFM Score Distribution:")
print("R_Score (Recency):")
rfm_scored.groupBy("R_Score").count().orderBy("R_Score").show()
print("F_Score (Frequency):")
rfm_scored.groupBy("F_Score").count().orderBy("F_Score").show()
print("M_Score (Monetary):")
rfm_scored.groupBy("M_Score").count().orderBy("M_Score").show()

Calculating RFM scores...
RFM Score Distribution:
R_Score (Recency):
+-------+-----+
|R_Score|count|
+-------+-----+
|      1|  869|
|      2|  869|
|      3|  869|
|      4|  869|
|      5|  870|
+-------+-----+

F_Score (Frequency):
+-------+-----+
|F_Score|count|
+-------+-----+
|      1|  870|
|      2|  869|
|      3|  869|
|      4|  869|
|      5|  869|
+-------+-----+

M_Score (Monetary):
+-------+-----+
|M_Score|count|
+-------+-----+
|      1|  870|
|      2|  869|
|      3|  869|
|      4|  869|
|      5|  869|
+-------+-----+



In [0]:
# Apply customer segmentation based on RFM scores
print("Applying customer segmentation...")

customer_analytics_final = rfm_scored.withColumn("CustomerSegment", 
    # Champions: High value customers who buy frequently and recently
    when(col("RFM_Score").isin([
        "555", "554", "544", "545", "454", "455", "445", "553", "552"
    ]), "Champions")
    
    # Loyal Customers: Regular customers with good purchase history
    .when(col("RFM_Score").isin([
        "543", "444", "435", "355", "354", "345", "344", "335", "434", "343"
    ]), "Loyal_Customers")
    
    # Potential Loyalists: Recent customers with growth potential
    .when(col("RFM_Score").isin([
        "551", "541", "542", "533", "532", "531", "452", "451", "453", "342", "351", "352", "353"
    ]), "Potential_Loyalists")
    
    # New Customers: Recent first-time or low-frequency buyers
    .when(col("RFM_Score").isin([
        "512", "511", "421", "422", "412", "411", "311", "312", "313", "314", "321", "322"
    ]), "New_Customers")
    
    # At Risk: Good customers who haven't purchased recently
    .when(col("RFM_Score").isin([
        "155", "154", "144", "214", "215", "115", "114", "113", "145", "125", "124"
    ]), "At_Risk")
    
    # Cannot Lose: High-value customers at high risk of churning
    .when(col("RFM_Score").isin([
        "123", "122", "121", "223", "222", "221", "213", "231", "141", "142", "143", "112", "111"
    ]), "Cannot_Lose")
    
    # Promising: New customers with decent purchase potential
    .when(col("RFM_Score").isin([
        "244", "245", "254", "255", "334", "325", "324", "323"
    ]), "Promising")
    
    # Need Attention: Below average customers who need activation
    .when(col("RFM_Score").isin([
        "253", "252", "251", "243", "242", "241", "235", "234", "233", "232"
    ]), "Need_Attention")
    
    # All other combinations
    .otherwise("Others")
) \
.withColumn("created_at", current_timestamp())

# Display customer segment distribution
print("Customer Segment Distribution:")
customer_analytics_final.groupBy("CustomerSegment").count().orderBy(col("count").desc()).show()

Applying customer segmentation...
Customer Segment Distribution:
+-------------------+-----+
|    CustomerSegment|count|
+-------------------+-----+
|      New_Customers| 1092|
|             Others|  919|
|            At_Risk|  629|
|    Loyal_Customers|  443|
|          Promising|  411|
|     Need_Attention|  385|
|        Cannot_Lose|  239|
|          Champions|  147|
|Potential_Loyalists|   81|
+-------------------+-----+



In [0]:
# Save customer analytics to Gold layer
customer_analytics_final.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_customer_analytics")

print(" Customer Analytics saved to: gold_customer_analytics")

# Display sample results for verification
print("Sample Customer Analytics Results:")
customer_analytics_final.select(
    "CustomerID", "Country", "Recency", "Frequency", "Monetary", 
    "R_Score", "F_Score", "M_Score", "RFM_Score", "CustomerSegment"
).show(10)

 Customer Analytics saved to: gold_customer_analytics
Sample Customer Analytics Results:
+----------+--------------+-------+---------+---------+-------+-------+-------+---------+-------------------+
|CustomerID|       Country|Recency|Frequency| Monetary|R_Score|F_Score|M_Score|RFM_Score|    CustomerSegment|
+----------+--------------+-------+---------+---------+-------+-------+-------+---------+-------------------+
|     14646|   Netherlands|      1|       73|280206.02|      5|      1|      1|      511|      New_Customers|
|     18102|United Kingdom|      0|       60| 259657.3|      5|      1|      1|      511|      New_Customers|
|     17450|United Kingdom|      8|       46|194390.79|      5|      1|      1|      511|      New_Customers|
|     16446|United Kingdom|      0|        2| 168472.5|      5|      3|      1|      531|Potential_Loyalists|
|     14911|          EIRE|      1|      201|143711.17|      5|      1|      1|      511|      New_Customers|
|     12415|     Australia|    

In [0]:
# ==========================================
# GOLD LAYER - PRODUCT PERFORMANCE ANALYTICS
# ==========================================

print("=== GOLD LAYER: PRODUCT PERFORMANCE ===")

# Analyze product performance metrics focusing on actual products
product_performance = silver_df \
    .filter(col("ProductCategory") == "Product") \
    .groupBy("StockCode", "Description", "ProductCategory") \
    .agg(
        # Sales volume metrics
        sum("Quantity").alias("TotalQuantitySold"),
        round(sum("TotalPrice"), 2).alias("TotalRevenue"),
        
        # Customer engagement metrics
        countDistinct("CustomerID").alias("UniqueCustomers"),
        countDistinct("InvoiceNo").alias("UniqueOrders"),
        count("*").alias("TotalTransactions"),
        
        # Average performance metrics
        round(avg("UnitPrice"), 2).alias("AvgUnitPrice"),
        round(avg("Quantity"), 2).alias("AvgQuantityPerOrder"),
        round(sum("TotalPrice") / countDistinct("InvoiceNo"), 2).alias("AvgRevenuePerOrder")
    )

# Define window specifications for global rankings
revenue_window = Window.partitionBy(lit(1)).orderBy(col("TotalRevenue").desc())
quantity_window = Window.partitionBy(lit(1)).orderBy(col("TotalQuantitySold").desc())
popularity_window = Window.partitionBy(lit(1)).orderBy(col("UniqueCustomers").desc())

# Add ranking metrics for easy identification of top performers
product_performance = product_performance \
    .withColumn("RevenueRank", dense_rank().over(revenue_window)) \
    .withColumn("QuantityRank", dense_rank().over(quantity_window)) \
    .withColumn("PopularityRank", dense_rank().over(popularity_window)) \
    .withColumn("created_at", current_timestamp())

# Save product performance analytics
product_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_product_performance")

print(" Product Performance saved to: gold_product_performance")
print(f"Products analyzed: {product_performance.count():,}")

# Show top performing products
print("Top 10 Products by Revenue:")
product_performance.select(
    "StockCode", "Description", "TotalRevenue", "TotalQuantitySold", "UniqueCustomers"
).orderBy(col("TotalRevenue").desc()).show(10, truncate=False)

=== GOLD LAYER: PRODUCT PERFORMANCE ===
 Product Performance saved to: gold_product_performance
Products analyzed: 3,892
Top 10 Products by Revenue:
+---------+----------------------------------+------------+-----------------+---------------+
|StockCode|Description                       |TotalRevenue|TotalQuantitySold|UniqueCustomers|
+---------+----------------------------------+------------+-----------------+---------------+
|23843    |PAPER CRAFT , LITTLE BIRDIE       |168469.6    |80995            |1              |
|22423    |REGENCY CAKESTAND 3 TIER          |141946.0    |12349            |881            |
|85123A   |WHITE HANGING HEART T-LIGHT HOLDER|100046.95   |36589            |856            |
|85099B   |JUMBO BAG RED RETROSPOT           |84962.28    |46040            |635            |
|23166    |MEDIUM CERAMIC TOP STORAGE JAR    |81405.48    |77907            |138            |
|47566    |PARTY BUNTING                     |68614.98    |15244            |708            |
|8487

In [0]:
# ==========================================
# GOLD LAYER - TIME SERIES SALES ANALYTICS
# ==========================================

print("=== GOLD LAYER: SALES TIME SERIES ===")

# Create comprehensive time-based sales analytics
sales_timeseries = silver_df.groupBy("Year", "Month", "Quarter", "DayName", "IsWeekend", "Hour") \
    .agg(
        # Revenue and volume metrics
        round(sum("TotalPrice"), 2).alias("TotalRevenue"),
        sum("Quantity").alias("TotalQuantity"),
        
        # Transaction metrics
        countDistinct("InvoiceNo").alias("TotalOrders"),
        countDistinct("CustomerID").alias("UniqueCustomers"),
        countDistinct("StockCode").alias("UniqueProducts"),
        
        # Average performance metrics
        round(avg("TotalPrice"), 2).alias("AvgOrderValue"),
        round(sum("TotalPrice") / countDistinct("InvoiceNo"), 2).alias("AvgBasketValue")
    ) \
    .withColumn("created_at", current_timestamp())

growth_window = Window.partitionBy(lit(1)).orderBy("Year", "Month", "Quarter", "Hour")
sales_timeseries = sales_timeseries.withColumn(
    "RevenueGrowth", 
    col("TotalRevenue") - lag("TotalRevenue", 1).over(growth_window)
)

# Save time series analytics with partitioning for better query performance
sales_timeseries.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("Year") \
    .saveAsTable("gold_sales_timeseries")

print(" Sales Time Series saved to: gold_sales_timeseries")

# Show monthly trends
print("Monthly Sales Trends:")
monthly_summary = sales_timeseries.groupBy("Year", "Month") \
    .agg(
        sum("TotalRevenue").alias("MonthlyRevenue"),
        sum("TotalOrders").alias("MonthlyOrders")
    ).orderBy("Year", "Month")

monthly_summary.show(12)

=== GOLD LAYER: SALES TIME SERIES ===
 Sales Time Series saved to: gold_sales_timeseries
Monthly Sales Trends:
+----+-----+------------------+-------------+
|Year|Month|    MonthlyRevenue|MonthlyOrders|
+----+-----+------------------+-------------+
|2010|   12| 567142.5900000004|         1400|
|2011|    1| 566102.3400000001|          987|
|2011|    2|444664.31000000006|          997|
|2011|    3|         592155.25|         1321|
|2011|    4|463513.69999999984|         1150|
|2011|    5| 675938.6000000001|         1555|
|2011|    6|         656922.54|         1393|
|2011|    7| 597697.6299999999|         1331|
|2011|    8|          642425.9|         1280|
|2011|    9|         947854.95|         1755|
|2011|   10|1028184.5200000001|         1929|
|2011|   11|1147053.6399999997|         2657|
+----+-----+------------------+-------------+
only showing top 12 rows


In [0]:
# ==========================================
# GOLD LAYER - GEOGRAPHIC PERFORMANCE
# ==========================================

print("=== GOLD LAYER: GEOGRAPHIC PERFORMANCE ===")

# Analyze performance by country and time period
geographic_performance = silver_df.groupBy("Country", "Year", "Month") \
    .agg(
        # Financial metrics
        round(sum("TotalPrice"), 2).alias("TotalRevenue"),
        sum("Quantity").alias("TotalQuantity"),
        
        # Customer metrics
        countDistinct("CustomerID").alias("UniqueCustomers"),
        countDistinct("InvoiceNo").alias("TotalOrders"),
        countDistinct("StockCode").alias("UniqueProducts"),
        round(avg("TotalPrice"), 2).alias("AvgOrderValue")
    )

# Add country rankings within each time period
geographic_performance = geographic_performance \
    .withColumn("RevenueRank", 
               dense_rank().over(Window.partitionBy("Year", "Month").orderBy(col("TotalRevenue").desc()))) \
    .withColumn("CustomerRank", 
               dense_rank().over(Window.partitionBy("Year", "Month").orderBy(col("UniqueCustomers").desc()))) \
    .withColumn("created_at", current_timestamp())

# Save geographic performance with partitioning
geographic_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("Year") \
    .saveAsTable("gold_geographic_performance")

print(" Geographic Performance saved to: gold_geographic_performance")

# Show top countries by revenue
print("Top Countries by Total Revenue:")
country_totals = geographic_performance.groupBy("Country") \
    .agg(
        round(sum("TotalRevenue"), 2).alias("TotalRevenue"),
        sum("UniqueCustomers").alias("TotalCustomers")
    ).orderBy(col("TotalRevenue").desc())

country_totals.show(10)

=== GOLD LAYER: GEOGRAPHIC PERFORMANCE ===
 Geographic Performance saved to: gold_geographic_performance
Top Countries by Total Revenue:
+--------------+------------+--------------+
|       Country|TotalRevenue|TotalCustomers|
+--------------+------------+--------------+
|United Kingdom|  7250877.27|         11790|
|   Netherlands|   285446.34|            33|
|          EIRE|   265245.96|            29|
|       Germany|   227618.61|           334|
|        France|    208820.1|           299|
|     Australia|   138420.61|            38|
|         Spain|     61452.2|            77|
|   Switzerland|    56443.95|            42|
|       Belgium|    41196.34|            86|
|        Sweden|    38367.83|            30|
+--------------+------------+--------------+
only showing top 10 rows


In [0]:
# ==========================================
# GOLD LAYER - DAILY BUSINESS METRICS
# ==========================================

print("=== GOLD LAYER: DAILY BUSINESS METRICS ===")

# Create daily aggregated metrics for operational dashboards
daily_metrics = silver_df.groupBy(to_date("InvoiceDate").alias("Date")) \
    .agg(
        # Daily performance metrics
        round(sum("TotalPrice"), 2).alias("DailyRevenue"),
        sum("Quantity").alias("DailyQuantity"),
        countDistinct("InvoiceNo").alias("DailyOrders"),
        countDistinct("CustomerID").alias("DailyCustomers"),
        countDistinct("StockCode").alias("DailyProducts"),
        
        # Daily averages
        round(avg("TotalPrice"), 2).alias("AvgOrderValue"),
        round(sum("TotalPrice") / countDistinct("CustomerID"), 2).alias("RevenuePerCustomer")
    )

# Add time dimensions for analysis
daily_metrics = daily_metrics \
    .withColumn("Year", year("Date")) \
    .withColumn("Month", month("Date")) \
    .withColumn("DayOfWeek", dayofweek("Date")) \
    .withColumn("IsWeekend", when(col("DayOfWeek").isin([1, 7]), 1).otherwise(0))

# Calculate moving averages for trend analysis
daily_window_7 = Window.partitionBy(lit(1)).orderBy("Date").rowsBetween(-6, 0)
daily_window_30 = Window.partitionBy(lit(1)).orderBy("Date").rowsBetween(-29, 0)

daily_metrics = daily_metrics \
    .withColumn("MovingAvg7Days", round(avg("DailyRevenue").over(daily_window_7), 2)) \
    .withColumn("MovingAvg30Days", round(avg("DailyRevenue").over(daily_window_30), 2)) \
    .withColumn("created_at", current_timestamp())

# Save daily metrics with partitioning for efficient queries
daily_metrics.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("Year", "Month") \
    .saveAsTable("gold_daily_metrics")

print(" Daily Metrics saved to: gold_daily_metrics")
print(f"Daily records created: {daily_metrics.count():,}")

# Show recent daily performance
print("Recent Daily Performance (Last 10 Days):")
daily_metrics.select(
    "Date", "DailyRevenue", "DailyOrders", "DailyCustomers", "MovingAvg7Days"
).orderBy(col("Date").desc()).show(10)

=== GOLD LAYER: DAILY BUSINESS METRICS ===
 Daily Metrics saved to: gold_daily_metrics
Daily records created: 305
Recent Daily Performance (Last 10 Days):
+----------+------------+-----------+--------------+--------------+
|      Date|DailyRevenue|DailyOrders|DailyCustomers|MovingAvg7Days|
+----------+------------+-----------+--------------+--------------+
|2011-12-09|   184287.47|         41|            35|      67301.59|
|2011-12-08|    50140.12|        113|           105|      47260.44|
|2011-12-07|    68946.77|        104|            94|      46000.03|
|2011-12-06|    45694.26|        110|           103|      43076.88|
|2011-12-05|     57811.1|        116|           105|      43840.96|
|2011-12-04|    19901.67|         62|            58|      37957.23|
|2011-12-02|    44329.71|        114|            95|      38866.63|
|2011-12-01|    43999.43|        118|           111|      37978.94|
|2011-11-30|    41317.24|         99|            92|      41912.79|
|2011-11-29|    48484.72|    

In [0]:
# ==========================================
# BUSINESS INSIGHTS SUMMARY
# ==========================================

print("=== COMPREHENSIVE BUSINESS INSIGHTS ===")

# Executive summary metrics
exec_summary = spark.sql("""
    SELECT 
        ROUND(SUM(Monetary), 2) as TotalRevenue,
        COUNT(*) as TotalCustomers,
        ROUND(AVG(Monetary), 2) as AvgCustomerValue,
        ROUND(AVG(Frequency), 1) as AvgPurchaseFrequency,
        ROUND(AVG(Recency), 1) as AvgDaysSinceLastPurchase
    FROM gold_customer_analytics
""")

print("Executive Summary:")
exec_summary.show()

# Customer segmentation business insights
segment_insights = spark.sql("""
    SELECT 
        CustomerSegment,
        COUNT(*) as CustomerCount,
        ROUND(AVG(Monetary), 2) as AvgSpending,
        ROUND(AVG(Frequency), 1) as AvgFrequency,
        ROUND(AVG(Recency), 1) as AvgRecency,
        ROUND(SUM(Monetary), 2) as TotalRevenue,
        ROUND(SUM(Monetary) * 100.0 / (SELECT SUM(Monetary) FROM gold_customer_analytics), 2) as RevenueShare
    FROM gold_customer_analytics
    GROUP BY CustomerSegment
    ORDER BY TotalRevenue DESC
""")

print("Customer Segmentation Analysis:")
segment_insights.show()

=== COMPREHENSIVE BUSINESS INSIGHTS ===
Executive Summary:
+------------+--------------+----------------+--------------------+------------------------+
|TotalRevenue|TotalCustomers|AvgCustomerValue|AvgPurchaseFrequency|AvgDaysSinceLastPurchase|
+------------+--------------+----------------+--------------------+------------------------+
|   8844766.5|          4346|         2035.15|                 4.3|                    92.3|
+------------+--------------+----------------+--------------------+------------------------+

Customer Segmentation Analysis:
+-------------------+-------------+-----------+------------+----------+------------+------------+
|    CustomerSegment|CustomerCount|AvgSpending|AvgFrequency|AvgRecency|TotalRevenue|RevenueShare|
+-------------------+-------------+-----------+------------+----------+------------+------------+
|      New_Customers|         1091|    5726.64|        10.7|      21.1|  6247758.81|       70.64|
|             Others|          919|    1318.63|    

In [0]:
print("=== DATA PIPELINE COMPLETED SUCCESSFULLY ===")
print("Gold Tables Created:")
print("   • gold_customer_analytics - RFM analysis and customer segmentation")
print("   • gold_product_performance - Product sales and popularity metrics") 
print("   • gold_sales_timeseries - Time-based sales analysis")
print("   • gold_geographic_performance - Country-wise performance")
print("   • gold_daily_metrics - Daily operational KPIs")

=== DATA PIPELINE COMPLETED SUCCESSFULLY ===
Gold Tables Created:
   • gold_customer_analytics - RFM analysis and customer segmentation
   • gold_product_performance - Product sales and popularity metrics
   • gold_sales_timeseries - Time-based sales analysis
   • gold_geographic_performance - Country-wise performance
   • gold_daily_metrics - Daily operational KPIs


<h2 style="text-align: center;">Visualizations</h2>

In [0]:
# Customer segment distribution
customer_segments_viz = spark.sql("""
    SELECT 
        CustomerSegment, 
        COUNT(*) as CustomerCount,
        ROUND(SUM(Monetary), 2) as TotalRevenue
    FROM gold_customer_analytics
    GROUP BY CustomerSegment
    ORDER BY CustomerCount DESC
""").toPandas()

print("Customer Segment Distribution:")
display(customer_segments_viz)

Customer Segment Distribution:


CustomerSegment,CustomerCount,TotalRevenue
New_Customers,1091,6247758.81
Others,919,1211822.19
At_Risk,630,146386.76
Loyal_Customers,444,137798.78
Promising,412,128315.75
Need_Attention,387,317767.97
Cannot_Lose,238,379761.93
Champions,144,25603.32
Potential_Loyalists,81,249550.99


Databricks visualization. Run in Databricks to view.

In [0]:
# Monthly revenue and order trends
monthly_trends_viz = spark.sql("""
    SELECT 
        CONCAT(Year, '-', LPAD(Month, 2, '0')) as YearMonth,
        SUM(TotalRevenue) as MonthlyRevenue,
        SUM(TotalOrders) as MonthlyOrders
    FROM gold_sales_timeseries
    GROUP BY Year, Month
    ORDER BY Year, Month
""").toPandas()

print("Monthly Revenue Trends:")
display(monthly_trends_viz)

Monthly Revenue Trends:


YearMonth,MonthlyRevenue,MonthlyOrders
2010-12,567142.5900000001,1400
2011-01,566102.3399999999,987
2011-02,444664.31,997
2011-03,592155.2499999999,1321
2011-04,463513.7,1150
2011-05,675938.6000000002,1555
2011-06,656922.5400000003,1393
2011-07,597697.63,1331
2011-08,642425.9,1280
2011-09,947854.95,1755


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Top/Bottom performing RFM combinations
print("Top 10 RFM Combinations by Customer Count")
top_rfm_combos = spark.sql("""
    SELECT 
        CONCAT('R:', R_Score, ' F:', F_Score, ' M:', M_Score) as RFM_Combination,
        R_Score, F_Score, M_Score,
        CustomerCount,
        AvgSpending,
        ROUND(CustomerCount * AvgSpending, 2) as Revenue_Potential
    FROM (
        SELECT 
            R_Score, F_Score, M_Score,
            COUNT(*) as CustomerCount,
            ROUND(AVG(Monetary), 2) as AvgSpending
        FROM gold_customer_analytics
        GROUP BY R_Score, F_Score, M_Score
    ) rfm_data
    ORDER BY CustomerCount DESC
    LIMIT 10
""")

display(top_rfm_combos)

Top 10 RFM Combinations by Customer Count


RFM_Combination,R_Score,F_Score,M_Score,CustomerCount,AvgSpending,Revenue_Potential
R:5 F:1 M:1,5,1,1,346,11224.67,3883735.82
R:1 F:5 M:5,1,5,5,332,144.2,47874.4
R:1 F:5 M:4,1,5,4,209,344.59,72019.31
R:4 F:1 M:1,4,1,1,177,6031.77,1067623.29
R:4 F:2 M:2,4,2,2,137,1368.19,187442.03
R:3 F:4 M:5,3,4,5,132,156.39,20643.48
R:2 F:5 M:5,2,5,5,114,142.9,16290.6
R:5 F:2 M:2,5,2,2,106,1372.97,145534.82
R:3 F:4 M:4,3,4,4,104,360.48,37489.92
R:3 F:3 M:3,3,3,3,102,689.11,70289.22


Databricks visualization. Run in Databricks to view.

In [0]:
# Customer Count vs Average Spending (by Recency Score)

scatter_by_recency = spark.sql("""
    SELECT 
        R_Score,
        F_Score,
        M_Score,
        COUNT(*) as CustomerCount,
        ROUND(AVG(Monetary), 2) as AvgSpending,
        CONCAT('R:', R_Score, ' F:', F_Score, ' M:', M_Score) as RFM_Label
    FROM gold_customer_analytics
    GROUP BY R_Score, F_Score, M_Score
    ORDER BY R_Score, F_Score, M_Score
""")
print("Customer Count vs Average Spending by R_Score")
display(scatter_by_recency)

Customer Count vs Average Spending by R_Score


R_Score,F_Score,M_Score,CustomerCount,AvgSpending,RFM_Label
1,1,1,5,4421.33,R:1 F:1 M:1
1,1,2,2,1303.67,R:1 F:1 M:2
1,1,3,3,677.93,R:1 F:1 M:3
1,1,4,1,319.5,R:1 F:1 M:4
1,2,1,1,3619.22,R:1 F:2 M:1
1,2,2,10,1363.53,R:1 F:2 M:2
1,2,3,11,587.68,R:1 F:2 M:3
1,2,4,2,332.08,R:1 F:2 M:4
1,3,1,6,16117.95,R:1 F:3 M:1
1,3,2,12,1216.1,R:1 F:3 M:2


Databricks visualization. Run in Databricks to view.

In [0]:
# Top products by revenue
top_products_viz = spark.sql("""
    SELECT 
        CASE 
            WHEN LENGTH(Description) > 30 THEN CONCAT(SUBSTRING(Description, 1, 30), '...')
            ELSE Description 
        END as ProductName,
        TotalRevenue,
        TotalQuantitySold,
        UniqueCustomers
    FROM gold_product_performance
    ORDER BY TotalRevenue DESC
    LIMIT 15
""").toPandas()
print("Top 15 Products by Revenue")
display(top_products_viz)

Top 15 Products by Revenue


ProductName,TotalRevenue,TotalQuantitySold,UniqueCustomers
"PAPER CRAFT , LITTLE BIRDIE",168469.6,80995,1
REGENCY CAKESTAND 3 TIER,141946.0,12349,881
WHITE HANGING HEART T-LIGHT HO...,100046.95,36589,856
JUMBO BAG RED RETROSPOT,84962.28,46040,635
MEDIUM CERAMIC TOP STORAGE JAR,81405.48,77907,138
PARTY BUNTING,68614.98,15244,708
ASSORTED COLOUR BIRD ORNAMENT,56191.64,35132,678
RABBIT NIGHT LIGHT,51070.28,27066,450
CHILLI LIGHTS,46235.41,9640,205
PAPER CHAIN KIT 50'S CHRISTMAS...,42268.48,15484,613


Databricks visualization. Run in Databricks to view.