In [0]:
from pyspark.sql.functions import col

silver_df = spark.table("config_catalog.silver.products")

silver_cast_df = silver_df.withColumn(
    "price",
    col("price").cast("double")   # ✅ FIX
)

gold_df = silver_cast_df.selectExpr(
    "uuid() as product_sk",       # ✅ also fixing column name
    "product_id",
    "product_name",
    "price",
    "current_timestamp() as valid_from",
    "CAST(NULL AS TIMESTAMP) as valid_to",
    "true as is_current"
)

gold_df.write.format("delta") \
    .mode("append") \
    .saveAsTable("config_catalog.gold.dim_products")

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import *
from pyspark.sql.window import Window

silver_df = spark.table("config_catalog.silver.products")

gold_delta = DeltaTable.forName(spark, "config_catalog.gold.dim_products")
gold_df = gold_delta.toDF()

gold_current = gold_df.filter("is_current = true")

In [0]:
joined_df = silver_cast_df.alias("src").join(
    gold_current.alias("tgt"),
    "product_id",
    "left"
)

In [0]:
changed_df = joined_df.filter(
    col("tgt.product_id").isNotNull() &
    (
        (col("src.product_name") != col("tgt.product_name")) |
        (col("src.price") != col("tgt.price"))
    )
)

In [0]:
changed_clean_df = changed_df.select(
    col("src.product_id").alias("product_id"),
    col("src.product_name").alias("product_name"),
    col("src.price").alias("price"),
    col("src.last_updated").alias("last_updated")
)

In [0]:
window_spec = Window.partitionBy("product_id") \
                    .orderBy(col("last_updated").desc())

dedup_df = changed_clean_df.withColumn(
    "rn",
    row_number().over(window_spec)
).filter(col("rn") == 1).drop("rn")

In [0]:
gold_delta.alias("tgt").merge(
    dedup_df.alias("src"),
    "tgt.product_id = src.product_id AND tgt.is_current = true"
).whenMatchedUpdate(set={
    "valid_to": current_timestamp(),
    "is_current": lit(False)
}).execute()

In [0]:
insert_df = dedup_df.selectExpr(
    "uuid() as product_sk",
    "product_id",
    "product_name",
    "price",
    "current_timestamp() as valid_from",
    "CAST(NULL AS TIMESTAMP) as valid_to",
    "true as is_current"
)

insert_df.write.format("delta") \
    .mode("append") \
    .saveAsTable("config_catalog.gold.dim_products")

In [0]:
dbutils.widgets.text("run_id", "")
run_id = dbutils.widgets.get("run_id")

dbutils.widgets.text("pipeline_name", "")
pipeline_name = dbutils.widgets.get("pipeline_name")



jdbc_url = "jdbc:sqlserver://configserver18.database.windows.net:1433;database=config_db"

connection_props = {
    "user": 'Mahi_123',
    "password":'Maram_098',
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

from pyspark.sql.types import *
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql.functions import col

schema = StructType([
    StructField("pipeline_name", StringType(), True),
    StructField("run_id", StringType(), True),
    StructField("dataset_name", StringType(), True),
    StructField("status", StringType(), True),
    StructField("rows_copied", IntegerType(), True),
    StructField("error_message", StringType(), True),
    StructField("end_time", TimestampType(), True)
])



success_row = [Row(
        pipeline_name=pipeline_name,
        run_id=run_id,
        dataset_name="DIM_PRODUCTS",
        status="SUCCESS",
        rows_copied=int(rows_copied),
        error_message=None,
        end_time=datetime.now()
    )]

    success_df = spark.createDataFrame(success_row, schema)
    success_df.write.jdbc(url=jdbc_url, table="pipeline_log", mode="append", properties=connection_props)