In [0]:
SourceTable='workspace.de_practice_source.sales'
TargetTable='workspace.de_practice_target.sales'

In [0]:
SourceDf=spark.read.table(SourceTable)  # Read source table into DataFrame
TargetDf=spark.read.table(TargetTable)  # Read target table into DataFrame

In [0]:
from pyspark.sql.functions import col

# Filter the DataFrame to show only rows where 'franchiseID' is '3000001'
# Display the filtered DataFrame for inspection
SourceDf.filter(col("franchiseID") == "3000001").display()

# The 'city' value for rows with 'franchiseID' 3000001 is 'Tokyo'

In [0]:
from pyspark.sql.functions import col, when

# Update the 'city' column in SourceDf:
# For rows where 'franchiseID' equals '3000001', set the 'city' value to 'Tokyo Modified'.
# For all other rows, retain the original 'city' value.
SourceDf = SourceDf.withColumn(
    "city",
    when(col("franchiseID") == "3000001", "Mumbai").otherwise(col("city"))
)

# Display rows where 'franchiseID' is '3000001' to verify the 'city' column update.
SourceDf.filter(col("franchiseID") == "3000001").display()

# After this update, the 'city' value for all rows with 'franchiseID' 3000001 will be 'Delhi'.

In [0]:
# Create a hash key by concatenating all columns into a single string column 'RowHash'
from pyspark.sql import functions as F

# Concatenate all columns in 'source' DataFrame into 'RowHash'
SourceDf = SourceDf.withColumn('RowHash', F.sha2(F.concat_ws('', *SourceDf.columns), 256))


In [0]:
# Add three new columns to SourceDf:
# 1. 'IndCurrent': Set to 1 for all rows, indicating the current/active record.
# 2. 'CreatedDate': Set to the current timestamp, representing when the record was created.
# 3. 'ModifiedDate': Set to the current timestamp, representing when the record was last modified.
SourceDf = SourceDf.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())

In [0]:
SourceDf.filter(col("franchiseID") == "3000001").display()

In [0]:
#join with Target Table and create Flag
TargetDf=spark.read.table(TargetTable).select(['franchiseID','RowHash','storage_id']).withColumnRenamed('RowHash','TargetHash')
SourceDf=SourceDf.join(TargetDf, on =['franchiseID'], how='left').withColumn('Flag', F.when(col('TargetHash').isNull() | (col('TargetHash') != col('RowHash')), 'New').when(col('TargetHash') == col('RowHash'), 'NoChange').otherwise('Update'))
# Drop the TargetHash column
SourceDf=SourceDf.drop('TargetHash')
SourceDf=SourceDf.filter(col("Flag") == "New")
SourceDf.display()


## 📘 What is SCD Type 2?

**SCD (Slowly Changing Dimension) Type 2** is a technique used in data warehousing to handle changes in dimension data **while preserving historical records**.

Whenever an attribute of a dimension changes (e.g., a customer's city), instead of overwriting the existing record, **SCD Type 2 inserts a new row** with the updated value. The old row is retained to maintain the historical context of data.

This method ensures that historical analysis remains accurate based on what was true **at the time** of the event or transaction.

---

### 🧠 Scenario Example (General):

Let’s say you store customer information, and the customer's city changes from **Seattle** to **Redmond**.  
With **SCD Type 2**, instead of overwriting the existing record, you **insert a new row** with the updated city while **retaining the old row**.

This approach helps preserve historical accuracy, allowing analysis based on what was true **at the time** the data was recorded.

---

### 🔹 Scenario Example (Tabular):

- **Primary Key:** `frinsied_id`  
- **Changed Attribute:** `city`  
- **Flag Column:** `indcurrent` (1 = active, 0 = inactive)

Suppose a record with `frinsied_id = 3000001` originally had `city = 'Tokyo'`. Later, the city is updated to `'Delhi'`.

With **SCD Type 2**, the table will look like this:

| surrogate_key | frinsied_id | city   | start_date | end_date   | indcurrent |
|---------------|-------------|--------|------------|------------|------------|
| 1             | 3000001     | Tokyo  | 2022-01-01 | 2023-05-15 | 0          |
| 2             | 3000001     | Delhi  | 2023-05-16 | null       | 1          |

> The original value `'Tokyo'` is preserved as a historical record.  
> The new record `'Delhi'` is added with `indcurrent = 1`, and the previous one is marked as inactive (`indcurrent = 0`).

---

📖 [Learn more: Slowly Changing Data – Microsoft Azure Data Factory](https://learn.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-overview#slowly-changing-data)


In [0]:
# Before applying the SCD Type 1 merge, let's inspect the data in the target table for a specific franchiseID
display(spark.sql("select * from workspace.de_practice_target.sales where franchiseID='3000001'"))

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit, col, udf
from pyspark.sql.types import StringType
import uuid

# Configuration
table_name = "workspace.de_practice_target.sales"
key_column = "franchiseID"
hash_column = "RowHash"
is_current_column = "IndCurrent"
surrogate_key_column = "storage_id"
created_column = "CreatedDate"

# Reference Delta table
target_table = DeltaTable.forName(spark, table_name)

# Add new columns to source DataFrame
uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())
SourceDf = SourceDf \
    .withColumn(surrogate_key_column, uuid_udf()) \
    .withColumn(created_column, current_timestamp()) \
    .withColumn(is_current_column, lit(1))

# Use aliases properly
src = SourceDf.alias("src")
tgt = target_table.alias("tgt")

# Use column expressions (not strings) in merge condition
tgt.merge(
    source=src,
    condition=(
        (col(f"tgt.{key_column}") == col(f"src.{key_column}")) &
        (col(f"tgt.{is_current_column}") == lit(1))
    )
).whenMatchedUpdate(
    condition=col(f"tgt.{hash_column}") != col(f"src.{hash_column}"),
    set={
        is_current_column: lit(0)
    }
).whenNotMatchedInsertAll().execute()


In [0]:
# Inspect the target table data for a specific franchiseID after applying the SCD Type 2 merge
display(spark.sql("select * from workspace.de_practice_target.sales where franchiseID='3000001'"))

In [0]:
SourceDf = SourceDf.drop('storage_id','Flag')
max_storage_id = spark.sql(f"select max(storage_id) as max_id from {table_name}").first()['max_id']
next_storage_id = 1 if not max_storage_id or max_storage_id == 0 else max_storage_id + 1

SourceDf = SourceDf.withColumn('storage_id', lit(next_storage_id))
SourceDf = SourceDf.withColumn('IndCurrent', lit(1))
SourceDf.write.format('delta').mode('append').saveAsTable(table_name)

In [0]:
# Inspect the target table data for a specific franchiseID after applying the SCD Type 2 merge
display(spark.sql("select * from workspace.de_practice_target.sales where franchiseID='3000001'"))