**Drop source and target tables**

In [None]:
%%sql

drop table if exists sparkdb.TESTsource;
drop table if exists sparkdb.TESTtarget

****

**Declare source and target tables**

In [2]:
%%sql

create table if not exists sparkdb.TESTtarget (recid INT, name STRING, segment STRING, SCDcurrent STRING, SCDeffectiveDate STRING, SCDendDate STRING) using delta;
create table if not exists sparkdb.TESTsource (recid INT, name STRING, segment STRING) using delta

StatementMeta(, 115, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

**Fill in data into source-table**

In [3]:
%%sql

insert into sparkdb.TESTsource values(1,"FaetterBR","legetøjsbutik"), (2,"ToysRUs","legetøjsbutik")

StatementMeta(DAXSparkPoolv3, 115, 4, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

**Check source-table**

In [4]:
spark.sql("SELECT * FROM `sparkdb`.`testsource`").show(10)

StatementMeta(DAXSparkPoolv3, 115, 6, Finished, Available)

+-----+---------+-------------+
|recid|     name|      segment|
+-----+---------+-------------+
|    1|FaetterBR|legetøjsbutik|
|    2|  ToysRUs|legetøjsbutik|
+-----+---------+-------------+

**Check target-table**

In [5]:
%%pyspark
spark.sql("SELECT * FROM `sparkdb`.`testtarget`").show(10)

StatementMeta(DAXSparkPoolv3, 115, 7, Finished, Available)

+-----+----+-------+----------+----------------+----------+
|recid|name|segment|SCDcurrent|SCDeffectiveDate|SCDendDate|
+-----+----+-------+----------+----------------+----------+
+-----+----+-------+----------+----------------+----------+

**Create SCD-function**

In [6]:
## Best way to parameterize target-table in below function is to declare this as an outer variable
targetTableName = "sparkdb.testtarget"

StatementMeta(DAXSparkPoolv3, 115, 8, Finished, Available)

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

def SCDdeltaOnStream(microdf,batchid):

  # Declare target-table and source-object. Source-object is added SCD control-columns
  targetTable = DeltaTable.forName(spark, targetTableName)
  microdf = (microdf
      .withColumn("SCDcurrent",lit("true"))
      .withColumn("SCDeffectiveDate",current_timestamp())
      .withColumn("SCDendDate",lit("NULL"))
      )
  
  # key-column for matching is declared along with the SCD-columns triggering new records when updated.
  matchColumn = "recid"
  SCDcolumns = ['name','segment']

  # Identify columns not included in the ListOfSCDcolumns but still to be updated by matching records ensuring new and existing data is aligned
  NoneSCDcolumns = [col for col in DeltaTable.forName(spark,targetTableName).toDF().schema.names if col not in SCDcolumns + list(matchColumn) + ['SCDcurrent','SCDeffectiveDate','SCDendDate']]
 
  # Rows with new values in SCD-columns for existing records in the target is set aside
  newLabelsToInsert = (microdf
    .alias("updates")
    .join(targetTable.toDF().alias("target"), f"{matchColumn}")
    .where("target.SCDcurrent = true AND ({input})".format(input=" OR ".join(['updates.'+i+ ' <> ' + 'target.' + i for i in SCDcolumns])))
  )

  # Stage the update by unioning two sets of rows
  # 1. The rows that has just been set aside in above step (these are to be inserted as "current" versions of existing records)
  # 2. Rows that will either update the current attribute-labels of existing records or insert the new labels of new records
  stagedUpdates = (
    newLabelsToInsert
    .selectExpr("NULL as mergeKey", "updates.*")   # Rows for 1
    .union(microdf.selectExpr(f"{matchColumn} as mergeKey", "*"))  # Rows for 2.
  )

  # Apply SCD Type 2 operation using merge.
  # None-matching records can be grouped in following two categories: 1) rows reflecting new SCD-values for existing records, and 2) entirely new records.
  #M atching records can be grouped in two categories: 1) existing records with old SCD-values needing to be marked as obsolete and provided and SCDendDate, and 2) existing records where there might/might not be updates to none-SCD-columns 
  targetTable.alias("target").merge(
    stagedUpdates.alias("staged_updates"),f"target.{matchColumn} = mergeKey"
  ).whenMatchedUpdate(
    condition = "target.SCDcurrent = true AND ({input})".format(input=" OR ".join(['target.'+i+ ' <> ' + 'staged_updates.' + i for i in SCDcolumns])),
    set = {                                      # Set current to false and endDate to source's effective date.
      "SCDcurrent": "false",
      "SCDendDate": current_timestamp()
    }
  ).whenMatchedUpdate(
    condition = "target.SCDcurrent = true AND ({input})".format(input=" AND ".join(['target.'+i+ ' = ' + 'staged_updates.' + i for i in SCDcolumns])),
    set = {'target.'+col : 'staged_updates.'+col for col in NoneSCDcolumns}
  ).whenNotMatchedInsertAll().execute()


StatementMeta(DAXSparkPoolv3, 115, 9, Finished, Available)

**Set up streamRead and Write between source and target**

In [8]:
df = (spark.readStream
    .format("delta")
    .table("sparkdb.testsource")
)

StatementMeta(DAXSparkPoolv3, 115, 10, Finished, Available)

In [9]:
streamQuery = (df.writeStream
    .format("delta")
    ##.outputMode("append")
    .foreachBatch(SCDdeltaOnStream)
    .option("checkpointLocation","abfss://<your own container@daxdatalakestorage.dfs.core.windows.net/<folderstorage>")
    .option("mergeSchema",True)
    ##.trigger(once=True)
    .start()
    )

StatementMeta(DAXSparkPoolv3, 115, 11, Finished, Available)

**Check content of target-cell to see it has been updated with source-data (if no result remember to delete checkpoint-dir)**

In [24]:
%%pyspark
spark.sql("SELECT * FROM `sparkdb`.`testtarget`").show(10)

StatementMeta(DAXSparkPoolv3, 115, 26, Finished, Available)

+-----+---------+-------------+----------+--------------------+----------+
|recid|     name|      segment|SCDcurrent|    SCDeffectiveDate|SCDendDate|
+-----+---------+-------------+----------+--------------------+----------+
|    1|FaetterBR|legetøjsbutik|      true|2022-01-17 17:12:...|      NULL|
|    2|  ToysRUs|legetøjsbutik|      true|2022-01-17 17:12:...|      NULL|
+-----+---------+-------------+----------+--------------------+----------+

In [25]:
streamQuery.status

StatementMeta(DAXSparkPoolv3, 115, 27, Finished, Available)

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

**Add new row to source-table**

In [23]:
%%sql
insert into sparkdb.TESTsource values(3,"HammerFedt","grovsmed")

StatementMeta(DAXSparkPoolv3, 115, 25, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [26]:
%%pyspark
spark.sql("SELECT * FROM `sparkdb`.`testsource`").show(10)

StatementMeta(DAXSparkPoolv3, 115, 28, Finished, Available)

+-----+----------+-------------+
|recid|      name|      segment|
+-----+----------+-------------+
|    1| FaetterBR|legetøjsbutik|
|    2|   ToysRUs|legetøjsbutik|
|    3|HammerFedt|     grovsmed|
+-----+----------+-------------+

**Check target-table to see new row**

In [27]:
%%pyspark
spark.sql("SELECT * FROM `sparkdb`.`testtarget`").show(10)

StatementMeta(DAXSparkPoolv3, 115, 29, Finished, Available)

+-----+----------+-------------+----------+--------------------+----------+
|recid|      name|      segment|SCDcurrent|    SCDeffectiveDate|SCDendDate|
+-----+----------+-------------+----------+--------------------+----------+
|    1| FaetterBR|legetøjsbutik|      true|2022-01-17 17:12:...|      NULL|
|    2|   ToysRUs|legetøjsbutik|      true|2022-01-17 17:12:...|      NULL|
|    3|HammerFedt|     grovsmed|      true|2022-01-17 17:12:...|      NULL|
+-----+----------+-------------+----------+--------------------+----------+

In [28]:
streamQuery.status

StatementMeta(DAXSparkPoolv3, 115, 30, Finished, Available)

{'message': 'Getting offsets from DeltaSource[abfss://cosmo@daxdatalakestorage.dfs.core.windows.net/synapse/workspaces/daxprojectcosmo/warehouse/sparkdb.db/testsource]',
 'isDataAvailable': False,
 'isTriggerActive': True}

**Change row already in sourcetable**

In [29]:
DeltaTable.forName(spark, 'sparkdb.testsource').update(
  condition = col('name') == 'FaetterBR',
  set = { 'segment': lit('våbenproducent') }
)

StatementMeta(DAXSparkPoolv3, 115, 31, Finished, Available)

In [30]:
%%pyspark
spark.sql("SELECT * FROM `sparkdb`.`testsource`").show(10)

StatementMeta(DAXSparkPoolv3, 115, 32, Finished, Available)

+-----+----------+--------------+
|recid|      name|       segment|
+-----+----------+--------------+
|    1| FaetterBR|våbenproducent|
|    2|   ToysRUs| legetøjsbutik|
|    3|HammerFedt|      grovsmed|
+-----+----------+--------------+

**Check target-table to see updated row**

In [31]:
%%pyspark
spark.sql("SELECT * FROM `sparkdb`.`testtarget`").show(10)

StatementMeta(DAXSparkPoolv3, 115, 33, Finished, Available)

+-----+----------+-------------+----------+--------------------+----------+
|recid|      name|      segment|SCDcurrent|    SCDeffectiveDate|SCDendDate|
+-----+----------+-------------+----------+--------------------+----------+
|    1| FaetterBR|legetøjsbutik|      true|2022-01-17 17:12:...|      NULL|
|    2|   ToysRUs|legetøjsbutik|      true|2022-01-17 17:12:...|      NULL|
|    3|HammerFedt|     grovsmed|      true|2022-01-17 17:12:...|      NULL|
+-----+----------+-------------+----------+--------------------+----------+

In [32]:
streamQuery.status

StatementMeta(DAXSparkPoolv3, 115, 34, Finished, Available)

{'message': "Terminated with exception: Detected a data update (for example part-00000-859ad800-9c19-459b-bd76-ec6688000785-c000.snappy.parquet) in the source table at version 3. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.",
 'isDataAvailable': False,
 'isTriggerActive': False}