#####We will load data from silver to gold here for one of the sourcesystems and encourage the trainees to load the data for other source system.<br> 

In [0]:
#create schema for gold if it  does not exist
spark.sql("""CREATE SCHEMA IF NOT EXISTS psl_salesdev.gold MANAGED LOCATION 'abfss://sales@storageaccountname.dfs.core.windows.net/gold/'""")

DataFrame[]

___________________________________________________________________________________
#Load Dimensions

We will now creaate or dimension table structure if it already doesnt exist<br>
<b>this will be a managed [table](https://docs.databricks.com/en/tables/managed.html)

In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS psl_salesdev.gold.tb_dim_product
( productKey STRING NOT NULL COMMENT 'Unique Key- Hashed value of the productID and source',
  productID STRING NOT NULL COMMENT 'Unique identifier for each product',
  productName STRING NOT NULL COMMENT 'Name of the product',
  subCategory STRING COMMENT 'Sub-category of the product',
  category STRING COMMENT 'Category of the product',
  source STRING COMMENT 'Source system of the data',
  createdAt TIMESTAMP COMMENT 'Timestamp when the data was ingested',
  updatedAt TIMESTAMP COMMENT 'Timestamp when the data was updated'
)
COMMENT 'This managed table contains product dimension data coming from different sources'
""")


DataFrame[]

######Let us read the product table data from the silver layer

In [0]:
df=spark.table("psl_salesdev.silver.product_cleaned")

We will now create the key column for our table

In [0]:
from pyspark.sql.functions import sha1,current_timestamp,col,concat_ws
#Lets first create the key column with combination without hashing
df=df.withColumn("productKey",concat_ws("|",col("productID"),col("source"))) 
#concat_ws is used to join the columns with a specific delimiter in this case | for example productID|source

#Now lets hash the key column
source_df=df.withColumn("productKey",sha1(col("productKey"))).dropDuplicates('productKey')
source_df.limit(5).display()

productID,productName,subCategory,category,source,ingestionTimestamp,productKey
FUR-CH-10003312,Hon 2090 “Pillow Soft” Series Mid Back Swivel/Tilt Chairs,Chairs,Furniture,Retail CSV,2024-09-09T19:36:52.229Z,001a593a4c5ddad23e24fd41821a7da13ddd8aeb
TEC-PH-10000169,ARKON Windshield Dashboard Air Vent Car Mount Holder,Phones,Technology,Retail CSV,2024-09-09T19:36:52.229Z,00518a64a08232452dcb86711d24d55777adfc62
FUR-BO-10002202,"Atlantic Metals Mobile 2-Shelf Bookcases, Custom Colors",Bookcases,Furniture,Retail CSV,2024-09-09T19:36:52.229Z,005b3778efbb1e664732df68e9ad8980aa98144e
FUR-FU-10001940,Staple-based wall hangings,Furnishings,Furniture,Retail CSV,2024-09-09T19:36:52.229Z,006eac7fbcdacffa05392b4976fc38d77cf5e2c2
OFF-BI-10001679,GBC Instant Index System for Binding Systems,Binders,Office Supplies,Retail CSV,2024-09-09T19:36:52.229Z,00a8b42510cccb85e440fa019e2be314199de766


###SCD1 script

In [0]:
from delta.tables import DeltaTable


# Load the target table using catalog.schema.table 
target_table = DeltaTable.forName(spark, "psl_salesdev.gold.tb_dim_product")

# Perform the merge with conditional update
target_table.alias("target").merge(
    source_df.alias("source"),
    "target.productKey = source.productKey"  # Business key for the merge
).whenMatchedUpdate(
    condition = (
        (col("source.productName") != col("target.productName")) |
        (col("source.subCategory") != col("target.subCategory"))
    ),
    set = {
        "productName": "source.productName",
        "subCategory": "source.subCategory",
        "updatedAt": current_timestamp()  # Update `updated_date` if fields differ
    }
).whenNotMatchedInsert(
    values = {
        "productKey": "source.productKey",
        "productID": "source.productID",
        "productName": "source.productName",
        "subCategory": "source.subCategory",
        "category": "source.category",
        "source": "source.source",
        "createdAt": current_timestamp(),  # Set `created_date` on insert
    }
).execute()


Let us verify if our dimension write was succesful

In [0]:
%sql
select * from psl_salesdev.gold.tb_dim_product

productKey,productID,productName,subCategory,category,source,createdAt,updatedAt
3a684f159a43d668d23c73913bd2c0db8c916b1b,TEC-PH-10000486,Plantronics HL10 Handset Lifter,Phones,Technology,Retail CSV,2024-09-10T04:32:08.281Z,
409cff111f556613a58d3f40d9e3ec8b80b2c91d,TEC-AC-10003832,Logitech P710e Mobile Speakerphone,Accessories,Technology,Retail CSV,2024-09-10T04:32:08.281Z,
8f1c9eaee5668b9eb13253537bdd999480bfb7a4,TEC-AC-10002076,Microsoft Natural Keyboard Elite,Accessories,Technology,Retail CSV,2024-09-10T04:32:08.281Z,
3f04dfa1a4777875cba280bf42bfdfe839f9f99d,TEC-PH-10004667,Cisco 8x8 Inc. 6753i IP Business Phone System,Phones,Technology,Retail CSV,2024-09-10T04:32:08.281Z,
b03ff9c908164b7fa3e4aff42590519b85b4b542,OFF-BI-10000545,GBC Ibimaster 500 Manual ProClick Binding System,Binders,Office Supplies,Retail CSV,2024-09-10T04:32:08.281Z,
0301d35acb2a3e5410e6a1a74767acfb59aa9679,OFF-PA-10003543,Xerox 1985,Paper,Office Supplies,Retail CSV,2024-09-10T04:32:08.281Z,
585278f7f8828b399f4897d75505b108e2fed148,OFF-PA-10000994,Xerox 1915,Paper,Office Supplies,Retail CSV,2024-09-10T04:32:08.281Z,
56c250b5ae5c636c70577f6372241d32f314ab49,FUR-TA-10003008,"Lesro Round Back Collection Coffee Table, End Table",Tables,Furniture,Retail CSV,2024-09-10T04:32:08.281Z,
2974a20b89e1dbc874d0a4030c7d0cd0eaa255ea,OFF-PA-10002464,HP Office Recycled Paper (20Lb. and 87 Bright),Paper,Office Supplies,Retail CSV,2024-09-10T04:32:08.281Z,
d7848474f1c147f7ecc107fbc3abcdebe29b6245,TEC-AC-10001109,Logitech Trackman Marble Mouse,Accessories,Technology,Retail CSV,2024-09-10T04:32:08.281Z,


__________________________________________________________________________
#Load Facts

In [0]:
#First we will create the structure of our fact table
spark.sql("""
CREATE TABLE IF NOT EXISTS psl_salesdev.gold.tb_fact_sales
( productKey STRING NOT NULL COMMENT 'Unique Key- Hashed value of the productID and source',
  orderDate DATE COMMENT 'Date of the order',
  salesValue DOUBLE COMMENT 'Sales Value of the product',
  totatOrders INT COMMENT 'Total number of orders',
  quantity INT COMMENT 'Quantity of the product',
  profit DOUBLE COMMENT 'Profit of the product',
  source    STRING COMMENT 'Source of the data',
  createdAt TIMESTAMP COMMENT 'Timestamp when the data was ingested'
)
COMMENT 'This managed table contains product sales data coming from different sources'
""")

DataFrame[]

####Read data from the silver table

In [0]:
df_sales_silver=spark.table("psl_salesdev.silver.sales_cleaned")

create productkey and aggregate the data based on productKey and date level

In [0]:
from pyspark.sql.functions import concat_ws,sha1
df_sales_silver_withkey=df_sales_silver.withColumn('ProductKey',concat_ws("|",df_sales_silver.productId,df_sales_silver.source))
df_sales_silver_withkey_withkey=df_sales_silver_withkey.withColumn('ProductKey',sha1(df_sales_silver_withkey.ProductKey))

After Key creation, let us now aggregate the df

In [0]:
from pyspark.sql.functions import *

In [0]:
df_agg=df_sales_silver_withkey_withkey.groupBy('ProductKey','orderDate','source')\
    .agg(sum('quantity').cast('int').alias('quantity'),
         sum('sales').alias('salesValue'),
         countDistinct('orderId').cast('int').alias('totatOrders'),
         sum('profit').alias('profit')
         )\
             .withColumn('createdAt',current_timestamp())\
                 .select(
                    'productKey' ,
                    'orderDate' ,
                    'salesValue',
                    'totatOrders' ,
                    'quantity' ,
                    'profit' ,
                    'source'  ,
                    'createdAt'
                    )

In [0]:
df_agg.limit(5).display()

productKey,orderDate,salesValue,totatOrders,quantity,profit,source,createdAt
f09a9c1cd9a9bff199e4de17bca75f93435fd5a4,2016-05-21,111.96,1,2,54.8604,Retail CSV,2024-09-12T14:37:14.336Z
d38f55198a15e5d9dd5ad2d51e689511ebb1b134,2017-12-28,10.36,2,4,1.628,Retail CSV,2024-09-12T14:37:14.336Z
b0f5aa9a5d163f8733035934144627a88b648f3e,2017-12-08,27.15,1,5,13.3035,Retail CSV,2024-09-12T14:37:14.336Z
409cff111f556613a58d3f40d9e3ec8b80b2c91d,2017-07-11,1287.45,1,5,244.6155,Retail CSV,2024-09-12T14:37:14.336Z
1f8b776065e1e78f6ffcbf20886ccc5d1d9b85cc,2017-05-14,64.4,1,5,1.932,Retail CSV,2024-09-12T14:37:14.336Z


save the data to sales table

In [0]:
df_agg.write.mode('append').saveAsTable('psl_salesdev.gold.tb_fact_sales')

Let us now verify if the data has been written to our sales table

In [0]:
%sql
select * from psl_salesdev.gold.tb_fact_sales

productKey,orderDate,salesValue,totatOrders,quantity,profit,source,createdAt
f09a9c1cd9a9bff199e4de17bca75f93435fd5a4,2016-05-21,111.96,1,2.0,54.8604,Retail CSV,2024-09-10T08:13:26.81Z
d38f55198a15e5d9dd5ad2d51e689511ebb1b134,2017-12-28,10.36,2,4.0,1.628,Retail CSV,2024-09-10T08:13:26.81Z
b0f5aa9a5d163f8733035934144627a88b648f3e,2017-12-08,27.15,1,5.0,13.3035,Retail CSV,2024-09-10T08:13:26.81Z
409cff111f556613a58d3f40d9e3ec8b80b2c91d,2017-07-11,1287.45,1,5.0,244.6155,Retail CSV,2024-09-10T08:13:26.81Z
1f8b776065e1e78f6ffcbf20886ccc5d1d9b85cc,2017-05-14,64.4,1,5.0,1.932,Retail CSV,2024-09-10T08:13:26.81Z
e0dec97a288ae365a1f335298cd431aa4e6d2778,2016-09-02,22.911,1,7.0,-17.5651,Retail CSV,2024-09-10T08:13:26.81Z
5fc8aa2d07d9fcc2b084d6c3f73c2c27f0419eba,2016-12-18,18.84,1,3.0,6.0288,Retail CSV,2024-09-10T08:13:26.81Z
81b666a39ac5df97c5ee6e11de6463e68fe6a0a3,2014-09-19,7.16,1,2.0,3.58,Retail CSV,2024-09-10T08:13:26.81Z
7273da8a8500d58003866a64b984d99be79e34bc,2014-04-07,122.97,1,3.0,60.2553,Retail CSV,2024-09-10T08:13:26.81Z
7951d981138d8623926dbe2cd89a6f6b4c228c74,2017-12-08,113.568,1,2.0,-18.4548,Retail CSV,2024-09-10T08:13:26.81Z
