In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "c840e426-a13c-466b-98fd-d6685dc0986e", 
    "fs.azure.account.oauth2.client.secret": "veF8Q~UHz~IxXvmrGMYCQgQpYcZxgTkuiRPrEbVN",  
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/f3222e0f-e9ce-4895-ae9a-0d461ac48fe8/oauth2/token"    
}

# mounting gold container
dbutils.fs.mount(
    source = "abfss://gold@gsynergyrgstorage.dfs.core.windows.net/",
    mount_point = "/mnt/gold",
    extra_configs = configs
)

True

In [0]:
#define paths
silver_layer_path = "/mnt/silver"
gold_layer_path = "/mnt/gold"

In [0]:
# Example: Read facttable from Silver Layer
facttransactions_df = spark.read.format("delta").load(f"{silver_layer_path}/facttransactions")
factavgcost_df = spark.read.format("delta").load(f"{silver_layer_path}/factavgcost")

# Example: Read hierarchial table from Silver Layer
hierclnd_df = spark.read.format("delta").load(f"{silver_layer_path}/hierclnd")
hierprod_df = spark.read.format("delta").load(f"{silver_layer_path}/hierprod")
hierinvloc_df = spark.read.format("delta").load(f"{silver_layer_path}/hierinvloc")
hierinvstatus_df = spark.read.format("delta").load(f"{silver_layer_path}/hierinvstatus")
hierpossite_df = spark.read.format("delta").load(f"{silver_layer_path}/hierpossite")
hierpricestate_df = spark.read.format("delta").load(f"{silver_layer_path}/hierpricestate")
hierrtlloc_df = spark.read.format("delta").load(f"{silver_layer_path}/hierrtlloc")
hierhldy_df = spark.read.format("delta").load(f"{silver_layer_path}/hierhldy")

#### Creating Views and transformation logic

In [0]:
# Join facttransactions_df with hierclnd_df to get fsclwk_id
joined_df = facttransactions_df.join(
    hierclnd_df,
    facttransactions_df.fscldt_id == hierclnd_df.fscldt_id,
    "inner"
)

# Aggregate the data
mview_weekly_sales_df = joined_df.groupBy(
    "pos_site_id",
    "sku_id",
    "fsclwk_id",
    "price_substate_id",
    "type"
).agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

# Display the result
mview_weekly_sales_df.display()

pos_site_id,sku_id,fsclwk_id,price_substate_id,type,total_sales_units,total_sales_dollars,total_discount_dollars
129,0787650000,201809,FP,FP,8,72.0,24.0
CATPROSP,0882690000,201817,FP,FP,1,18.95,0.0
CATMAIN,2AR7130901,201852,FP,FP,9,809.55,0.0
INETMAIN,0829590000,201907,FP,FP,9,526.43,13.12
181,0885190000,201920,FP,FP,2,36.0,0.0
CATMAIN,1020807000,201925,FP,FP,105,701.96,27.79
148,0882890000,201929,FP,FP,1,18.95,0.0
164,0574390000,201818,FP,FP,1,44.95,0.0
136,3668931802,201826,FP,FP,1,58.95,0.0
CATPROSP,2AT5120701,201842,FP,FP,3,209.85,0.0


In [0]:
# Write the initial table to Gold Layer
mview_weekly_sales_df.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold_layer_path}/mview_weekly_sales") \
    .saveAsTable("gold_mview_weekly_sales")


In [0]:
# monthly sales aggregation table
monthly_sales_df = joined_df.groupBy(
    "pos_site_id",
    "sku_id",
    "fsclmth_id",
    "price_substate_id",
    "type"
).agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

monthly_sales_df.display()

pos_site_id,sku_id,fsclmth_id,price_substate_id,type,total_sales_units,total_sales_dollars,total_discount_dollars
CATMAIN,2464540701,201806,FP,FP,211,16743.12,126.33
CATMAIN,0851104000,201806,FP,FP,23,4945.0,0.0
RTLOPCS,2AC1031601,201809,FP,FP,11,809.45,70.0
190,2AC1080701,201810,FP,FP,3,239.85,0.0
133,0808990000,201810,FP,FP,4,64.0,0.0
CATMAIN,2AR7130701,201811,FP,FP,188,16572.17,338.43
128,0553990000,201903,FP,FP,5,159.78,5.22
INETMAIN,2775940801,201906,MD1,MD1,10,666.2,33.7
INETOUT,0574490000,201908,MD1,MD1,3,122.69,12.28
152,0445790000,201802,FP,FP,5,240.0,0.0


In [0]:
# Write to Gold Layer
monthly_sales_df.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold_layer_path}/mview_monthly_sales") \
    .saveAsTable("gold_mview_monthly_sales")

In [0]:
# Enriching product hierarchy with sales metrics
product_performance_df = hierprod_df.join(
    facttransactions_df.groupBy("sku_id").agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars")
    ),
    "sku_id",
    "left"
)

In [0]:
# Write to Gold Layer
product_performance_df.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold_layer_path}/dim_product_performance") \
    .saveAsTable("gold_dim_product_performance")

In [0]:
# Creating a view for top-selling products by chnl_label
top_selling_products_df = facttransactions_df.join(
    hierpossite_df,
    facttransactions_df.pos_site_id == hierpossite_df.site_id,
    "inner"
).groupBy(
    "chnl_label",
).agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars")
).orderBy(col("total_sales_dollars").desc())

top_selling_products_df.display()

chnl_label,total_sales_units,total_sales_dollars
Catalog,1982455,118274732.0
Retail POS,2077614,79246977.19
Internet,479053,25633367.48
Retail OPCS,158279,10532462.86


In [0]:
# Write to Gold Layer
top_selling_products_df.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold_layer_path}/view_top_selling_products") \
    .saveAsTable("gold_view_top_selling_products")

In [0]:
# KPI table for sales growth rate
sales_growth_df = facttransactions_df.join(
    hierclnd_df,
    facttransactions_df.fscldt_id == hierclnd_df.fscldt_id,
    "inner"
).groupBy(
    "fsclmth_id"
).agg(
    sum("sales_dollars").alias("total_sales_dollars")
).withColumn(
    "sales_growth_rate",
    round(((col("total_sales_dollars") - lag("total_sales_dollars").over(Window.orderBy("fsclmth_id"))) / lag("total_sales_dollars").over(Window.orderBy("fsclmth_id"))*100), 2)
)
sales_growth_df.display()

fsclmth_id,total_sales_dollars,sales_growth_rate
201801,3770869.6,
201802,5828343.38,54.56
201803,6984686.31,19.84
201804,7167291.42,2.61
201805,6767163.83,-5.58
201806,4739030.89,-29.97
201807,3917228.73,-17.34
201808,5471372.49,39.67
201809,4871474.57,-10.96
201810,6319505.6,29.72


In [0]:
# Write to Gold Layer
sales_growth_df.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold_layer_path}/kpi_sales_growth") \
    .saveAsTable("gold_kpi_sales_growth")

In [0]:
# Bonus Challenge

# Define the path to the mview_weekly_sales table
mview_weekly_sales_path = f"{gold_layer_path}/mview_weekly_sales"

# Read the existing mview_weekly_sales table
mview_weekly_sales_df = spark.read.format("delta").load(mview_weekly_sales_path)

# Assume new data is loaded into a temporary DataFrame (new_facttransactions_df)
# For example, new data could be loaded from the silver layer area
new_facttransactions_df = spark.read.format("delta").load("/mnt/silver/new_facttransactions")

# Join new data with hierclnd_df to get fsclwk_id
new_joined_df = new_facttransactions_df.join(
    hierclnd_df,
    new_facttransactions_df.fscldt_id == hierclnd_df.fscldt_id,
    "inner"
)

# Aggregate the new data
new_mview_weekly_sales_df = new_joined_df.groupBy(
    "pos_site_id",
    "sku_id",
    "fsclwk_id",
    "price_substate_id",
    "type"
).agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

# Merge the new data into the existing mview_weekly_sales table
from delta.tables import DeltaTable

# Load the existing Delta table
delta_table = DeltaTable.forPath(spark, mview_weekly_sales_path)

# Perform the merge operation
delta_table.alias("tgt").merge(
    new_mview_weekly_sales_df.alias("src"),
    """
    tgt.pos_site_id = src.pos_site_id AND
    tgt.sku_id = src.sku_id AND
    tgt.fsclwk_id = src.fsclwk_id AND
    tgt.price_substate_id = src.price_substate_id AND
    tgt.type = src.type
    """
).whenMatchedUpdate(
    set={
        "total_sales_units": col("tgt.total_sales_units") + col("src.total_sales_units"),
        "total_sales_dollars": col("tgt.total_sales_dollars") + col("src.total_sales_dollars"),
        "total_discount_dollars": col("tgt.total_discount_dollars") + col("src.total_discount_dollars")
    }
).whenNotMatchedInsert(
    values={
        "pos_site_id": col("src.pos_site_id"),
        "sku_id": col("src.sku_id"),
        "fsclwk_id": col("src.fsclwk_id"),
        "price_substate_id": col("src.price_substate_id"),
        "type": col("src.type"),
        "total_sales_units": col("src.total_sales_units"),
        "total_sales_dollars": col("src.total_sales_dollars"),
        "total_discount_dollars": col("src.total_discount_dollars")
    }
).execute()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3232046314958023>, line 11[0m
[1;32m      7[0m mview_weekly_sales_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mload(mview_weekly_sales_path)
[1;32m      9[0m [38;5;66;03m# Assume new data is loaded into a temporary DataFrame (new_facttransactions_df)[39;00m
[1;32m     10[0m [38;5;66;03m# For example, new data could be loaded from the silver layer area[39;00m
[0;32m---> 11[0m new_facttransactions_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mload([38;5;124m"[39m[38;5;124m/mnt/silver/new_facttransactions[39m[38;5;124m"[39m)
[1;32m     13[0m [38;5;66;03m# Join new data with hierclnd