## E-Commerce Data Curated Analytics - Dimensional Modeling & Aggregation

This notebook represents the final stage in the E-commerce ETL pipeline. It:
1. Creates a dimensional model (star schema) from processed data
2. Builds business aggregations for analytics
3. Performs data quality checks
4. Writes curated data to the data lake for consumption

### Configuration and Environment Setup

The following cell initializes the Spark environment and defines storage paths for the ETL process.

In [1]:
# Import necessary libraries
import pandas as pd
import numpy as np
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.getOrCreate()

# Define function for generating ADLS paths (reusing from your existing code)
def get_adls_path(container: str, folder: str) -> str:
    """
    Generate an ADLS path based on the container and folder.
    """
    storage_account = "ecomsalessa"
    return f"abfss://{container}@{storage_account}.dfs.core.windows.net/{folder}/"

# Define source and destination paths
processed_container = "processed"
processed_folder = "ecommerce-dataset-l0"
processed_path = get_adls_path(processed_container, processed_folder)

curated_container = "curated"
curated_folder = "ecommerce-dataset-l1"
curated_path = get_adls_path(curated_container, curated_folder)

### Data Loading from Processed Layer

This section loads the processed datasets from the previous pipeline stage. These datasets have been:
1. Cleaned and standardized
2. Enhanced with derived fields
3. Enriched with English product category translations
4. Optimized for analytical processing

Data verification ensures all required fields and translations are present before proceeding.

In [2]:
# Load processed datasets
print("Loading processed datasets...")
customers_df = spark.read.parquet(processed_path + "customers/")
orders_df = spark.read.parquet(processed_path + "orders/")
order_items_df = spark.read.parquet(processed_path + "order_items/")
order_payments_df = spark.read.parquet(processed_path + "order_payments/")
order_reviews_df = spark.read.parquet(processed_path + "order_reviews/")
products_df = spark.read.parquet(processed_path + "products/")
sellers_df = spark.read.parquet(processed_path + "sellers/")
category_names_df = spark.read.parquet(processed_path + "category_names/")
geolocation_df = spark.read.parquet(processed_path + "geolocation/")

# Display schemas to verify data loading
print("Verifying loaded datasets...")
customers_df.printSchema()
orders_df.printSchema()

# Verify English category names are available
print("\nVerifying product categories:")
products_df.select("product_category_name", "product_category_name_english").show(5)
print(f"Products with English categories: {products_df.filter(F.col('product_category_name_english').isNotNull()).count()}")
print(f"Products without English categories: {products_df.filter(F.col('product_category_name_english').isNull()).count()}")

### Dimension Tables Creation

This section implements the dimensional model by creating standardized dimension tables:

1. **Customer Dimension**: Geographic and identification attributes of customers
2. **Product Dimension**: Product attributes including categories and physical dimensions
3. **Seller Dimension**: Seller attributes and location information
4. **Date Dimension**: A complete calendar with date hierarchies and attributes
5. **Geography Dimension**: Location data for spatial analysis

These dimension tables form the reference points for the star schema design.

In [3]:
# 1. CREATE DIMENSION TABLES
print("Creating dimension tables...")

# 1.1 Customer Dimension
dim_customer = customers_df.select(
    "customer_id",
    "customer_unique_id",
    "customer_zip_code_prefix",
    "customer_city",
    "customer_state"
)

# 1.2 Product Dimension
dim_product = products_df.join(
    category_names_df,
    products_df.product_category_name == category_names_df.product_category_name,
    "left"
).select(
    products_df.product_id,
    products_df.product_category_name,
    F.coalesce(category_names_df.product_category_name_english, F.lit("uncategorized")).alias("product_category_name_english"),
    products_df.product_weight_g,
    products_df.product_length_cm,
    products_df.product_height_cm,
    products_df.product_width_cm,
    # Calculate volume in cubic cm
    (products_df.product_length_cm * products_df.product_height_cm * products_df.product_width_cm).alias("product_volume_cm3")
)

# 1.3 Seller Dimension
dim_seller = sellers_df.select(
    "seller_id",
    "seller_zip_code_prefix",
    "seller_city",
    "seller_state"
)

# 1.4 Date Dimension
# Get min and max dates from orders
min_date = orders_df.agg(F.min("order_purchase_timestamp").cast("date")).collect()[0][0]
max_date = orders_df.agg(F.max("order_purchase_timestamp").cast("date")).collect()[0][0]

# Create date range
date_range = spark.sql(f"""
SELECT explode(sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)) as date
""")

# Build date dimension
dim_date = date_range.select(
    F.col("date").cast("string").alias("date_id"),
    F.col("date"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.quarter("date").alias("quarter"),
    F.weekofyear("date").alias("week_of_year"),
    F.dayofweek("date").alias("day_of_week"),
    F.when(F.dayofweek("date").isin(1, 7), True).otherwise(False).alias("is_weekend")
)

# 1.5 Geography Dimension
# Aggregate geolocation data to avoid duplicates
dim_geography = geolocation_df.groupBy("geolocation_zip_code_prefix").agg(
    F.first("geolocation_city").alias("city"),
    F.first("geolocation_state").alias("state"),
    F.avg("geolocation_lat").alias("latitude"),
    F.avg("geolocation_lng").alias("longitude")
)

### Fact Tables Creation

This section builds the central fact tables that contain measures and foreign keys to dimensions:

1. **Sales Fact**: Transaction-level sales data with order details and performance metrics
2. **Reviews Fact**: Customer feedback data with response time analysis

These fact tables contain the quantitative metrics that will be analyzed across various dimensions.

In [4]:
# 2. CREATE FACT TABLES
print("Creating fact tables...")

# 2.1 Sales Fact Table
fact_sales = order_items_df.join(
    orders_df, "order_id"
).join(
    order_payments_df.groupBy("order_id").agg(
        F.sum("payment_value").alias("total_payment"),
        F.collect_list("payment_type").alias("payment_methods")
    ), "order_id", "left"
).select(
    order_items_df.order_id,
    order_items_df.order_item_id,
    orders_df.customer_id,
    order_items_df.product_id,
    order_items_df.seller_id,
    F.date_format(orders_df.order_purchase_timestamp, "yyyy-MM-dd").alias("date_id"),
    orders_df.order_purchase_timestamp,
    orders_df.order_delivered_customer_date,
    orders_df.order_status,
    orders_df.is_approved,
    orders_df.is_delivered,
    order_items_df.price,
    order_items_df.freight_value,
    (order_items_df.price + order_items_df.freight_value).alias("total_item_value"),
    orders_df.shipping_days,
    orders_df.delivery_days,
    orders_df.total_days,
    orders_df.is_delayed,
    orders_df.delay_days
)

# 2.2 Customer Reviews Fact Table
fact_reviews = order_reviews_df.join(
    orders_df.select("order_id", "customer_id"), "order_id"
).select(
    "order_id",
    "review_id",
    "customer_id",
    "review_score",
    "review_comment_message",
    "review_creation_date",
    "review_answer_timestamp",
    F.datediff("review_answer_timestamp", "review_creation_date").alias("review_response_days")
)


### Business Aggregations Creation

This section generates pre-calculated aggregations for common business analytics needs:

1. **Sales by Category**: Product category performance metrics
2. **Sales by State**: Geographic sales distribution analysis
3. **Seller Performance**: Seller efficiency and delivery metrics
4. **Monthly Sales Trends**: Time-series analysis of sales patterns
5. **Order Status Analysis**: Order fulfillment and cancellation analysis
6. **Cross-State Analysis**: Performance metrics for orders shipped across state borders
7. **Product Size Analysis**: Impact of physical dimensions on sales and shipping
8. **Payment Method Analysis**: Payment preference patterns

Each aggregation includes relevant counts, sums, and averages to support business decision-making.

In [None]:
# 3. CREATE BUSINESS AGGREGATIONS
print("Creating business aggregations...")

# 3.1 Sales by Category
agg_sales_by_category = fact_sales.join(
    dim_product, "product_id"
).groupBy(
    "product_category_name_english"
).agg(
    F.count("order_id").alias("orders_count"),
    F.countDistinct("customer_id").alias("unique_customers"),
    F.sum("price").alias("total_sales"),
    F.avg("price").alias("avg_item_price"),
    F.avg("total_days").alias("avg_delivery_time"),
    F.sum(F.when(F.col("is_delayed") == True, 1).otherwise(0)).alias("delayed_orders")
)

# 3.2 Sales by State
agg_sales_by_state = fact_sales.join(
    dim_customer, "customer_id"
).groupBy(
    "customer_state"
).agg(
    F.count("order_id").alias("orders_count"),
    F.countDistinct("customer_id").alias("unique_customers"),
    F.sum("price").alias("total_sales"),
    F.avg("price").alias("avg_item_price"),
    F.avg("total_days").alias("avg_delivery_time")
)


# 3.3 Seller Performance
agg_seller_performance = fact_sales.join(
    dim_seller, "seller_id"
).groupBy(
    "seller_id",
    "seller_state"
).agg(
    F.count("order_id").alias("orders_count"),
    F.sum("price").alias("total_sales"),
    F.avg("freight_value").alias("avg_shipping_cost"),
    F.avg("total_days").alias("avg_delivery_time"),
    F.sum(F.when(F.col("is_delayed") == True, 1).otherwise(0)).alias("delayed_orders"),
    F.sum(F.when(F.col("is_delayed") == True, 1).otherwise(0)) / F.count("order_id").alias("delay_rate")
)

# 3.4 Monthly Sales Trends
agg_monthly_sales = fact_sales.join(
    dim_date, "date_id"
).groupBy(
    "year", 
    "month"
).agg(
    F.count("order_id").alias("orders_count"),
    F.countDistinct("customer_id").alias("unique_customers"),
    F.sum("price").alias("total_sales"),
    F.avg("price").alias("avg_item_price")
).orderBy(
    "year", 
    "month"
)

# 3.5 Order Status Analysis (including cancellations)
print("Creating order status analysis...")
agg_order_status = fact_sales.join(
    dim_product, "product_id"
).groupBy(
    "product_category_name_english", "order_status"
).agg(
    F.count("order_id").alias("orders_count"),
    F.sum("price").alias("total_sales"),
    F.countDistinct("customer_id").alias("unique_customers") 
)

# 3.6 Cross-State Order Analysis 
agg_cross_state = fact_sales.join(
    dim_customer, "customer_id"
).join(
    dim_seller, "seller_id"
).join(
    dim_product, "product_id"
).withColumn(
    "is_cross_state", F.when(F.col("customer_state") != F.col("seller_state"), True).otherwise(False)
).groupBy(
    "product_category_name_english", "is_cross_state"
).agg(
    F.count("order_id").alias("orders_count"),
    F.sum("price").alias("total_sales"),
    F.avg("total_days").alias("avg_delivery_time"),
    # Add extra parentheses around the whole expression
    (F.sum(F.when(F.col("is_delayed") == True, 1).otherwise(0)) / F.count("order_id")).alias("delay_rate")
)

# 3.7 Product Size Analysis
print("Creating product size analysis...")
dim_product_with_size = dim_product.withColumn(
    "size_category", 
    F.when(F.col("product_volume_cm3").isNull(), "Unknown")
    .otherwise(F.expr("case when product_volume_cm3 < 1000 then 'Small' " + 
                      "when product_volume_cm3 < 8000 then 'Medium' " + 
                      "else 'Large' end"))
)

agg_size_analysis = fact_sales.join(
    dim_product_with_size, "product_id"
).groupBy(
    "product_category_name_english", "size_category"
).agg(
    F.count("order_id").alias("orders_count"),
    F.sum("price").alias("total_sales"),
    F.avg("freight_value").alias("avg_shipping_cost")
)

# 3.8 Payment Method Analysis
print("Creating payment method analysis...")
agg_payment_methods = fact_sales.join(
    order_payments_df.select("order_id", "payment_type"),
    "order_id"
).groupBy(
    "payment_type"
).agg(
    F.count("order_id").alias("orders_count"),
    F.sum("price").alias("total_sales"),
    F.avg("price").alias("avg_order_value"),
    F.countDistinct("customer_id").alias("unique_customers")
)

agg_payment_methods.write.mode("overwrite").parquet(curated_path + "aggregates/payment_methods/")

print("\nSample data from agg_payment_methods:")
agg_payment_methods.show(5)

# Data quality verification for key analyses
print("\nData quality checks:")
print(f"Order status analysis record count: {agg_order_status.count()}")
print(f"Cross-state analysis record count: {agg_cross_state.count()}")
print(f"Size analysis record count: {agg_size_analysis.count()}")

# Check for null values in key metrics
print("\nNull values in key metrics:")
print("Order status analysis nulls in total_sales:", 
      agg_order_status.filter(F.col("total_sales").isNull()).count())

print("\nActual column names in agg_cross_state:")
print(agg_cross_state.columns)

print("Cross-state analysis nulls in avg_delivery_time:", 
      agg_cross_state.filter(F.col("avg_delivery_time").isNull()).count())

### Data Persistence and Verification

This section:
1. Writes all dimension and fact tables to the curated layer
2. Persists aggregated datasets for quick access by reporting tools
3. Displays sample data to verify the output quality
4. Provides summary statistics on the processed data volume

The curated data is now optimized for consumption by business intelligence tools and dashboards.

In [None]:
# 4. WRITE TO CURATED CONTAINER
print("Writing dimension tables to curated container...")

# Write dimensions
dim_customer.write.mode("overwrite").parquet(curated_path + "dimensions/dim_customer/")
dim_product.write.mode("overwrite").parquet(curated_path + "dimensions/dim_product/")
dim_seller.write.mode("overwrite").parquet(curated_path + "dimensions/dim_seller/")
dim_date.write.mode("overwrite").parquet(curated_path + "dimensions/dim_date/")
dim_geography.write.mode("overwrite").parquet(curated_path + "dimensions/dim_geography/")

print("Writing fact tables to curated container...")
# Write facts
fact_sales.write.mode("overwrite").parquet(curated_path + "facts/fact_sales/")
fact_reviews.write.mode("overwrite").parquet(curated_path + "facts/fact_reviews/")

print("Writing aggregation tables to curated container...")
# Write aggregations
# Write all aggregations to curated layer
print("\nWriting aggregations to curated layer...")
agg_sales_by_category.write.mode("overwrite").parquet(curated_path + "aggregates/sales_by_category/")
agg_sales_by_state.write.mode("overwrite").parquet(curated_path + "aggregates/sales_by_state/")
agg_seller_performance.write.mode("overwrite").parquet(curated_path + "aggregates/seller_performance/")
agg_monthly_sales.write.mode("overwrite").parquet(curated_path + "aggregates/monthly_sales/")
agg_order_status.write.mode("overwrite").parquet(curated_path + "aggregates/order_status/")
agg_cross_state.write.mode("overwrite").parquet(curated_path + "aggregates/cross_state_analysis/")
agg_size_analysis.write.mode("overwrite").parquet(curated_path + "aggregates/size_analysis/")

# Show sample data from key tables
print("\nSample data from fact_sales:")
fact_sales.select("order_id", "customer_id", "price", "total_days", "is_delayed").show(5)

print("\nSample data from agg_sales_by_category:")
agg_sales_by_category.show(5)

print("\nSample data from agg_monthly_sales:")
agg_monthly_sales.show(5)

print("\nSample data from agg_order_status:")
agg_order_status.show(5)

print("\nSample data from agg_cross_state:")
agg_cross_state.show(5)

print("\nSample data from agg_size_analysis:")
agg_size_analysis.show(5)

print("\nData warehousing process completed successfully!")

# Final pipeline summary
print("\n=== ETL Pipeline Summary ===")
print(f"Processed {fact_sales.count()} sales records")
print(f"Created {dim_product.count()} product dimension records")
print(f"Created {dim_customer.count()} customer dimension records")
print(f"Created {dim_seller.count()} seller dimension records")
print(f"Generated {agg_sales_by_category.count() + agg_sales_by_state.count() + agg_seller_performance.count() + agg_monthly_sales.count() + agg_order_status.count() + agg_cross_state.count() + agg_size_analysis.count()} analytical aggregations")
print("Pipeline execution complete!")