Importing packages

In [None]:
from datetime import datetime
from pyspark.sql.functions import col,regexp_replace, isnan, udf, year, round, col, when, substring_index
from pyspark.sql.types import IntegerType, StringType, StructType, StructField,FloatType,  DateType
import logging

# Setup logger
logging.basicConfig(filename='/dbfs/cluster-logs/error.log', level=logging.ERROR)

Loading data

In [None]:
#defining schema for product table

product_schema = StructType(fields=
                            [
                                StructField("product_id",StringType(),False),
                                StructField("category",StringType(),True),
                                StructField("sub_category",StringType(),True),
                                StructField("product_name",StringType(),True),
                                StructField("state",StringType(),True),
                                StructField("price_per_product",FloatType(),True)
                            ]
                            )

customer_schema = StructType(fields=
                            [
                                StructField("customer_id",StringType(),False),
                                StructField("customer_name",StringType(),True),
                                StructField("email",StringType(),True),
                                StructField("phone",StringType(),True),
                                StructField("address",StringType(),True),
                                StructField("segment",StringType(),True),
                                StructField("country",StringType(),True),
                                StructField("city",StringType(),True),
                                StructField("state",StringType(),True),
                                StructField("postal_code",StringType(),True),
                                StructField("region",StringType(),True)
                            ]
                            )

order_schema = StructType(fields=
                            [
                                StructField("row_id",IntegerType(),False),
                                StructField("order_id",StringType(),True),
                                StructField("order_date",DateType(),True),
                                StructField("ship_date",DateType(),True),
                                StructField("ship_mode",DateType(),True),
                                StructField("customer_id",StringType(),True),
                                StructField("product_id",StringType(),True),
                                StructField("quantity",IntegerType(),True),
                                StructField("price",FloatType(),True),
                                StructField("discount",FloatType(),True),
                                StructField("profit",FloatType(),True)
                            ]
                            )         

order_schema2 = StructType(fields=
                            [
                                StructField("Row ID",IntegerType(),False),
                                StructField("Order ID",StringType(),True),
                                StructField("Order Date",DateType(),True),
                                StructField("Ship Date",DateType(),True),
                                StructField("Ship Mode",DateType(),True),
                                StructField("Customer ID",StringType(),True),
                                StructField("Product ID",StringType(),True),
                                StructField("Quantity",IntegerType(),True),
                                StructField("Price",FloatType(),True),
                                StructField("Discount",FloatType(),True),
                                StructField("Profit",FloatType(),True)
                            ]
                            )                                                     


In [None]:
#order
df_order_raw = spark.read.format("json").option("multiline","true").load("/Volumes/test_databricks_workspace/pei_schema_source/pei_volume/Order.json")

#product
df_product_raw = spark.read.format("csv").schema(product_schema).option("header",True).load("/Volumes/test_databricks_workspace/pei_schema_source/pei_volume/Product.csv")

#customer
df_customer_raw = spark.read.format("com.crealytics.spark.excel").option("header", "true").option("treatEmptyValuesAsNulls", "false").option("inferSchema", "false").load("/Volumes/test_databricks_workspace/pei_schema_source/pei_volume/Customer.xlsx")


In [None]:
def save_as_table(df,save_path, mode, partition_cols=None):
    if(partition_cols==None):
        df.write \
            .mode(mode) \
            .saveAsTable(save_path)
    else:
        df.write \
            .partitionBy(partition_cols) \
            .mode(mode) \
            .saveAsTable(save_path)


#column renaming

In [None]:
df_order_raw_renamed = df_order_raw.select(col("Order ID").alias("order_id"), col("Order Date").alias("order_date"), col("Ship Date").alias('ship_date'), col("Ship Mode").alias("ship_mode"), col("Customer ID").alias("customer_id"), col("Product ID").alias("product_id"), col("Quantity"),col("price"), col("Discount"), col("profit"))

df_order_raw_renamed.show()

+--------------+----------+----------+--------------+-----------+---------------+--------+-------+--------+-------+
|      order_id|order_date| ship_date|     ship_mode|customer_id|     product_id|Quantity|  price|Discount| profit|
+--------------+----------+----------+--------------+-----------+---------------+--------+-------+--------+-------+
|CA-2016-122581| 21/8/2016| 25/8/2016|Standard Class|   JK-15370|FUR-CH-10002961|       7|573.174|     0.3| 63.686|
|CA-2017-117485| 23/9/2017| 29/9/2017|Standard Class|   BD-11320|TEC-AC-10004659|       4| 291.96|     0.0|102.186|
|US-2016-157490| 6/10/2016| 7/10/2016|   First Class|   LB-16795|OFF-BI-10002824|       4|     17|     0.7| -14.92|
|CA-2015-111703|  2/7/2015|  9/7/2015|Standard Class|   KB-16315|OFF-PA-10003349|       3| 15.552|     0.2| 5.6376|
|CA-2014-108903| 3/10/2014| 3/10/2014|      Same Day|   DO-13435|TEC-AC-10003023|       3|142.488|     0.2|   -3.0|
|CA-2016-117583|27/11/2016|30/11/2016|   First Class|   CB-12025|OFF-BI-

In [None]:
df_customer_renamed = df_customer_raw.select(col("customer id").alias("customer_id"), col("customer name").alias("customer_name"), col("email"), col("phone").cast("double"),col("address"), col("segment"), col("country"), col("state"), col("city"), col("postal code").alias("postal_code"), col("region"))


###saving the raw data as tables in bronze schema

In [None]:
#saving the data to bronze schema
df_write_config_raw = [
                    {
                        'dataframe':df_product_raw,
                        'path':'test_databricks_workspace.pei_bronze.product',
                        'mode':"append",
                        'partitionCols':None
                    },
                    {
                        'dataframe':df_order_raw_renamed,
                        'path':'test_databricks_workspace.pei_bronze.order',
                        'mode':"append",
                        'partitionCols':None
                    },
                    {
                        'dataframe':df_customer_renamed,
                        'path':'test_databricks_workspace.pei_bronze.customer',
                        'mode':"append",
                        'partitionCols':None
                    }
                   ]

for config in df_write_config_raw:
    save_as_table(config['dataframe'], config['path'], config['mode'],config['partitionCols'])

#reading data from raw tables

In [None]:
df_customer = spark.read.table("test_databricks_workspace.pei_bronze.customer")
df_product = spark.read.table("test_databricks_workspace.pei_bronze.product")
df_order = spark.read.table("test_databricks_workspace.pei_bronze.order")



##String to date Conversion of Order Df columns

In [None]:
string_date_udf =  udf (lambda x: datetime.strptime(x, '%d/%m/%Y'), DateType())
df_order_date_conversion = df_order.withColumn('order_date',string_date_udf(col('order_date'))).withColumn('ship_date', string_date_udf(col('ship_date')))

df_order_date_conversion.show()

##enriching order data

In [None]:
df_order_enriched = df_order_date_conversion.withColumn('year',year(col('order_date')))
df_order_enriched.show()

+--------------+----------+----------+--------------+-----------+---------------+--------+-------+--------+-------+----+
|      order_id|order_date| ship_date|     ship_mode|customer_id|     product_id|Quantity|  price|Discount| profit|year|
+--------------+----------+----------+--------------+-----------+---------------+--------+-------+--------+-------+----+
|CA-2016-122581|2016-08-21|2016-08-25|Standard Class|   JK-15370|FUR-CH-10002961|       7|573.174|     0.3| 63.686|2016|
|CA-2017-117485|2017-09-23|2017-09-29|Standard Class|   BD-11320|TEC-AC-10004659|       4| 291.96|     0.0|102.186|2017|
|US-2016-157490|2016-10-06|2016-10-07|   First Class|   LB-16795|OFF-BI-10002824|       4|     17|     0.7| -14.92|2016|
|CA-2015-111703|2015-07-02|2015-07-09|Standard Class|   KB-16315|OFF-PA-10003349|       3| 15.552|     0.2| 5.6376|2015|
|CA-2014-108903|2014-10-03|2014-10-03|      Same Day|   DO-13435|TEC-AC-10003023|       3|142.488|     0.2|   -3.0|2014|
|CA-2016-117583|2016-11-27|2016-

#cleaned up customer data

In [None]:
df_customer_modified = df_customer.withColumn("customer_name", 
                                             when(isnan(col("customer_name")), substring_index(col("email"), "@", 1))
                                             .otherwise(col("customer_name"))).withColumn("customer_name", regexp_replace(col("customer_name"), '[^a-zA-Z\s]',''))   

df_customer_modified.show()     

+-----------+-------------------+--------------------+-------------+--------------------+-----------+-------------+--------------+--------------+-----------+-------+
|customer_id|      customer_name|               email|        phone|             address|    segment|      country|         state|          city|postal_code| region|
+-----------+-------------------+--------------------+-------------+--------------------+-----------+-------------+--------------+--------------+-----------+-------+
|   NC-18535|      Nick Crebassa|rebeccamurphy656@...|         NULL|3071 Mitchell Isl...|  Corporate|United States|      Virginia|       Hampton|      23666|  South|
|   JK-15640|           Jim Kriz|kimberlymartin280...|3.713293549E9|30748 Sullivan Is...|Home Office|United States|    California|San Bernardino|      92404|   West|
|   BS-11365|       Bi l Shonely|moniquewright318@...|         NULL|31084 Clark Strea...|  Corporate|United States|      New York| New York City|      10011|   East|
|   

#Validating if the customer name has any regular expressions

In [None]:
#df_customer_modified.filter(col("customer_id")=="FG-14260").show()


#picking only required fields for join to optimize

In [None]:
df_customer_required = df_customer_modified.select(col('customer_id'),col('customer_name'),col('country').alias('customer_country'))

df_product_required = df_product.select(col('product_id'),col('category'),col('sub_category'))

In [None]:
df_joined = df_order_enriched.join(df_product_required, 'product_id').join(df_customer_required,'customer_id')
# df_joined = df_order_enriched.join(df_product_required,on='product_id',how='left')
# df_joined.show()

df_joined = df_joined.withColumn('profit',round(col('profit'),2))
display(df_joined)

customer_id,product_id,order_id,order_date,ship_date,ship_mode,Quantity,price,Discount,profit,year,category,sub_category,customer_name,customer_country
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States
JK-15370,FUR-CH-10002961,CA-2016-122581,2016-08-21,2016-08-25,Standard Class,7,573.174,0.3,63.69,2016,Furniture,Chairs,Jay Kimmel,United States


##saving data as tables in silver schema after enrichment

In [None]:
# saving enriched data to silver schema
df_write_config = [
                    {
                        'dataframe':df_product,
                        'path':'test_databricks_workspace.pei_silver.product',
                        'mode':"overwrite",
                        'partitionCols':None
                    },
                    {
                        'dataframe':df_customer_modified,
                        'path':'test_databricks_workspace.pei_silver.customer',
                        'mode':"overwrite",
                        'partitionCols':None
                    },
                    {
                        'dataframe':df_joined,
                        'path':'test_databricks_workspace.pei_silver.order',
                        'mode':"overwrite",
                        'partitionCols':["year","category"]
                    },
                   ]

for config in df_write_config:
    save_as_table(config['dataframe'], config['path'], config['mode'],config['partitionCols'])

# df_joined.write.format("delta").clusterBy("year", "category").saveAsTable("test_databricks_workspace.pei_schema.table22")

In [None]:
%fs ls dbfs:/

path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,1710652092000
dbfs:/Volume/,Volume/,0,0
dbfs:/Volumes/,Volumes/,0,0
dbfs:/cluster-logs/,cluster-logs/,0,1710653103000
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/local_disk0/,local_disk0/,0,1710729042000
dbfs:/volume/,volume/,0,0
dbfs:/volumes/,volumes/,0,0


#Aggregations on enriched and joined order table

In [None]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

#reading order table from silver schema
df_order = spark.read.table("test_databricks_workspace.pei_silver.order")

# Aggregations
windowSpec1 = Window.orderBy('year')
windowSpec2 = Window.orderBy('customer_id')
windowSpec3 = Window.orderBy('category')
windowSpec4 = Window.orderBy('sub_category')

df_agg1 = df_order.groupby('year').agg({"profit": "sum"}).withColumnRenamed("sum(profit)", "year_profits")
df_agg1 = df_agg1.withColumn("row_num", row_number().over(windowSpec1))

df_agg2 = df_order.groupby('customer_id').agg({"profit": "sum"}).withColumnRenamed("sum(profit)", "customer_profits")
df_agg2 = df_agg2.withColumn("row_num", row_number().over(windowSpec2))

df_agg3 = df_order.groupby('category').agg({"profit": "sum"}).withColumnRenamed("sum(profit)", "category_profits")
df_agg3 = df_agg3.withColumn("row_num", row_number().over(windowSpec3))

df_agg4 = df_order.groupby('sub_category').agg({"profit": "sum"}).withColumnRenamed("sum(profit)", "sub_category_profits")
df_agg4 = df_agg4.withColumn("row_num", row_number().over(windowSpec4))

# Merge DataFrames
merged_df = df_agg1.join(df_agg2, "row_num", "outer") \
                    .join(df_agg3, "row_num", "outer") \
                    .join(df_agg4, "row_num", "outer")

merged_df.write\
    .partitionBy("year","category") \
    .mode("overwrite") \
    .saveAsTable('test_databricks_workspace.pei_gold.order_aggregated_table_with_profits')



In [None]:
merged_df.show()

+-------+----+------------------+-----------+-------------------+---------------+------------------+------------+--------------------+
|row_num|year|      year_profits|customer_id|   customer_profits|       category|  category_profits|sub_category|sub_category_profits|
+-------+----+------------------+-----------+-------------------+---------------+------------------+------------+--------------------+
|      1|2014| 40975.53000000001|   AA-10315|            -273.39|      Furniture| 9329.440000000004| Accessories|   49795.31999999998|
|      2|2015| 65706.45999999988|   AA-10375|              277.4|Office Supplies|129458.78000000028|  Appliances|   23786.92000000003|
|      3|2016|  68161.4800000002|   AA-10480|  445.9699999999999|     Technology|163230.84000000005|         Art|   6441.229999999996|
|      4|2017|127175.59000000007|   AA-10645|             807.83|           NULL|              NULL|     Binders|            30331.69|
|      5|NULL|              NULL|   AB-10015|          

In [None]:
# Convert DataFrame to temp table in Spark
df_order.createOrReplaceTempView("final_table")

# SQL query to fetch sum of profits grouped by year and category
sql_query_year_category = '''
SELECT year, category, SUM(profit) AS sum_profits
FROM final_table
GROUP BY year, category 
ORDER BY year
'''

# Execute the SQL query
spark.sql(sql_query_year_category).show()

+----+---------------+-------------------+
|year|       category|        sum_profits|
+----+---------------+-------------------+
|2014|Office Supplies| 135983.22000000137|
|2014|      Furniture|-31047.960000000014|
|2014|     Technology| 140917.92000000124|
|2015|      Furniture| 20352.840000000153|
|2015|     Technology|  220943.1600000005|
|2015|Office Supplies|   152942.760000001|
|2016|     Technology| 146624.75999999963|
|2016|      Furniture|  46501.25999999992|
|2016|Office Supplies| 215842.86000000118|
|2017|     Technology|  470899.1999999956|
|2017|Office Supplies|   271983.840000003|
|2017|      Furniture| 20170.499999999858|
+----+---------------+-------------------+



In [None]:
# SQL query to fetch sum of profits grouped by year and customer
sql_query_year_customer = '''
SELECT year, customer_id, SUM(profit) AS sum_profits
FROM final_table
GROUP BY year, customer_id 
ORDER BY year
'''

# Execute the SQL query
spark.sql(sql_query_year_customer).show()

+----+-----------+-------------------+
|year|customer_id|        sum_profits|
+----+-----------+-------------------+
|2014|   AT-10435|  5.039999999999999|
|2014|   LC-17140|             -159.6|
|2014|   DB-12970| 231.90000000000003|
|2014|   MS-17710| 244.37999999999994|
|2014|   MF-17665|-2759.9399999999996|
|2014|   EH-13945|              132.9|
|2014|   AB-10105|            2992.38|
|2014|   TZ-21580|              793.8|
|2014|   BS-11365| 1264.1999999999998|
|2014|   NZ-18565|            1488.18|
|2014|   AS-10630| 431.69999999999993|
|2014|   NF-18385|            1120.44|
|2014|   PP-18955|            2834.34|
|2014|   LH-17020| -476.0400000000001|
|2014|   JD-16060|  50.88000000000001|
|2014|   BP-11095|             790.26|
|2014|   JH-15430| 112.73999999999998|
|2014|   ZD-21925| 152.94000000000003|
|2014|   CS-11950|-496.74000000000035|
|2014|   AS-10240| 224.09999999999997|
+----+-----------+-------------------+
only showing top 20 rows

