In [0]:
%pip install openpyxl
%pip install pytest

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


1. Read the provided files into dataframes

In [0]:
import pandas as pd
import json
from datetime import datetime

# Load Orders JSON file
with open('/Volumes/workspace/default/temp/Orders.json', 'r') as f:
    orders_data = json.load(f)
orders_df = pd.DataFrame(orders_data)

# Load Products CSV
products_df = pd.read_csv('/Volumes/workspace/default/temp/Products.csv')

# Load Customer Excel file  
customers_df = pd.read_excel('/Volumes/workspace/default/temp/Customer.xlsx')


In [0]:
customers_df= customers_df.drop('phone', axis=1)  # dropping column with bad data (unneccasry, as not being used anywhere)

2. Data Cleaning and Date Processing

In [0]:
# Parse date function
def parse_date(date_str):
    try:
        return pd.to_datetime(date_str, format='%d/%m/%Y')
    except:
        return pd.NaT

# Transform dates and extract year
orders_df['Order Date'] = orders_df['Order Date'].apply(parse_date)
orders_df['Ship Date'] = orders_df['Ship Date'].apply(parse_date)
orders_df['Year'] = orders_df['Order Date'].dt.year


In [0]:

%sql
Drop Table orders;
Drop Table products;
Drop Table customers;



In [0]:
from pyspark.sql.functions import trim, regexp_replace, when, col


# save raw tables
#Rename Columns to remove spaces

orders_df = orders_df.rename(columns=lambda x: x.replace(" ", "_"))
products_df = products_df.rename(columns=lambda x: x.replace(" ", "_"))
customers_df = customers_df.rename(columns=lambda x: x.replace(" ", "_"))

spark_orders_df = spark.createDataFrame(orders_df)
spark_products_df = spark.createDataFrame(products_df)
spark_customers_df = spark.createDataFrame(customers_df)

spark_orders_df = spark_orders_df.dropna()
spark_customers_df = spark_customers_df.dropna()

spark_customers_df = spark_customers_df.withColumn(
    "Customer_Name",
    trim(
        regexp_replace(
            regexp_replace("Customer_Name", "[^a-zA-Z ]", ""),  # remove special chars and digits from customer name
            " +",  # regex for one or more spaces
            " "    # replace with single space
        )
    )
)



spark_orders_df.write.mode("overwrite").saveAsTable("orders")
spark_products_df.write.mode("overwrite").saveAsTable("products")
spark_customers_df.write.mode("overwrite").saveAsTable("customers")

3. Unit Tests For Data Cleaning
Test the Data Cleaning logic, including column renaming and handling missing values

In [0]:
import pytest

 

@pytest.fixture

def sample_data():

    # Sample Orders Data

    orders_data = [

        {"Order_ID": 1, "Customer_ID": 101, "Product_ID": 201, "Order_Date": "2023-01-01", "Profit": 100.5},

        {"Order_ID": 2, "Customer_ID": 102, "Product_ID": 202, "Order_Date": "2023-02-01", "Profit": -50.0},

        {"Order_ID": 3, "Customer_ID": 101, "Product_ID": 203, "Order_Date": "2023-03-01", "Profit": 75.0},

    ]

    orders_df = pd.DataFrame(orders_data)

 

    # Sample Products Data

    products_data = [

        {"Product_ID": 201, "Product_Name": "Product A", "Category": "Category 1", "Sub-Category": "Sub 1"},

        {"Product_ID": 202, "Product_Name": "Product B", "Category": "Category 2", "Sub-Category": "Sub 2"},

        {"Product_ID": 203, "Product_Name": "Product C", "Category": "Category 1", "Sub-Category": "Sub 1"},

    ]

    products_df = pd.DataFrame(products_data)

 

    # Sample Customers Data

    customers_data = [

        {"Customer_ID": 101, "Customer_Name": "Customer A", "Country": "USA"},

        {"Customer_ID": 102, "Customer_Name": "Customer B", "Country": "Canada"},

    ]

    customers_df = pd.DataFrame(customers_data)


    return orders_df, products_df, customers_df

    def test_column_renaming(sample_data):



     # Test that column renaming works correctly.

     orders_df, products_df, customers_df = sample_data

    renamed_orders_df = orders_df.rename(columns=lambda x: x.replace(" ", "_"))

    assert "Order_ID" in renamed_orders_df.columns, "Column renaming failed for Orders DataFrame."

 

def test_dropna(sample_data):

    """

    Test that rows with missing values are dropped correctly.

    """

    orders_df, _, _ = sample_data

    spark_orders_df = spark.createDataFrame(orders_df)

    cleaned_orders_df = spark_orders_df.dropna()

    assert cleaned_orders_df.count() == spark_orders_df.count(), "Dropna failed to clean missing rows."

4. Create tables with additional details 

In [0]:
from pyspark.sql import functions as F

# Aggregate customer metrics - To get order information
customer_metrics = spark_orders_df.groupby('Customer_ID').agg(
    F.countDistinct('Order_ID').alias('Total_Orders'),
    F.sum('Price').alias('Total_Sales'),  # Renaming
    F.sum('Profit').alias('Total_Profit'), # Renaming
    F.sum('Quantity').alias('Total_Quantity'),
    F.min('Order_Date').alias('First_Order_Date'),
    F.max('Order_Date').alias('Last_Order_Date')
)

# Round the numeric columns
customer_metrics = customer_metrics.withColumn('Total_Sales', F.round('Total_Sales', 2)) \
                                   .withColumn('Total_Profit', F.round('Total_Profit', 2)) \
                                   .withColumn('Total_Quantity', F.round('Total_Quantity', 2))

# Join with customer details, Left join with Customer and Customer Metrics  
enriched_customers = spark_customers_df.join(
    customer_metrics,
    on='Customer_ID',
    how='left'
)

display(enriched_customers)



Customer_ID,Customer_Name,email,address,Segment,Country,City,State,Postal_Code,Region,Total_Orders,Total_Sales,Total_Profit,Total_Quantity,First_Order_Date,Last_Order_Date
RD-19585,Rob Dowd,danielleware947@gmail.com,"1055 Leslie Squares Apt. 640 North Jacob, AZ 25423",Consumer,United States,Dubuque,Iowa,52001,Central,10,2912.66,734.52,73,2014-04-02T00:00:00.000Z,2017-07-03T00:00:00.000Z
MM-18055,Michelle Moray,andrewhays420@gmail.com,"1127 Cole Skyway Lake Steven, OK 62340",Consumer,United States,San Francisco,California,94110,West,8,2749.86,-520.46,73,2014-04-11T00:00:00.000Z,2017-10-30T00:00:00.000Z
FO-14305,Frank Olsen,kendraholder796@gmail.com,"0543 Martinez Mount Suite 497 South Johnfort, NM 85655",Consumer,United States,Philadelphia,Pennsylvania,19143,East,10,2678.43,215.58,60,2014-06-09T00:00:00.000Z,2017-11-22T00:00:00.000Z
MP-17470,Mark Packer,nicholasrobinson191@gmail.com,"143 Kyle Throughway Suite 713 North Jacobville, SC 41104",Home Office,United States,New York City,New York,10035,East,7,3205.77,600.3,77,2014-03-30T00:00:00.000Z,2016-01-11T00:00:00.000Z
CM-12115,Chad McGuire,sharonwarner980@gmail.com,"050 Anna Camp South Jonathan, WY 45473",Consumer,United States,New York City,New York,10011,East,4,1661.61,408.58,35,2015-03-29T00:00:00.000Z,2017-12-22T00:00:00.000Z
KN-16705,Kristina Nunn,sabrinahayes269@gmail.com,"04950 Joseph Meadow East Kyleville, FM 82237",Home Office,United States,Sparks,Nevada,89431,West,8,2280.58,329.77,55,2014-03-02T00:00:00.000Z,2017-09-23T00:00:00.000Z
JG-15310,Jason Gross,danielpaul555@gmail.com,"04852 Wise Row Davidfurt, AR 32041",Corporate,United States,Newark,Ohio,43055,East,6,2240.58,3.59,46,2016-03-03T00:00:00.000Z,2017-12-28T00:00:00.000Z
VM-21685,Valerie Mitchum,pamelathompson911@gmail.com,"0592 James Landing Apt. 950 East Steven, NJ 78538",Home Office,United States,Seattle,Washington,98105,West,7,2454.12,513.62,37,2014-04-07T00:00:00.000Z,2017-11-06T00:00:00.000Z
TS-21085,Thais Sissman,jamesjohnson660@gmail.com,"04890 Sweeney Turnpike East Mariaside, DC 09754",Consumer,United States,Ormond Beach,Florida,32174,South,2,4.83,-3.32,4,2015-07-19T00:00:00.000Z,2017-01-07T00:00:00.000Z
FM-14290,Frank Merwin,denisecook866@gmail.com,"143 Danielle Route New Seanburgh, FM 77823",Home Office,United States,Quincy,Massachusetts,2169,East,9,3736.23,197.89,79,2014-08-26T00:00:00.000Z,2017-11-24T00:00:00.000Z


In [0]:
product_metrics = spark_orders_df.groupby('Product_ID').agg(
    F.countDistinct('Order_ID').alias('Total_Orders'),
    F.sum('Price').alias('Total_Sales'),
    F.sum('Profit').alias('Total_Profit'),
    F.sum('Quantity').alias('Total_Quantity_Sold'),
    F.countDistinct('Customer_ID').alias('Unique_Customers')
)


product_metrics = product_metrics.withColumn('Total_Sales', F.round('Total_Sales', 2)) \
                                   .withColumn('Total_Profit', F.round('Total_Profit', 2))


# Join with product details, Left join 
enriched_products = spark_products_df.join(product_metrics,
                                    on='Product_ID',
                                    how='left')


display(enriched_products)




Enriched Orders

In [0]:
# Join orders with customer and product details
from pyspark.sql.functions import round
spark_orders_df = spark_orders_df.withColumn('Profit', round(spark_orders_df['Profit'], 2))

enriched_orders = spark_orders_df.join(
    enriched_customers.select('Customer_ID', 'Customer_Name', 'Country'), 
    on='Customer_ID', how='inner'
).join(
    enriched_products.select('Product_ID', 'Product_Name', 'Category', 'Sub-Category'), 
    on='Product_ID', how='inner'
)

display(enriched_orders)



In [0]:
enriched_orders.write.mode("overwrite").saveAsTable("enriched_orders")



5. Unit Tests For Transformations

In [0]:
"""

## Unit Tests for Transformations

Test the transformations applied to the data, including aggregations and joins.

"""

 

def test_customer_metrics(sample_data):

    """

    Test customer metrics aggregation.

    """

    orders_df, _, _ = sample_data

    spark_orders_df = spark.createDataFrame(orders_df)

    customer_metrics = spark_orders_df.groupby('Customer_ID').agg(

        F.countDistinct('Order_ID').alias('Total_Orders'),

        F.sum('Profit').alias('Total_Profit')

    )

    assert customer_metrics.count() == 2, "Customer metrics aggregation failed."

 

def test_enriched_customers(sample_data):

    """

    Test the join between customers and aggregated metrics.

    """

    orders_df, _, customers_df = sample_data

    spark_orders_df = spark.createDataFrame(orders_df)

    spark_customers_df = spark.createDataFrame(customers_df)

    customer_metrics = spark_orders_df.groupby('Customer_ID').agg(

        F.countDistinct('Order_ID').alias('Total_Orders'),

        F.sum('Profit').alias('Total_Profit')

    )

    enriched_customers = spark_customers_df.join(customer_metrics, on='Customer_ID', how='left')

    assert enriched_customers.count() == 2, "Join between customers and metrics failed."

6. Calculate Profit

In [0]:
from pyspark.sql import functions as F

# Annual profit analysis
profit_by_year = enriched_orders.groupby('Year').agg(
    F.sum('Profit').alias('Total_Profit')
)

# Product Category profit analysis
profit_by_category = enriched_orders.groupby('Category').agg(
    F.sum('Profit').alias('Total_Profit')
)

# Product Sub-Category profit analysis
profit_by_sub_category = enriched_orders.groupby('Sub-Category').agg(
    F.sum('Profit').alias('Total_Profit')
)

# Product Sub-Category profit analysis
profit_by_cust = enriched_orders.groupby('Customer_Name').agg(
    F.sum('Profit').alias('Total_Profit')
)

display(profit_by_year)
display(profit_by_category)
display(profit_by_sub_category)
display(profit_by_cust)




In [0]:
%sql
-- Profit by year
SELECT Year, SUM(Profit) AS Total_Profit
FROM enriched_orders
GROUP BY Year;




In [0]:
%sql

-- Profit by year and product category
SELECT Year, Category, SUM(Profit) AS Total_Profit
FROM enriched_orders
GROUP BY Year, Category;




In [0]:
%sql

-- Profit by customer
SELECT Customer_Name, SUM(Profit) AS Total_Profit
FROM enriched_orders
GROUP BY Customer_Name;




In [0]:
%sql

-- Profit by customer and year
SELECT Customer_Name, Year, SUM(Profit) AS Total_Profit
FROM enriched_orders
GROUP BY Customer_Name, Year;



In [0]:
pip install pytest pytest-ipynb

Collecting pytest-ipynb
  Downloading pytest-ipynb-1.1.1.tar.gz (3.7 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting runipy (from pytest-ipynb)
  Downloading runipy-0.1.5.tar.gz (24 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting Jinja2>=2.7.2 (from runipy->pytest-ipynb)
  Downloading jinja2-3.1.6-py3-none-any.whl.metadata (2.9 kB)
Collecting nbconvert>=4.0.0 (from runipy->pytest-ipynb)
  Downloading nbconvert-7.16.6-py3-none-any.whl.metadata (8.5 kB)
Collecting nbformat>=4.0.0 (from runipy->pytest-ipynb)
  Downloading nbformat-5.10.4-py3-none-any.whl.metadata (3.6 kB)
Collecting MarkupSafe>=2.0 (from Jinja2>=2.7.2->runipy->pytest-ipynb)
  Downloading MarkupSafe-3.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (4.0 kB)
Collecting beautifulsoup4 (from nbconvert>=4.0.0->runipy->pytest-ipynb)
  Downloading beautifulsoup4-4.13.