# Gold Layer Analytics & Reporting

This notebook performs post-Gold exploration and reporting.

Purpose:
- validate dimensional model
- compute business KPIs
- perform exploratory analysis
- create reporting views for BI tools

Inputs:
- gold.dim_customers
- gold.dim_products
- gold.fact_sales

Outputs:
- gold.report_customers
- gold.report_products

Note:
This is analysis + semantic modeling, not data transformation.


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


## Step 1. Load Gold tables

In [0]:
spark.sql("USE CATALOG datawarehouse")

fact_df = spark.table("gold.fact_sales")
cust_df = spark.table("gold.dim_customers")
prod_df = spark.table("gold.dim_products")

fact_df.printSchema()

root
 |-- order_number: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- customer_key: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- sales_amount: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



## Step 2. Database exploration
Basic metadata checks

In [0]:
spark.sql("""
SELECT *
FROM information_schema.tables
WHERE table_schema = 'gold'
""").display()

table_catalog,table_schema,table_name,table_type,is_insertable_into,commit_action,table_owner,comment,created,created_by,last_altered,last_altered_by,data_source_format,storage_sub_directory,storage_path
datawarehouse,gold,report_customers,MANAGED,YES,PRESERVE,eshan.sharma108@gmail.com,,2026-02-08T16:03:16.369Z,eshan.sharma108@gmail.com,2026-02-08T16:13:56.178Z,eshan.sharma108@gmail.com,DELTA,tables/2e20e446-5149-443d-a5e6-2c8f0e948b27,
datawarehouse,gold,fact_sales,MANAGED,YES,PRESERVE,eshan.sharma108@gmail.com,,2026-02-08T08:37:45.792Z,eshan.sharma108@gmail.com,2026-02-08T08:37:46.494Z,eshan.sharma108@gmail.com,DELTA,tables/53cf3f26-d7e2-462b-8435-c699b9f51e9e,
datawarehouse,gold,dim_customers,MANAGED,YES,PRESERVE,eshan.sharma108@gmail.com,,2026-02-08T08:37:04.357Z,eshan.sharma108@gmail.com,2026-02-08T08:37:05.225Z,eshan.sharma108@gmail.com,DELTA,tables/d8b24fa7-d2d5-4edb-8fee-bfd0eb69318b,
datawarehouse,gold,report_products,MANAGED,YES,PRESERVE,eshan.sharma108@gmail.com,,2026-02-08T16:16:03.190Z,eshan.sharma108@gmail.com,2026-02-08T16:16:03.912Z,eshan.sharma108@gmail.com,DELTA,tables/da988393-71f9-4378-8fc2-3a73a5bb5a80,
datawarehouse,gold,dim_products,MANAGED,YES,PRESERVE,eshan.sharma108@gmail.com,,2026-02-08T08:37:24.998Z,eshan.sharma108@gmail.com,2026-02-08T08:37:25.873Z,eshan.sharma108@gmail.com,DELTA,tables/fa0539dd-bd19-4a88-852a-e93ba63a3e34,


In [0]:
spark.sql("""
SELECT * 
FROM information_schema.columns
WHERE table_schema = 'gold'
""").display()

table_catalog,table_schema,table_name,column_name,ordinal_position,column_default,is_nullable,full_data_type,data_type,character_maximum_length,character_octet_length,numeric_precision,numeric_precision_radix,numeric_scale,datetime_precision,interval_type,interval_precision,maximum_cardinality,is_identity,identity_generation,identity_start,identity_increment,identity_maximum,identity_minimum,identity_cycle,is_generated,generation_expression,is_system_time_period_start,is_system_time_period_end,system_time_period_timestamp_generation,is_updatable,partition_index,comment
datawarehouse,gold,dim_customers,customer_key,0,,YES,int,INT,,,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,customer_id,1,,YES,int,INT,,,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,customer_number,2,,YES,varchar(50),STRING,0.0,0.0,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,first_name,3,,YES,string,STRING,0.0,0.0,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,last_name,4,,YES,string,STRING,0.0,0.0,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,gender,5,,YES,string,STRING,0.0,0.0,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,birth_date,6,,YES,date,DATE,,,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,country,7,,YES,varchar(50),STRING,0.0,0.0,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,marital_status,8,,YES,string,STRING,0.0,0.0,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,
datawarehouse,gold,dim_customers,create_date,9,,YES,date,DATE,,,,,,,,,,NO,,,,,,,NO,,NO,NO,,YES,,


## Step 3. Dimension exploration
Understand categorical distributions

In [0]:
cust_df.select("country").distinct().display()
prod_df.select("category").distinct().display()
prod_df.select("sub_category").distinct().display()
prod_df.select('product_name').distinct().display()
prod_df.select("category",'sub_category',"product_name").distinct().orderBy("category","sub_category","product_name").display()

country
Australia
United States
Canada
Germany
United Kingdom
France
""


category
Components
Bikes
Clothing
Accessories
""


sub_category
Road Frames
Mountain Bikes
Road Bikes
Mountain Frames
Socks
Forks
Wheels
Gloves
Headsets
Locks


product_name
HL Road Frame - Black- 58
HL Road Frame - Red- 58
Mountain-100 Black- 38
Mountain-100 Black- 42
Mountain-100 Black- 44
Mountain-100 Black- 48
Mountain-100 Silver- 38
Mountain-100 Silver- 42
Mountain-100 Silver- 44
Mountain-100 Silver- 48


category,sub_category,product_name
,,HL Mountain Pedal
,,HL Road Pedal
,,LL Mountain Pedal
,,LL Road Pedal
,,ML Mountain Pedal
,,ML Road Pedal
,,Touring Pedal
Accessories,Bike Racks,Hitch Rack - 4-Bike
Accessories,Bike Stands,All-Purpose Bike Stand
Accessories,Bottles and Cages,Mountain Bottle Cage


## Step 4. Date exploration
Check data coverage

In [0]:
fact_df.select("order_date").display()

order_date
2010-12-29
2010-12-29
2010-12-29
2010-12-29
2010-12-29
2010-12-30
2010-12-30
2010-12-30
2010-12-30
2010-12-31


Check first and last order date

In [0]:
fact_df.agg(
    F.min("order_date").alias("first_order"),
    F.max("order_date").alias("last_order")
).display()

first_order,last_order
2010-12-29,2014-01-28


How many years of sales are available?

In [0]:

fact_df.agg(
    F.min("order_date").alias("first_order"),
    F.max("order_date").alias("last_order"),
    F.floor(
        F.months_between(F.max("order_date"), F.min("order_date")) / 12
    ).alias("years_of_sales"),
    F.floor(
        F.months_between(F.max("order_date"), F.min("order_date"))
    ).alias("months_of_sales")
).display()

first_order,last_order,years_of_sales,months_of_sales
2010-12-29,2014-01-28,3,36


Finding the youngest and oldest customer birth dates

In [0]:
cust_df.agg(
    F.min("birth_date").alias("oldest_birthdate"),
    F.max("birth_date").alias("youngest_birthdate")
).display()

oldest_birthdate,youngest_birthdate
1926-03-15,1986-06-25


Finding the ages of youngest and oldest customer

In [0]:
cust_df.agg(
    F.floor(F.months_between(F.current_date(), F.min("birth_date")) / 12).alias("oldest_age"),
    F.floor(F.months_between(F.current_date(), F.max("birth_date")) / 12).alias("youngest_age")
).display()


oldest_age,youngest_age
99,39


## Step 5. Core metrics (Measures Exploration)
High-level KPIs

In [0]:

# Fact metrics
fact_metrics = fact_df.agg(
    F.sum("sales_amount").alias("total_sales"),
    F.sum("quantity").alias("total_quantity"),
    F.avg("price").alias("avg_price"),
    F.count("order_number").alias("total_orders"),                 # includes duplicates
    F.countDistinct("order_number").alias("total_orders_unique"),  # unique orders
    F.countDistinct("customer_key").alias("active_customers")      # customers who ordered
)

# Dimension metrics
product_metrics = prod_df.agg(
    F.count("product_key").alias("total_products")
)

customer_metrics = cust_df.agg(
    F.count("customer_key").alias("total_customers"),
    F.countDistinct("customer_key").alias("active_customers")
)

# Combine everything into one row
metrics_df = (
    fact_metrics
    .crossJoin(product_metrics)
    .crossJoin(customer_metrics)
)

metrics_df.display()

total_sales,total_quantity,avg_price,total_orders,total_orders_unique,active_customers,total_products,total_customers,active_customers.1
29356250.0,60423,486.0377827080367,60398,27659,18484,295,18484,18484


## Step 6. Magnitude Analysis

In [0]:

# Customers by country

customers_by_country_df = (
    cust_df
    .groupBy("country")
    .agg(F.count("customer_key").alias("total_customers"))
    .orderBy(F.desc("total_customers"))
)
customers_by_country_df.display()

country,total_customers
United States,7482
Australia,3591
United Kingdom,1913
France,1810
Germany,1780
Canada,1571
,337


In [0]:
# Customers by gender
customers_by_gender_df = (
    cust_df
    .groupBy("gender")
    .agg(F.count("customer_key").alias("total_customers"))
    .orderBy(F.desc("total_customers"))
)
customers_by_gender_df.display()

gender,total_customers
Male,9341
Female,9128
,15


In [0]:
# Products by category
products_by_category_df = (
    prod_df
    .groupBy("category")
    .agg(F.count("product_key").alias("total_products"))
    .orderBy(F.desc("total_products"))
)
products_by_category_df.display()

category,total_products
Components,127
Bikes,97
Clothing,35
Accessories,29
,7


In [0]:
# Average cost per category
avg_cost_by_category_df = (
    prod_df
    .groupBy("category")
    .agg(F.avg("product_cost").alias("avg_costs"))
    .orderBy(F.desc("avg_costs"))
)
avg_cost_by_category_df.display()

category,avg_costs
Bikes,949.4432989690722
Components,264.7165354330709
,28.571428571428573
Clothing,24.8
Accessories,13.172413793103448


## Step 7. Revenue

In [0]:
#Revenue by Category
category_sales = (
    fact_df.join(prod_df, "product_key", "left")
    .groupBy("category")
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy(F.desc("total_sales"))
)

category_sales.display()

category,total_sales
Bikes,28316272.0
Accessories,700262.0
Clothing,339716.0


In [0]:
# Revenue by customer
revenue_by_customer_df = (
    fact_df
    .join(cust_df, "customer_key", "left")
    .groupBy("customer_key")
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy(F.desc("total_sales"))
)
revenue_by_customer_df.display()

customer_key,total_sales
1302,13294.0
1133,13294.0
1309,13268.0
1132,13265.0
1301,13242.0
1322,13215.0
1125,13195.0
1308,13172.0
1297,13164.0
434,12914.0


In [0]:
# Sales distribution by country
sales_by_country_df = (
    fact_df
    .join(cust_df, "customer_key", "left")
    .groupBy("country")
    .agg(F.sum("sales_amount").alias("total_sold_items"))
    .orderBy(F.desc("total_sold_items"))
)
sales_by_country_df.display()

country,total_sold_items
United States,9162327.0
Australia,9060172.0
United Kingdom,3391376.0
Germany,2894066.0
France,2643751.0
Canada,1977738.0
,226820.0


## Step 8. Ranking Analysis

In [0]:
fact_products_df = fact_df.join(prod_df, "product_key", "left")
fact_customers_df = fact_df.join(cust_df, "customer_key", "left")

In [0]:
#Top 5 products by revenue
top_products= (
    fact_products_df
    .groupBy("product_key")
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy(F.desc("total_sales"))
    .limit(5)
)


top_products.display()

product_key,total_sales
122,1373454.0
121,1363128.0
123,1339394.0
125,1301029.0
120,1294854.0


In [0]:
#Bottom 5 products by revenue
bottom_products_df = (
    fact_products_df
    .groupBy("product_key")
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy("total_sales")
    .limit(5)
)
bottom_products_df.display()

product_key,total_sales
279,2430.0
280,2682.0
259,6382.0
168,7272.0
291,7440.0


In [0]:
#Top 5 subcategories by revenue
top_subcategories_df = (
    fact_products_df
    .groupBy("sub_category")
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy(F.desc("total_sales"))
    .limit(5)
)
top_subcategories_df.display()

sub_category,total_sales
Road Bikes,14519438.0
Mountain Bikes,9952254.0
Touring Bikes,3844580.0
Tires and Tubes,244634.0
Helmets,225435.0


In [0]:
#Top 10 customers by revenue
top_customers_df = (
    fact_customers_df
    .groupBy("customer_key")
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy(F.desc("total_sales"))
    .limit(10)
)
top_customers_df.display()

customer_key,total_sales
1133,13294.0
1302,13294.0
1309,13268.0
1132,13265.0
1301,13242.0
1322,13215.0
1125,13195.0
1308,13172.0
1297,13164.0
434,12914.0


In [0]:
#Bottom 3 customers with fewest orders
bottom_customers_orders_df = (
    fact_customers_df
    .groupBy("customer_key")
    .agg(F.countDistinct("order_number").alias("total_orders"))
    .orderBy("total_orders")
    .limit(3)
)
bottom_customers_orders_df.display()

customer_key,total_orders
17390,1
16602,1
16613,1


## Step 9. Change Over Time Analysis

In [0]:
sales_df = (
    fact_df
    .filter(F.col("order_date").isNotNull())
)


In [0]:
#Yearly total sales
yearly_sales_df = (
    sales_df
    .groupBy(F.year("order_date").alias("order_year"))
    .agg(
        F.sum("sales_amount").alias("total_sales")
    )
    .orderBy("order_year")
)
yearly_sales_df.display()

order_year,total_sales
2010,43419.0
2011,7075088.0
2012,5842231.0
2013,16344878.0
2014,45642.0


In [0]:
#Yearly total sales + customers
yearly_sales_customers_df = (
    sales_df
    .groupBy(F.year("order_date").alias("order_year"))
    .agg(
        F.sum("sales_amount").alias("total_sales"),
        F.countDistinct("customer_key").alias("total_customers")
    )
    .orderBy("order_year")
)
yearly_sales_customers_df.display()

order_year,total_sales,total_customers
2010,43419.0,14
2011,7075088.0,2216
2012,5842231.0,3255
2013,16344878.0,17427
2014,45642.0,834


In [0]:
#Yearly sales + customers + quantity
yearly_full_df = (
    sales_df
    .groupBy(F.year("order_date").alias("order_year"))
    .agg(
        F.sum("sales_amount").alias("total_sales"),
        F.countDistinct("customer_key").alias("total_customers"),
        F.sum("quantity").alias("total_quantity")
    )
    .orderBy("order_year")
)
yearly_full_df.display()

order_year,total_sales,total_customers,total_quantity
2010,43419.0,14,14
2011,7075088.0,2216,2216
2012,5842231.0,3255,3397
2013,16344878.0,17427,52807
2014,45642.0,834,1970


In [0]:
#Monthly seasonality
monthly_trends_df = (
    sales_df
    .groupBy(F.month("order_date").alias("order_month"))
    .agg(
        F.sum("sales_amount").alias("total_sales"),
        F.countDistinct("customer_key").alias("total_customers"),
        F.sum("quantity").alias("total_quantity")
    )
    .orderBy("order_month")
)
monthly_trends_df.display()

order_month,total_sales,total_customers,total_quantity
1,1868558.0,1818,4043
2,1744517.0,1765,3858
3,1908375.0,1982,4449
4,1948226.0,1916,4355
5,2204969.0,2074,4781
6,2935883.0,2430,5573
7,2412838.0,2154,5107
8,2684313.0,2312,5335
9,2536520.0,2210,5070
10,2916550.0,2533,5838


In [0]:
# Year + month breakdown
year_month_df = (
    sales_df
    .groupBy(
        F.year("order_date").alias("order_year"),
        F.month("order_date").alias("order_month")
    )
    .agg(
        F.sum("sales_amount").alias("total_sales"),
        F.countDistinct("customer_key").alias("total_customers"),
        F.sum("quantity").alias("total_quantity")
    )
    .orderBy("order_year", "order_month")
)
year_month_df.display()

order_year,order_month,total_sales,total_customers,total_quantity
2010,12,43419.0,14,14
2011,1,469795.0,144,144
2011,2,466307.0,144,144
2011,3,485165.0,150,150
2011,4,502042.0,157,157
2011,5,561647.0,174,174
2011,6,737793.0,230,230
2011,7,596710.0,188,188
2011,8,614516.0,193,193
2011,9,603047.0,185,185


## Step 10. Cumulative Analysis

In [0]:
#Total sales per month (month number only)
monthly_sales_df = (
    sales_df
    .groupBy(F.month("order_date").alias("monthly"))
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy("monthly")
)
monthly_sales_df.display()

monthly,total_sales
1,1868558.0
2,1744517.0
3,1908375.0
4,1948226.0
5,2204969.0
6,2935883.0
7,2412838.0
8,2684313.0
9,2536520.0
10,2916550.0


In [0]:
# Running total of sales over time (monthly number 1–12)
w = Window.orderBy("monthly").rowsBetween(Window.unboundedPreceding, Window.currentRow)

running_monthly_df = (
    monthly_sales_df
    .withColumn("running_total_sales", F.sum("total_sales").over(w))
)
running_monthly_df.display()



monthly,total_sales,running_total_sales
1,1868558.0,1868558.0
2,1744517.0,3613075.0
3,1908375.0,5521450.0
4,1948226.0,7469676.0
5,2204969.0,9674645.0
6,2935883.0,12610528.0
7,2412838.0,15023366.0
8,2684313.0,17707679.0
9,2536520.0,20244199.0
10,2916550.0,23160749.0


In [0]:
#Running total using real calendar months (DATE_TRUNC('month'))
#(this is usually what you actually want for time series, because Jan 2023 ≠ Jan 2024)
monthly_calendar_df = (
    sales_df
    .groupBy(F.date_trunc("month", "order_date").alias("monthly"))
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy("monthly")
)

w_cal = Window.orderBy("monthly").rowsBetween(Window.unboundedPreceding, Window.currentRow)

running_calendar_df = (
    monthly_calendar_df
    .withColumn("running_total_sales", F.sum("total_sales").over(w_cal))
)
running_calendar_df.display()



monthly,total_sales,running_total_sales
2010-12-01T00:00:00.000Z,43419.0,43419.0
2011-01-01T00:00:00.000Z,469795.0,513214.0
2011-02-01T00:00:00.000Z,466307.0,979521.0
2011-03-01T00:00:00.000Z,485165.0,1464686.0
2011-04-01T00:00:00.000Z,502042.0,1966728.0
2011-05-01T00:00:00.000Z,561647.0,2528375.0
2011-06-01T00:00:00.000Z,737793.0,3266168.0
2011-07-01T00:00:00.000Z,596710.0,3862878.0
2011-08-01T00:00:00.000Z,614516.0,4477394.0
2011-09-01T00:00:00.000Z,603047.0,5080441.0


In [0]:
#Running total of yearly sales
yearly_df = (
    sales_df
    .groupBy(F.date_trunc("year", "order_date").alias("yearly"))
    .agg(F.sum("sales_amount").alias("total_sales"))
    .orderBy("yearly")
)

w_year = Window.orderBy("yearly").rowsBetween(Window.unboundedPreceding, Window.currentRow)

running_yearly_df = (
    yearly_df
    .withColumn("running_total_sales", F.sum("total_sales").over(w_year))
)
running_yearly_df.display()



yearly,total_sales,running_total_sales
2010-01-01T00:00:00.000Z,43419.0,43419.0
2011-01-01T00:00:00.000Z,7075088.0,7118507.0
2012-01-01T00:00:00.000Z,5842231.0,12960738.0
2013-01-01T00:00:00.000Z,16344878.0,29305616.0
2014-01-01T00:00:00.000Z,45642.0,29351258.0


In [0]:
#Moving average of yearly sales
yearly_avg_df = (
    sales_df
    .groupBy(F.date_trunc("year", "order_date").alias("yearly"))
    .agg(
        F.sum("sales_amount").alias("total_sales"),
        F.avg("price").alias("avg_price")
    )
    .orderBy("yearly")
)

w_year = Window.orderBy("yearly").rowsBetween(Window.unboundedPreceding, Window.currentRow)

yearly_moving_df = (
    yearly_avg_df
    .withColumn("running_total_sales", F.sum("total_sales").over(w_year))
    .withColumn("moving_average", F.avg("avg_price").over(w_year))
)

yearly_moving_df.display()



yearly,total_sales,avg_price,running_total_sales,moving_average
2010-01-01T00:00:00.000Z,43419.0,3101.3571428571427,43419.0,3101.3571428571427
2011-01-01T00:00:00.000Z,7075088.0,3192.7292418772563,7118507.0,3147.0431923671995
2012-01-01T00:00:00.000Z,5842231.0,1719.8207241683838,12960738.0,2671.302369634261
2013-01-01T00:00:00.000Z,16344878.0,309.65742109052326,29305616.0,2080.8911324983264
2014-01-01T00:00:00.000Z,45642.0,23.168527918781727,29351258.0,1669.3466115824176


## Step 11. Performance Analysis

In [0]:

yearly_product_sales_df = (
    fact_df
    .filter(F.col("order_date").isNotNull())
    .join(prod_df, "product_key", "left")
    .groupBy(
        F.year("order_date").alias("order_year"),
        "product_name"
    )
    .agg(F.sum("sales_amount").alias("current_sales"))
)

In [0]:
w_product = Window.partitionBy("product_name")
w_product_time = Window.partitionBy("product_name").orderBy("order_year")


In [0]:
performance_df = (
    yearly_product_sales_df
    .withColumn("avg_sales", F.avg("current_sales").over(w_product))
    .withColumn("diff_avg", F.col("current_sales") - F.col("avg_sales"))
    .withColumn(
        "sales_status",
        F.when(F.col("diff_avg") > 0, "Above Avg")
         .when(F.col("diff_avg") < 0, "Below Avg")
         .otherwise("Avg")
    )
    .withColumn(
        "previous_year_sale",
        F.lag("current_sales").over(w_product_time)
    )
    .withColumn(
        "diff_previous_year",
        F.col("current_sales") - F.col("previous_year_sale")
    )
    .withColumn(
        "sales_status_previous_year",
        F.when(F.col("diff_previous_year") > 0, "Increased")
         .when(F.col("diff_previous_year") < 0, "Decreased")
         .otherwise("Same")
    )
    .orderBy("product_name", "order_year")
)
performance_df.display()

order_year,product_name,current_sales,avg_sales,diff_avg,sales_status,previous_year_sale,diff_previous_year,sales_status_previous_year
2012,AWC Logo Cap,72.0,6570.0,-6498.0,Below Avg,,,Same
2013,AWC Logo Cap,18891.0,6570.0,12321.0,Above Avg,72.0,18819.0,Increased
2014,AWC Logo Cap,747.0,6570.0,-5823.0,Below Avg,18891.0,-18144.0,Decreased
2012,All-Purpose Bike Stand,159.0,13197.0,-13038.0,Below Avg,,,Same
2013,All-Purpose Bike Stand,37683.0,13197.0,24486.0,Above Avg,159.0,37524.0,Increased
2014,All-Purpose Bike Stand,1749.0,13197.0,-11448.0,Below Avg,37683.0,-35934.0,Decreased
2013,Bike Wash - Dissolver,6960.0,3636.0,3324.0,Above Avg,,,Same
2014,Bike Wash - Dissolver,312.0,3636.0,-3324.0,Below Avg,6960.0,-6648.0,Decreased
2013,Classic Vest- L,11968.0,6240.0,5728.0,Above Avg,,,Same
2014,Classic Vest- L,512.0,6240.0,-5728.0,Below Avg,11968.0,-11456.0,Decreased


## Step 12. Part-to-Whole (Proportion) Analysis

Identify which product categories contribute most to total sales.

In [0]:
#Total sales by category
category_sales_df = (
    fact_df
    .join(prod_df, "product_key", "left")
    .groupBy("category")
    .agg(F.sum("sales_amount").alias("total_sales"))
)

In [0]:
#Overall sales (total across all categories)
w_all = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

category_proportion_df = (
    category_sales_df
    .withColumn("overall_sales", F.sum("total_sales").over(w_all))
    .withColumn(
        "sales_percentage",
        F.round(
            (F.col("total_sales") / F.col("overall_sales")) * 100, 2
        )
    )
)
category_proportion_df.display()



category,total_sales,overall_sales,sales_percentage
Accessories,700262.0,29356250.0,2.39
Clothing,339716.0,29356250.0,1.16
Bikes,28316272.0,29356250.0,96.46


In [0]:
#Formatted and ordered version
final_category_share_df = (
    category_proportion_df
    .withColumn(
        "sales_percentage_label",
        F.concat(F.col("sales_percentage"), F.lit("%"))
    )
    .orderBy(F.desc("total_sales"))
)
final_category_share_df.display()



category,total_sales,overall_sales,sales_percentage,sales_percentage_label
Bikes,28316272.0,29356250.0,96.46,96.46%
Accessories,700262.0,29356250.0,2.39,2.39%
Clothing,339716.0,29356250.0,1.16,1.16%


## Step 13. Data Segmentation

Segment products and customers for deeper insights.

In [0]:
# Segment products into cost ranges
product_segments_df = (
    prod_df
    .withColumn(
        "cost_range",
        F.when(F.col("product_cost") < 100, "Below 100")
         .when((F.col("product_cost") >= 100) & (F.col("product_cost") <= 500), "100-500")
         .when((F.col("product_cost") > 500) & (F.col("product_cost") <= 1000), "500-1000")
         .otherwise("Above 1000")
    )
)
product_segments_df.display()

product_key,product_id,product_number,product_name,category_id,category,sub_category,maintenance,product_cost,product_line,start_date,cost_range
1,210,FR-R92B-58,HL Road Frame - Black- 58,CO_RF,Components,Road Frames,Yes,0,Road,2003-07-01,Below 100
2,211,FR-R92R-58,HL Road Frame - Red- 58,CO_RF,Components,Road Frames,Yes,0,Road,2003-07-01,Below 100
3,348,BK-M82B-38,Mountain-100 Black- 38,BI_MB,Bikes,Mountain Bikes,Yes,1898,Mountain,2011-07-01,Above 1000
4,349,BK-M82B-42,Mountain-100 Black- 42,BI_MB,Bikes,Mountain Bikes,Yes,1898,Mountain,2011-07-01,Above 1000
5,350,BK-M82B-44,Mountain-100 Black- 44,BI_MB,Bikes,Mountain Bikes,Yes,1898,Mountain,2011-07-01,Above 1000
6,351,BK-M82B-48,Mountain-100 Black- 48,BI_MB,Bikes,Mountain Bikes,Yes,1898,Mountain,2011-07-01,Above 1000
7,344,BK-M82S-38,Mountain-100 Silver- 38,BI_MB,Bikes,Mountain Bikes,Yes,1912,Mountain,2011-07-01,Above 1000
8,345,BK-M82S-42,Mountain-100 Silver- 42,BI_MB,Bikes,Mountain Bikes,Yes,1912,Mountain,2011-07-01,Above 1000
9,346,BK-M82S-44,Mountain-100 Silver- 44,BI_MB,Bikes,Mountain Bikes,Yes,1912,Mountain,2011-07-01,Above 1000
10,347,BK-M82S-48,Mountain-100 Silver- 48,BI_MB,Bikes,Mountain Bikes,Yes,1912,Mountain,2011-07-01,Above 1000


In [0]:
# Count how many products fall into each cost segment
product_segment_counts_df = (
    product_segments_df
    .groupBy("cost_range")
    .agg(F.count("product_key").alias("total_products"))
    .orderBy(F.desc("total_products"))
)
product_segment_counts_df.display()

cost_range,total_products
Below 100,110
100-500,101
500-1000,45
Above 1000,39


In [0]:
# Calculate customer lifespan and total spending
customer_spending_df = (
    fact_df
    .join(cust_df, "customer_key", "left")
    .groupBy("customer_key")
    .agg(
        F.sum("sales_amount").alias("total_spending"),
        F.min("order_date").alias("first_order"),
        F.max("order_date").alias("last_order")
    )
    .withColumn(
        "lifespan",
        F.floor(F.months_between("last_order", "first_order"))
    )
)
customer_spending_df.display()

customer_key,total_spending,first_order,last_order,lifespan
17149,3578.0,2011-03-19,2011-03-19,0.0
3139,4443.0,2011-06-05,2013-06-03,23.0
15277,3362.0,2012-01-27,2013-11-18,21.0
4366,3142.0,2012-05-21,2013-08-15,14.0
2737,4479.0,2012-06-09,2013-12-19,18.0
5156,4490.0,2012-12-05,2013-11-18,11.0
16557,2049.0,2012-12-26,2012-12-26,0.0
2936,57.0,2013-03-02,2013-03-02,0.0
4885,40.0,2013-05-07,2013-05-07,0.0
3683,138.0,2013-05-17,2013-12-06,6.0


In [0]:
# Segment customers based on spending and activity
#  Criteria:
#  - VIP: ≥12 months history and spending > $5000
#  - Regular: ≥12 months history and spending ≤ $5000
#  - New: <12 months history
# Assign customer segments based on lifespan and spending
customer_segments_df = (
    customer_spending_df
    .withColumn(
        "customer_segment",
        F.when((F.col("lifespan") >= 12) & (F.col("total_spending") > 5000), "VIP")
         .when((F.col("lifespan") >= 12) & (F.col("total_spending") <= 5000), "Regular")
         .otherwise("New")
    )
)
customer_segments_df.display()

customer_key,total_spending,first_order,last_order,lifespan,customer_segment
17149,3578.0,2011-03-19,2011-03-19,0.0,New
3139,4443.0,2011-06-05,2013-06-03,23.0,Regular
15277,3362.0,2012-01-27,2013-11-18,21.0,Regular
4366,3142.0,2012-05-21,2013-08-15,14.0,Regular
2737,4479.0,2012-06-09,2013-12-19,18.0,Regular
5156,4490.0,2012-12-05,2013-11-18,11.0,New
16557,2049.0,2012-12-26,2012-12-26,0.0,New
2936,57.0,2013-03-02,2013-03-02,0.0,New
4885,40.0,2013-05-07,2013-05-07,0.0,New
3683,138.0,2013-05-17,2013-12-06,6.0,New


In [0]:
# Count customers per segment
customer_segment_counts_df = (
    customer_segments_df
    .groupBy("customer_segment")
    .agg(F.count("customer_key").alias("total_customers"))
    .orderBy(F.desc("total_customers"))
)
customer_segment_counts_df.display()

customer_segment,total_customers
New,14828
Regular,2037
VIP,1619


## Step 14. Customer report view
Aggregate once → create reusable view

### Customer Report (gold.report_customers)

#### Purpose
Consolidated customer analytics table used for reporting, dashboards, and segmentation.

Provides one row per customer with behavioral, monetary, and lifecycle KPIs.

---

#### Grain
**1 row = 1 customer**

---

#### Data Sources
- gold.fact_sales
- gold.dim_customers

---

#### Metrics

##### Core KPIs
| Column | Description |
|-------|-------------|
| total_orders | Distinct orders placed |
| total_sales | Total revenue generated |
| total_quantity | Total items purchased |
| total_products | Distinct products purchased |
| last_order_date | Most recent purchase |
| recency | Months since last order |
| lifespan | Months between first and last order |

---

##### Customer Attributes
| Column | Description |
|--------|-------------|
| customer_age | Age derived from birth date |
| age_group | Age bucket for reporting |

Buckets:
- Under 20
- 20–29
- 30–39
- 40–49
- 50+

---

##### Segmentation
| Segment | Rule |
|---------|------|
| VIP | lifespan ≥ 12 months AND sales > 5000 |
| Regular | lifespan ≥ 12 months AND sales ≤ 5000 |
| New | lifespan < 12 months |

---

##### Derived Ratios
| Column | Formula |
|----------|-----------|
| avg_order_value | total_sales / total_orders |
| avg_monthly_spend | total_sales / lifespan |

---

#### Typical Uses
- Customer lifetime value analysis
- Segmentation dashboards
- RFM-style scoring
- Marketing targeting
- Retention monitoring

---

#### Notes
- Age and recency calculated dynamically using current date
- months_between used for month calculations (Spark equivalent of DATEDIFF(MONTH))
- Can be saved as a physical table for BI tools


In [0]:
today = F.current_date()

customer_report = (
    fact_df.join(cust_df, "customer_key")
    .groupBy(
        "customer_key",
        "customer_number",
        "first_name",
        "last_name",
        "birth_date"
    )
    .agg(
        F.countDistinct("order_number").alias("total_orders"),
        F.sum("sales_amount").alias("total_sales"),
        F.sum("quantity").alias("total_quantity"),
        F.countDistinct("product_key").alias("total_products"),
        F.max("order_date").alias("last_order_date"),
        F.floor(F.months_between(F.max("order_date"), F.min("order_date"))).alias("lifespan")
    )
    .withColumn("customer_name", F.concat_ws(" ", "first_name", "last_name"))
    .withColumn("customer_age", F.floor(F.months_between(today, "birth_date") / 12))
    .withColumn("recency", F.floor(F.months_between(today, "last_order_date")))

    .withColumn(
        "customer_segment",
        F.when((F.col("lifespan") >= 12) & (F.col("total_sales") > 5000), "VIP")
         .when((F.col("lifespan") >= 12), "Regular")
         .otherwise("New")
    )

    .withColumn(
        "avg_order_value",
        F.col("total_sales") / F.col("total_orders")
    )

    .withColumn(
        "avg_monthly_spend",
        F.when(F.col("lifespan") == 0, F.col("total_sales"))
         .otherwise(F.col("total_sales") / F.col("lifespan"))
    )
)
(
    customer_report
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("gold.report_customers")
)

## Step 15. Product report view

In [0]:
product_report = (
    fact_df
    .filter(F.col("order_date").isNotNull())
    .join(prod_df, "product_key", "left")
    .groupBy(
        "product_key",
        "product_name",
        "category",
        "sub_category",
        "product_cost"
    )
    .agg(
        F.countDistinct("order_number").alias("total_orders"),
        F.countDistinct("customer_key").alias("total_customers"),
        F.sum("sales_amount").alias("total_sales"),
        F.sum("quantity").alias("total_quantity"),
        F.max("order_date").alias("last_sale_date"),

        F.floor(
            F.months_between(F.max("order_date"), F.min("order_date"))
        ).alias("lifespan"),

        F.round(
            F.avg(F.col("sales_amount") / F.when(F.col("quantity") == 0, None)
                                           .otherwise(F.col("quantity"))),
            1
        ).alias("avg_selling_price")
    )
    .withColumn(
        "recency_in_months",
        F.floor(F.months_between(today, "last_sale_date"))
    )
    .withColumn(
        "product_segment",
        F.when(F.col("total_sales") > 50000, "High-Performer")
         .when(F.col("total_sales") >= 10000, "Mid-Range")
         .otherwise("Low-Performer")
    )

    .withColumn(
        "avg_order_revenue",
        F.col("total_sales") / F.col("total_orders")
    )
    .withColumn(
        "avg_monthly_revenue",
        F.when(F.col("lifespan") == 0, F.col("total_sales"))
         .otherwise(F.col("total_sales") / F.col("lifespan"))
    )
)

(
    product_report
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("gold.report_products")
)