In [0]:
%run "/Workspace/gsynergy/1.Mount_storage"

/mnt/raw-data has been unmounted.
Storage Mounted Successfully


In [0]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("GSynergy Data Pipeline") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

# Define Data Path from Mounted Storage
RAW_DATA_PATH = f"{mount_point}"

# Load all hierarchy (dimension) data
hierarchy_files = [
    "hier.clnd.dlm.gz", "hier.hldy.dlm.gz", "hier.invloc.dlm.gz",
    "hier.invstatus.dlm.gz", "hier.possite.dlm.gz", "hier.pricestate.dlm.gz",
    "hier.prod.dlm.gz", "hier.rtlloc.dlm.gz"
]

hierarchy_dfs = {file: spark.read.option("header", True).option("delimiter", "|").csv(f"{RAW_DATA_PATH}/{file}") for file in hierarchy_files}

# Load all fact data
fact_files = ["fact.averagecosts.dlm.gz", "fact.transactions.dlm.gz"]
fact_dfs = {file: spark.read.option("header", True).option("delimiter", "|").csv(f"{RAW_DATA_PATH}/{file}") for file in fact_files}

# Data Quality Checks - Remove NULL values in key columns
fact_dfs["fact.transactions.dlm.gz"] = fact_dfs["fact.transactions.dlm.gz"].dropna(
    subset=["pos_site_id", "sku_id", "fscldt_id", "price_substate_id", "type"]  # Ensure 'type' column is included
)

# Ensure primary key uniqueness for hierarchy tables
for key, df in hierarchy_dfs.items():
    if "id" in df.columns:
        hierarchy_dfs[key] = df.dropDuplicates(["id"])

# Staging Tables - Normalize hierarchy tables
level_1_df = hierarchy_dfs["hier.prod.dlm.gz"].select("sku_id", "dept_label").dropDuplicates()
level_2_df = hierarchy_dfs["hier.prod.dlm.gz"].select("sku_id", "cat_label").dropDuplicates()

# Write Staging Tables to Delta
level_1_df.write.format("delta").mode("overwrite").saveAsTable("gsynergy_db.level_1")
level_2_df.write.format("delta").mode("overwrite").saveAsTable("gsynergy_db.level_2")

# Create Fact Table with Foreign Key Relationships
fact_staging_df = fact_dfs["fact.transactions.dlm.gz"] \
    .join(
        level_1_df.withColumnRenamed("sku_id", "level1_sku_id"),
        fact_dfs["fact.transactions.dlm.gz"].sku_id == col("level1_sku_id"),
        "left"
    ) \
    .drop("level1_sku_id")

# Aggregation Weekly Sales
mview_weekly_sales_df = fact_staging_df \
    .groupBy("pos_site_id", "sku_id", "fscldt_id", "price_substate_id", "type") \
    .agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars"),
        sum("discount_dollars").alias("total_discount_dollars")
    )

# Rename fscldt_id to fsclwk_id
mview_weekly_sales_df = mview_weekly_sales_df.withColumnRenamed("fscldt_id", "fsclwk_id")

mview_weekly_sales_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gsynergy_db.mview_weekly_sales")

print("Completed successfully.")


Completed successfully.


In [0]:
display(mview_weekly_sales_df)

pos_site_id,sku_id,fsclwk_id,price_substate_id,type,total_sales_units,total_sales_dollars,total_discount_dollars
129,0174450000,20160201,FP,Sale,1.0,28.75,0.0
CATMAIN,0403020000,20160201,FP,Sale,8.0,183.6,0.0
CATMAIN,1085190075,20160202,FP,Sale,1.0,75.0,0.0
132,0693790000,20160202,MD3,Sale,3.0,38.91,0.0
CATMAIN,2785131701,20160203,FP,Sale,1.0,69.95,0.0
CATMAIN,0403081000,20160203,FP,Sale,32.0,734.4000000000001,0.0
113,0444790000,20160203,FP,Sale,1.0,16.0,0.0
CATMAIN,2230940208,20160204,FP,Sale,4.0,174.61,17.19
RTLOPCS,3668931602,20160204,FP,Return,1.0,58.95,0.0
CATMAIN,2812520701,20160204,FP,Return,2.0,99.9,0.0
