#######Runtime `Parameter`

In [0]:
dbutils.widgets.text("env","dev")
env = dbutils.widgets.get("env")

#######Load Configuration

In [0]:
%run "/Workspace/Repos/azuredataengineer44@gmail.com/databricks-traffic/Databricks Retail Notebooks/common/config_loader"

#######Read Bronze Products

In [0]:
config = load_config(env)
catalog = config["unity_catalog"]["catalog"]

bronze_table = f"{catalog}.bronze.products_raw"

df_bronze = spark.read.table(bronze_table)

#######Prepare Incoming Records

In [0]:
from pyspark.sql.functions import (col ,current_date,lit,to_timestamp,coalesce)

df_incoming1 = (df_bronze
               .withColumn("created_ts",to_timestamp("created_at"))
               .withColumn("updated_ts",to_timestamp("updated_at"))
               ).filter(col("_rescued_data").isNull())
               

df_incoming = (df_incoming1
.select("product_id",
        "product_name",
        "category",
        "brand",
        "price",
        "currency",
        "is_active",
        "ingestion_ts",
        "source_file_path",
        "created_ts",
        "updated_ts"        
)
.withColumn("effective_from",coalesce(col("updated_ts"), col("created_ts")))
.withColumn("effective_to",lit(None).cast("timestamp"))
.withColumn("is_current",lit(True))
)

In [0]:
# df_incoming.display()

#######Apply DQ Checks

In [0]:
df_products = df_incoming



In [0]:
from pyspark.sql.functions import col, when, lit

df_validated = (
    df_products
    .withColumn(
        "dqerror",
        when (col("product_id").isNull(), lit("Product ID is null"))
        .when (col("product_name").isNull(), lit("Product Name is null"))
        .when (col("price") < 0 , lit("Price is negative"))
        .when (col("currency").isNull(), lit("Currency is null"))     
        # .when (col("res_data").isNotNull(), lit("Invalid JSON"))   
        .otherwise(lit(None))
)
)


In [0]:
# df_validated.display()


#######Split Valid vs Invalid Records

In [0]:
df_valid = df_validated.filter(col("dqerror").isNull())
df_invalid = df_validated.filter(col("dqerror").isNotNull())                               

#######Create Quarantine Table

In [0]:
# catalog = config["unity_catalog"]["catalog"]

quarantine_table = f"{catalog}.silver.products_quarantine"

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {quarantine_table} (
  product_id STRING,
  product_name STRING,
  category STRING,
  brand STRING,
  price STRING,
  currency STRING,
  is_active STRING,
  ingestion_ts TIMESTAMP,
  source_file STRING,
  dqerror STRING
)
USING DELTA
""")


In [0]:
df_invalid1 = df_invalid.withColumnRenamed("source_file_path","source_file")   

#######Load Invalid Records In Quarantine Table

In [0]:
df_quarantine = df_invalid1.select(
    col("product_id"),
    col("product_name"),
    col("category"),
    col("brand"),
    col("price"),
    col("currency"),
    col("is_active"),
    col("ingestion_ts"),
    col("source_file"),
    col("dqerror")
)


#######Dedupe Valid Data 

In [0]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

window_spec = (
    Window
    .partitionBy("product_id", "effective_from")
    .orderBy(col("ingestion_ts").desc())
)

df_incoming_deduped = (
    df_valid
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)


In [0]:
from pyspark.sql.functions import max as spark_max

df_latest_event = (
    df_incoming_deduped
    .groupBy("product_id")
    .agg(spark_max("effective_from").alias("max_effective_from"))
)
# df_latest_event.display()

In [0]:
df_incoming_final = (
    df_incoming_deduped.alias("s")
    .join(
        df_latest_event.alias("m"),
        (col("s.product_id") == col("m.product_id")) &
        (col("s.effective_from") == col("m.max_effective_from")),
        how = "left"
    )
    .select(
        col("s.product_id").alias("product_id"),
        col("s.product_name"),
        col("s.category"),
        col("s.brand"),
        col("s.price"),
        col("s.currency"),
        col("s.is_active"),
        col("s.effective_from"),
        col("s.ingestion_ts"),
        col("s.source_file_path"),
        col("m.max_effective_from").isNotNull().alias("is_current")
    )
    .drop("max_effective_from")
    )

#######Derive Field : [effective_to]

In [0]:
from pyspark.sql.functions import lead, col
from pyspark.sql.window import Window

window_spec = (
    Window
    .partitionBy("product_id")
    .orderBy("effective_from")
)

df_incoming_timed = (
    df_incoming_final
    .withColumn(
        "effective_to",
        lead("effective_from").over(window_spec)
    )
    )

In [0]:
# from pyspark.sql.functions import col

# display(
#   df_incoming_timed.filter(
#     col("product_id") == "P001"
#   )
# )

#######Create Silver Table

In [0]:
silver_table = f"{catalog}.silver.products_scd2"

spark.sql(f"""
          CREATE TABLE IF NOT EXISTS {silver_table} 
          (
            product_id string,
            product_name string,
            category string,
            brand string,
            price integer,
            currency string,
            is_active boolean,
            effective_from DATE,
            effective_to DATE,
            is_current BOOLEAN,
            ingestion_ts TIMESTAMP,
            source_file STRING
           )
            USING DELTA
            """)

#######Define Merge Conditions

In [0]:
from delta.tables  import DeltaTable
from pyspark.sql.functions import current_date, lit

silver_dt = DeltaTable.forName(spark, silver_table)

merge_condition = """
                   t.product_id = s.product_id
                   AND t.is_current = true
                   AND s.effective_from > t.effective_from                   
                   """
change_condition = ("""
                    t.product_name <> s.product_name OR
                    t.category <> s.category OR
                    t.brand <> s.brand OR
                    t.price <> s.price OR
                    t.currency <> s.currency OR
                    t.is_active <> s.is_active
                    """)

In [0]:
# %python
# display(
#     silver_dt.toDF()
# )

# df_incoming1.display()

#######Merge Silver.Products Data

In [0]:

merge_builder = (
                silver_dt.alias("t")
                .merge(
                    df_incoming_timed.alias("s"),merge_condition
                )
                .whenMatchedUpdate(condition = change_condition,
                                   set = {
                                       "effective_to":col("s.effective_from"),
                                       "is_current": lit(False)
                                   })
                .whenNotMatchedInsert(
        values={
            "product_id": "product_id",
            "product_name": "s.product_name",
            "category": "s.category",
            "brand": "s.brand",
            "price": "s.price",
            "currency": "s.currency",
            "is_active": "s.is_active",
            "ingestion_ts": "s.ingestion_ts",
            "source_file": lit(None),  # or another default value
            "source_file": "s.source_file_path",
            "effective_from": "s.effective_from",
            "effective_to": "s.effective_to",
            "is_current": "s.is_current"
        }
    )
)

merge_builder.execute()

In [0]:
total_count = df_products.count()
invalid_count = df_invalid.count()

print(f"Total records: {total_count}")
print(f"Invalid records: {invalid_count}")

In [0]:
%skip
spark.sql("""select * from dev_catalog.silver.products_scd2 where product_id ='P001'""").display()

In [0]:
%skip
%sql
WITH ranked AS (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY product_id, effective_from
      ORDER BY ingestion_ts DESC
    ) AS rn
  FROM dev_catalog.silver.products_scd2
)

SELECT *
FROM ranked
WHERE rn > 1;


In [0]:
%skip
%sql

DELETE FROM dev_catalog.silver.products_scd2 t
WHERE EXISTS (
  SELECT 1
  FROM (
    SELECT
      product_id,
      effective_from,
      ROW_NUMBER() OVER (
        PARTITION BY product_id, effective_from
        ORDER BY ingestion_ts DESC
      ) AS rn
    FROM dev_catalog.silver.products_scd2
  ) s
  WHERE s.product_id = t.product_id
    AND s.effective_from = t.effective_from
    AND s.rn > 1
);
