In [0]:
%pip install openpyxl

In [0]:
import pandas as pd
import io
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from zoneinfo import ZoneInfo
from pyspark.sql.functions import current_timestamp, from_utc_timestamp

In [0]:
# =========================
# 1️⃣ Setup parametseters
# =========================
dbutils.widgets.text("master_data_type", "product_master")  # product_master or product_price
dataset_type = dbutils.widgets.get("master_data_type")

In [0]:
dataset_type

In [0]:
# Define paths based on dataset
if dataset_type == "product_master":
    source_path = dbutils.secrets.get(scope='rsmas-dev-scope', key='master-data-path')+"product/product_excel/"
    staging_path = dbutils.secrets.get(scope='rsmas-dev-scope', key='master-data-path')+"product/product_updates/"
elif dataset_type == "product_price_history":
    source_path = dbutils.secrets.get(scope='rsmas-dev-scope', key='master-data-path')+"product/product_price_history_excel/"
    staging_path = dbutils.secrets.get(scope='rsmas-dev-scope', key='master-data-path')+"product/product_price_updates/"
elif dataset_type == "supplier_costprice_history":
    source_path = dbutils.secrets.get(scope='rsmas-dev-scope', key='master-data-path')+"product/supplier_costprice_history_excel/"
    staging_path = dbutils.secrets.get(scope='rsmas-dev-scope', key='master-data-path')+"product/supplier_costprice_updates/"
else:
    raise ValueError(f"Unknown dataset_type: {dataset_type}")

checkpoint_path = source_path + "checkpoints/"
schema_location = checkpoint_path + "schema/"

# Define unique keys per dataset for deduplication
unique_keys = ["ProductID"] if dataset_type == "product_master" or dataset_type == "supplier_costprice_history" else ["ProductID", "StoreID"]

# =========================
# 2️⃣ Read new Excel files as binary
# =========================
raw_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("cloudFiles.schemaLocation", schema_location)
    .load(source_path)
    .filter("path NOT LIKE '%checkpoint%'")
    .select("path", "content")
)

# =========================
# 3️⃣ ForeachBatch function to parse Excel and write as Delta
# =========================
def process_batch(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    pdf = batch_df.toPandas()
    all_records = []

    for _, row in pdf.iterrows():
        file_path = row["path"]
        content = row["content"]

        excel_sheets = pd.read_excel(io.BytesIO(content), sheet_name=None)

        for sheet_name, sheet_df in excel_sheets.items():
            # Clean column names
            sheet_df.columns = (
                sheet_df.columns.str.strip()
                .str.replace(" ", "_", regex=True)
                .str.replace(r"[;{}()\n\t=]", "", regex=True)
            )

            sheet_df["source_file"] = file_path
            sheet_df["sheet_name"] = sheet_name
            sheet_df["load_date"] = datetime.now(ZoneInfo("Europe/London")).date()
            all_records.append(sheet_df)

    if all_records:
        final_pdf = pd.concat(all_records, ignore_index=True)
        spark_df = spark.createDataFrame(final_pdf)

        # Add ingestion timestamp
        spark_df = spark_df.withColumn("ingested_at", from_utc_timestamp(current_timestamp(), "Europe/London"))

        # Drop duplicates based on unique keys
        spark_df = spark_df.drop_duplicates(subset=unique_keys)

        # Write to Delta
        (
            spark_df.write
            .format("delta")
            .partitionBy("load_date")
            .mode("append")
            .save(staging_path)
        )

# =========================
# 4️⃣ Attach streaming sink
# =========================
query = (
    raw_df.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", checkpoint_path)
    .trigger(once=True)  # process current files then stop
    .start()
)

query.awaitTermination()
