In [0]:
%pip install openpyxl

In [0]:
import pandas as pd
import io
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from zoneinfo import ZoneInfo
from pyspark.sql.functions import current_timestamp, from_utc_timestamp

In [0]:
# =========================
# 1️⃣ Setup parametseters
# =========================
dbutils.widgets.text("master_data_type", "product_master")  # product_master or product_price

In [0]:
# -------------------------------------------------
# Runtime config (same pattern as DLT)
# -------------------------------------------------
CATALOG = spark.conf.get("rsmas.catalog")
SCHEMA = spark.conf.get("rsmas.schema")
SECRET_SCOPE = spark.conf.get("rsmas.secret.scope")

BASE_PATH = dbutils.secrets.get(
    scope=SECRET_SCOPE,
    key="master-data-path"
)

dataset_type = dbutils.widgets.get("master_data_type")

# -------------------------------------------------
# Define paths per dataset
# -------------------------------------------------
if dataset_type == "product_master":
    source_path = BASE_PATH + "product/product_excel/"
    staging_path = BASE_PATH + "product/product_updates/"
    unique_keys = ["ProductID"]

elif dataset_type == "product_price_history":
    source_path = BASE_PATH + "product/product_price_history_excel/"
    staging_path = BASE_PATH + "product/product_price_updates/"
    unique_keys = ["ProductID", "StoreID"]

elif dataset_type == "supplier_costprice_history":
    source_path = BASE_PATH + "product/supplier_costprice_history_excel/"
    staging_path = BASE_PATH + "product/supplier_costprice_updates/"
    unique_keys = ["ProductID"]

else:
    raise ValueError(f"Unknown dataset_type: {dataset_type}")

checkpoint_path = source_path + "checkpoints/"
schema_location = checkpoint_path + "schema/"

# =================================================
# Read Excel files as binary
# =================================================
raw_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("cloudFiles.schemaLocation", schema_location)
    .load(source_path)
    .filter("path NOT LIKE '%checkpoint%'")
    .select("path", "content")
)

# =================================================
# ForeachBatch logic
# =================================================
def process_batch(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    pdf = batch_df.toPandas()
    all_records = []

    for _, row in pdf.iterrows():
        file_path = row["path"]
        content = row["content"]

        excel_sheets = pd.read_excel(io.BytesIO(content), sheet_name=None)

        for sheet_name, sheet_df in excel_sheets.items():
            sheet_df.columns = (
                sheet_df.columns.str.strip()
                .str.replace(" ", "_", regex=True)
                .str.replace(r"[;{}()\n\t=]", "", regex=True)
            )

            sheet_df["source_file"] = file_path
            sheet_df["sheet_name"] = sheet_name
            sheet_df["load_date"] = datetime.now(
                ZoneInfo("Europe/London")
            ).date()

            all_records.append(sheet_df)

    if all_records:
        final_pdf = pd.concat(all_records, ignore_index=True)
        spark_df = spark.createDataFrame(final_pdf)

        spark_df = spark_df.withColumn(
            "ingested_at",
            from_utc_timestamp(current_timestamp(), "Europe/London")
        )

        spark_df = spark_df.drop_duplicates(subset=unique_keys)

        (
            spark_df.write
            .format("delta")
            .partitionBy("load_date")
            .mode("append")
            .save(staging_path)
        )

# =================================================
# Start stream
# =================================================
query = (
    raw_df.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", checkpoint_path)
    .trigger(once=True)
    .start()
)

query.awaitTermination()
