### Name: Haridharan M
### Work Location : Pune
### HomeTown : Coimbatore

### Link : [Portfolio](https://haridharan05.github.io/MyCreativeSpace/)

### **Conclusion**

This consolidated notebook runs from start to finish to:

- Load the raw .dlm files into Spark.
- Clean & Validate the data.
- Write dimension and fact tables to a staging schema (Delta).
- Aggregate and create the Refined table mview_weekly_sales.
- Demonstrate Incremental (upsert) updates to the final aggregated table.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, DateType
)

# set shuffle partitions based on cluster size
spark.conf.set("spark.sql.shuffle.partitions", "8")

# Display the files in DBFS to confirm paths
display(dbutils.fs.ls("/FileStore/tables/GS_Sample_data/"))


path,name,size,modificationTime
dbfs:/FileStore/tables/GS_Sample_data/fact_averagecosts.dlm,fact_averagecosts.dlm,29142959,1743659829000
dbfs:/FileStore/tables/GS_Sample_data/fact_transactions.dlm,fact_transactions.dlm,380631122,1743660070000
dbfs:/FileStore/tables/GS_Sample_data/hier_clnd.dlm,hier_clnd.dlm,276119,1743659833000
dbfs:/FileStore/tables/GS_Sample_data/hier_hldy.dlm,hier_hldy.dlm,545,1743659834000
dbfs:/FileStore/tables/GS_Sample_data/hier_invloc.dlm,hier_invloc.dlm,2702,1743659846000
dbfs:/FileStore/tables/GS_Sample_data/hier_invstatus.dlm,hier_invstatus.dlm,17004,1743659847000
dbfs:/FileStore/tables/GS_Sample_data/hier_possite.dlm,hier_possite.dlm,5079,1743659847000
dbfs:/FileStore/tables/GS_Sample_data/hier_pricestate.dlm,hier_pricestate.dlm,157,1743659848000
dbfs:/FileStore/tables/GS_Sample_data/hier_prod.dlm,hier_prod.dlm,194621,1743659850000
dbfs:/FileStore/tables/GS_Sample_data/hier_rtlloc.dlm,hier_rtlloc.dlm,3456,1743659851000


In [0]:
#Define File Paths & Table Names

BASE_PATH = "dbfs:/FileStore/tables/GS_Sample_data"
STAGING_DB = "staging_db"
REFINED_DB = "refined_db"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {STAGING_DB}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {REFINED_DB}")

files = { 
    "fact_averagecosts": f"{BASE_PATH}/fact_averagecosts.dlm",
    "fact_transactions": f"{BASE_PATH}/fact_transactions.dlm",
    "hier_clnd": f"{BASE_PATH}/hier_clnd.dlm",
    "hier_hldy": f"{BASE_PATH}/hier_hldy.dlm",
    "hier_invloc": f"{BASE_PATH}/hier_invloc.dlm",
    "hier_invstatus": f"{BASE_PATH}/hier_invstatus.dlm",
    "hier_possite": f"{BASE_PATH}/hier_possite.dlm",
    "hier_pricestate": f"{BASE_PATH}/hier_pricestate.dlm",
    "hier_prod": f"{BASE_PATH}/hier_prod.dlm",
    "hier_rtlloc": f"{BASE_PATH}/hier_rtlloc.dlm"
}



In [0]:
# Dimension Table.

# 3.1 hier_possite
possite_schema = (
    StructType()
    .add("site_id", StringType(), True)
    .add("site_label", StringType(), True)
    .add("subchnl_id", StringType(), True)
    .add("subchnl_label", StringType(), True)
    .add("chnl_id", StringType(), True)
    .add("chnl_label", StringType(), True)
)
df_possite = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(possite_schema)
    .csv(files["hier_possite"])
)
# Drop rows with null primary key (site_id) & trim
df_possite_clean = df_possite.dropna(subset=["site_id"]).withColumn("site_id", F.trim(F.col("site_id")))

# 3.2 hier_pricestate
pricestate_schema = (
    StructType()
    .add("substate_id", StringType(), True)
    .add("substate_label", StringType(), True)
    .add("state_id", StringType(), True)
    .add("state_label", StringType(), True)
)
df_pricestate = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(pricestate_schema)
    .csv(files["hier_pricestate"])
)
df_pricestate_clean = df_pricestate.dropna(subset=["substate_id"])

# 3.3 hier_prod
prod_schema = (
    StructType()
    .add("sku_id", StringType(), True)
    .add("sku_label", StringType(), True)
    .add("stylclr_id", StringType(), True)
    .add("stylclr_label", StringType(), True)
    .add("styl_id", StringType(), True)
    .add("styl_label", StringType(), True)
    .add("subcat_id", StringType(), True)
    .add("subcat_label", StringType(), True)
    .add("cat_id", StringType(), True)
    .add("cat_label", StringType(), True)
    .add("dept_id", StringType(), True)
    .add("dept_label", StringType(), True)
    .add("issvc", StringType(), True)
    .add("isasmbly", StringType(), True)
    .add("isnfs", StringType(), True)
)
df_prod = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(prod_schema)
    .csv(files["hier_prod"])
)
df_prod_clean = df_prod.dropna(subset=["sku_id"])

# 3.4 hier_rtlloc
rtlloc_schema = (
    StructType()
    .add("str", StringType(), True)
    .add("str_label", StringType(), True)
    .add("dstr", StringType(), True)
    .add("dstr_label", StringType(), True)
    .add("rgn", StringType(), True)
    .add("rgn_label", StringType(), True)
)
df_rtlloc = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(rtlloc_schema)
    .csv(files["hier_rtlloc"])
)
df_rtlloc_clean = df_rtlloc.dropna(subset=["str"])

# 3.5 hier_invstatus
invstatus_schema = (
    StructType()
    .add("code_id", StringType(), True)
    .add("code_label", StringType(), True)
    .add("bckt_id", StringType(), True)
    .add("bckt_label", StringType(), True)
    .add("ownrshp_id", StringType(), True)
    .add("ownrshp_label", StringType(), True)
)
df_invstatus = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(invstatus_schema)
    .csv(files["hier_invstatus"])
)
df_invstatus_clean = df_invstatus.dropna(subset=["code_id"])

# 3.6 hier_hldy
hldy_schema = (
    StructType()
    .add("hldy_id", StringType(), True)
    .add("hldy_label", StringType(), True)
)
df_hldy = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(hldy_schema)
    .csv(files["hier_hldy"])
)
df_hldy_clean = df_hldy.dropna(subset=["hldy_id"])

# 3.7 hier_invloc
invloc_schema = (
    StructType()
    .add("loc", StringType(), True)
    .add("loc_label", StringType(), True)
    .add("loctype", StringType(), True)
    .add("loctype_label", StringType(), True)
)
df_invloc = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(invloc_schema)
    .csv(files["hier_invloc"])
)
df_invloc_clean = df_invloc.dropna(subset=["loc"])

# 3.8 hier_clnd
clnd_schema = (
    StructType()
    .add("fscldt_id", StringType(), True)
    .add("fscldt_label", StringType(), True)
    .add("fsclwk_id", StringType(), True)
    .add("fsclwk_label", StringType(), True)
    .add("fsclmth_id", StringType(), True)
    .add("fsclmth_label", StringType(), True)
    .add("fsclqrtr_id", StringType(), True)
    .add("fsclqrtr_label", StringType(), True)
    .add("fsclyr_id", StringType(), True)
    .add("fsclyr_label", StringType(), True)
    .add("ssn_id", StringType(), True)
    .add("ssn_label", StringType(), True)
    .add("ly_fscldt_id", StringType(), True)
    .add("lly_fscldt_id", StringType(), True)
    .add("fscldow", StringType(), True)
    .add("fscldom", StringType(), True)
    .add("fscldoq", StringType(), True)
    .add("fscldoy", StringType(), True)
    .add("fsclwoy", StringType(), True)
    .add("fsclmoy", StringType(), True)
    .add("fsclqoy", StringType(), True)
    .add("date", StringType(), True)
)
df_clnd = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(clnd_schema)
    .csv(files["hier_clnd"])
)
df_clnd_clean = df_clnd.dropna(subset=["fscldt_id"])

# Convert date to a DateType
df_clnd_clean = df_clnd_clean.withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd"))

In [0]:
#  4.Fact Tables

# 4.1 fact_averagecosts
avgcosts_schema = (
    StructType()
    .add("fscldt_id", StringType(), True)
    .add("sku_id", StringType(), True)
    .add("average_unit_standardcost", DoubleType(), True)
    .add("average_unit_landedcost", DoubleType(), True)
)
df_averagecosts = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(avgcosts_schema)
    .csv(files["fact_averagecosts"])
)
df_averagecosts_clean = df_averagecosts.dropna(subset=["fscldt_id", "sku_id"])

# 4.2 fact_transactions
transactions_schema = (
    StructType()
    .add("order_id", StringType(), True)
    .add("line_id", StringType(), True)
    .add("type", StringType(), True)
    .add("dt", StringType(), True)
    .add("pos_site_id", StringType(), True)
    .add("sku_id", StringType(), True)
    .add("fscldt_id", StringType(), True)
    .add("price_substate_id", StringType(), True)
    .add("sales_units", DoubleType(), True)
    .add("sales_dollars", DoubleType(), True)
    .add("discount_dollars", DoubleType(), True)
    .add("original_order_id", StringType(), True)
    .add("original_line_id", StringType(), True)
)
df_transactions = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(transactions_schema)
    .csv(files["fact_transactions"])
)

# cleaning: remove rows missing critical keys, convert dt to timestamp
df_transactions_clean = (
    df_transactions
    .dropna(subset=["order_id", "line_id", "pos_site_id", "sku_id", "fscldt_id"])
    .withColumn("dt", F.to_timestamp(F.col("dt"), "yyyy-MM-dd'T'HH:mm:ss"))
)


In [0]:

# Filter Negative Dollars
df_transactions_clean = df_transactions_clean.filter(
    (F.col("sales_units") >= 0) &
    (F.col("sales_dollars") >= 0) &
    (F.col("discount_dollars") >= 0)
)

# """
# Explanation:
# - We use a Spark filter to keep only rows where sales_units, sales_dollars, and discount_dollars
#   are >= 0. This helps remove erroneous or corrupted data.
# """

#---------------------------------------------------------------------------------------------------------------

# Date Boundary
df_transactions_clean = df_transactions_clean.filter(
    F.col("dt") >= F.to_timestamp(F.lit("2015-01-01T00:00:00"), "yyyy-MM-dd'T'HH:mm:ss")
)

# """
# Explanation:
# - We assume that any transaction date before 2015 is out of scope.
# - We filter rows so dt >= 2015-01-01 00:00:00.
# """

#---------------------------------------------------------------------------------------------------------------

# Primey Key Check
possite_dup_count = df_possite_clean.groupBy("site_id").count().filter("count > 1").count()
if possite_dup_count > 0:
    print(f"WARNING: Found {possite_dup_count} duplicate site_id(s) in hier_possite")

# """
# Explanation:
# - We group by site_id and count how many times each site_id appears.
# - Then we filter for any that appear more than once (count > 1).
# - If there's at least one such site_id, we print a warning.
# - In a production setting, we might raise an exception or log this to a monitoring system.
# """

#---------------------------------------------------------------------------------------------------------------

# Referential check
fact_fscldt_ids = df_transactions_clean.select("fscldt_id").distinct()
dim_fscldt_ids = df_clnd_clean.select("fscldt_id").distinct()

missing_in_dim = fact_fscldt_ids.join(dim_fscldt_ids, "fscldt_id", "left_anti")
missing_count = missing_in_dim.count()
if missing_count > 0:
    print(f"WARNING: {missing_count} fscldt_id from fact_transactions not found in hier_clnd")

# """
# Explanation:
# - We collect distinct fscldt_id values from the fact table (fact_transactions).
# - We do the same from the calendar dimension (hier_clnd).
# - A left_anti join shows us which fact fscldt_id values do NOT appear in the calendar dimension.
# - If there's any mismatch, we print a warning. In a stricter scenario, you might drop or flag those rows.
# """




In [0]:

# Staging database
spark.sql(f"USE {STAGING_DB}")

# Dimension to staging in Delta format
df_possite_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_possite")
df_pricestate_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_pricestate")
df_prod_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_prod")
df_rtlloc_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_rtlloc")
df_invstatus_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_invstatus")
df_hldy_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_hldy")
df_invloc_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_invloc")
df_clnd_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.dim_clnd")

# Fact to staging in Delta format
df_averagecosts_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.fact_averagecosts")
df_transactions_clean.write.format("delta").mode("overwrite").saveAsTable(f"{STAGING_DB}.fact_transactions")

# """
# Explanation:
# - We call .write.format("delta").mode("overwrite").saveAsTable("table_name").
# - This creates or overwrites a Delta table in the specified database (staging_db).
# - Each dimension/fact table now becomes easily queryable via Spark SQL, 
#   and we can apply Delta's capabilities (like versioning, time travel, merges).
# """


In [0]:

spark.sql(f"CREATE DATABASE IF NOT EXISTS {REFINED_DB}")
spark.sql(f"USE {REFINED_DB}")

# Clean up any old version of the table
spark.sql(f"DROP TABLE IF EXISTS {REFINED_DB}.mview_weekly_sales")

# We can decide to group by fsclwk_id if it's available, 
# or by fscldt_id for daily-level.

spark.sql(f'''
  CREATE TABLE {REFINED_DB}.mview_weekly_sales AS
  SELECT
      pos_site_id,
      sku_id,
      fscldt_id,
      price_substate_id,
      type,
      SUM(sales_units) AS total_sales_units,
      SUM(sales_dollars) AS total_sales_dollars,
      SUM(discount_dollars) AS total_discount_dollars
  FROM {STAGING_DB}.fact_transactions
  GROUP BY
      pos_site_id, sku_id, fscldt_id, price_substate_id, type
''')

# """
# Explanation:
# - We switch to refined_db and then create a new table (mview_weekly_sales).
# - This table aggregates from staging_db.fact_transactions on the specified group-by columns.
# - The result is a single table with daily or weekly rollups of key metrics (sales, discounts).
# - Adjust the grouping if you prefer weekly (fsclwk_id) instead of daily (fscldt_id).
# """


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:

from delta.tables import DeltaTable
import pyspark.sql.functions as F

# 8.1. Paths and table references
INCREMENTAL_FILE = "dbfs:/FileStore/tables/GS_Sample_data/fact_transactions.dlm" # fact_transactions_YYYY_MM__DD.dlm or .csv consider as incremental file in prod 
REFINED_DB = "refined_db"
refined_table_name = f"{REFINED_DB}.mview_weekly_sales"

# 8.2.Incremental Data
df_new_increment = (
    spark.read
    .option("header", "true")
    .option("sep", "|")
    .schema(transactions_schema)
    .csv(INCREMENTAL_FILE)
)

# cleaning / validation:
df_new_increment_clean = (
    df_new_increment
    # Drop rows missing critical columns
    .dropna(subset=["order_id", "line_id", "pos_site_id", "sku_id", "fscldt_id"])

    # Convert dt to timestamp
    .withColumn("dt", F.to_timestamp(F.col("dt"), "yyyy-MM-dd'T'HH:mm:ss"))

    # Numeric range checks
    .filter(
        (F.col("sales_units") >= 0) &
        (F.col("sales_dollars") >= 0) &
        (F.col("discount_dollars") >= 0)
    )

    # Date boundary checks 
    .filter(
        F.col("dt") >= F.to_timestamp(F.lit("2015-01-01T00:00:00"), "yyyy-MM-dd'T'HH:mm:ss")
    )
)

# 8.3. Aggregate new data to match mview_weekly_sales
df_new_agg = (
    df_new_increment_clean.groupBy(
        "pos_site_id",
        "sku_id",
        "fscldt_id",          
        "price_substate_id",
        "type"
    )
    .agg(
        F.sum("sales_units").alias("sum_sales_units"),
        F.sum("sales_dollars").alias("sum_sales_dollars"),
        F.sum("discount_dollars").alias("sum_discount_dollars")
    )
)

# 8.4. Merge (Upsert) into mview_weekly_sales

refined_delta_table = DeltaTable.forName(spark, refined_table_name)

refined_delta_table.alias("tgt") \
    .merge(
        df_new_agg.alias("src"),
        """
          tgt.pos_site_id = src.pos_site_id
          AND tgt.sku_id = src.sku_id
          AND tgt.fscldt_id = src.fscldt_id
          AND tgt.price_substate_id = src.price_substate_id
          AND tgt.type = src.type
        """
    ) \
    .whenMatchedUpdate(set={
        # Add incremental sums to the existing totals
        "total_sales_units": "tgt.total_sales_units + src.sum_sales_units",
        "total_sales_dollars": "tgt.total_sales_dollars + src.sum_sales_dollars",
        "total_discount_dollars": "tgt.total_discount_dollars + src.sum_discount_dollars"
    }) \
    .whenNotMatchedInsert(values={
        # Insert new group keys if they didn't exist before
        "pos_site_id": "src.pos_site_id",
        "sku_id": "src.sku_id",
        "fscldt_id": "src.fscldt_id",
        "price_substate_id": "src.price_substate_id",
        "type": "src.type",
        "total_sales_units": "src.sum_sales_units",
        "total_sales_dollars": "src.sum_sales_dollars",
        "total_discount_dollars": "src.sum_discount_dollars"
    }) \
    .execute()

print("Incremental load/merge into mview_weekly_sales completed.")


Incremental load/merge into mview_weekly_sales completed.


In [0]:
df_mview = spark.sql("SELECT * FROM refined_db.mview_weekly_sales")

display(df_mview)


pos_site_id,sku_id,fscldt_id,price_substate_id,type,total_sales_units,total_sales_dollars,total_discount_dollars
113,0174410000,20160328,FP,Sale,2.0,57.5,0.0
113,0174410000,20160411,FP,Sale,2.0,57.5,0.0
113,0174410000,20160501,FP,Sale,4.0,86.42,28.58
113,0174410000,20160524,FP,Sale,2.0,57.5,0.0
113,0174410000,20160527,FP,Sale,2.0,57.5,0.0
113,0174410000,20160603,FP,Sale,2.0,57.5,0.0
113,0174410000,20160605,FP,Sale,4.0,103.0,12.0
113,0174410000,20160625,FP,Sale,2.0,43.12,14.38
113,0174410000,20160731,FP,Sale,4.0,109.0,6.0
113,0174410000,20160808,FP,Return,4.0,109.0,6.0
