##Normalize and Load Hierarchy Tables to Silver Layer

In [0]:
from pyspark.sql.functions import *

#### Function to  read Bronze Delta Table

In [0]:
def read_bronze_table(table_name):
    path = f"abfss://bronze@dlgysnergy.dfs.core.windows.net/{table_name}"
    return spark.read.format("delta").load(path)

#### Function Write to Silver Layer

In [0]:
def write_to_silver(df, table_name, mode="overwrite"):
    path = f"abfss://silver@dlgysnergy.dfs.core.windows.net/stg_hier_{table_name}_sl"
    df.write.format("delta").mode(mode).save(path)
    print(f"✅ stg_hier_{table_name}_sl written to silver")

#### Normalizing hier.prod into Department, category, subcategory and sku tables

In [0]:
bronze_prod = read_bronze_table("hier_prod_bz")
bronze_prod.limit(5).display()

#### 1. Department Table

In [0]:
df_dept = bronze_prod.select("dept_id", "dept_label").dropDuplicates()
df_dept.display()
write_to_silver(df_dept, "dept")

#### 2. Category Table

In [0]:
df_category = bronze_prod.select("cat_id", "cat_label", "dept_id").dropDuplicates()
df_category.display()
write_to_silver(df_category, "category")

#### 3. Subcategory Table

In [0]:
df_subcat = bronze_prod.select("subcat_id", "subcat_label", "cat_id").dropDuplicates()
df_subcat.display() 
write_to_silver(df_subcat, "subcategory")

#### 4. SKU Table

In [0]:
df_sku = bronze_prod.select("sku_id", "sku_label", "subcat_id").dropDuplicates()
df_sku.display()
write_to_silver(df_sku, "sku")

#### Deduplication of other dimension tables and writing into silver layer(staging)

#### hier_clnd_bz

In [0]:
bronze_clnd = read_bronze_table("hier_clnd_bz")
bronze_clnd.limit(5).display()
write_to_silver(bronze_clnd.dropDuplicates(), "clnd")

#### hier_hldy_bz

In [0]:
bronze_hldy = read_bronze_table("hier_hldy_bz")
bronze_hldy.limit(5).display()
write_to_silver(bronze_hldy.dropDuplicates(), "hldy")


####hier_invloc_bz

In [0]:
bronze_invloc = read_bronze_table("hier_invloc_bz")
bronze_invloc.limit(5).display()
write_to_silver(bronze_invloc.dropDuplicates(), "invloc")

#### hier_invstatus_bz

In [0]:

bronze_invstatus = read_bronze_table("hier_invstatus_bz")
bronze_invstatus.limit(5).display()
write_to_silver(bronze_invstatus.dropDuplicates(), "invstatus")

#### hier_possite_bz

In [0]:
bronze_possite = read_bronze_table("hier_possite_bz")
bronze_possite.limit(5).display()
write_to_silver(bronze_possite.dropDuplicates(), "possite")

#### Load hier_pricestate_bz

In [0]:
bronze_pricestate = read_bronze_table("hier_pricestate_bz")
bronze_pricestate.limit(5).display()
write_to_silver(bronze_pricestate.dropDuplicates(), "pricestate")

#### Load hier_rtlloc_bz

In [0]:
bronze_rtlloc = read_bronze_table("hier_rtlloc_bz")
bronze_rtlloc.limit(5).display()
write_to_silver(bronze_rtlloc.dropDuplicates(), "rtlloc")

###Fact tables

In [0]:
fact_txn = read_bronze_table("fact_transactions_bz")
fact_txn_cleaned = fact_txn.dropDuplicates().filter("order_id IS NOT NULL AND line_id IS NOT NULL")
fact_txn_cleaned.limit(10).display()
write_to_silver(fact_txn_cleaned, "fact_transactions")

In [0]:
fact_avgcost = read_bronze_table("fact_averagecosts_bz")
fact_avgcost_cleaned = fact_avgcost.dropDuplicates()
fact_avgcost_cleaned.limit(10).display()
write_to_silver(fact_avgcost_cleaned, "fact_averagecosts")