In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.scd_source")
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.scd_target")

DataFrame[]

In [0]:
customer_data = spark.sql("select * from samples.tpch.customer")
customer_data.write.mode("overwrite").saveAsTable(
    "workspace.scd_source.customer_data"
)

In [0]:
source = spark.read.table("workspace.scd_source.customer_data")

In [0]:
from pyspark.sql.functions import col

# Filter the DataFrame to show only rows where 'franchiseID' is '3000001'
# Display the filtered DataFrame for inspection
source.filter(col("c_custkey") == "412450").display()

c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment
412450,Customer#000412450,fUD6IoGdtF,20,30-293-696-5047,4406.28,BUILDING,refully final dolphins after the carefully bold packages sleep quickly express deposits. fluffily


In [0]:
from pyspark.sql.functions import col, when

# Update the 'city' column in SourceDf:
# For rows where 'franchiseID' equals '3000001', set the 'city' value to 'Tokyo Modified'.
# For all other rows, retain the original 'city' value.
source = source.withColumn(
    "customer_priority",
    when(col("c_mktsegment") == "BUILDING", "Priority Customer").otherwise(
        "Not Priority"
    ),
)

# Display rows where 'franchiseID' is '3000001' to verify the 'city' column update.
source.filter(col("customer_priority") == "Priority Customer").display()

In [0]:
source = source.drop("customer_priority")

In [0]:
from pyspark.sql import functions as F

# Load Data From Source and concatenate all columns into 'ConCatValue'
source = source.withColumn('RowHash', F.sha2(F.concat_ws('', *source.columns), 256))

display(source)

In [0]:
source = source.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window_spec = Window.orderBy(F.monotonically_increasing_id())
source = source.withColumn("storage_id", F.row_number().over(window_spec))

first_cols = ["storage_id"]
other_cols = [col for col in source.columns if col not in first_cols]
source = source.select(first_cols + other_cols)

display(source)

In [0]:
source.write.mode("append").saveAsTable(
    "workspace.scd_target.customer_data"
)



In [0]:
table_name = spark.read.table("workspace.scd_target.customer_data")

In [0]:
source = source.drop('storage_id')



In [0]:
target=spark.read.table(table_name).select(['supplierID','RowHash','storage_id']).withColumnRenamed('RowHash','TargetHash')

source=source.join(target, on=['supplierID'], how='left').withColumn('Flag', F.when(col('TargetHash').isNull() | (col('TargetHash') != col('RowHash')), 'New').when(col('TargetHash') == col('RowHash'), 'NoChange').otherwise('Update'))

source=source.drop('TargetHash')
source=source.filter(col("Flag") == "New")



In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit, col, udf
from pyspark.sql.types import StringType
import uuid

# Configuration
table_name = "workspace.scd_target.customer_data"
key_column = "c_custkey"
hash_column = "RowHash"
is_current_column = "IndCurrent"
surrogate_key_column = "storage_id"
created_column = "CreatedDate"

# Reference Delta table
target_table = DeltaTable.forName(spark, "workspace.scd_target.customer_data")

# Add new columns to source DataFrame
uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())
source = source \
    .withColumn(surrogate_key_column, uuid_udf()) \
    .withColumn(created_column, current_timestamp()) \
    .withColumn(is_current_column, lit(1))

# Use aliases properly
src = source.alias("src")
tgt = target_table.alias("tgt")

tgt.merge(
    source=src,
    condition=(
        (col(f"tgt.{key_column}") == col(f"src.{key_column}")) &
        (col(f"tgt.{is_current_column}") == lit(1))
    )
).whenMatchedUpdate(
    condition=col(f"tgt.{hash_column}") != col(f"src.{hash_column}"),
    set={
        is_current_column: lit(0)
    }
).whenNotMatchedInsertAll().execute()


In [0]:
source = source.drop('storage_id','Flag')
max_storage_id = spark.sql(f"select max(storage_id) as max_id from {table_name}").first()['max_id']
next_storage_id = 1 if not max_storage_id or max_storage_id == 0 else max_storage_id + 1

source = source.withColumn('storage_id', lit(next_storage_id))
source = source.withColumn('IndCurrent', lit(1))
source.write.format('delta').mode('append').saveAsTable(table_name)

