In [1]:
# Import functions
from pyspark import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *
import datetime

StatementMeta(, 9209489c-df0b-4f9c-92c5-14075561863b, 3, Finished, Available, Finished)

In [4]:
# SCD2 function
def fn_SCD2(schemaName, tableName, primaryKey):

    # Fetch data from Bronze or intermediate Silver layer
    dataChanged = spark.read.table(f"{schemaName}.clean_{tableName}") 

    # Remove loading_date column from dataset
    dataChanged = dataChanged.drop('loading_date')

    # Generate hash key if primary is missing
    if not primaryKey or primaryKey == "":
        dataChanged = dataChanged.withColumn("hash", \
        sha2(concat_ws("||", *dataChanged.columns), 256))
        primaryKey = 'hash'

    # Create list with all columns
    columnNames = dataChanged.schema.names

    # Set date
    current_date = datetime.date.today()

    # Try and read existing dataset
    try:
        # Read original data - this is your scd type 2 table holding all data
        dataOriginal = spark.read.table(f"{schemaName}.hist_{tableName}")
    except:
        # Use first load when no data exists yet
        newOriginalData = dataChanged.withColumn('current', lit(True)) \
        .withColumn('effectiveDate', lit(current_date)) \
        .withColumn('endDate', lit(datetime.date(9999, 12, 31)))
        newOriginalData.write.format("delta").mode("overwrite") \
        .saveAsTable(f"{schemaName}.hist_{tableName}")

    # Read original data - this is your scd type 2 table holding all data
    dataOriginal = spark.read.table(f"{schemaName}.hist_{tableName}")

    # Rename all columns in dataChanged, prepend src_ to column names
    df_new = dataChanged.select([F.col(c).alias("src_"+c) \
    for c in dataChanged.columns])
    src_columnNames = df_new.schema.names
    df_new2 = df_new.withColumn('src_current', lit(True)) \
    .withColumn('src_effectiveDate', lit(current_date)) \
    .withColumn('src_endDate', lit(datetime.date(9999, 12, 31)))

    # Create dynamic columns
    src_primaryKey = 'src_' + primaryKey

    # FULL Merge, join on key column and also 
    # date column to make only join to the latest records
    df_merge = dataOriginal.join(df_new2, (df_new2[src_primaryKey] \
    == dataOriginal[primaryKey]), how='fullouter')

    # Derive new column to indicate the action
    df_merge = df_merge.withColumn('action',
        when(concat_ws('+', *columnNames) == \
        concat_ws('+', *src_columnNames), 'NOACTION')
        .when(df_merge.current == False, 'NOACTION')
        .when(df_merge[src_primaryKey].isNull() & df_merge.current, 'DELETE')
        .when(df_merge[src_primaryKey].isNull(), 'INSERT')
        .otherwise('UPDATE')
    )

    # Generate target selections based on action codes
    column_names = columnNames + ['current', 'effectiveDate', 'endDate']
    src_column_names = src_columnNames + ['src_current', \
    'src_effectiveDate', 'src_endDate']

    # For records that needs no action
    df_merge_p1 = df_merge.filter(df_merge.action == \
    'NOACTION').select(column_names)

    # For records that needs insert only
    df_merge_p2 = df_merge.filter(df_merge.action == \
    'INSERT').select(src_column_names)
    df_merge_p2_1 = df_merge_p2.select([F.col(c) \
    .alias(c.replace(c[0:4], "")) for c in df_merge_p2.columns])

    # For records that needs to be deleted
    df_merge_p3 = df_merge.filter(df_merge.action == \
    'DELETE').select(column_names).withColumn('current', lit(False)) \
    .withColumn('endDate', lit(current_date))

    # For records that needs to be expired and then inserted
    df_merge_p4_1 = df_merge.filter(df_merge.action == \
    'UPDATE').select(src_column_names)
    df_merge_p4_2 = df_merge_p4_1.select([F.col(c) \
    .alias(c.replace(c[0:4], "")) for c in df_merge_p2.columns])

    # Replace src_ alias in all columns
    df_merge_p4_3 = df_merge.filter(df_merge.action == \
    'UPDATE').withColumn('endDate', date_sub(df_merge.src_effectiveDate, 1)) \
    .withColumn('current', lit(False)).select(column_names)

    # Union all records together
    df_merge_final = df_merge_p1.unionAll(df_merge_p2) \
    .unionAll(df_merge_p3).unionAll(df_merge_p4_2).unionAll(df_merge_p4_3)

    # At last, you can overwrite existing data using this new data frame
    df_merge_final.write.format("delta").mode("overwrite") \
    .saveAsTable(schemaName + ".hist_" + tableName)

StatementMeta(, 9209489c-df0b-4f9c-92c5-14075561863b, 6, Finished, Available, Finished)

In [5]:
fn_SCD2("adventureworks","address","AddressID")
fn_SCD2("adventureworks","customer","CustomerID")
fn_SCD2("adventureworks","customeraddress","")
fn_SCD2("adventureworks","product","ProductID")
fn_SCD2("adventureworks","productcategory","ProductCategoryID")
fn_SCD2("adventureworks","productdescription","ProductDescriptionID")
fn_SCD2("adventureworks","productmodel","ProductModelID")
fn_SCD2("adventureworks","productmodelproductdescription","")
fn_SCD2("adventureworks","salesorderdetail","")
fn_SCD2("adventureworks","salesorderheader","SalesOrderID")

StatementMeta(, 9209489c-df0b-4f9c-92c5-14075561863b, 7, Finished, Available, Finished)