Import Packages

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
from pyspark.sql.window import Window

# Following packages are used for testing purposes
import pytest
import ipytest
ipytest.autoconfig()

Call Spark Session

In [2]:
spark = SparkSession.builder\
        .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")\
        .config("spark.sql.legacy.timeParserPolicy","LEGACY")\
        .config("spark.sql.warehouse.dir", "D:/Programs/Real time project/real_time_streamer_pyspark/hive/warehouse") \
        .config("spark.sql.catalogImplementation", "hive") \
        .enableHiveSupport() \
        .getOrCreate()

Read Data function using spark. raw data format, schema and dbfs path are parameters

In [3]:
def read_data(spark, format,schema, path):
    if format == "json":
        df = spark.read.format(format).schema(schema).option("multiline", "true").load(path) #schema is specified for reading json data.
    else:
        df = spark.read.format(format).option("header",True).option("inferSchema",True).load(path) #Used default inferSchema option to read data in csv and xlsx format
    return df

Spark Write Function on Hive table. Write mode, data format and table name are parameters

In [12]:
def write_function(df, mode, format, table_name):
    try:
        df.write \
            .mode(mode) \
            .format(format) \
            .save(table_name)
        print(f"DataFrame is written to {table_name}.")
    except Exception as e:
        print(f"An error occurred while writing the DataFrame: {e}")

Order enrichment function which is a joined dataset of Orders, Products and customers. 

In [5]:
# Profit column is rounded to 2 decimal points and Order Year column is created.
def order_enrichment(df_orders, df_customers, df_products):
    df_orders_sel = df_orders.withColumn("Order Year", year(to_timestamp(col("Order Date"), "dd/mm/yyyy")))\
                                                .withColumn("Profit", round(col("Profit"),2))
    df_cust_sel = df_customers.select(col("Customer ID"), col("Customer Name"), col("Country"))
    df_prod_sel = df_products.select(col("Product ID"), col("Category"), col("Sub-Category")).distinct()
    
    df_orders_enrich = df_orders_sel.join(df_cust_sel, ["Customer ID"], "inner").join(df_prod_sel, ["Product ID"], "left")
    
    return df_orders_enrich
    
# Bug with Product dataset
# Duplicate Product IDs - Same Product ID have multiple records with different name and location
# few missing Product ID's in product table but are in Orders

Function to aggregate the order_enriched table to calculate `Total Profit` based on different group by fields.
Final result is ordered by `Total Profit` in descending order

In [6]:
def profit(df_order_enriched, field):
    df = df_order_enriched.groupBy(field).agg(sum("Profit").alias("Total Profit"))
    df_round = df.withColumn("Total Profit", round(col("Total Profit"),2))
    return df_round.orderBy(col("Total Profit").desc())

I have used a TDD approach to develop the above functions based on the requirements provided. 2 Major functions are used and the same are tested using below functions.

In [None]:
%%ipytest
# pytest is the package used for testing purpose and ipytest is the package used to enable testing features in the databricks notebook.


# Testing function to test order_enrichment function

def test_order_enrichment(spark=spark):
    # dummy dataframes are created using spark which included necessary corner cases like multiple orders, customers, products, product table duplication, profit with long decimal point
    df_orders_raw_list = [( "1","order_id1","21/8/2016","25/8/2016","Standard Class","cust_id1","prod_id1","7","573.174","0.3","63.686"),
                          ( "2","order_id2","21/8/2017","25/8/2017","First Class","cust_id2","prod_id2","3","573.174","0.3","50.112")]
    
    df_customers_raw_list = [("cust_id1","cust_name1","email1","00000000","address1","Consumer","country1","city1","state1","80027.0","West"),
                           ("cust_id2","cust_name2","email1","00000000","address2","Consumer","country2","city2","state2","80027.0","West")]
    
    df_products_raw_list = [("prod_id1","Furniture","Chairs","prod_name1","New York","81.882"),
                            ("prod_id2","Technology","Accessories","prod_name2","New York","81.882"),
                           ("prod_id3","Technology","Accessories","prod_name3","New York","81.882")]
    
    df_orders_raw = spark.createDataFrame(df_orders_raw_list, ["Row ID","Order ID","Order Date","Ship Date","Ship Mode","Customer ID","Product ID","Quantity","Price","Discount","Profit"])
    df_customers_raw = spark.createDataFrame(df_customers_raw_list, ["Customer ID","Customer Name","email","phone","address","Segment","Country","City","State","Postal Code","Region"])
    df_products_raw = spark.createDataFrame(df_products_raw_list, ["Product ID","Category","Sub-Category","Product Name","State","Price per product"])

    
    # dummy dataframe are created with expected results based on the input data.
    df_enrichment_final_raw_list = [("prod_id1","cust_id1","1","order_id1","21/8/2016","25/8/2016","Standard Class","7","573.174","0.3","63.69","2016","cust_name1","country1","Furniture","Chairs"),
                                   ("prod_id2","cust_id2","2","order_id2","21/8/2017","25/8/2017","First Class","3","573.174","0.3","50.11","2017","cust_name2","country2","Technology","Accessories")]
    
    df_enrichment_final_expected = spark.createDataFrame(df_enrichment_final_raw_list,["Product ID","Customer ID","Row ID","Order ID","Order Date","Ship Date","Ship Mode","Quantity","Price","Discount","Profit","Order Year","Customer Name","Country","Category","Sub-Category"])

    # Calling the order_enrichment function and execute based on our dummy dataframe and fetch the result
    df_enrichment_final_result = order_enrichment(df_orders_raw, df_customers_raw, df_products_raw)

    # Checking if result table count is equal to expected table count
    assert df_orders_raw.count() == df_enrichment_final_result.count()

    # Checking if result table have customer name based on the ID and matched with raw data. 
    # Also checked if the Profit is rounded to 2 decimal points
    assert df_enrichment_final_result.select(col("Customer Name"),col("Profit")).where(col("Customer Name")=="cust_name1").collect()[0][1] == \
           df_orders_raw.select(col("Customer ID"),round(col("Profit"),2)).where(col("Customer ID")=="cust_id1").collect()[0][1]

    # Checking if the result table is entirely equal to the dummy expected table
    assert df_enrichment_final_result.orderBy(col("Row ID").asc()).exceptAll(df_enrichment_final_expected.orderBy(col("Row ID").asc())).count() == 0

In [None]:
%%ipytest

def test_profit(spark=spark):
    # dummy data for order enriched table is created
    df_order_enriched_raw_list = [("prod_id1","cust_id1","1","order_id1","21/8/2016","25/8/2016","Standard Class","7","573.174","0.3","63.69","2016","cust_name1","country1","Furniture","Chairs"),
                                   ("prod_id2","cust_id2","2","order_id2","21/8/2017","25/8/2017","First Class","3","573.174","0.3","50.11","2017","cust_name2","country2","Technology","Accessories"),
                                   ("prod_id3","cust_id2","3","order_id3","21/8/2017","25/8/2017","First Class","3","573.174","0.3","50.11","2017","cust_name2","country2","Technology","Accessories")]
    
    df_order_enriched_raw = spark.createDataFrame(df_order_enriched_raw_list,["Product ID","Customer ID","Row ID","Order ID","Order Date","Ship Date","Ship Mode","Quantity","Price","Discount","Profit","Order Year","Customer Name","Country","Category","Sub-Category"])
    
    # dummy data was created with expected results based on the input dummy data. 
    df_profit_per_year_final_expected = spark.createDataFrame([("2016", "63.69"),("2017", "100.22")], ["Order Year","Total Profit"])
    df_profit_per_category_final_expected = spark.createDataFrame([("Furniture", "63.69"),("Technology", "100.22")], ["Category","Total Profit"])
    df_profit_per_sub_category_final_expected = spark.createDataFrame([("Chairs", "63.69"),("Accessories", "100.22")], ["Sub-Category","Total Profit"])
    df_profit_per_customer_final_expected = spark.createDataFrame([("cust_name1", "63.69"),("cust_name2", "100.22")], ["Customer Name","Total Profit"])

    # Calling the profit function and execute based on our dummy raw  order enriched and fetch the result
    df_profit_per_year_final_result = profit(df_order_enriched=df_order_enriched_raw, field="Order Year")
    df_profit_per_category_final_result = profit(df_order_enriched=df_order_enriched_raw, field="Category")
    df_profit_per_sub_category_final_result = profit(df_order_enriched=df_order_enriched_raw, field="Sub-Category")
    df_profit_per_customer_final_result = profit(df_order_enriched=df_order_enriched_raw, field="Customer Name")

    # Checking if the Total Profit in the aggregated function is equal to total profit in raw order data
    assert df_profit_per_year_final_result.select(sum("Total Profit")).collect()[0][0] == df_order_enriched_raw.select(sum("Profit")).collect()[0][0]

    # Checking if Result table is equal to Expected table. This is done for all 4 aggregations.
    assert df_profit_per_year_final_result.exceptAll(df_profit_per_year_final_expected.orderBy(col("Total Profit").desc())).count() == 0
    assert df_profit_per_category_final_result.exceptAll(df_profit_per_category_final_expected.orderBy(col("Total Profit").desc())).count() == 0
    assert df_profit_per_sub_category_final_result.exceptAll(df_profit_per_sub_category_final_expected.orderBy(col("Total Profit").desc())).count() == 0
    assert df_profit_per_customer_final_result.exceptAll(df_profit_per_customer_final_expected.orderBy(col("Total Profit").desc())).count() == 0

Schema to read json data. I have used default inderschema option in spark to read data from other data formats like xlsx and csv.

In [7]:
schema = StructType([StructField("Row ID", IntegerType(), True),
                      StructField("Order ID", StringType(), True),
                      StructField("Order Date", StringType(), True),
                      StructField("Ship Date", StringType(), True),
                      StructField("Ship Mode", StringType(), True),
                      StructField("Customer ID", StringType(), True),
                      StructField("Product ID", StringType(), True),
                      StructField("Quantity", IntegerType(), True),
                      StructField("Price", FloatType(), True),
                      StructField("Discount", FloatType(), True),
                      StructField("Profit", FloatType(), True)])

Read raw data using read_data function. Spark Session, data format, schema, path to file are parameters.

In [8]:
df_customers = read_data(spark, format="com.crealytics.spark.excel", schema=None, path="D:/Programs/Real time project/real_time_streamer_pyspark/order_data/Customer.xlsx")
df_products = read_data(spark, format="csv", schema=None, path="D:/Programs/Real time project/real_time_streamer_pyspark/order_data/Product.csv")
df_orders = read_data(spark, format="json",schema=schema,path="D:/Programs/Real time project/real_time_streamer_pyspark/order_data/Order.json")

Data transformations

In [9]:
# Call function `order_enrichment`. raw dataframes of orders, customers, products are specified as parameters.
df_order_enriched = order_enrichment(df_orders, df_customers, df_products)

In [10]:
# Call function `profit` to calulate profit by specific fields and stored as different dataframes. 
# df_order_enriched and field column (Year, Category, Sub-category, Customer Name) for grouping is specified.
df_profit_by_year = profit(df_order_enriched=df_order_enriched, field="Order Year")
df_profit_by_product_category =  profit(df_order_enriched=df_order_enriched, field="Category")
df_profit_by_product_sub_category = profit(df_order_enriched=df_order_enriched, field="Sub-Category")
df_profit_by_customer = profit(df_order_enriched=df_order_enriched, field="Customer Name")

Write data to tables

In [21]:
# Call function `write_function` to write the required dataframes to tables using Hive.
# I have used parquet as dataformat to write on for better compressibility. 
# Respective Table name is also specified along with write mode as overwrite
# Raw tables, order enriched table and profit by category tables are created.
write_function(df=df_orders,  mode="overwrite", format="parquet", table_name="Sink/Orders")
write_function(df=df_products,  mode="overwrite", format="parquet", table_name="Sink/Products")
write_function(df=df_customers,  mode="overwrite", format="parquet", table_name="Sink/Customers")

write_function(df=df_order_enriched,  mode="overwrite", format="parquet", table_name="Sink/Orders_enriched")

write_function(df=df_profit_by_year,  mode="overwrite", format="parquet", table_name="Sink/Profit_by_Year")
write_function(df=df_profit_by_product_category,  mode="overwrite", format="parquet", table_name="Sink/Profit_by_Product_Category")
write_function(df=df_profit_by_product_sub_category,  mode="overwrite", format="parquet", table_name="Sink/Profit_by_Product_Sub_Category")
write_function(df=df_profit_by_customer,  mode="overwrite", format="parquet", table_name="Sink/Profit_by_Customer")

DataFrame is written to Sink/Orders.
DataFrame is written to Sink/Products.
DataFrame is written to Sink/Customers.
DataFrame is written to Sink/Orders_enriched.
DataFrame is written to Sink/Profit_by_Year.
DataFrame is written to Sink/Profit_by_Product_Category.
DataFrame is written to Sink/Profit_by_Product_Sub_Category.
DataFrame is written to Sink/Profit_by_Customer.


SQL statement to fetch respective tables. 

In [30]:
Profit_by_Year_output = spark.read.format("parquet").load("D:/Programs/Real time project/real_time_streamer_pyspark/Sink/Profit_by_Year")
Profit_by_Customer_output = spark.read.format("parquet").load("D:/Programs/Real time project/real_time_streamer_pyspark/Sink/Profit_by_Customer")
Orders_enriched_output = spark.read.format("parquet").load("D:/Programs/Real time project/real_time_streamer_pyspark/Sink/Orders_enriched")

Profit_by_Year_output.createOrReplaceTempView("Profit_by_Year")
Profit_by_Customer_output.createOrReplaceTempView("Profit_by_Customer")
Orders_enriched_output.createOrReplaceTempView("Orders_enriched")

In [31]:
# I wrote query inside spark.sql here, We can also switch databricks cell to SQL mode to write query without using spark 

# Query to fetch Profit by Year
spark.sql("""select * from Profit_by_Year;""").show()

+----------+------------+
|Order Year|Total Profit|
+----------+------------+
|      2017|   111084.87|
|      2016|    65073.28|
|      2015|    63073.09|
|      2014|    39185.71|
+----------+------------+



In [34]:
# Query to fetch Profit by Customer
spark.sql("""select * from Profit_by_Customer order by `Total Profit` desc;""").show()

+--------------------+------------+
|       Customer Name|Total Profit|
+--------------------+------------+
|        Frank Hawley|    13400.31|
|        Tamara Chand|     8981.32|
|        Raymond Buch|     6976.36|
|        Sanjit Chand|      5757.3|
|        Hunter Lopez|     5622.43|
|        Patrick Ryan|      5596.2|
|       Adrian Barton|     5444.97|
|        Tom Ashbrook|     4703.72|
|Christopher Martinez|     3900.04|
|     Penelope Sewall|     3183.77|
|                NULL|      3126.8|
|       Keith Dawkins|     3038.92|
|         Andy Reiter|     2884.61|
|       Daniel Raglin|     2869.08|
|    Tom Boeckenhauer|     2798.06|
|        Nathan Mautz|      2751.7|
|        Sanjit Engle|     2650.68|
|    Bi 8761l Shonely|     2616.41|
|         Harry Marie|     2438.07|
|        Todd Sumrall|     2371.72|
+--------------------+------------+
only showing top 20 rows



In [44]:
# Query to fetch Profit by Year and Category
spark.sql("""select `Order Year`, `Category`, sum(Profit) 
            from Orders_enriched 
            group by `Order Year`,`Category`
            order by `Order Year` desc;""").show()

+----------+---------------+-------------------+
|Order Year|       Category|        sum(Profit)|
+----------+---------------+-------------------+
|      2017|Office Supplies|  44273.04976409674|
|      2017|     Technology|  63281.79999738932|
|      2017|      Furniture| 3041.5200760364532|
|      2017|           NULL|  488.5000163912773|
|      2016|     Technology| 23223.780251443386|
|      2016|           NULL|  404.4500068426132|
|      2016|      Furniture|  6889.499873638153|
|      2016|Office Supplies|  34555.55016118288|
|      2015|           NULL|  583.1600017547607|
|      2015|Office Supplies|  24519.38986013457|
|      2015|      Furniture|  3027.169886946678|
|      2015|     Technology|  34943.36974078417|
|      2014|     Technology|  21493.33014243841|
|      2014|           NULL|  523.1099948883057|
|      2014|      Furniture|-5331.0600063204765|
|      2014|Office Supplies| 22500.330029863864|
+----------+---------------+-------------------+



In [45]:
# Query to fetch Profit by Customer and Year
spark.sql("""select `Customer Name`, `Order Year`, sum(Profit) 
             from Orders_enriched 
             group by `Customer Name`, `Order Year`
             order by `Order Year` desc;""").show()

+-------------------+----------+-------------------+
|      Customer Name|Order Year|        sum(Profit)|
+-------------------+----------+-------------------+
|    Laura Armstrong|      2017| 16.070000380277634|
|      Filia McAdams|      2017|-24.750000476837158|
| Zuschuss Donatelli|      2017|  16.59000015258789|
|       Jeremy Farry|      2017| 36.979999363422394|
|       Greg Guthrie|      2017|-10.070001244544983|
|          Janet Lee|      2017|  88.18999862670898|
|           Amy Hunt|      2017| 147.86999988555908|
|    Neola Schneider|      2017| -31.86000108718872|
|       Sonia Cooley|      2017|  70.27000188827515|
|    Erica Hernandez|      2017|  144.2900037765503|
|     Lena Hernandez|      2017|  65.21999931335449|
|       Lena Radford|      2017| -272.5799865722656|
|     Pamela Coakley|      2017|-0.9900000095367432|
|     Vicky Freymann|      2017|  62.47999882698059|
|     Parhena Norris|      2017| 48.350001096725464|
|Rick Wi 4567@#$lson|      2017| 1660.85002088