In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col, broadcast
from delta.tables import DeltaTable

In [0]:
full_load_path = "dbfs:/FileStore/gs/full"
incremental_path = "dbfs:/FileStore/gs/incremental"

staging_path = "s3://lh-cadp-dtc-sellout-eu-west-1-dev-bmhz/silver/gs"

In [0]:
transactions_schema = (StructType()
    .add("order_id", StringType(), True)            
    .add("line_id", StringType(), True)             
    .add("type", StringType(), True)
    .add("dt", StringType(), True)                  
    .add("pos_site_id", StringType(), True) 
    .add("sku_id", StringType(), True)   
    .add("fscldt_id", StringType(), True)
    .add("price_substate_id", StringType(), True)
    .add("sales_units", DoubleType(), True)
    .add("sales_dollars", DoubleType(), True)
    .add("discount_dollars", DoubleType(), True)
    .add("original_order_id", StringType(), True)
    .add("original_line_id", StringType(), True)
)

# Schema for fact_averagecosts
avgcosts_schema = (StructType()
    .add("fscldt_id", StringType(), True)
    .add("sku_id", StringType(), True)
    .add("average_unit_standardcost", DoubleType(), True)
    .add("average_unit_landedcost", DoubleType(), True)
)

# Schema for hier_clnd (Calendar)
clnd_schema = (StructType()
    .add("fscldt_id", StringType(), True)
    .add("fscldt_label", StringType(), True)
    .add("fsclwk_id", StringType(), True)
    .add("fsclwk_label", StringType(), True)
    .add("fsclmth_id", StringType(), True)
    .add("fsclmth_label", StringType(), True)
    .add("fsclqrtr_id", StringType(), True)
    .add("fsclqrtr_label", StringType(), True)
    .add("fsclyr_id", StringType(), True)
    .add("fsclyr_label", StringType(), True)
    .add("ssn_id", StringType(), True)
    .add("ssn_label", StringType(), True)
    .add("ly_fscldt_id", StringType(), True)
    .add("lly_fscldt_id", StringType(), True)
    .add("fscldow", StringType(), True)
    .add("fscldom", StringType(), True)
    .add("fscldoq", StringType(), True)
    .add("fscldoy", StringType(), True)
    .add("fsclwoy", StringType(), True)
    .add("fsclmoy", StringType(), True)
    .add("fsclqoy", StringType(), True)
    .add("date", DateType(), True)
)

# Schema for hier_possite
possite_schema = (StructType()
    .add("site_id", StringType(), True)
    .add("site_label", StringType(), True)
    .add("subchnl_id", StringType(), True)
    .add("subchnl_label", StringType(), True)
    .add("chnl_id", StringType(), True)
    .add("chnl_label", StringType(), True)
)

# Schema for hier_pricestate
pricestate_schema = (StructType()
    .add("substate_id", StringType(), True)
    .add("substate_label", StringType(), True)
    .add("state_id", StringType(), True)
    .add("state_label", StringType(), True)
)

# Schema for hier_prod
prod_schema = (StructType()
    .add("sku_id", StringType(), True)
    .add("sku_label", StringType(), True)
    .add("stylclr_id", StringType(), True)
    .add("stylclr_label", StringType(), True)
    .add("styl_id", StringType(), True)
    .add("styl_label", StringType(), True)
    .add("subcat_id", StringType(), True)
    .add("subcat_label", StringType(), True)
    .add("cat_id", StringType(), True)
    .add("cat_label", StringType(), True)
    .add("dept_id", StringType(), True)
    .add("dept_label", StringType(), True)
    .add("issvc", StringType(), True)
    .add("isasmbly", StringType(), True)
    .add("isnfs", StringType(), True)
)

# Schema for hier_rtlloc
rtlloc_schema = (StructType()
    .add("str", StringType(), True)
    .add("str_label", StringType(), True)
    .add("dstr", StringType(), True)
    .add("dstr_label", StringType(), True)
    .add("rgn", StringType(), True)
    .add("rgn_label", StringType(), True)
)

# Schema for hier_invstatus
invstatus_schema = (StructType()
    .add("code_id", StringType(), True)
    .add("code_label", StringType(), True)
    .add("bckt_id", StringType(), True)
    .add("bckt_label", StringType(), True)
    .add("ownrshp_id", StringType(), True)
    .add("ownrshp_label", StringType(), True)
)

# Schema for hier_hldy
hldy_schema = (StructType()
    .add("hldy_id", StringType(), True)
    .add("hldy_label", StringType(), True)
)

# Schema for hier_invloc
invloc_schema = (StructType()
    .add("loc", StringType(), True)
    .add("loc_label", StringType(), True)
    .add("loctype", StringType(), True)
    .add("loctype_label", StringType(), True)
)


full_paths = {
    "fact_averagecosts": f"{full_load_path}/fact_averagecosts_dlm.gz",
    "fact_transactions": f"{full_load_path}/fact_transactions_dlm.gz",
    "hier_clnd": f"{full_load_path}/hier_clnd_dlm.gz",
    "hier_hldy": f"{full_load_path}/hier_hldy_dlm.gz",
    "hier_invloc": f"{full_load_path}/hier_invloc_dlm.gz",
    "hier_invstatus": f"{full_load_path}/hier_invstatus_dlm.gz",
    "hier_possite": f"{full_load_path}/hier_possite_dlm.gz",
    "hier_pricestate": f"{full_load_path}/hier_pricestate_dlm.gz",
    "hier_prod": f"{full_load_path}/hier_prod_dlm.gz",
    "hier_rtlloc": f"{full_load_path}/hier_rtlloc_dlm.gz"
}

incremental_paths = {
    "fact_averagecosts": f"{incremental_path}/fact_averagecosts_2025_04_06_dlm.gz",
    "fact_transactions": f"{incremental_path}/fact_transactions_2025_04_06_dlm.gz",
}


In [0]:
# Function to read gzipped pipe-delimited file
def load_pipe_gz(path, schema):
    return (
        spark.read
        .option("header", True)
        .option("delimiter", "|")
        .schema(schema)
        .csv(path)
    )

In [0]:
# fact_transactions
fact_transactions_path = f"{staging_path}/fact_transactions"
df_full_transactions = load_pipe_gz(full_paths["fact_transactions"], transactions_schema).dropDuplicates()

df_full_transactions_clean = (df_full_transactions
    .dropna(subset=["order_id", "line_id", "pos_site_id", "sku_id", "fscldt_id"])
    .withColumn("dt", F.to_timestamp(F.col("dt"), "yyyy-MM-dd'T'HH:mm:ss"))
    .filter((F.col("sales_units") >= 0) &
            (F.col("sales_dollars") >= 0) &
            (F.col("discount_dollars") >= 0))
    .filter(F.col("dt") >= F.to_timestamp(F.lit("2015-01-01T00:00:00"), "yyyy-MM-dd'T'HH:mm:ss"))
)
df_full_transactions_clean.write.format("delta").mode("overwrite").save(fact_transactions_path)

# fact_averagecosts
fact_avgcosts_path = f"{staging_path}/fact_averagecosts"
df_full_avgcosts = load_pipe_gz(full_paths["fact_averagecosts"], avgcosts_schema).dropDuplicates()
df_full_avgcosts_clean = df_full_avgcosts.dropna(subset=["fscldt_id", "sku_id"])
df_full_avgcosts_clean.write.format("delta").mode("overwrite").save(fact_avgcosts_path)

# hier_clnd
dim_clnd_path = f"{staging_path}/dim_clnd"
df_full_clnd = load_pipe_gz(full_paths["hier_clnd"], clnd_schema).dropDuplicates()

df_full_clnd_clean = df_full_clnd.dropna(subset=["fscldt_id"]).withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd"))
df_full_clnd_clean.write.format("delta").mode("overwrite").save(dim_clnd_path)

# hier_possite
dim_possite_path = f"{staging_path}/dim_possite"
df_full_possite = load_pipe_gz(full_paths["hier_possite"], possite_schema).dropDuplicates()

df_full_possite_clean = df_full_possite.dropna(subset=["site_id"]).withColumn("site_id", F.trim(F.col("site_id")))
df_full_possite_clean.write.format("delta").mode("overwrite").save(dim_possite_path)

# hier_prod
dim_prod_path = f"{staging_path}/dim_prod"
df_full_prod = load_pipe_gz(full_paths["hier_prod"], prod_schema).dropDuplicates()

df_full_prod_clean = df_full_prod.dropna(subset=["sku_id"])
df_full_prod_clean.write.format("delta").mode("overwrite").save(dim_prod_path)

# hier_pricestate
dim_pricestate_path = f"{staging_path}/dim_pricestate"
df_full_pricestate = load_pipe_gz(full_paths["hier_pricestate"], pricestate_schema).dropDuplicates()

df_full_pricestate_clean = df_full_pricestate.dropna(subset=["substate_id"])
df_full_pricestate_clean.write.format("delta").mode("overwrite").save(dim_pricestate_path)

# hier_rtlloc
dim_rtlloc_path = f"{staging_path}/dim_rtlloc"
df_full_rtlloc = load_pipe_gz(full_paths["hier_rtlloc"], rtlloc_schema).dropDuplicates()

df_full_rtlloc_clean = df_full_rtlloc.dropna(subset=["str"])
df_full_rtlloc_clean.write.format("delta").mode("overwrite").save(dim_rtlloc_path)

# hier_invstatus
dim_invstatus_path = f"{staging_path}/dim_invstatus"
df_full_invstatus = load_pipe_gz(full_paths["hier_invstatus"], invstatus_schema).dropDuplicates()

df_full_invstatus_clean = df_full_invstatus.dropna(subset=["code_id"])
df_full_invstatus_clean.write.format("delta").mode("overwrite").save(dim_invstatus_path)

# hier_hldy
dim_hldy_path = f"{staging_path}/dim_hldy"
df_full_hldy = load_pipe_gz(full_paths["hier_hldy"], hldy_schema).dropDuplicates()

df_full_hldy_clean = df_full_hldy.dropna(subset=["hldy_id"])
df_full_hldy_clean.write.format("delta").mode("overwrite").save(dim_hldy_path)

# hier_invloc (dimension)
dim_invloc_path = f"{staging_path}/dim_invloc"
df_full_invloc = load_pipe_gz(full_paths["hier_invloc"], invloc_schema).dropDuplicates()

df_full_invloc_clean = df_full_invloc.dropna(subset=["loc"])
df_full_invloc_clean.write.format("delta").mode("overwrite").save(dim_invloc_path)

print("Full load data has been processed")

Full load data has been processed


In [0]:
# HELPER FUNCTION

def process_table(final_path, full_df, incr_df, merge_key_cols, update_exprs):
    """
    final_path: Destination Delta table.
    full_df:    df - full load (cleaned).
    incr_df:    df - incremental load (cleaned).
    merge_key_cols: List of column names used as keys for merge.
    update_exprs:   Dictionary mapping - to update expressions for whenMatchedUpdate().
    """
    # Deduplicate incremental data based on merge keys
    incr_df = incr_df.dropDuplicates(merge_key_cols)

    # merge - "t" for target and "s" for source
    condition = " AND ".join([f"t.{col} = s.{col}" for col in merge_key_cols])
    print("Merge Condition:", condition)

    # Check if the Delta table already exists
    if DeltaTable.isDeltaTable(spark, final_path):
        print(f"Delta table exists at {final_path}. Merging incremental data...")
        delta_table = DeltaTable.forPath(spark, final_path)
        delta_table.alias("t").merge(
            incr_df.alias("s"),
            condition
        ).whenMatchedUpdate(set=update_exprs) \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        # If table doesn't exist, create it by union of full + incremental
        print(f"Delta table does not exist at {final_path}. Creating table by union of full and incremental data.")
        df_final = full_df.unionByName(incr_df)
        df_final.write.format("delta").mode("overwrite").save(final_path)


# 1. fact_transactions

# Paths & Keys
log_fact_tx_path = staging_path + "/fact_transactions"
incr_fact_tx_path = incremental_paths["fact_transactions"]
fact_tx_keys = ["order_id", "line_id", "pos_site_id", "sku_id", "fscldt_id", "price_substate_id", "type"]
fact_tx_update = {
    "sales_units":      "t.sales_units + s.sales_units",
    "sales_dollars":    "t.sales_dollars + s.sales_dollars",
    "discount_dollars": "t.discount_dollars + s.discount_dollars"
}

# Read & Clean Incremental Data
df_incr_transactions = load_pipe_gz(incr_fact_tx_path, transactions_schema)

df_incr_transactions_clean = (df_incr_transactions
    .dropna(subset=["order_id", "line_id", "pos_site_id", "sku_id", "fscldt_id"])
    .withColumn("dt", F.to_timestamp(F.col("dt"), "yyyy-MM-dd'T'HH:mm:ss"))
    .filter((F.col("sales_units") >= 0) & (F.col("sales_dollars") >= 0) & (F.col("discount_dollars") >= 0))
    .filter(F.col("dt") >= F.to_timestamp(F.lit("2015-01-01T00:00:00"), "yyyy-MM-dd'T'HH:mm:ss"))
)

# Merge
process_table(log_fact_tx_path, df_full_transactions_clean, df_incr_transactions_clean, fact_tx_keys, fact_tx_update)


# 2. fact_averagecosts
log_avgcosts_path = staging_path + "/fact_averagecosts"
incr_avgcosts_path = incremental_paths["fact_averagecosts"]
avgcosts_keys = ["fscldt_id", "sku_id"]
avgcosts_update = {
    "average_unit_standardcost": "s.average_unit_standardcost",
    "average_unit_landedcost":   "s.average_unit_landedcost"
}

df_incr_avgcosts = load_pipe_gz(incr_avgcosts_path, avgcosts_schema)

df_incr_avgcosts_clean = df_incr_avgcosts.dropna(subset=["fscldt_id", "sku_id"])

process_table(log_avgcosts_path, df_full_avgcosts_clean, df_incr_avgcosts_clean, avgcosts_keys, avgcosts_update)

print("Incremental load merge complete.")


Merge Condition: t.order_id = s.order_id AND t.line_id = s.line_id AND t.pos_site_id = s.pos_site_id AND t.sku_id = s.sku_id AND t.fscldt_id = s.fscldt_id AND t.price_substate_id = s.price_substate_id AND t.type = s.type
Delta table exists at s3://lh-cadp-dtc-sellout-eu-west-1-dev-bmhz/silver/gs/fact_transactions. Merging incremental data...
Merge Condition: t.fscldt_id = s.fscldt_id AND t.sku_id = s.sku_id
Delta table exists at s3://lh-cadp-dtc-sellout-eu-west-1-dev-bmhz/silver/gs/fact_averagecosts. Merging incremental data...
Incremental load merge complete. Final log DB tables are updated.


In [0]:
# Define paths
log_fact_tx_path = staging_path + "/fact_transactions"
log_avgcosts_path = staging_path + "/fact_averagecosts"
log_clnd_path = staging_path + "/dim_clnd"
log_possite_path = staging_path + "/dim_possite"
log_prod_path = staging_path + "/dim_prod"
log_pricestate_path = staging_path + "/dim_pricestate"

# Read Delta tables - log DB folder
df_final_tx = spark.read.format("delta").load(log_fact_tx_path)
df_final_avg = spark.read.format("delta").load(log_avgcosts_path)
df_final_clnd = spark.read.format("delta").load(log_clnd_path)
df_final_possite = spark.read.format("delta").load(log_possite_path)
df_final_prod = spark.read.format("delta").load(log_prod_path)
df_final_pricestate = spark.read.format("delta").load(log_pricestate_path)

# Aggregate logic
df_enriched = df_final_tx.alias("ft") \
    .join(df_final_possite.alias("ps"), df_final_tx.pos_site_id == df_final_possite.site_id, "left") \
    .join(df_final_prod.alias("p"), "sku_id", "left") \
    .join(df_final_clnd.alias("clnd"), "fscldt_id", "left") \
    .join(df_final_pricestate.alias("psate"), df_final_tx.price_substate_id == df_final_pricestate.substate_id, "left") \
    .select(
        "ft.pos_site_id",
        "ps.site_label",
        "ft.sku_id",
        "p.sku_label",
        "ft.fscldt_id",
        "clnd.fscldt_label",
        "clnd.fsclwk_id",
        "clnd.fsclmth_label",
        "ft.price_substate_id",
        "psate.substate_label",
        "ft.type",
        "ft.sales_units",
        "ft.sales_dollars",
        "ft.discount_dollars",
        "ft.order_id",
        "ft.line_id"
    )

# Join with fact_averagecosts for cost details
df_enriched = df_enriched.alias("e") \
    .join(df_final_avg.alias("avg"), 
          (df_enriched.fscldt_id == df_final_avg.fscldt_id) & (df_enriched.sku_id == df_final_avg.sku_id), 
          "left") \
    .selectExpr("e.*", "avg.average_unit_standardcost", "avg.average_unit_landedcost")


# final refined aggregated logic
df_refined = df_enriched.groupBy(
    "pos_site_id", "site_label", "sku_id", "sku_label",
    "fscldt_id", "fscldt_label", "fsclwk_id", "fsclmth_label",
    "price_substate_id", "substate_label", "type"
).agg(
    F.sum("sales_units").alias("total_sales_units"),
    F.sum("sales_dollars").alias("total_sales_dollars"),
    F.sum("discount_dollars").alias("total_discount_dollars"),
    F.avg("average_unit_standardcost").alias("avg_standard_cost"),
    F.avg("average_unit_landedcost").alias("avg_landed_cost")
)

# sample refined data
print("Sample of final refined aggregated data:")
display(df_refined.limit(10))

# output - log DB
mview_weekly_sales_enriched_path = staging_path + "/mview_weekly_sales_enriched"

# log DB folder (overwrite mode) with schema merging.
df_refined.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .option("mergeSchema", "true") \
    .save(mview_weekly_sales_enriched_path)

print("Aggregated table has been written to:", mview_weekly_sales_enriched_path)


Sample of final refined aggregated data:


pos_site_id,site_label,sku_id,sku_label,fscldt_id,fscldt_label,fsclwk_id,fsclmth_label,price_substate_id,substate_label,type,total_sales_units,total_sales_dollars,total_discount_dollars,avg_standard_cost,avg_landed_cost
140,Avalon Alpharetta,0445790000,SMOOTH AFFAIR FCL PRMR & BRGTN N/A 1.7 FL. OZ.,20180205,"Feb 5, 2018",201801,"Feb, 2018",FP,Full Price,Return,2.0,73.6,22.4,16.8,16.8
178,Classen Curve,0808990000,YOU ARE A BADASS N/A ONE SIZE,20180205,"Feb 5, 2018",201801,"Feb, 2018",FP,Full Price,Sale,2.0,22.3,9.7,8.32,8.32
133,The Woodlands,2701640701,MADAME LACE TANK WHITE MEDIUM MISSY,20180207,"Feb 7, 2018",201801,"Feb, 2018",FP,Full Price,Sale,2.0,79.9,0.0,7.5,7.5
137,Greenway Station,0627890000,AYA BLU EAU DE PARFUM N/A ONE SIZE,20180207,"Feb 7, 2018",201801,"Feb, 2018",MD3,Markdown 3,Sale,4.0,44.92,14.96,20.55,20.55
178,Classen Curve,2AK5460801,SCATTERED ROSES SHIRT BOTANICAL GREEN LARGE MISSY,20180207,"Feb 7, 2018",201801,"Feb, 2018",FP,Full Price,Sale,2.0,159.9,0.0,11.6,11.6
143,La Encantada,0851104000,FILLERINA PLUS RPLNSHNG TRTMNT GRADE 4 ONE SIZE,20180208,"Feb 8, 2018",201801,"Feb, 2018",FP,Full Price,Sale,2.0,430.0,0.0,86.0,86.0
146,Bellevue Square,2785120701,SUPER STRETCH CAPRI BLACK MEDIUM MISSY,20180208,"Feb 8, 2018",201801,"Feb, 2018",FP,Full Price,Sale,2.0,139.9,0.0,21.5,21.5
113,The Boulevard,0403080000,WUNDER BROW BLONDE KIT,20180209,"Feb 9, 2018",201801,"Feb, 2018",FP,Full Price,Sale,2.0,39.92,4.08,5.98,5.98
133,The Woodlands,2AG3220701,JUST BEACHY PULLOVER LIGHT GREY HTHR MEDIUM MISSY,20180209,"Feb 9, 2018",201801,"Feb, 2018",FP,Full Price,Sale,2.0,139.9,0.0,14.33,14.33
135,Town & Country Village,0731004000,CHILL PILL BATH FIZZIE ROSE ONE SIZE,20180213,"Feb 13, 2018",201802,"Feb, 2018",FP,Full Price,Sale,2.0,0.0,17.0,3.0,3.0


Aggregated table has been written to: s3://lh-cadp-dtc-sellout-eu-west-1-dev-bmhz/silver/gs/mview_weekly_sales_enriched


In [0]:
df = spark.sql("SELECT * FROM delta.`s3://lh-cadp-dtc-sellout-eu-west-1-dev-bmhz/silver/gs/mview_weekly_sales_enriched`")
df.createOrReplaceTempView("mview_weekly_sales_enriched")

In [0]:
%sql
SELECT 
    type, 
    price_substate_id, 
    COUNT(1) AS Total_trans_count,
    SUM(total_sales_dollars) AS total_sales_dollars, 
    SUM(total_discount_dollars) AS total_discount_dollars, 
    SUM(avg_standard_cost) AS total_standard_cost, 
    SUM(avg_landed_cost) AS total_landed_cost 
FROM mview_weekly_sales_enriched
GROUP BY type, price_substate_id
ORDER BY type ASC

type,price_substate_id,Total_trans_count,total_sales_dollars,total_discount_dollars,total_standard_cost,total_landed_cost
Cancel,MD2,355,30526.94000000001,2568.36,6146.53,6146.53
Cancel,FP,39683,8488593.966600541,462203.75999999983,764045.929999999,764045.929999999
Cancel,MD3,398,31366.5,2647.68,6853.01,6853.01
Cancel,MD1,1655,286858.9599999993,11669.340000000002,31827.679999999986,31827.679999999986
Return,FP,312219,59837498.11623054,2144496.018116043,5832795.659999856,5832795.659999856
Return,MD1,16018,1623226.320000039,155823.56299799975,317805.37999999983,317805.37999999983
Return,MD2,4449,388008.4799999984,38902.92799999997,85474.75000000004,85474.75000000004
Return,MD3,3041,197879.77659999905,18023.03840000001,58370.78000000007,58370.78000000007
Sale,MD2,35190,2702652.879960061,514991.5589580104,521921.29999999946,521921.29999999946
Sale,MD1,113112,11660009.180016655,1989112.3540120304,1592918.1500000274,1592918.1500000274
