In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_date, concat_ws, last_day, expr, explode, row_number
from pyspark.sql.window import Window
from delta.tables import DeltaTable
import sys

In [0]:
dbutils.widgets.dropdown(
    name="environment",
    defaultValue="fq_dev",
    choices=["fq_dev", "fq_test", "fq_prod"],
    label="Select environment"
)

dbutils.widgets.combobox(
    name="source",
    defaultValue="excel_sheet",
    choices=["posist", "netsuite", "other","excel_sheet"],
    label="Source"
)

dbutils.widgets.combobox(
    name="domain",
    defaultValue="budget",
    choices=["discount", "sales", "cost","wastage","budget"],
    label="Domain"
)

environment = dbutils.widgets.get("environment")
source = dbutils.widgets.get("source")
domain = dbutils.widgets.get("domain")



In [0]:
def get_external_location(name: str) -> str:
    return (
        spark.sql(f"DESCRIBE EXTERNAL LOCATION `{name}`")
             .select("url")
             .collect()[0][0]
    )

bronze_path = get_external_location(f"{environment}_extloc_bronze")
silver_path = get_external_location(f"{environment}_extloc_silver")
gold_path = get_external_location(f"{environment}_extloc_gold")
checkpoint_path = get_external_location(f"{environment}_extloc_checkpoint")
staging_path = get_external_location(f"{environment}_extloc_staging")


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, max as _max

SOURCE_TABLE = "fq_dev_catalog.bronze.budget"
VERSION_TABLE = "fq_dev_catalog.bronze.version_control"
CONSUMER_NAME = "dim_budget_clone"

delta_table = DeltaTable.forName(spark, SOURCE_TABLE)

latest_version = (
    delta_table.history()
    .select(_max("version").alias("latest_version"))
    .collect()[0]["latest_version"]
)

if spark.catalog.tableExists(VERSION_TABLE):
    last_processed_version = (
    spark.table(VERSION_TABLE)
    .filter(col("source_table") == SOURCE_TABLE)
    .filter(col("consumer_name") == CONSUMER_NAME)
    .agg(_max("last_processed_version").alias("last_version"))
    .collect()[0]["last_version"]
)
else:
    last_processed_version = None

last_processed_version = last_processed_version if last_processed_version is not None else -1

if last_processed_version >= latest_version:
    print(
        f"[INFO] No new data to process | "
        f"Latest version: {latest_version}, "
        f"Last processed: {last_processed_version}"
    )
    df_bronze = None  
else:
    start_version = last_processed_version + 1
    print(
        f"[INFO] Processing versions from {start_version} to {latest_version}"
    )

    df_bronze = (
        spark.read
        .format("delta")
        .option("startingVersion", start_version)
        .table(SOURCE_TABLE)
    )


In [0]:
if df_bronze is None or df_bronze.isEmpty():
    print("[INFO] No new wastage data to process â€” exiting job")
    dbutils.notebook.exit("NO_NEW_DATA") 

In [0]:
# df_bronze.display()

In [0]:
df_bronze = (
    df_bronze
    .select(
        "store_name",
        "budget",
        "remarks",
        "dateorg"
    )
)


In [0]:
from pyspark.sql.functions import (
    lit, expr, coalesce, try_to_date, col,
    trunc, year, date_format
)

def transform_budget_df(df):
    """
    Cleans and enriches budget dataframe.
    """
    df = (
        df.withColumnRenamed("budget", "budget_amount")
           .withColumnRenamed("store_name","deployment_name")
          .withColumn("source_system", lit("EXCEL_SHEET"))
          .withColumn("sys_id", expr("uuid()"))
          .withColumn(
              "effective_date",
              coalesce(
                  try_to_date(col("dateorg"), "MM-dd-yyyy"),
                  try_to_date(col("dateorg"), "M-d-yyyy"),
                  try_to_date(col("dateorg"), "MM/dd/yyyy"),
                  try_to_date(col("dateorg"), "M/d/yyyy")
              )
          )
          .drop("dateorg")      
    )
  
    return df

budget_df = transform_budget_df(df_bronze)
   



In [0]:
# budget_df.display()

In [0]:
# from pyspark.sql.functions import sum
# budget_df.agg(sum("budget_amount")).display()


In [0]:
def read_store_dimension():
    """
    Reads and prepares store dimension.
    """
    df = spark.read.table(f"{environment}_catalog.silver.dim_store") \
        .select("deployment_name")
    return df

store_df = read_store_dimension()


In [0]:
# store_df.display()

In [0]:
# %sql
# select * from fq_dev_catalog.silver.dim_store

In [0]:
from pyspark.sql.functions import lit, current_timestamp, col

budget_rejected_stores = (
    budget_df.alias("b")
    .join(
        store_df.alias("s"),
        col("b.deployment_name") == col("s.deployment_name"),
        "left"
    )
    .filter(col("s.deployment_name").isNull())
    .select("b.*")   # keep only budget columns
    .withColumn(
        "rejection_reason",
        lit("STORE_NOT_FOUND_IN_MASTER_TABLE_dim_store")
    )
    .withColumn("rejection_ts", current_timestamp())
)



In [0]:
# budget_rejected_stores.display()

In [0]:
# budget_rejected_stores.agg(sum("budget_amount")).display()

In [0]:
# %sql
# drop table fq_dev_catalog.silver.budget_rejected_stores

In [0]:
# spark.sql(f"""
# CREATE TABLE IF NOT EXISTS fq_dev_catalog.silver.budget_rejected_stores
# USING DELTA
# TBLPROPERTIES (delta.enableChangeDataFeed = true, delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true, delta.columnMapping.mode = 'name')
# """)

In [0]:
# %sql
# select * from fq_dev_catalog.silver.budget

In [0]:
target_table="fq_dev_catalog.silver.budget_rejected_stores"
def write_budget_rejected_to_silver(df):
    query = (
            budget_rejected_stores.write
                .format("delta")
                .mode("append")
                .option("mergeSchema", "true")
                .saveAsTable(target_table)
    )
    return query
write_budget_rejected_to_silver(budget_rejected_stores)

In [0]:
# %sql
# select distinct(deployment_name)from fq_dev_catalog.silver.budget_rejected_stores

In [0]:
# %sql
# select sum(budget_amount) as total_rejected_budget_amount
# from fq_dev_catalog.silver.budget_rejected_stores

In [0]:
store_df = store_df.withColumnRenamed("deployment_name", "store_deployment_name")

def create_final_budget(budget_df, store_df):
    """
    Joins budget with dimensions and selects final columns.
    """
    df = budget_df.join(
        store_df,
        budget_df.deployment_name == store_df.store_deployment_name,
        "inner"
    )

    df = df.select(
        "sys_id",
        "deployment_name",
        "effective_date",
        "budget_amount",
        col("remarks").alias("budget_remarks"),
        "source_system"
    )
    # display(df)
    return df

budget_base = create_final_budget(budget_df, store_df)
   



In [0]:
# budget_base.display()

In [0]:
# budget_base.display()

In [0]:
# budget_base.agg(count("sys_id")).display()

In [0]:
budget_unique = (
    budget_base
    .dropDuplicates(
        ["deployment_name", "effective_date"]
    )
)


In [0]:
# budget_unique.agg(count("sys_id")).display()

In [0]:
budget_final = (
    budget_unique.dropna(
    subset=['deployment_name','effective_date']
    )
)

In [0]:
# from pyspark.sql.functions import col
# budget_final.agg(count("sys_id")).display()


In [0]:
# from pyspark.sql.functions import sum
# budget_final.agg(sum("budget_amount")).display()


In [0]:

# %sql
# CREATE TABLE IF NOT EXISTS fq_dev_catalog.silver.budget (
#     sys_id STRING NOT NULL,
#     deployment_name STRING NOT NULL,
#     effective_date DATE NOT NULL,
#     budget_amount DOUBLE,
#     budget_remarks STRING,
#     source_system STRING NOT NULL
# )
# USING DELTA
# TBLPROPERTIES (
#     -- Enable Change Data Feed
#     delta.enableChangeDataFeed = true,
#     delta.columnMapping.mode = 'name',


#     -- Optimization properties
#     delta.autoOptimize.optimizeWrite = true,
#     delta.autoOptimize.autoCompact = true
# );


In [0]:
from delta.tables import DeltaTable
import sys

def upsert_to_silver(
    df,
    table_name,
    business_keys
):
    try:
        # Skip empty DataFrame
        if df.isEmpty():
            print("No data to upsert")
            return

        # Load target Delta table
        silver_table = DeltaTable.forName(spark, table_name)

        # Build merge condition dynamically
        merge_condition = " AND ".join(
            [f"t.{k} = s.{k}" for k in business_keys]
        )

        (
            silver_table.alias("t")
            .merge(
                df.alias("s"),
                merge_condition
            )
            .whenMatchedUpdate(set={
                "sys_id": "s.sys_id",
                "budget_amount": "s.budget_amount",
                "budget_remarks": "s.budget_remarks",
                "source_system" : "s.source_system"   
            })
            .whenNotMatchedInsert(values={
                "sys_id": "s.sys_id",
                "deployment_name" : "s.deployment_name",
                "effective_date": "s.effective_date",
                "budget_amount": "s.budget_amount",
                "budget_remarks": "s.budget_remarks",
                "source_system": "s.source_system"
            })
            .execute()
        )

        print("Batch upsert to silver completed successfully")

    except Exception as e:
        print(f" Upsert to Silver failed: {e}")
        sys.exit(1)

In [0]:
silver_table_name = f"{environment}_catalog.silver.budget"


business_keys = [
    "deployment_name",
    "effective_date"
]

upsert_to_silver(
    budget_final,        # Streaming DF (already aggregated & unique)
    silver_table_name,      # Silver table
    business_keys           # Business keys
)


In [0]:
%sql
-- select * from fq_dev_catalog.silver.budget

In [0]:
# %sql
# select count("*") from fq_dev_catalog.silver.budget

In [0]:
# %sql
# select sum(budget_amount) as total_budget_amount
# from fq_dev_catalog.silver.budget




In [0]:
# cdf_df = (
#     spark.read
#          .format("delta")
#          .option("readChangeFeed", "true")
#          .option("startingVersion", 0)
#          .table("fq_dev_catalog.silver.budget")
# )

# cdf_df.display()

In [0]:
# spark.sql(f"""
# INSERT INTO {VERSION_TABLE} (source_table, consumer_name, last_processed_version, updated_at)
# SELECT
#   '{SOURCE_TABLE}',
#   '{CONSUMER_NAME}',
#   {latest_version},
#   current_timestamp()
# WHERE NOT EXISTS (
#   SELECT 1
#   FROM {VERSION_TABLE}
#   WHERE
#     source_table = '{SOURCE_TABLE}'
#     AND consumer_name = '{CONSUMER_NAME}'
# )
# """)

In [0]:
spark.sql(f"""
UPDATE {VERSION_TABLE}
SET
  last_processed_version = {latest_version},
  updated_at = current_timestamp()
WHERE
  source_table = '{SOURCE_TABLE}'
  AND consumer_name = '{CONSUMER_NAME}'
""")

In [0]:
#  %sql
# select * from fq_dev_catalog.bronze.version_control