# SCD Demo for Students
For the student dimension we keep 1 row per student academy combination per academic year.

Updates in the academic year just get updated on that row. This is somewhere between a type 1 and a type 2 SCD.

Each week a dated snapshot is then taken into a "long/thin" fact table containing enrollment data along with core demographic data to track changes over time when fine level detail is needed.

Doing it this way greatly simplifies the modeling in Power BI and speeds up response times.

What follows is a core update function and then simulation of types of changes and how they would be captured. This is just a proof of concept and would need to be linked into an incoming clean base/silver table.

The basis of this code comes from https://docs.delta.io/latest/delta-update.html#slowly-changing-data-scd-type-2-operation-into-delta-tables&language-python

At the end of this notebook it drops all the tables involved.

## Setup

In [1]:
from datetime import date, datetime, timezone
from delta.tables import DeltaTable
from pyspark.sql.types import StringType, DateType, TimestampType
from pyspark.sql.functions import col, lit, concat_ws, sha2, row_number
from pyspark.sql.window import Window

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 5, Finished, Available)

In [2]:
## You need to create two lake houses in a Fabric workspace into which the following tables will be created.
## You also need to link this notebook to them in the explorer pane.
## In the example below the two lake houses are "LH_Demo_Base" and "LH_Demo_Curated" (you could adopt Silver and Gold naming respectively).

base_table = "LH_Demo_Base.demo3_students"
curated_table = "LH_Demo_Curated.demo3_dim_students"
#date_table = "LH_Demo_Curated.con_dim_dates" ## In this example we hard code dates but you could use a date table to lookup academic year details.

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 6, Finished, Available)

In [3]:
## In a production environment you'd get the current date and then look up details of the academic year from a date table along the lines of:

## Get today's date
#today = date.today()

## Print today's date
#print("Reference date:", today)

#dfDates = spark.read.table(date_table)
#currentAcYear = dfDates.where(col("date") == today).select("academic_year").collect()[0][0]
#AcYearStartDate = dfDates.where(col("academic_year") == currentAcYear).agg({"date": "min"}).collect()[0][0]
#AcYearEndDate = dfDates.where(col("academic_year") == currentAcYear).agg({"date": "max"}).collect()[0][0]

#print(currentAcYear)
#print(AcYearStartDate)
#print(AcYearEndDate)

## Instead we'll set static variables to simulate the environment:
today = datetime.strptime("2022-03-15", "%Y-%m-%d").date()
currentAcYear = "2022/2023"
AcYearStartDate = datetime.strptime("2022-09-01", "%Y-%m-%d").date()
AcYearEndDate = datetime.strptime("2023-08-31", "%Y-%m-%d").date()

print("Date:", today)
print("Academic Year:", currentAcYear)
print("Academic Year Start Date:", AcYearStartDate)
print("Academic Year End Date:", AcYearEndDate)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 7, Finished, Available)

Date: 2022-03-15
Academic Year: 2022/2023
Academic Year Start Date: 2022-09-01
Academic Year End Date: 2023-08-31


## Initial Source Table

In [4]:
## Create empty source table.
(DeltaTable.createIfNotExists(spark)
    .tableName(base_table)
    .addColumn("upn", "STRING")
    .addColumn("academy", "STRING")
    .addColumn("nc_year", "INT")
    .addColumn("forename", "STRING")
    .addColumn("surname", "STRING")
    .addColumn("dob", "DATE")
    .addColumn("gender", "STRING")
    .addColumn("fsm", "STRING")
    .addColumn("pp", "STRING")
    .addColumn("ks2_pa", "STRING")
    .addColumn("joined_date", "DATE")
    .addColumn("leaving_date", "DATE")
    .execute())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 8, Finished, Available)

<delta.tables.DeltaTable at 0x750d393ece80>

In [5]:
## Add 2 new records into the source table.

spark.sql(f"""
    INSERT INTO {base_table}
        (upn, academy, nc_year, forename, surname, dob, gender, fsm, pp, ks2_pa, joined_date, leaving_date)
    VALUES 
        ('X001', 'BAA', 7, 'Pedro', 'Dobbs', '1980-01-25', 'M', 'FSM', 'PP', 'HPA', '2021-09-01', NULL),
        ('X002', 'BPA', 8, 'John', 'Smith', '1990-04-12', 'M', 'Not FSM', 'Not PP', 'LPA', '2020-09-01', NULL)
    """)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 9, Finished, Available)

DataFrame[]

## Empty Dimension Table

In [6]:
## Create empty student dimension table.
(DeltaTable.createIfNotExists(spark)
    .tableName(curated_table)
    .addColumn("surrogate_key", "LONG") ## In a later release of Delta this will be created as an identity column which will simplify a lot of code.
    .addColumn("hash_key", "STRING")
    .addColumn("upn", "STRING")
    .addColumn("academy", "STRING")
    .addColumn("academic_year", "STRING")
    .addColumn("nc_year", "INT")
    .addColumn("student_academy_year_key", "STRING")
    .addColumn("student_academy_key", "STRING")
    .addColumn("forename", "STRING")
    .addColumn("surname", "STRING")
    .addColumn("dob", "DATE")
    .addColumn("gender", "STRING")
    .addColumn("fsm", "STRING")
    .addColumn("pp", "STRING")
    .addColumn("ks2_pa", "STRING")
    .addColumn("joined_date", "DATE")
    .addColumn("leaving_date", "DATE")
    .addColumn("academic_year_from_date", "DATE")
    .addColumn("academic_year_to_date", "DATE")
    .addColumn("current", "BOOLEAN")
    .addColumn("updated_at", "TIMESTAMP")
    .addColumn("deleted_from_source_in_year", "BOOLEAN")
    .execute())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 10, Finished, Available)

<delta.tables.DeltaTable at 0x750d393eebc0>

## Main SCD2 Function

In [7]:
def merge_scd2(updates_df, dimension_table, academic_year, academic_year_start, academic_year_end):
    ## Create a hash key from all the columns in the updates df to identify changes.
    updates_df = (updates_df
                    .withColumn("hash_key", sha2(concat_ws("", *updates_df.columns), 256))
                    .withColumn("academic_year", lit(academic_year))
                    .withColumn("student_academy_key", concat_ws("-", col("upn"), col("academy")))
                    .withColumn("student_academy_year_key", concat_ws("-", col("upn"), col("academy"), lit(academic_year)))
                )

    ## Retrieve maximum surrogate key in curated delta table
    maxTableKey = dimension_table.toDF().agg({"surrogate_key":"max"}).collect()[0][0]
    if maxTableKey is None:
        maxTableKey = 0

    ## Identify records in the source that are new versions of existing records for that academic year.
    changed_records = (updates_df.alias("updates")
                        .join(dimension_table.toDF().alias("scd2"), "student_academy_year_key")
                        .where("updates.hash_key <> scd2.hash_key")
                        .selectExpr("'record to update' as merge_type", "updates.*", "scd2.surrogate_key")
                    )

    print("changed records")
    display(changed_records)

    
    ## Identify records in the source that have no existing version in the dimension for that academic year.
    new_records = (updates_df.alias("updates")
                    .join(dimension_table.toDF().alias("scd2"), "student_academy_year_key", "leftanti")
                    .selectExpr("'new record' as merge_type", "updates.*")
                )

    ## Assign new surrogate keys to these records
    w = Window().orderBy("upn")
    new_records = new_records.withColumn("surrogate_key", row_number().over(w))
    new_records = new_records.withColumn("surrogate_Key", col("surrogate_key").cast('long'))
    new_records = new_records.withColumn("surrogate_key", col("surrogate_key") + maxTableKey)

    print("new records")
    display(new_records)

    ## Stage Updates

    ## We create a staging data frame with both the new versions and the new records.
    staged_updates = (changed_records
                        .union(new_records)
                    )

    print("staged_updates")
    display(staged_updates)

    ## Add an updated column.
    now = datetime.now(timezone.utc).isoformat()
    staged_updates = staged_updates.withColumn('updated_at', lit(now))

    ## Apply SCD Type 2 operation using merge
    (dimension_table.alias("scd2")
        .merge(staged_updates.alias("staged_updates"),"scd2.student_academy_year_key = staged_updates.student_academy_year_key")
            .whenMatchedUpdate(
                set = {    
                    "hash_key": "staged_updates.hash_key",
                    "forename": "staged_updates.forename",
                    "surname": "staged_updates.surname",
                    "nc_year": "staged_updates.nc_year",
                    "dob": "staged_updates.dob",
                    "gender": "staged_updates.gender",
                    "fsm": "staged_updates.fsm",
                    "pp": "staged_updates.pp",
                    "ks2_pa": "staged_updates.ks2_pa",
                    "joined_date": "staged_updates.joined_date",
                    "leaving_date": "staged_updates.leaving_date",
                    "current": "true",
                    "academic_year_from_date": lit(academic_year_start.isoformat()),
                    "academic_year_to_date": lit(academic_year_end.isoformat()),
                    "updated_at": "staged_updates.updated_at",
                    "deleted_from_source_in_year": "false"
                })          
            .whenNotMatchedInsert(
                values = {
                    "surrogate_key": "staged_updates.surrogate_key",
                    "hash_key": "staged_updates.hash_key",
                    "upn": "staged_updates.upn",
                    "academy": "staged_updates.academy",
                    "academic_year": "staged_updates.academic_year",
                    "student_academy_year_key": "staged_updates.student_academy_year_key",
                    "student_academy_key": "staged_updates.student_academy_key",
                    "forename": "staged_updates.forename",
                    "surname": "staged_updates.surname",
                    "nc_year": "staged_updates.nc_year",
                    "dob": "staged_updates.dob",
                    "gender": "staged_updates.gender",
                    "fsm": "staged_updates.fsm",
                    "pp": "staged_updates.pp",
                    "ks2_pa": "staged_updates.ks2_pa",
                    "joined_date": "staged_updates.joined_date",
                    "leaving_date": "staged_updates.leaving_date",
                    "current": "true",
                    "academic_year_from_date": lit(academic_year_start.isoformat()),
                    "academic_year_to_date": lit(academic_year_end.isoformat()),
                    "updated_at": "staged_updates.updated_at",
                    "deleted_from_source_in_year": "false"
                })
            ).execute()

    ## Finally we want to identify records that no longer exist in the source.
    deleted_records = (dimension_table.toDF().alias("scd2")
                            .join(updates_df.alias("updates"), "student_academy_year_key", "leftanti")
                            .where(f"scd2.academic_year = '{academic_year}' and scd2.deleted_from_source_in_year = 'false'")
                            .selectExpr("scd2.*")
                        )

    print("deleted records")
    display(deleted_records)

    ## Flag deleted records.
    (dimension_table.alias("scd2")
        .merge(deleted_records.alias("deleted"),"scd2.student_academy_year_key = deleted.student_academy_year_key") # This will only mark latest records without end dates as deleted from source.
            .whenMatchedUpdate(
                set = {
                    "deleted_from_source_in_year": "true",
                    "leaving_date": lit(now),  ## Record not in source so best we can do is assume leaving_date.
                    "current": "false"
                })
    ).execute()


StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 11, Finished, Available)

## Example Usage

In [8]:
## Reference student dimension table
dimension_table = DeltaTable.forName(spark, curated_table)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 12, Finished, Available)

### Initial Insert

In [9]:
updates_df = spark.read.table(base_table)
display(updates_df)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, b0872858-eef0-4d0c-bab4-ef0209c3d6d6)

In [10]:
## Merge updated into scd2 table
merge_scd2(updates_df, dimension_table, currentAcYear, AcYearStartDate, AcYearEndDate)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 14, Finished, Available)

changed records


SynapseWidget(Synapse.DataFrame, 112f9bd6-2638-4232-ba64-7bcc76b303be)

new records


SynapseWidget(Synapse.DataFrame, de813d12-9e05-4e8d-9ea7-df59e7cba13c)

staged_updates


SynapseWidget(Synapse.DataFrame, 45231234-1d4b-4c10-9887-aa3062835fd0)

deleted records


SynapseWidget(Synapse.DataFrame, c60a70cc-4d82-4d32-afcc-26c70bf474d8)

In [11]:
## Display sink table
dimension_table = DeltaTable.forName(spark, curated_table)
display(dimension_table.toDF())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 488d0626-337b-4e23-8394-ece006c2ca98)

### Modify source by updating some records and inserting new ones.

In [12]:
## Load latest state of source table.
source_table = DeltaTable.forName(spark, base_table)
print ("Original data...")
display(source_table.toDF())

## Change one records

source_table.update(
  condition = col("upn") == "X001",
  set = { "fsm": lit("Not FSM"),
          "pp": lit("Not PP") })

## Load latest state of source table
source_table = DeltaTable.forName(spark, base_table)
print ("Updated data...")
display(source_table.toDF())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 16, Finished, Available)

Original data...


SynapseWidget(Synapse.DataFrame, 165b68df-4e30-4d88-b0f4-c88098ea10d2)

Updated data...


SynapseWidget(Synapse.DataFrame, 05062ba7-6be9-4524-a0af-47097fca221d)

In [13]:
## Add 2 new records

spark.sql(f"""
    INSERT INTO {base_table}
        (upn, academy, nc_year, forename, surname, dob, gender, fsm, pp, ks2_pa, joined_date, leaving_date)
    VALUES 
        ('X003', 'BAA', 9, 'Alice', 'Higgs', '1983-09-25', 'F', 'Not FSM', 'PP', 'HPA', '2019-09-01', NULL),
        ('X004', 'BTA', 10, 'Natalie', 'Anderson', '1992-04-10', 'F', 'Not FSM', 'Not PP', 'LPA', '2018-09-01', NULL)
    """)



StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 17, Finished, Available)

DataFrame[]

In [14]:
updates_df = spark.read.table(base_table)
display(updates_df)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 18, Finished, Available)

SynapseWidget(Synapse.DataFrame, bb082e5d-4392-4c21-8ffd-823d0b506732)

In [15]:
## Merge updated into scd2 table
merge_scd2(updates_df, dimension_table, currentAcYear, AcYearStartDate, AcYearEndDate)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 19, Finished, Available)

changed records


SynapseWidget(Synapse.DataFrame, 8e0a25f3-5f2a-4c5d-8e73-99f535349265)

new records


SynapseWidget(Synapse.DataFrame, 40c6f536-0685-481e-a65f-53e6a2e8e64e)

staged_updates


SynapseWidget(Synapse.DataFrame, d1926f68-7c76-4a6f-82b4-f1ad12503f07)

deleted records


SynapseWidget(Synapse.DataFrame, a20ddf92-9b03-4573-b43b-18897106b4b6)

In [16]:
## Display dimension table
dimension_table = DeltaTable.forName(spark, curated_table)
display(dimension_table.toDF())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 20, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0f59a6df-5302-4efb-bf68-55cbfbf9f71a)

### Change Academic Year

In [17]:
## We'll now simulate a new academic year, against with static variables:
today = datetime.strptime("2024-03-15", "%Y-%m-%d").date()
currentAcYear = '2023/2024'
AcYearStartDate = datetime.strptime("2023-09-01", "%Y-%m-%d").date()
AcYearEndDate = datetime.strptime("2024-08-31", "%Y-%m-%d").date()

print("Date:", today)
print("Academic Year:", currentAcYear)
print("Academic Year Start Date:", AcYearStartDate)
print("Academic Year End Date:", AcYearEndDate)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 21, Finished, Available)

Date: 2024-03-15
Academic Year: 2023/2024
Academic Year Start Date: 2023-09-01
Academic Year End Date: 2024-08-31


In [18]:
## Increment NC Year

## Load latest state of source table
source_table = DeltaTable.forName(spark, base_table)
print ("Original data...")
display(source_table.toDF())

source_table.update(
  set = { "nc_year": "nc_year + 1"})

## Load latest state of source table to show updates
source_table = DeltaTable.forName(spark, base_table)
print ("Updated data...")
display(source_table.toDF())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 22, Finished, Available)

Original data...


SynapseWidget(Synapse.DataFrame, 29bf741f-9115-4844-a0c1-90a603daa502)

Updated data...


SynapseWidget(Synapse.DataFrame, 018f5531-6e24-4abc-a87c-b077c943e0a6)

In [19]:
## Merge updated into scd2 table
merge_scd2(updates_df, dimension_table, currentAcYear, AcYearStartDate, AcYearEndDate)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 23, Finished, Available)

changed records


SynapseWidget(Synapse.DataFrame, 18e20110-6943-4c13-982f-49ab235ba898)

new records


SynapseWidget(Synapse.DataFrame, 500343e5-465a-4616-8d64-2e72269e0438)

staged_updates


SynapseWidget(Synapse.DataFrame, 6030ff20-0d90-4e37-a6be-efb5e883f4b7)

deleted records


SynapseWidget(Synapse.DataFrame, e8a69379-2ee2-498a-98cb-c1caec4dc9c1)

In [20]:
## Display sink table
dimension_table = DeltaTable.forName(spark, curated_table)
display(dimension_table.toDF())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 24, Finished, Available)

SynapseWidget(Synapse.DataFrame, eb5ea46d-ca97-4587-ac03-a20b13041d36)

### Delete a record in source

In [21]:
## Delete a record.
spark.sql(f"""
    DELETE FROM {base_table}
    WHERE
        UPN = 'X003' 
    """)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 25, Finished, Available)

DataFrame[num_affected_rows: bigint]

In [22]:
## Get source table.
updates_df = spark.read.table(base_table)
display(updates_df)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 26, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4db4d698-1b08-4921-a561-9fd209f21635)

In [23]:
# Merge updated into scd2 table
merge_scd2(updates_df, dimension_table, currentAcYear, AcYearStartDate, AcYearEndDate)

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 27, Finished, Available)

changed records


SynapseWidget(Synapse.DataFrame, d1d6906e-b685-4212-aafd-225d74bfb376)

new records


SynapseWidget(Synapse.DataFrame, 86146594-d885-4611-a475-00054f650adc)

staged_updates


SynapseWidget(Synapse.DataFrame, 19da5c58-7b35-4803-9e00-d70f3e791f28)

deleted records


SynapseWidget(Synapse.DataFrame, e576042f-dde3-4fc6-8958-2196ee8bf976)

In [24]:
## Display sink table
dimension_table = DeltaTable.forName(spark, curated_table)
display(dimension_table.toDF())

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 28, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5377b44b-b227-4eea-8d6b-e31a6810149a)

### Clean Up

In [25]:
spark.sql(f"DROP TABLE IF EXISTS {base_table}")
spark.sql(f"DROP TABLE IF EXISTS {curated_table}")

StatementMeta(, f383dc19-b995-42b1-9aad-c408f942fc0e, 29, Finished, Available)

DataFrame[]