#### Introduction
This notebook explores managing historical data changes using Slowly Changing Dimension (SCD) Type 2 within a dimensional model. It combines Delta tables with the Kimball methodology, a data warehousing approach by Ralph Kimball, to effectively handle SCD Type 2 scenarios. 

This will cover the basics of SCD Type 2 and Delta tables, which offer efficient change tracking and versioning. It will then provide a step-by-step guide to implementing SCD Type 2 with Delta tables, including handling new records, updates, deletions, and expired data. Practical examples and code snippets will be included to help you apply these concepts in your data warehousing projects, enabling you to create reliable and scalable data architectures.

##### Slowly Changing Dimensions Type 2

SCD Type 2 addresses the challenge of managing historical changes in dimensional data by creating a new row in the dimension table whenever an attribute value changes. This approach allows for the retention of both current and historical versions of dimension records, ensuring a comprehensive view of data evolution.

Key aspects of SCD Type 2 include:

- **Surrogate Keys**: These artificial keys uniquely identify each version of a dimension record, facilitating efficient tracking of historical changes.
- **Start and End Dates**: Each record includes start and end dates to indicate the record’s validity period. The end date may be set as open-ended (e.g., '9999-12-31') to mark the current version.
- **Slow Changes**: SCD Type 2 handles infrequent modifications by creating new records for each change, such as updates to customer addresses or product descriptions.
- **Historical Integrity**: Rather than updating existing records, SCD Type 2 creates new records to maintain historical accuracy.
- **Dimension Table Structure**: The dimension table includes surrogate keys, natural keys, attribute values, and validity dates, evolving over time to capture historical changes without altering or deleting past data.


#### Example Case Scenario

Consider a company ABC require to keep track of Sales Dimensions for any change happening over time. The Analytics Engineer team suggests using SCD Type 2 with delta tables. Also, they need to track if any record was deleted in the source dataset and mark it.

Here we are keeping track of data from Bronze Layer to Bronze Layer

We'll first set up Delta Tables: one for incoming Bronze New Data and one for the Silver Data Dimension, which will act as the target table.

In [1]:
# Libraries Needed
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Path Azure Data lake
bronze_path = 'abfss://cfgfs@thorbicfg.dfs.core.windows.net/synapse/workspaces/thorbicfg/warehouse/bronze_gc.db/DimSales' 
silver_path = 'abfss://cfgfs@thorbicfg.dfs.core.windows.net/synapse/workspaces/thorbicfg/warehouse/silver_gc.db/DimSales_Temp'

In [2]:
# Silver data
columns = ["SurrogateKey","DimId", "Col1", "Col2","Col3","Hash", "CurrentFlag","DeletedFlag" , "EffectiveFromDate" , "EffectiveToDate" ]
data_s = [(100,"1", "200" , "500" , "800" , "d43fac69ca61701ab7e73bcd3243d3eee16d17fd9b272e771219748f44e296bc" , "Y","N" ,"2023-05-12","2999-12-31" )
        , (102,"6", "300" , "900" , "250" , "214f2d6d2245dfc8f7e23cb9c45d6a3ceb8d920e6894902f38a0e5c8abd3bf60" , "Y","N","2023-05-12","2999-12-31" ) 
        ,(103,"13", "900" , None , "700" , "3c47b4483777dc59a611f1623c1c459c2ddd0afe8089af993701fda05f561127" , "Y","N","2023-05-12","2999-12-31" )
        ,(104,"43", "340" , "359" , "9032" , "rc9fe69c1076002d1d9e504a8aabb57cf57e8cf6a5ab56220a6ab7010b96b6e8" , "Y","N","2023-05-12","2999-12-31")]

In [3]:
# Bronze data
columns = ["SurrogateKey","DimId", "Col1", "Col2","Col3","Hash", "CurrentFlag","DeletedFlag" , "EffectiveFromDate" , "EffectiveToDate" ]
data_b = [("","1" ,"200" , "500" , "800" , "" , "Y","N" ,"2023-05-12","2999-12-31" )
        , ("","6",  "300" , "900" , "250" , "" , "Y","N","2023-05-12","2999-12-31" ) 
        ,("","13",  "100" , None , "700" , "" , "Y","N","2023-06-08","2999-12-31")
        ,("","59",  "1500" , "2000" , "800" , "" , "Y","N","2023-06-08","2999-12-31")]

The SurrogateKey in the bronze table is initially empty and will be created when loading into the final Silver table. Additionally, the Hash column is empty now and will be generated using data from Col1, Col2, and Col3 to identify differences from the Silver data.

Now let us create both data frames:

In [6]:
silver = spark.createDataFrame(data_s).toDF(*columns)
bronze = spark.createDataFrame(data_b).toDF(*columns)

Create hash for Col1, Col2 and Col3 using sha2 function with a bit length of 256

In [7]:
HashCols = ['Col1', 'Col2', 'Col3']
bronze = bronze.withColumn("Hash", lit(sha2(concat_ws("~", *HashCols), 256)))

To create a Temporary Surrogate Key for the Bronze data frame:

1. Purpose: Surrogate keys help track historical changes and ensure unique identification of each dimension record.
3. Current State: Surrogate Key is empty in the Bronze table.
5. Method: Use the Window and row_number functions, ordered by 'DimId' and 'Hash' from the Bronze data, to generate unique values.
7. Objective: Create a temporary Surrogate Key to avoid collisions with existing keys in the Silver table and support incremental loads by adding the last maximum key from the Silver table.

In [8]:
# Keys for DimSales
keys = ['DimId', 'Hash']

# Build the dimension surrogate key
w = Window().orderBy(*keys)
bronze = bronze.withColumn("SurrogateKey", row_number().over(w))
bronze = bronze.withColumn("SurrogateKey",col("SurrogateKey").cast('long'))

<mark>Tip</mark>: Make Sure the newly generated Surrogate Key is the same datatype as the target data

In [9]:
bronze.printSchema()

In [11]:
silver.printSchema()

Let's review the updated Bronze and Silver data frames, which now include the Temporary Surrogate Key and the new Hash for the bronze data.

In [13]:
bronze.show(truncate=False)

In [12]:
silver.show(truncate=False)

We have successfully prepared the data for our Delta Tables. The next step is to create the Delta Tables using the initial `bronze_path` and `silver_path` in our Azure Data Lake folders.

In [14]:
# Write Dataframe as Delta Table (silver)
if not DeltaTable.isDeltaTable(spark, silver_path):
    silver.write.format("delta").mode("overwrite").save(silver_path)

# Write Dataframe as Delta Table (bronze)
if not DeltaTable.isDeltaTable(spark, bronze_path):
    bronze.write.format("delta").mode("overwrite").save(bronze_path)

The objective is to update records in the Silver table based on changes detected from the Bronze table and insert new records. Here's how to achieve it:

1. **Update Existing Records:** Flag records with changes as `CurrentFlag = 'N'`. For example, if `DimId = 13` has a new sale for `Col1`, update the record in Silver to indicate it's no longer current.

2. **Insert New Records:** Bronze includes new records not present in Silver that need to be inserted.

3. **Hashing for Changes:** Use the hash of `Col1`, `Col2`, and `Col3` to detect changes between Bronze and Silver.

4. **Technique:** Use a Left Anti Join to find records in Bronze that don't have matching records in Silver. This helps in identifying new records to be inserted.

5. **Match Condition:** Ensure the join condition is based on unique values using `DimId` and `Hash`. Filter results to include only records where `CurrentFlag = 'Y'` and `DeletedFlag = 'N'` to focus on current, non-deleted records.

This approach ensures you accurately track and update changes while handling new records efficiently.

In [15]:
condition = ['DimId', 'Hash', 'CurrentFlag', 'DeletedFlag']

RowsToUpdate = bronze \
            .alias("source") \
            .where("CurrentFlag = 'Y' AND DeletedFlag = 'N'") \
            .join(silver.alias("target"),condition,'leftanti') \
            .select(*columns) \
            .orderBy(col('source.DimId')) 

In [16]:
RowsToUpdate.show(truncate=False)

Before proceeding, we will:

1. **Increment Surrogate Key:** Assign new surrogate keys to the new records.
2. **Merge Changes:** Integrate the updated and new records into the Silver Delta table. 

This ensures that all new and modified records are accurately reflected in the Silver data.

In [18]:
# Retrieve maximum surrogate key in silver delta table
maxTableKey = DeltaTable.forPath(spark, silver_path).toDF().agg({"SurrogateKey":"max"}).collect()[0][0]

In [19]:
print(maxTableKey)

In [20]:
# Increment surrogate key in stage table by maxTableKey
RowsToUpdate = RowsToUpdate.withColumn("SurrogateKey", col("SurrogateKey") + maxTableKey)

To avoid key collisions:

- For new records, use the highest existing key plus an increment. 
- For example:
  - If the highest current key is 104, the next key would be 104 + 2 = 106 for the first new record.
  - For the next new record, it would be 104 + 3 = 107.

In [21]:
RowsToUpdate.show(truncate=False)

In [22]:
# Merge statement to expire old records
DeltaTable.forPath(spark, silver_path).alias("original").merge(
    source = RowsToUpdate.alias("updates"),
    condition = 'original.DimId = updates.DimId'
).whenMatchedUpdate(
    condition = "original.CurrentFlag = 'Y' AND original.DeletedFlag = 'N' AND original.Hash <> updates.Hash",
    set = {                                      
        "CurrentFlag": "'N'",
        "EffectiveToDate": lit(current_timestamp())
    }
).execute()

### SCD Type 2 (Inserts)

After DimId=13 is expired, we are still carrying over in the data frame the new records that need to be inserted.

In [23]:
# Insert all new and updated records
RowsToUpdate.select(*columns).write.mode("Append").format("delta").save(silver_path)

### SCD Type 2 (Deletes)

For handling deletions:

1. **Identify Deleted Records:** Compare the Silver table with the new incoming Bronze data. Any record in Silver not present in Bronze is considered deleted.

2. **Update Status:** Set `CurrentFlag = 'N'` and `DeletedFlag = 'Y'` for these records. The `EffectiveToDate` should be updated to today's date.

3. **Ensure Completeness:** Ensure Bronze data only includes rows with `CurrentFlag = 'Y'` and `DeletedFlag = 'N'`, which represents the current and active records.

4. **Join Condition:** Use `DimId` to identify matches between Silver and Bronze data. Update records in Silver to reflect deletions based on the absence in Bronze.

This approach keeps the Silver data consistent with the source by accurately recording historical deletions.

In [24]:
RowsToDelete = silver.alias('silver').where("CurrentFlag = 'Y' AND DeletedFlag = 'N'") \
    .join(bronze.alias('bronze'), col('bronze.DimId') == col('silver.DimId'), "leftanti")

In [25]:
# Merge statement to mark as deleted records
DeltaTable.forPath(spark, silver_path).alias("original").merge(
    source = RowsToDelete.alias("deletes"),
    condition = 'original.DimId = deletes.DimId'
).whenMatchedUpdate(
    condition = "original.CurrentFlag = 'Y' AND original.DeletedFlag = 'N'",
    set = {                                      
        "DeletedFlag": "'Y'",
        "EffectiveToDate": lit(current_timestamp())
    }
).execute()

Finally, updates, inserts, and deletes are logged and flagged. To show the last current record before deletion, use `CurrentFlag = 'Y'` and `DeletedFlag = 'Y'`. This allows retrieval of the last existing record if historical data is needed.

Different techniques, such as grouping by `DimId` and selecting the maximum `EffectiveToDate`, can be used based on your requirements. Choose the method that best fits your needs.

**Conclusion:**

In this notebook, we've examined how to effectively implement Slowly Changing Dimensions Type 2 (SCD Type 2) using Delta tables, surrogate keys, and PySpark within the Delta Lakehouse architecture. 

By adopting Ralph Kimball’s SCD Type 2 methodology, organizations gain a thorough view of their dimensional data, which enhances trend analysis, historical performance comparison, and change tracking. This approach ensures data integrity and supports informed decision-making.

Start implementing SCD Type 2 with Delta Lakehouse to elevate your data warehousing capabilities.