# Config stuff

In [1]:

from pyspark.sql import SparkSession
import ConnectionConfig as cc
from delta import DeltaTable
from pyspark.sql.functions import *
from delta import configure_spark_with_delta_pip
from datetime import datetime

In [2]:

builder = SparkSession.builder \
    .appName("DimDate") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.driver.extraClassPath", ":".join(cc.jars)) \
    .master("local[*]")
builder = configure_spark_with_delta_pip(builder)
spark = builder.getOrCreate()
builder.getOrCreate()

## Incremental load

The SCD2 implementation requires a more complex tranformation to correctly handle changes in the source files. For detailed information consult the comments in the code.

### Read source table

##### 1. READ EXISTING DIMENSION
Read the existing deltaTable (as a deltaTable object, not a Dataframe). Make the table available as a View.
Get the maximum surrogate key value and the current date to use in futher processing

In [3]:

dimSalesRepDelta = DeltaTable.forPath(spark,".\spark-warehouse\dimsalesrep")
dimSalesRepDelta.toDF().createOrReplaceTempView("dimSalesRep")

run_timestamp =datetime.now().strftime("%Y-%m-%d %H:%M:%S") #The job runtime is stored in a variable
maxSK = spark.sql("select max(salesRepSK) salesRepSK from dimSalesRep").first()[0] #The current maximum "surrogate key" (SK) has to be known because we have to start counting from MAX + 1 for new SK.




##### 2 READ SOURCE TABLE
Creating dataframe with source table (from operational system). Transformed to the dimension format.
The surrogate key is the maxSK + the id counter. Otherwise SK will not be unique.

In [4]:
cc.set_connection("mydb")

sourceSalesReps_df = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "dbo.salesrep") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "salesRepID") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 20) \
    .load() \
    .withColumn("salesRepSK", maxSK + monotonically_increasing_id()) \
    .withColumn("scd_start", lit(run_timestamp).cast("timestamp")) \
    .withColumn("scd_end", lit("2100-12-12").cast("timestamp")) \
    .withColumn("md5", md5(concat( col('name'), col('office')))) \
    .withColumn("current", lit(True))


##### 3 DETECT CHANGES
Dataframe to identify SCD2 changed rows.
First a join between SOURCE and DIMENSION is performed
   The md5 hash is used to identify differences.
   The list contains:
       - updated source-rows (the join finds a row)  and
       - new source-rows (the leftouter join does not find a row)

In [5]:
detectedChanges = sourceSalesReps_df.alias("source") \
    .join(dimSalesRepDelta.toDF().alias("dwh"), "salesRepID", "leftouter") \
    .filter("(dwh.current = true AND source.md5 <> dwh.md5 ) OR dwh.salesRepId is null")
    # Filter filters updated records (md5 check) OR new records (null check)

detectedChanges.show()

+----------+---------+------+-----------+-------------------+-------------------+--------------------+-------+---------+--------+----------+-------------------+-------------------+--------------------+-------+
|salesRepID|     name|office| salesRepSK|          scd_start|            scd_end|                 md5|current|     name|  office|salesRepSK|          scd_start|            scd_end|                 md5|current|
+----------+---------+------+-----------+-------------------+-------------------+--------------------+-------+---------+--------+----------+-------------------+-------------------+--------------------+-------+
|         4|R. Geller| Paris|17179869190|2022-09-26 16:02:55|2100-12-12 00:00:00|6f34bd2f09f010ec1...|   true|R. Geller|New York|         3|1990-01-01 00:00:00|2100-12-12 00:00:00|6212c0ce01f144d66...|   true|
|     10015|    Frits|London|42949672963|2022-09-26 16:02:55|2100-12-12 00:00:00|a9986535dd7259e4c...|   true|     null|    null|      null|               null|


##### 4 CREATE RECORDS TO INSERT AND UPDATE
Every changed source-row and new sourced-row requires the insertion of a new record in the dimension.
Changed source-rows require an update of the scd-fields.

Rows without mergeKey will be inserted in the dimension table. Rows with mergekey will be updated in the dimension

In [6]:

upserts = detectedChanges \
    .selectExpr("NULL as mergeKey", "source.*" )\
    .union(detectedChanges.filter("dwh.current is not null").selectExpr("salesRepId as mergeKey","salesRepId as salesRepId", "dwh.*"))
upserts.show()

+--------+----------+---------+--------+-----------+-------------------+-------------------+--------------------+-------+
|mergeKey|salesRepID|     name|  office| salesRepSK|          scd_start|            scd_end|                 md5|current|
+--------+----------+---------+--------+-----------+-------------------+-------------------+--------------------+-------+
|    null|         4|R. Geller|   Paris|17179869190|2022-09-26 16:02:55|2100-12-12 00:00:00|6f34bd2f09f010ec1...|   true|
|    null|     10015|    Frits|  London|42949672963|2022-09-26 16:02:55|2100-12-12 00:00:00|a9986535dd7259e4c...|   true|
|       4|         4|R. Geller|New York|          3|1990-01-01 00:00:00|2100-12-12 00:00:00|6212c0ce01f144d66...|   true|
+--------+----------+---------+--------+-----------+-------------------+-------------------+--------------------+-------+




##### 5 PERFORM MERGE DIMSALESREP AND UPSERTS
merge looks for a matching dwh.salesRepID for mergeKey
   - when a match is found (the dimension table contains a row where its salesRepId corresponds with one of the mergekeys)  -> perform update of row to close the period and set current to "false"
   - when no match is found (there is no salesRepID in the dimension because the mergeKey is null) -> perform an insert with the data from the updserts table (from the source). The scd-start is filled with the run_timestamp)

In [7]:

dimSalesRepDelta.alias("dwh") \
   .merge(upserts.alias("upserts"),"dwh.salesRepID = mergeKey") \
   .whenMatchedUpdate(condition= "dwh.current = true", set= {"current": "false", "scd_end":   lit(run_timestamp).cast("timestamp")}) \
   .whenNotMatchedInsert(values =
     {
     "salesrepSK": "upserts.salesRepSK",
     "salesRepId" : "upserts.salesRepId",
     "name" : "upserts.name",
     "office" : "upserts.office",
     "current" : "true",
     "scd_end" : lit("2100-12-12").cast("timestamp"),
     "scd_start" : lit(run_timestamp).cast("timestamp"),
     "md5" : "upserts.md5"
     }
   ) \
   .execute()

In [8]:

dimSalesRepDelta.toDF().describe()
dimSalesRepDelta.toDF().sort("salesRepID", "scd_start").show(100)
# dimSalesRepDelta.upgradeTableProtocol

DataFrame[summary: string, salesRepID: string, name: string, office: string, salesRepSK: string, md5: string]

+----------+-------------+-------------+-----------+-------------------+-------------------+--------------------+-------+
|salesRepID|         name|       office| salesRepSK|          scd_start|            scd_end|                 md5|current|
+----------+-------------+-------------+-----------+-------------------+-------------------+--------------------+-------+
|         1|      Z. Jane|      Chicago|          0|1990-01-01 00:00:00|2100-12-12 00:00:00|cbf61f481bec12d90...|   true|
|         2|   P. Chapman|       Berlin|          1|1990-01-01 00:00:00|2100-12-12 00:00:00|14b094c31bf9e4149...|   true|
|         3|     T. Crane|     New York|          2|1990-01-01 00:00:00|2100-12-12 00:00:00|6c062f95defda9dc3...|   true|
|         4|    R. Geller|     New York|          3|1990-01-01 00:00:00|2022-09-26 16:02:55|6212c0ce01f144d66...|  false|
|         4|    R. Geller|        Paris|17179869190|2022-09-26 16:02:55|2100-12-12 00:00:00|6f34bd2f09f010ec1...|   true|
|         5|      J.Mosb

In [9]:
spark.stop()