In [0]:
from pyspark.sql import functions as F, types as T
from delta.tables import DeltaTable

In [0]:
%run /Workspace/consolidated_pipeline/1_setup/utilities

In [0]:
print(bronze_schema, silver_schema, gold_schema)

In [0]:
dbutils.widgets.text("catalog_name", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "orders", "Data Source")

catalog_name = dbutils.widgets.get("catalog_name")
data_source = dbutils.widgets.get("data_source")
print(catalog_name, data_source)

base_path = f"s3://ag-sportsbar/{data_source}"
landing_path = f"{base_path}/landing/"
processed_path = f"{base_path}/processed"
print("Base Path:", base_path)
print("Landing Path:", landing_path)
print("Processed Path:", processed_path)

In [0]:
bronze_table = f"{catalog_name}.{bronze_schema}.{data_source}"
silver_table = f"{catalog_name}.{silver_schema}.{data_source}"
gold_table = f"{catalog_name}.{gold_schema}.sb_fact_{data_source}"
print(bronze_table, silver_table, gold_table)

### Bronze

In [0]:
df = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{landing_path}/*.csv")
    .withColumn("_ingested_at", F.current_timestamp())
    .select("*", "_metadata.file_name", "_metadata.file_size", "_metadata.file_path")
)
display(df.limit(10))
print(f"Total rows: {df.count()}")

In [0]:
df.printSchema()

In [0]:
df = df.withColumn("order_qty", F.col("order_qty").cast("double"))

df.write.format("delta").option("delta.enableChangeDataFeed", "true").option(
    "mergeSchema", "true"
).mode("append").saveAsTable(bronze_table)

In [0]:
files = dbutils.fs.ls(landing_path)
# files
for file_info in files:
    dbutils.fs.mv(
        file_info.path, f"{processed_path}/{file_info.name}", True
    )  # mv(source, destination, recurse)

### Silver

In [0]:
df_bronze = spark.sql(f"SELECT * FROM {bronze_table}")
display(df_bronze.limit(10))

In [0]:
# Keep only rows where order_qty is present
df_orders = df_bronze.filter(F.col("order_qty").isNotNull())

# Clean customer_id → keep numeric, else set to 999999
df_orders = df_orders.withColumn(
    "customer_id",
    F.when(F.col("customer_id").rlike("^[0-9]+$"), F.col("customer_id")).otherwise(
        F.lit(999999).cast("string")
    ),
)

# Remove weekday name from the date text
# "Tuesday, July 01, 2025" → "July 01, 2025"
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.regexp_replace(F.col("order_placement_date"), r"^[A-Za-z]+,\s*", ""),
)

# Parse order_placement_date using multiple possible formats
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.coalesce(
        F.try_to_date(F.col("order_placement_date"), "yyyy/MM/dd"),
        F.try_to_date(F.col("order_placement_date"), "dd/MM/yyyy"),
        F.try_to_date(F.col("order_placement_date"), "yyyy-MM-dd"),
        F.try_to_date(F.col("order_placement_date"), "dd-MM-yyyy"),
        F.try_to_date(F.col("order_placement_date"), "MMMM dd, yyyy"),
    ),
)

# drop duplicates
df_orders = df_orders.dropDuplicates(
    ["order_id", "order_placement_date", "customer_id", "product_id", "order_qty"]
)

# convert product id to string
df_orders = df_orders.withColumn("product_id", F.col("product_id").cast("string"))

In [0]:
display(df_orders.limit(5))

In [0]:
df_orders.agg(
    F.min("order_placement_date").alias("min_date"),
    F.max("order_placement_date").alias("max_date"),
).show()

In [0]:
df_orders.count()

In [0]:
df_products = spark.table(f"{catalog_name}.{silver_schema}.products")
display(df_products.limit(10))

In [0]:
df_joined = df_orders.join(df_products, on="product_id", how="inner").select(df_orders["*"], df_products["product_code"])
display(df_joined.limit(10))

In [0]:
df_joined.count()  # the count reduced because we did inner join (but we need to do it as we don't want NULL values of product_code)

In [0]:
silver_table

In [0]:
if not (spark.catalog.tableExists(silver_table)):
    df_joined.write.format("delta").option("delta.enableChangeDataFeed", "true").option(
        "mergeSchema", "true"
    ).mode("overwrite").saveAsTable(silver_table)
else:
    silver_delta = DeltaTable.forName(spark, silver_table)
    silver_delta.alias("target").merge(
        df_joined.alias("source"),
        "target.order_placement_date = source.order_placement_date AND target.order_id = source.order_id AND target.product_code = source.product_code AND target.customer_id = source.customer_id",
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

### Gold

In [0]:
silver_table

In [0]:
df_gold = spark.sql(f"select order_id, order_placement_date as date, customer_id as customer_code, product_code, product_id, order_qty as sold_quantity from {silver_table}")
display(df_gold.limit(5))

In [0]:
gold_table

In [0]:
if not (spark.catalog.tableExists(gold_table)):
    print("Creating new table")
    df_gold.write.format("delta").option("delta.enableChangeDataFeed", "true").option(
        "mergeSchema", "true"
    ).mode("overwrite").saveAsTable(gold_table)
else:
    gold_delta = DeltaTable.forName(spark, gold_table)
    gold_delta.alias("target").merge(
        df_gold.alias("source"),
        "target.date = source.date AND target.order_id = source.order_id AND target.product_code = source.product_code AND target.customer_code = source.customer_code",
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

### Merge with Parent Company

In [0]:
df_child = spark.sql(f"SELECT date, product_code, customer_code, sold_quantity FROM {gold_table}")
display(df_child.limit(5))

In [0]:
df_child.count()


In [0]:
df_monthly = (
    df_child.withColumn("month_start", F.date_trunc("month", "date").cast("date"))
    .groupBy("month_start", "product_code", "customer_code")
    .agg(F.sum("sold_quantity").alias("sold_quantity"))
    .withColumnRenamed("month_start", "date")
)
display(df_monthly.limit(5))

In [0]:
df_monthly.count()

In [0]:
gold_parent_delta = DeltaTable.forName(
    spark, f"{catalog_name}.{gold_schema}.fact_orders"
)
gold_parent_delta.alias("parent_gold").merge(
    df_monthly.alias("child_gold"),
    "parent_gold.date = child_gold.date AND parent_gold.product_code = child_gold.product_code AND parent_gold.customer_code = child_gold.customer_code",
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()