In [1]:
customer_schema='customer_id string,customer_name string,is_premium_customer string,sign_up_date string'
customer_df=spark.read.format('csv').option('header',True).schema(customer_schema).load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/raw/customers')
display(customer_df)


StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 3, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 18ecf0ee-8c98-4c6d-9f30-f4a23fa0ee1b)

In [10]:
from pyspark.sql.functions import *
customer_silver_df=customer_df.withColumn('customer_name',initcap(trim('customer_name')))\
.withColumn('is_premium_customer',when(lower(col('is_premium_customer')).isin('yes','y'),True).otherwise(False))\
.withColumn('sign_up_date',to_date('sign_up_date','yyyy-MM-dd'))
customer_silver_df.write.format('delta').save('Files/silver/customers')

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 12, Finished, Available, Finished, False)

In [12]:
order_schema='order_id string,customer_id string,order_date string,shipped_date string,delivered_date string,price double,region string,order_channel string,referred_by string'
order_df=spark.read.format('csv').option('header',True).schema(order_schema).load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/raw/orders')
display(order_df)

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 14, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, ceec3da5-3cfb-4591-8dd2-8489877a354e)

In [19]:
from pyspark.sql.functions import *
order_silver_df=order_df.withColumn('order_date',to_date('order_date','yyyy/MM/dd'))\
.withColumn('order_date',date_format('order_date','yyyy-MM-dd'))\
.withColumn('shipped_date',to_date('shipped_date','yyyy/MM/dd'))\
.withColumn('shipped_date',date_format('shipped_date','yyyy-MM-dd'))\
.withColumn('delivered_date',to_date('delivered_date','yyyy/MM/dd'))\
.withColumn('delivered_date',date_format('delivered_date','yyyy-MM-dd'))\
.withColumn('delivery_delay',datediff(col('delivered_date'),col('order_date')))\
.withColumn('region',initcap(trim('region')))\
.withColumn('order_channel',initcap(trim('order_channel')))\
.withColumn('referred_by',initcap(trim('referred_by')))
order_silver_df.write.format('delta').save('Files/silver/orders')

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 21, Finished, Available, Finished, False)

In [50]:
product_schema='product_id string,category string,carrier string,shipment_cost double,is_discounted string'
products_df=spark.read.format('csv').option('header',True).schema(product_schema).load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/raw/products')\
.withColumn('category',initcap(trim('category')))\
.withColumn('carrier',initcap(trim('carrier')))\
.withColumn('is_discounted',when(lower(col('is_discounted')).isin('yes','y'),True).otherwise(False))
products_df.write.format('delta').save('Files/silver/products')

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 52, Finished, Available, Finished, False)

In [22]:
order_item_schema='order_id string,product_id string,quantity int'
order_item_df=spark.read.format('csv').option('header',True).schema(order_item_schema).load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/raw/order_items')
display(order_item_df)

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 24, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 9f812c1f-f73a-4f73-8ba6-5e245ab02f33)

In [25]:
from pyspark.sql.functions import *
order_item_silver_df=order_item_df.filter((col('quantity')>0)&(col('quantity').isNotNull()))
order_item_silver_df.write.format('delta').save('Files/silver/order_item')

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 27, Finished, Available, Finished, False)

In [52]:
customer_silver_df1=spark.read.format('delta').load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/silver/customers')
order_item_silver_df1=spark.read.format('delta').load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/silver/order_item')
products_silver_df1=spark.read.format('delta').load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/silver/products')
orders_silver_df1=spark.read.format('delta').load('abfss://workspace_project4@onelake.dfs.fabric.microsoft.com/lakehouse4.Lakehouse/Files/silver/orders')

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 54, Finished, Available, Finished, False)

In [53]:
combined_df=orders_silver_df1.join(customer_silver_df1,orders_silver_df1.customer_id==customer_silver_df1.customer_id,how='left')\
.join(order_item_silver_df1,orders_silver_df1.order_id==order_item_silver_df1.order_id,how='left')\
.join(products_silver_df1,order_item_silver_df1.product_id==products_silver_df1.product_id,how='left')\
.drop(customer_silver_df1.customer_id,order_item_silver_df1.order_id,products_silver_df1.product_id)
combined_df.write.format('delta').save('Files/gold/combined_gold_df')

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 55, Finished, Available, Finished, False)

In [58]:
gold_metrics_df=combined_df.groupBy('customer_id','customer_name','is_premium_customer','region','order_channel').agg(
    countDistinct('order_id').alias('total_orders'),sum('quantity').alias('total_quantity'),
    sum('price').alias('total_revenue'),avg('delivery_delay').alias('avg_delivery_delay'),
    countDistinct(
        when(col('delivery_delay') > 3, col('order_id'))
    ).alias('delayed_orders_count'),round(sum('shipment_cost'),2).alias('total_shipment_cost'),
    round(sum(when(col('is_discounted')==True,col('price')).otherwise(lit(0))),2).alias('revenue_from_discount'),
    max('delivery_delay').alias('max_delivery_delay')
).orderBy(col('customer_id'))
gold_metrics_df.write.format('delta').save('Files/gold/gold_metrics_df')

StatementMeta(, 3df61ace-8b62-45a6-80e4-541561024c16, 60, Finished, Available, Finished, False)