# E-commerce Data Processing Case Study

This repository contains a Databricks notebook for processing e-commerce sales data using PySpark and Delta Lake. The project demonstrates a data lakehouse architecture with bronze, silver, and gold layers for data ingestion, processing, and aggregation.

## Overview

The notebook implements an end-to-end data pipeline for e-commerce data, including:

- **Data Ingestion**: Reading raw data from CSV, Excel, and JSON files
- **Data Quality**: Cleaning, standardizing, and enriching customer, product, and order data
- **Data Transformation**: Joining datasets and creating derived metrics
- **Data Aggregation**: Generating business insights and profit calculations
- **Data Storage**: Saving processed data as Delta tables in a multi-layer architecture

## Architecture

The project follows a medallion architecture:

- **Bronze Layer**: Raw data ingestion with minimal transformations
- **Silver Layer**: Cleaned and enriched data
- **Gold Layer**: Aggregated business metrics

## Project Structure - But kept flat for now to run in Databricks


## Technologies Used

- Databricks
- PySpark
- Delta Lake
- Pandas (for Excel processing)

## Data Sources

- Products data (CSV)
- Customers data (Excel)
- Orders data (JSON)

## Key Features

- Automated data quality scoring
- Customer and product data enrichment
- Comprehensive order analytics with joins
- Profit aggregation by various dimensions
- Unit tests for data processing functions

## Usage

Run the notebook cells in sequence to process the data pipeline. Ensure the required volumes and schemas are created before execution.

## Tests

The notebook includes unit tests for data enrichment and aggregation functions.


In [0]:
%sql
CREATE CATALOG IF NOT EXISTS ecommerce_sales COMMENT 'Ecommerce sales data processing';

In [0]:
%sql
USE CATALOG ecommerce_sales;
CREATE SCHEMA IF NOT EXISTS source_data_layer COMMENT 'Source data';    
CREATE SCHEMA IF NOT EXISTS bronze_layer COMMENT 'Raw data';
CREATE SCHEMA IF NOT EXISTS processed_layer COMMENT 'Processed data';
CREATE SCHEMA IF NOT EXISTS gold_layer COMMENT 'Final publish layer - Aggregated data';

In [0]:
%sql
USE CATALOG ecommerce_sales;

USE SCHEMA bronze_layer;

CREATE VOLUME IF NOT EXISTS ecom_volume_raw COMMENT 'Managed volume for raw files';

-- For processed/silver layer
USE SCHEMA processed_layer;

CREATE VOLUME IF NOT EXISTS ecom_volume_processed COMMENT 'Managed volume for processed files';

--- For Gold Layer
USE SCHEMA gold_layer;

CREATE VOLUME IF NOT EXISTS ecom_volume_gold COMMENT 'Managed volume for final publish aggregated files';

In [0]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging
import re
from pyspark.sql.window import Window

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Ecommerce Data Processing") \
    .getOrCreate()

print("Spark session initialized successfully")

Spark session initialized successfully


# **Functions to save table as delta & standardize column names**

In [0]:
def save_as_delta_table(df, table_name, table_path, schema_nm, catalog="ecommerce_sales"):
    """
    Save DataFrame as Delta table with error handling
    """
    try:
        # Save as Delta table
        table_path = f"{table_path}/{table_name}"
        print("sql string---",f"CREATE TABLE IF NOT EXISTS {catalog}.{schema_nm}.{table_name} USING DELTA AS select * from delta.'{table_path}'")
        df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(table_path)

        # Create table in metastore
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {catalog}.{schema_nm}.{table_name}
            USING DELTA AS
            select * from delta.`{table_path}`
        """)

        logger.info(f"Successfully saved {table_name} as Delta table")
        return True
    except Exception as e:
        logger.error(f"Failed to save {table_name} as Delta table: {str(e)}")
        return False

# To name the column as per the delta/parquet standards - Remove spaces,special characters    
def sanitize_column(input_df):
    # Remove or replace invalid characters
    for c in input_df.columns:
        new_name = re.sub(r"[ ]", "_", c)
        new_name = re.sub(r"[-]", "_", new_name)
        new_name = re.sub(r"[,;{}()\n\t=*]", "", new_name)
        input_df = input_df.withColumnRenamed(c, new_name)
    
    return input_df

In [0]:
raw_base_path = "/Volumes/ecommerce_sales/bronze_layer/ecom_volume_raw/source_files/"
products_path = f"{raw_base_path}/Products.csv"
customers_path = f"{raw_base_path}/Customer.xlsx"
orders_path = f"{raw_base_path}/Orders.json" 

bronze_delta_path = "/Volumes/ecommerce_sales/bronze_layer/ecom_volume_raw"

raw_products_df = (
spark.read
.option("header", True)
.option("inferSchema", True)
.option("quote", "\"") \
.option("escape", "\"") \
.csv(products_path)
)

raw_products_df = sanitize_column(raw_products_df)
# display(raw_products_df)
print("raw_products_df col---",raw_products_df.columns)
## Write to raw data delta location & schema-table
save_as_delta_table(raw_products_df, "products_raw",bronze_delta_path,"bronze_layer", "ecommerce_sales")


raw_products_df col--- ['Product_ID', 'Category', 'Sub_Category', 'Product_Name', 'State', 'Price_per_product']
sql string--- CREATE TABLE IF NOT EXISTS ecommerce_sales.bronze_layer.products_raw USING DELTA AS select * from delta.'/Volumes/ecommerce_sales/bronze_layer/ecom_volume_raw/products_raw'


INFO:__main__:Successfully saved products_raw as Delta table


True

# **Enrich raw Products data**

In [0]:

### Clean & enriched products data
## Assumptions - 1. Product id should not be null
# 2. Generating the data_quality_score column - Checking if cateory, sub-category, product name are not null.
# 3. We can generate columns like price_range category based on price per product column - based on the business logic. Clean the product name column by removing special characters. But here name seems relevant having " for inches, .,x etc. So keeping the product name as is.
# 4. Remove special characters from the state name.
# 5. Price per product to 2 digit decimal places
# 6. Generating is_avaialble flag for products with price
# 7. Product ID could follow standards or naming convention - But since I am not aware of exact standards, I am not applying any check apart from the null value check.

def enrich_product_data(raw_products_df):
    enriched_products_df = raw_products_df \
            .withColumn("price_per_product", round(col("price_per_product"), 2)) \
            .withColumn("price_range",
                    when(col("price_per_product") < 10, "Budget")
                    .when(col("price_per_product") < 50, "Standard")
                    .when(col("price_per_product") < 200, "Premium")
                    .otherwise("Luxury")) \
            .withColumn("State",
                    regexp_replace(col("State"), "[^A-Za-z\\s]", "")) \
            .withColumn("is_available",
                    when(col("price_per_product").isNotNull() &
                            (col("price_per_product") > 0), True).otherwise(False)) \
            .withColumn("data_quality_score",
                    (when(col("Product_Name").isNotNull(), 1).otherwise(0) +
                        when(col("Category").isNotNull(), 1).otherwise(0) +
                        when(col("price_per_product").isNotNull(), 1).otherwise(0)) / 3.0)\
            .filter(col("Product_id").isNotNull())

    # enriched_products_df = enriched_products_df.dropDuplicates(["Product_ID"])
    # Dropping duplicated rows based on Product_ID, keeping the entry with higher price
    w = Window.partitionBy("Product_ID").orderBy(col("price_per_product").desc())

    enriched_products_df = (
        enriched_products_df
        .withColumn("rn", row_number().over(w))
        .filter(col("rn") == 1)
        .drop("rn")
    )
    return enriched_products_df

In [0]:
enriched_products_df = enrich_product_data(raw_products_df)
# display(enriched_products_df)

silver_delta_path = "/Volumes/ecommerce_sales/processed_layer/ecom_volume_processed"

## Write to processed layer(silver) & schema-table
save_as_delta_table(enriched_products_df, "products_processed",silver_delta_path,"processed_layer", "ecommerce_sales")

INFO:py4j.clientserver:Received command c on object id p0


sql string--- CREATE TABLE IF NOT EXISTS ecommerce_sales.processed_layer.products_processed USING DELTA AS select * from delta.'/Volumes/ecommerce_sales/processed_layer/ecom_volume_processed/products_processed'


INFO:__main__:Successfully saved products_processed as Delta table


True

# **Raw Customer Data**

In [0]:
"""
The package spark-excel does not exist on PyPI, which is why the installation failed. The com.crealytics.spark.excel data source is provided as a Spark JAR, not a Python package. On Databricks, you must install the corresponding JAR as a cluster library, not with %pip. On serverless compute, you cannot install custom JARs, so reading Excel files with com.crealytics.spark.excel is not supported in this environment.

Instead, use a Python library like openpyxl or pandas to read Excel files in Databricks notebooks
"""
%pip install openpyxl pandas

import pandas as pd

raw_customers_df = pd.read_excel(
    customers_path,
    engine="openpyxl"
)

# Ensure 'phone' column is string type
raw_customers_df["phone"] = raw_customers_df["phone"].astype(str)

# Convert to spark dataframe
raw_customers_df = spark.createDataFrame(raw_customers_df)

# Standardize the column names
raw_customers_df = sanitize_column(raw_customers_df)
bronze_delta_path = "/Volumes/ecommerce_sales/bronze_layer/ecom_volume_raw"
## Write to bronze layer(raw) & schema-table
save_as_delta_table(raw_customers_df, "customers_raw",bronze_delta_path,"bronze_layer", "ecommerce_sales")

INFO:py4j.clientserver:Received command c on object id p0


Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
sql string--- CREATE TABLE IF NOT EXISTS ecommerce_sales.bronze_layer.customers_raw USING DELTA AS select * from delta.'/Volumes/ecommerce_sales/bronze_layer/ecom_volume_raw/customers_raw'


INFO:__main__:Successfully saved customers_raw as Delta table


True

# **Enrich Customer data**

In [0]:
"""
1. Customer name has multiple whitespaces, special characters etc. Remove them.
2. Phone number has to be atleast 10 digits to be marked as valid. Else changed to null
3. Postal code needs to be of length 5. Trailing zeros are added to make it 5 digits.
"""
from pyspark.sql.functions import udf, col, trim, lpad

def enrich_customer_data(raw_customers_df):
    """
        Enriches the customer data by cleaning the customer name and phone number
    """
    def clean_customer_name(name):
        if not name or not isinstance(name, str):
            return None
        # Remove leading/trailing whitespace and special characters except apostrophe
        name = name.strip()
        name = re.sub(r"^[^A-Za-z']+|[^A-Za-z']+$", '', name)
        # Remove embedded special characters, digits, and multiple spaces except apostrophe
        name = re.sub(r"[^A-Za-z'\s]", '', name)
        name = re.sub(r'\s+', ' ', name)
        # Remove single-letter fragments at start (e.g., "B ecky Martin")
        name = re.sub(r'^[A-Za-z]\s+', '', name)
        name = re.sub(r"^[^\w]+|[^\w]+$", "", name) # Remove double quotes anywhere in the name
        # Capitalize each word, preserving apostrophes
        name = ' '.join([w.capitalize() for w in name.split()])
        return name if name else None

    clean_customer_name_udf = udf(clean_customer_name, StringType())

    cleaned_customers_df = raw_customers_df.withColumn(
        "customer_name_clean",
        clean_customer_name_udf(trim(col("Customer_name")))
    ).withColumn(
        "clean_phone",
        when((col("phone").rlike("ERROR!")) | (length("phone") < 10),lit(None)).otherwise(regexp_replace("phone", "[^0-9x()]", ""))
    ).withColumn(
        "postal_code",
        lpad(col("postal_code").cast("string"), 5, "0")
    ).drop("Customer_name", "phone") \
    .withColumnRenamed("customer_name_clean", "Customer_name").withColumnRenamed("clean_phone", "phone")

    cleaned_customers_df = cleaned_customers_df.filter(col("Customer_name").isNotNull()).dropDuplicates(["Customer_ID","customer_name"])  # Filter out records where customer name is null
    return cleaned_customers_df

In [0]:
cleaned_customers_df = enrich_customer_data(raw_customers_df)

silver_delta_path = "/Volumes/ecommerce_sales/processed_layer/ecom_volume_processed"

## Write to processed layer(silver) & schema-table
save_as_delta_table(cleaned_customers_df, "customer_processed",silver_delta_path,"processed_layer", "ecommerce_sales")
## Check if the customer name null is allowed

INFO:py4j.clientserver:Received command c on object id p0


sql string--- CREATE TABLE IF NOT EXISTS ecommerce_sales.processed_layer.customer_processed USING DELTA AS select * from delta.'/Volumes/ecommerce_sales/processed_layer/ecom_volume_processed/customer_processed'


INFO:__main__:Successfully saved customer_processed as Delta table


True

# **Raw Orders Data**

In [0]:
# Load Orders Data
try:
    # Read JSON file
    raw_orders_df = spark.read \
        .format("json") \
        .option("multiline", "true") \
        .load(orders_path)
except Exception as e:
    print(f"Error loading orders data: {str(e)}")

# Standardize the column names
raw_orders_df = sanitize_column(raw_orders_df)

bronze_delta_path = "/Volumes/ecommerce_sales/bronze_layer/ecom_volume_raw"

## Write to bronze layer(raw) & schema-table
save_as_delta_table(raw_orders_df, "orders_raw",bronze_delta_path,"bronze_layer", "ecommerce_sales")
# display(spark.read.table("ecommerce_sales.bronze_layer.orders_raw").limit(100))

INFO:py4j.clientserver:Received command c on object id p0


sql string--- CREATE TABLE IF NOT EXISTS ecommerce_sales.bronze_layer.orders_raw USING DELTA AS select * from delta.'/Volumes/ecommerce_sales/bronze_layer/ecom_volume_raw/orders_raw'


INFO:__main__:Successfully saved orders_raw as Delta table


True

# **Comprehensive Order Data**

In [0]:
def comprehensive_order_data(cleaned_customers_df, raw_orders_df, enriched_products_df):
    """
        Creates a comprehensive order table by joining the cleaned customer data, raw orders data
    """
    # Create comprehensive order table
    cust_df = cleaned_customers_df.alias('cust')
    orders_df = raw_orders_df.alias('order')
    
    prod_df = enriched_products_df.alias('prod')

    print("orders_df cols--",orders_df.columns)
    comprehensive_orders_df = orders_df \
        .join(
            cust_df,
            orders_df["Customer_ID"] == cust_df["Customer_ID"],
            "left"
        ) \
        .join(
            prod_df,
            orders_df["Product_ID"] == prod_df["Product_ID"],
            "left"
        ) \
        .drop(cust_df["Customer_ID"],prod_df["Product_ID"]) \
        .select(
            'order.*',
            "cust.Customer_name",
            "cust.Country",
            "prod.product_name",
            "prod.category",
            "prod.sub_category"
        )
    comprehensive_orders_df = comprehensive_orders_df\
        .withColumn(
            "profit",
            round(col("profit"), 2)
        ) \
        .withColumn(
            "Order_Date",
            to_date(col("Order_Date"), "d/M/yyyy")
        ) \
        .withColumn(
            "year",
            year(col("Order_Date"))
        )
    return comprehensive_orders_df

comprehensive_orders_df = comprehensive_order_data(cleaned_customers_df, raw_orders_df, enriched_products_df)

save_as_delta_table(comprehensive_orders_df, "comprehensive_orders",silver_delta_path,"processed_layer", "ecommerce_sales")

# display(spark.read.table("ecommerce_sales.processed_layer.comprehensive_orders").limit(100))

"""
There are some records where customer_id is not avaialble in the customer data & same where product_id is not avaialble in the product data.
"""

orders_df cols-- ['Customer_ID', 'Discount', 'Order_Date', 'Order_ID', 'Price', 'Product_ID', 'Profit', 'Quantity', 'Row_ID', 'Ship_Date', 'Ship_Mode']
sql string--- CREATE TABLE IF NOT EXISTS ecommerce_sales.processed_layer.comprehensive_orders USING DELTA AS select * from delta.'/Volumes/ecommerce_sales/processed_layer/ecom_volume_processed/comprehensive_orders'


INFO:__main__:Successfully saved comprehensive_orders as Delta table


True

# **Aggregated Data - Gold Layer**

In [0]:
"""Create comprehensive profit aggregates table"""
def create_profit_aggregates(comprehensive_orders_df):
    try:
        # Create aggregates by all required dimensions
        aggregates_df = comprehensive_orders_df.groupBy(
            "year",
            "Category",
            "Sub_Category",
            "Customer_id","customer_name"
        ).agg(
            round(sum("profit"),2).alias("total_profit"),
        )
        return aggregates_df
    except Exception as e:
        logger.error(f"Error creating profit aggregates: {str(e)}")
        raise e

aggregate_df = create_profit_aggregates(comprehensive_orders_df)

gold_path = "/Volumes/ecommerce_sales/gold_layer/ecom_volume_gold"

save_as_delta_table(aggregate_df, "profit_aggregates",gold_path,"gold_layer", "ecommerce_sales")

display(spark.read.table("ecommerce_sales.gold_layer.profit_aggregates").limit(100))

INFO:py4j.clientserver:Received command c on object id p0


sql string--- CREATE TABLE IF NOT EXISTS ecommerce_sales.gold_layer.profit_aggregates USING DELTA AS select * from delta.'/Volumes/ecommerce_sales/gold_layer/ecom_volume_gold/profit_aggregates'


INFO:__main__:Successfully saved profit_aggregates as Delta table


year,Category,Sub_Category,Customer_id,customer_name,total_profit
2017,Office Supplies,Supplies,SO-20335,Sean O'donnell,-1.99
2015,Office Supplies,Storage,MC-17605,Matt Connell,1.61
2015,Office Supplies,Storage,IL-15100,Ivan Liston,239.0
2016,Office Supplies,Fasteners,AB-10255,Alejandro Ballentine,8.18
2017,Office Supplies,Storage,BD-11725,Bruce Degenhardt,87.76
2017,Technology,Phones,MT-17815,Meg Tillman,74.97
2016,Furniture,Chairs,Dl-13600,Dorris Liebe,146.39
2016,Furniture,Tables,ON-18715,Odella Nelson,-199.51
2017,Office Supplies,Binders,JS-15685,Jim Sink,-12.43
2016,Office Supplies,Paper,JH-15985,Joseph Holt,46.12


# Using SQL to output the following **aggregates**

### **Profit by year**

In [0]:
%sql
SELECT
year,
ROUND(SUM(profit), 2) as total_profit
FROM ecommerce_sales.processed_layer.comprehensive_orders
GROUP BY year
ORDER BY year

year,total_profit
2014,41498.52
2015,66289.41
2016,68565.57
2017,127663.74


### **Profit by year & category**

In [0]:
%sql
SELECT
year,
Category,
ROUND(SUM(profit), 2) as total_profit
FROM ecommerce_sales.processed_layer.comprehensive_orders
GROUP BY year, Category
ORDER BY year, Category

year,Category,total_profit
2014,,523.12
2014,Furniture,-5174.65
2014,Office Supplies,22663.76
2014,Technology,23486.29
2015,,583.16
2015,Furniture,3392.12
2015,Office Supplies,25490.34
2015,Technology,36823.79
2016,,404.44
2016,Furniture,7750.15


### **Profit by customer**

In [0]:
%sql
SELECT
customer_id, 
customer_name,
ROUND(SUM(profit), 2) as total_profit
FROM ecommerce_sales.processed_layer.comprehensive_orders
GROUP BY customer_id, customer_name
ORDER BY total_profit DESC

### **Profit by customer & year**

In [0]:
%sql
SELECT
customer_id, 
customer_name,
year,
ROUND(SUM(profit), 2) as total_profit
FROM ecommerce_sales.processed_layer.comprehensive_orders
GROUP BY customer_id, customer_name, year
ORDER BY customer_name, year

customer_id,customer_name,year,total_profit
NP-18325,,2014,314.39
ED-13885,,2014,18.48
SH-19975,,2014,54.47
DE-13255,,2014,9.85
TS-21505,,2014,87.45
SH-19975,,2015,51.87
NP-18325,,2015,852.92
ED-13885,,2015,574.81
EM-13810,,2015,-202.06
DE-13255,,2015,438.48


In [0]:
# SQL-Based Business Reports
try:
    print("=== Profit by Year ===")
    profit_by_year_df = spark.sql(f"""
        SELECT
            year,
            ROUND(SUM(profit), 2) as total_profit
        FROM ecommerce_sales.processed_layer.comprehensive_orders
        GROUP BY year
        ORDER BY year
    """)
    display(profit_by_year_df)

    print("\n=== Profit by Year + Product Category ===")
    profit_by_year_category_df = spark.sql(f"""
        SELECT
            year,
            Category,
            ROUND(SUM(profit), 2) as total_profit
        FROM ecommerce_sales.processed_layer.comprehensive_orders
        GROUP BY year, Category
        ORDER BY year, Category
    """)
    display(profit_by_year_category_df)

    print("\n=== Profit by Customer ===")
    profit_by_customer_df = spark.sql(f"""
        SELECT
          customer_id, 
          customer_name,
          ROUND(SUM(profit), 2) as total_profit
        FROM ecommerce_sales.processed_layer.comprehensive_orders
        GROUP BY customer_id, customer_name
        ORDER BY total_profit DESC
    """)
    display(profit_by_customer_df)

    print("\n=== Profit by Customer + Year ===")
    profit_by_customer_year_df = spark.sql(f"""
        SELECT
            customer_id, 
            customer_name,
            year,
            ROUND(SUM(profit), 2) as total_profit
        FROM ecommerce_sales.processed_layer.comprehensive_orders
        GROUP BY customer_id, customer_name, year
        ORDER BY customer_name, year
    """)
    display(profit_by_customer_year_df)

except Exception as e:
    logger.error(f"Error generating SQL reports: {str(e)}")
    print(f"Error generating SQL reports: {str(e)}")

=== Profit by Year ===


year,total_profit
2014,41498.52
2015,66289.41
2016,68565.57
2017,127663.74



=== Profit by Year + Product Category ===


year,Category,total_profit
2014,,523.12
2014,Furniture,-5174.65
2014,Office Supplies,22663.76
2014,Technology,23486.29
2015,,583.16
2015,Furniture,3392.12
2015,Office Supplies,25490.34
2015,Technology,36823.79
2016,,404.44
2016,Furniture,7750.15



=== Profit by Customer ===


customer_id,customer_name,total_profit
FH-14275,Frank Hawley,25852.32
TC-20980,Tamara Chand,9010.18
RB-19360,Raymond Buch,6976.36
SC-20095,Sanjit Chand,5839.14
HL-15040,Hunter Lopez,5622.43
PR-18880,Patrick Ryan,5596.2
AB-10105,Adrian Barton,5483.75
TA-21385,Tom Ashbrook,4703.72
CM-12385,Christopher Martinez,3912.03
PS-19045,Penelope Sewall,3183.77



=== Profit by Customer + Year ===


customer_id,customer_name,year,total_profit
NP-18325,,2014,314.39
ED-13885,,2014,18.48
SH-19975,,2014,54.47
DE-13255,,2014,9.85
TS-21505,,2014,87.45
SH-19975,,2015,51.87
NP-18325,,2015,852.92
ED-13885,,2015,574.81
EM-13810,,2015,-202.06
DE-13255,,2015,438.48


## Test Script

In [0]:
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import sys
import os

## Create Test Data for all 3 sources
def create_test_data(spark):
    """Create test data for unit testing"""
    # Test customer data
    customer_data = [
        ("CUST001", "John Doe", "john.doe@gmail.com","-1874","new jersey,usa","Consumer", "USA", "New York", "NY", "10001", "East"),
        ("CUST002", "Jane.  ..Smith", "john.doe@gmail.com","9811144367","boston,usa","Corporate", "Canada", "Toronto", "ON", "M5V", "East"),
        ("CUST003", None, "abc@gmail.com","(91)x876678990","101, bob hueston road, Washington","Home Office", "USA", "Chicago", "IL", "60601", "Central")
    ]

    customer_schema = StructType([
        StructField("Customer_ID", StringType(), True),
        StructField("Customer_Name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("address", StringType(), True),
        StructField("segment", StringType(), True),
        StructField("Country", StringType(), True),
        StructField("City", StringType(), True),
        StructField("State", StringType(), True),
        StructField("Postal_Code", StringType(), True),
        StructField("Region", StringType(), True)
    ])

    customers_df = spark.createDataFrame(customer_data, customer_schema)

    # Test product data
    product_data = [
        ("PROD001", "Technology", "Phones", "iPhone 12", "CA", 999.99),
        ("PROD002", "Furniture", "Chairs", "Office Chair", "NY", 299.99),
        ("PROD003", "Office Supplies", "Paper", "Printer Paper", "TX", 19.998),
        ("PROD003", "Admin", "Account", "Printer", "BO", 9.99),
        (None, "Furniture", "Table", "Office Table", "NJ", 499.99)
    ]

    product_schema = StructType([
        StructField("Product_ID", StringType(), True),
        StructField("Category", StringType(), True),
        StructField("Sub_Category", StringType(), True),
        StructField("Product_Name", StringType(), True),
        StructField("State", StringType(), True),
        StructField("Price_per_product", DoubleType(), True)
    ])

    products_df = spark.createDataFrame(product_data, product_schema)

    # Test order data
    order_data = [
        (1, "ORD001", "15/1/2023", "20/1/2023", "Standard Class", "CUST001", "PROD001", 2, 1999.98, 0.1, 199.99),
        (2, "ORD002", "16/1/2023", "21/1/2023", "First Class", "CUST002", "PROD002", 1, 299.99, 0.0, 89.99),
        (3, "ORD003", "17/1/2023", "22/1/2023", "Second Class", "CUST001", "PROD003", 5, 49.95, 0.2, 9.99)
    ]

    order_schema = StructType([
        StructField("Row_ID", IntegerType(), True),
        StructField("Order_ID", StringType(), True),
        StructField("Order_Date", StringType(), True),
        StructField("Ship_Date", StringType(), True),
        StructField("Ship_Mode", StringType(), True),
        StructField("Customer_ID", StringType(), True),
        StructField("Product_ID", StringType(), True),
        StructField("Quantity", IntegerType(), True),
        StructField("Price", DoubleType(), True),
        StructField("Discount", DoubleType(), True),
        StructField("Profit", DoubleType(), True)
    ])

    orders_df = spark.createDataFrame(order_data, order_schema)

    return customers_df, products_df, orders_df

In [0]:
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import sys
import os

@pytest.fixture(scope="session")
def spark():
    """Create Spark session for testing"""
    return SparkSession.builder \
        .appName("TestDataEnrichment") \
        .master("local[*]") \
        .getOrCreate()

def test_customer_enrichment(spark):
    """Test customer data enrichment"""
    customers_df, _, _ = create_test_data(spark)
    expected_cust_op = [
        ("CUST001", "john.doe@gmail.com","new jersey,usa","Consumer", "USA", "New York", "NY", "10001", "East", "John Doe", None),
        ("CUST002", "john.doe@gmail.com","boston,usa","Corporate", "Canada", "Toronto", "ON", "00M5V", "East", "Jane Smith", "9811144367")
    ]
    col_list = [
    "Customer_ID", "email", "address", "segment",
    "Country", "City", "State", "Postal_Code", "Region", "Customer_Name", "phone"
    ]
    expected_cust_df = spark.createDataFrame(expected_cust_op, col_list)
    # display(customers_df)
    # Add enrichment fields
    enriched_df = enrich_customer_data(customers_df)
    display(enriched_df)
    display(expected_cust_df)
    # Verify Count - Null customer_name record is removed
    assert enriched_df.count() == 2, "Record count didn't match"

    # Compare actual & expected data - 
    # 1. Records with customer_name will not be available 
    # 2.pincode will be standardized to 5 digits by adding leading zeros
    # 3. phone number with length less than 10 will be marked as null
    assert expected_cust_df.collect() == enriched_df.collect(), "Record didn't match"


def test_product_enrichment(spark):
    """Test product data enrichment"""
    _, products_df, _ = create_test_data(spark)

    expected_product_op = product_data = [
        ("PROD001", "Technology", "Phones", "iPhone 12", "CA", 999.99,"Luxury", True, 1),
        ("PROD002", "Furniture", "Chairs", "Office Chair", "NY", 299.99,"Luxury", True, 1),
        ("PROD003", "Office Supplies", "Paper", "Printer Paper", "TX", 20.0, "Standard", True, 1)
    ]
    col_list = [
    "product_id", "category", "sub_category", "product_name", "state", "price_per_product","price_range", "is_available", "data_quality_score"
    ]
    expected_product_df = spark.createDataFrame(expected_product_op, col_list)
    # Add enrichment fields
    enriched_df = enrich_product_data(products_df)
    display(enriched_df)
    display(expected_product_df)

    # Verify enrichment
    assert "price_range" in enriched_df.columns
    assert "is_available" in enriched_df.columns

    # Check price ranges
    budget_count = enriched_df.filter(col("price_range") == "Standard").count()
    luxury_count = enriched_df.filter(col("price_range") == "Luxury").count()
    assert budget_count == 1  # Printer paper
    assert luxury_count == 2  # iPhone, office chair
    assert enriched_df.count() == 3, "Record count didn't match"

    assert expected_product_df.collect() == enriched_df.collect(), "Record didn't match"

def test_order_enrichment(spark):
    """Test order data enrichment with joins"""
    customers_df, products_df, orders_df = create_test_data(spark)
    enriched_cust_df = enrich_customer_data(customers_df)
    enriched_prod_df = enrich_product_data(products_df)
    # Perform joins
    enriched_orders = comprehensive_order_data(enriched_cust_df, orders_df, enriched_prod_df)
    
    display(enriched_orders)

    # Verify joins and calculations
    assert enriched_orders.count() == 3
    enrich_order_cols = [x.lower() for x in enriched_orders.columns]
    assert "customer_name" in enrich_order_cols
    assert "category" in enrich_order_cols
    assert "sub_category" in enrich_order_cols
    assert "year" in enrich_order_cols

def test_aggregate(spark):
    """Test order data enrichment with joins"""
    customers_df, products_df, orders_df = create_test_data(spark)
    enriched_cust_df = enrich_customer_data(customers_df)
    enriched_prod_df = enrich_product_data(products_df)
    # Perform joins
    enriched_orders = comprehensive_order_data(enriched_cust_df, orders_df, enriched_prod_df)
    
    aggregates_df = create_profit_aggregates(enriched_orders)
    display(aggregates_df)
    assert aggregates_df.count() > 0

    # Check that profits are rounded to 2 decimal places
    profit_values = aggregates_df.select("total_profit").collect()
    for row in profit_values:
        profit_str = str(row["total_profit"])
        if '.' in profit_str:
            decimal_places = len(profit_str.split('.')[1])
            assert decimal_places <= 2, f"Aggregated profit should be rounded to 2 decimal places, got {decimal_places}"



In [0]:
# spark_sess = spark()
spark = SparkSession.builder \
    .appName("Testing validation") \
    .getOrCreate()
test_customer_enrichment(spark)
test_product_enrichment(spark)
test_order_enrichment(spark)
test_aggregate(spark)

Customer_ID,email,address,segment,Country,City,State,postal_code,Region,Customer_name,phone
CUST001,john.doe@gmail.com,"new jersey,usa",Consumer,USA,New York,NY,10001,East,John Doe,
CUST002,john.doe@gmail.com,"boston,usa",Corporate,Canada,Toronto,ON,00M5V,East,Jane Smith,9811144367.0


Customer_ID,email,address,segment,Country,City,State,Postal_Code,Region,Customer_Name,phone
CUST001,john.doe@gmail.com,"new jersey,usa",Consumer,USA,New York,NY,10001,East,John Doe,
CUST002,john.doe@gmail.com,"boston,usa",Corporate,Canada,Toronto,ON,00M5V,East,Jane Smith,9811144367.0


Product_ID,Category,Sub_Category,Product_Name,State,price_per_product,price_range,is_available,data_quality_score
PROD001,Technology,Phones,iPhone 12,CA,999.99,Luxury,True,1.0
PROD002,Furniture,Chairs,Office Chair,NY,299.99,Luxury,True,1.0
PROD003,Office Supplies,Paper,Printer Paper,TX,20.0,Standard,True,1.0


product_id,category,sub_category,product_name,state,price_per_product,price_range,is_available,data_quality_score
PROD001,Technology,Phones,iPhone 12,CA,999.99,Luxury,True,1
PROD002,Furniture,Chairs,Office Chair,NY,299.99,Luxury,True,1
PROD003,Office Supplies,Paper,Printer Paper,TX,20.0,Standard,True,1


orders_df cols-- ['Row_ID', 'Order_ID', 'Order_Date', 'Ship_Date', 'Ship_Mode', 'Customer_ID', 'Product_ID', 'Quantity', 'Price', 'Discount', 'Profit']


Row_ID,Order_ID,Order_Date,Ship_Date,Ship_Mode,Customer_ID,Product_ID,Quantity,Price,Discount,profit,Customer_name,Country,product_name,category,sub_category,year
1,ORD001,2023-01-15,20/1/2023,Standard Class,CUST001,PROD001,2,1999.98,0.1,199.99,John Doe,USA,iPhone 12,Technology,Phones,2023
2,ORD002,2023-01-16,21/1/2023,First Class,CUST002,PROD002,1,299.99,0.0,89.99,Jane Smith,Canada,Office Chair,Furniture,Chairs,2023
3,ORD003,2023-01-17,22/1/2023,Second Class,CUST001,PROD003,5,49.95,0.2,9.99,John Doe,USA,Printer Paper,Office Supplies,Paper,2023


orders_df cols-- ['Row_ID', 'Order_ID', 'Order_Date', 'Ship_Date', 'Ship_Mode', 'Customer_ID', 'Product_ID', 'Quantity', 'Price', 'Discount', 'Profit']


year,Category,Sub_Category,Customer_id,customer_name,total_profit
2023,Technology,Phones,CUST001,John Doe,199.99
2023,Furniture,Chairs,CUST002,Jane Smith,89.99
2023,Office Supplies,Paper,CUST001,John Doe,9.99
