## Processing a Slowly Changing Dimension Type 2 Using PySpark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("scd2_demo").getOrCreate()
v_s3_path = "/mnt/data/data/scd2"

## Create SCD2 dataset

In [2]:
# ############## generate current_scd2 dataset ############## #
hd_current_scd2 = """
 SELECT   BIGINT(1) AS customer_dim_key,
          STRING('John') AS first_name,
          STRING('Smith') AS last_name,
          STRING('G') AS middle_initial,
          STRING('123 Main Street') AS address,
          STRING('Springville') AS city,
          STRING('VT') AS state,
          STRING('01234-5678') AS zip_code,
          BIGINT(289374) AS customer_number,
          DATE('2014-01-01') AS eff_start_date,
          DATE('9999-12-31') AS eff_end_date,
          BOOLEAN(1) AS is_current
 UNION
 SELECT   BIGINT(2) AS customer_dim_key,
          STRING('Susan') AS first_name,
          STRING('Jones') AS last_name,
          STRING('L') AS middle_initial,
          STRING('987 Central Avenue') AS address,
          STRING('Central City') AS city,
          STRING('MO') AS state,
          STRING('49257-2657') AS zip_code,
          BIGINT(862447) AS customer_number,
          DATE('2015-03-23') AS eff_start_date,
          DATE('2018-11-17') AS eff_end_date,
          BOOLEAN(0) AS is_current
 UNION
 SELECT   BIGINT(3) AS customer_dim_key,
          STRING('Susan') AS first_name,
          STRING('Harris') AS last_name,
          STRING('L') AS middle_initial,
          STRING('987 Central Avenue') AS address,
          STRING('Central City') AS city,
          STRING('MO') AS state,
          STRING('49257-2657') AS zip_code,
          BIGINT(862447) AS customer_number,
          DATE('2018-11-18') AS eff_start_date,
          DATE('9999-12-31') AS eff_end_date,
          BOOLEAN(1) AS is_current
 UNION
 SELECT   BIGINT(4) AS customer_dim_key,
          STRING('William') AS first_name,
          STRING('Chase') AS last_name,
          STRING('X') AS middle_initial,
          STRING('57895 Sharp Way') AS address,
          STRING('Oldtown') AS city,
          STRING('CA') AS state,
          STRING('98554-1285') AS zip_code,
          BIGINT(31568) AS customer_number,
          DATE('2018-12-07') AS eff_start_date,
          DATE('9999-12-31') AS eff_end_date,
          BOOLEAN(1) AS is_current
"""
df_current_scd2 = spark.sql(hd_current_scd2)
df_current_scd2.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/current_scd2/")
df_current_scd2.createOrReplaceTempView("current_scd2")
# ############## review dataset ############## #
df_current_scd2 = spark.read.parquet(v_s3_path + "/current_scd2/*").orderBy("customer_dim_key")
df_current_scd2.show(10, False)

+----------------+----------+---------+--------------+------------------+------------+-----+----------+---------------+--------------+------------+----------+
|customer_dim_key|first_name|last_name|middle_initial|address           |city        |state|zip_code  |customer_number|eff_start_date|eff_end_date|is_current|
+----------------+----------+---------+--------------+------------------+------------+-----+----------+---------------+--------------+------------+----------+
|1               |John      |Smith    |G             |123 Main Street   |Springville |VT   |01234-5678|289374         |2014-01-01    |9999-12-31  |true      |
|2               |Susan     |Jones    |L             |987 Central Avenue|Central City|MO   |49257-2657|862447         |2015-03-23    |2018-11-17  |false     |
|3               |Susan     |Harris   |L             |987 Central Avenue|Central City|MO   |49257-2657|862447         |2018-11-18    |9999-12-31  |true      |
|4               |William   |Chase    |X      

## Create customer dataset from source system 

- Use the following script to generate your source data, which I will use to modify our SCD2:

In [5]:
# ############## generate customer_data dataset ############## #
hd_customer_data = """
 SELECT   BIGINT(289374) AS customer_number,
          STRING('John') AS first_name,
          STRING('Smith') AS last_name,
          STRING('G') AS middle_initial,
          STRING('456 Derry Court') AS address,
          STRING('Springville') AS city,
          STRING('VT') AS state,
          STRING('01234-5678') AS zip_code
 UNION
 SELECT   BIGINT(932574) AS customer_number,
          STRING('Lisa') AS first_name,
          STRING('Cohen') AS last_name,
          STRING('S') AS middle_initial,
          STRING('69846 Mason Road') AS address,
          STRING('Atlanta') AS city,
          STRING('GA') AS state,
          STRING('26584-3591') AS zip_code
 UNION
 SELECT   BIGINT(862447) AS customer_number,
          STRING('Susan') AS first_name,
          STRING('Harris') AS last_name,
          STRING('L') AS middle_initial,
          STRING('987 Central Avenue') AS address,
          STRING('Central City') AS city,
          STRING('MO') AS state,
          STRING('49257-2657') AS zip_code
 UNION
 SELECT   BIGINT(31568) AS customer_number,
          STRING('William') AS first_name,
          STRING('Chase') AS last_name,
          STRING('X') AS middle_initial,
          STRING('57895 Sharp Way') AS address,
          STRING('Oldtown') AS city,
          STRING('CA') AS state,
          STRING('98554-1285') AS zip_code
"""
df_customer_data= spark.sql(hd_customer_data)
df_customer_data.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/customer_data/")
df_customer_data.createOrReplaceTempView("customer_data")
# ############## review dataset ############## #
df_customer_data= spark.read.parquet(v_s3_path + "/customer_data/*").orderBy("customer_number")
df_customer_data.show(10, False)

+---------------+----------+---------+--------------+------------------+------------+-----+----------+
|customer_number|first_name|last_name|middle_initial|address           |city        |state|zip_code  |
+---------------+----------+---------+--------------+------------------+------------+-----+----------+
|31568          |William   |Chase    |X             |57895 Sharp Way   |Oldtown     |CA   |98554-1285|
|289374         |John      |Smith    |G             |456 Derry Court   |Springville |VT   |01234-5678|
|862447         |Susan     |Harris   |L             |987 Central Avenue|Central City|MO   |49257-2657|
|932574         |Lisa      |Cohen    |S             |69846 Mason Road  |Atlanta     |GA   |26584-3591|
+---------------+----------+---------+--------------+------------------+------------+-----+----------+



In [6]:
df_current_scd2.show(10, False)

+----------------+----------+---------+--------------+------------------+------------+-----+----------+---------------+--------------+------------+----------+
|customer_dim_key|first_name|last_name|middle_initial|address           |city        |state|zip_code  |customer_number|eff_start_date|eff_end_date|is_current|
+----------------+----------+---------+--------------+------------------+------------+-----+----------+---------------+--------------+------------+----------+
|1               |John      |Smith    |G             |123 Main Street   |Springville |VT   |01234-5678|289374         |2014-01-01    |9999-12-31  |true      |
|2               |Susan     |Jones    |L             |987 Central Avenue|Central City|MO   |49257-2657|862447         |2015-03-23    |2018-11-17  |false     |
|3               |Susan     |Harris   |L             |987 Central Avenue|Central City|MO   |49257-2657|862447         |2018-11-18    |9999-12-31  |true      |
|4               |William   |Chase    |X      

## Manually find changes

- Remember that the data from the source system feeds in to our SCD2, so I need to compare the two datasets to determine if there are any differences

## Create new current records for existing customers

- In order to logically capture this address change
- I need to compare the current SCD2 and the source data (as I did manually above) and flag changes
- I also need to be mindful of our row metadata fields to ensure that I am expiring and starting records using the appropriate dates.

In [8]:
# ############## create new current recs dataaset ############## #
hd_new_curr_recs = """
 SELECT   t.customer_dim_key,
          s.customer_number,
          s.first_name,
          s.last_name,
          s.middle_initial,
          s.address,
          s.city,
          s.state,
          s.zip_code,
          DATE(FROM_UTC_TIMESTAMP(CURRENT_TIMESTAMP, 'IST'))
              AS eff_start_date,
          DATE('9999-12-31') AS eff_end_date,
          BOOLEAN(1) AS is_current
 FROM     customer_data s
          INNER JOIN current_scd2 t
              ON t.customer_number = s.customer_number
              AND t.is_current = True
 WHERE    NVL(s.first_name, '') <> NVL(t.first_name, '')
          OR NVL(s.last_name, '') <> NVL(t.last_name, '')
          OR NVL(s.middle_initial, '') <> NVL(t.middle_initial, '')
          OR NVL(s.address, '') <> NVL(t.address, '')
          OR NVL(s.city, '') <> NVL(t.city, '')
          OR NVL(s.state, '') <> NVL(t.state, '')
          OR NVL(s.zip_code, '') <> NVL(t.zip_code, '')
"""
df_new_curr_recs = spark.sql(hd_new_curr_recs)
df_new_curr_recs.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_curr_recs/")
df_new_curr_recs.createOrReplaceTempView("new_curr_recs")
# ############## review dataset ############## #
df_new_curr_recs = spark.read.parquet(v_s3_path + "/new_curr_recs/*").orderBy("customer_number")
df_new_curr_recs.show(10, False)

+----------------+---------------+----------+---------+--------------+---------------+-----------+-----+----------+--------------+------------+----------+
|customer_dim_key|customer_number|first_name|last_name|middle_initial|address        |city       |state|zip_code  |eff_start_date|eff_end_date|is_current|
+----------------+---------------+----------+---------+--------------+---------------+-----------+-----+----------+--------------+------------+----------+
|1               |289374         |John      |Smith    |G             |456 Derry Court|Springville|VT   |01234-5678|2022-05-21    |9999-12-31  |true      |
+----------------+---------------+----------+---------+--------------+---------------+-----------+-----+----------+--------------+------------+----------+



- The above logic runs through all the records and finds the one change

## Find previous current records to expire

- Now that I have the a new current record for a customer that already exists, I need to expire the previous current record

In [9]:
# ########### isolate keys of records to be modified ########### #
df_modfied_keys = df_new_curr_recs.select("customer_dim_key")
df_modfied_keys.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/modfied_keys/")
df_modfied_keys.createOrReplaceTempView("modfied_keys")

## Expire previous current records

- Now I can go about expiring that prior record, while again being mindful of our row metadata fields and modifying them correctly
- Recall that I cannot update the record, so I have to create a new instance of it.

In [11]:
# ############## create new hist recs dataaset ############## #
hd_new_hist_recs = """
 SELECT   t.customer_dim_key,
          t.customer_number,
          t.first_name,
          t.last_name,
          t.middle_initial,
          t.address,
          t.city,
          t.state,
          t.zip_code,
          t.eff_start_date,
          DATE_SUB(
              DATE(FROM_UTC_TIMESTAMP(CURRENT_TIMESTAMP, 'IST')), 1
          ) AS eff_end_date,
          BOOLEAN(0) AS is_current
 FROM     current_scd2 t
          INNER JOIN modfied_keys k
              ON k.customer_dim_key = t.customer_dim_key
 WHERE    t.is_current = True
"""
df_new_hist_recs = spark.sql(hd_new_hist_recs)
df_new_hist_recs.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_hist_recs/")
df_new_hist_recs.createOrReplaceTempView("new_hist_recs")
# ############## review dataset ############## #
df_new_hist_recs = spark.read.parquet(v_s3_path + "/new_hist_recs/*").orderBy("customer_number")
df_new_hist_recs.show(10, False)

+----------------+---------------+----------+---------+--------------+---------------+-----------+-----+----------+--------------+------------+----------+
|customer_dim_key|customer_number|first_name|last_name|middle_initial|address        |city       |state|zip_code  |eff_start_date|eff_end_date|is_current|
+----------------+---------------+----------+---------+--------------+---------------+-----------+-----+----------+--------------+------------+----------+
|1               |289374         |John      |Smith    |G             |123 Main Street|Springville|VT   |01234-5678|2014-01-01    |2022-05-20  |false     |
+----------------+---------------+----------+---------+--------------+---------------+-----------+-----+----------+--------------+------------+----------+



- The above logic expires the record properly and writes to its own dataset:

## Isolate unaffected records

In [12]:
# ############## create unaffected recs dataset ############## #
hd_unaffected_recs = """
 SELECT   s.customer_dim_key,
          s.customer_number,
          s.first_name,
          s.last_name,
          s.middle_initial,
          s.address,
          s.city,
          s.state,
          s.zip_code,
          s.eff_start_date,
          s.eff_end_date,
          s.is_current
 FROM     current_scd2 s
          LEFT OUTER JOIN modfied_keys k
              ON k.customer_dim_key = s.customer_dim_key
 WHERE    k.customer_dim_key IS NULL
"""
df_unaffected_recs = spark.sql(hd_unaffected_recs)
df_unaffected_recs.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/unaffected_recs/")
df_unaffected_recs.createOrReplaceTempView("unaffected_recs")
# ############## review dataset ############## #
df_unaffected_recs = spark.read.parquet(v_s3_path + "/unaffected_recs/*").orderBy("customer_number")
df_unaffected_recs.show(10, False)

+----------------+---------------+----------+---------+--------------+------------------+------------+-----+----------+--------------+------------+----------+
|customer_dim_key|customer_number|first_name|last_name|middle_initial|address           |city        |state|zip_code  |eff_start_date|eff_end_date|is_current|
+----------------+---------------+----------+---------+--------------+------------------+------------+-----+----------+--------------+------------+----------+
|4               |31568          |William   |Chase    |X             |57895 Sharp Way   |Oldtown     |CA   |98554-1285|2018-12-07    |9999-12-31  |true      |
|2               |862447         |Susan     |Jones    |L             |987 Central Avenue|Central City|MO   |49257-2657|2015-03-23    |2018-11-17  |false     |
|3               |862447         |Susan     |Harris   |L             |987 Central Avenue|Central City|MO   |49257-2657|2018-11-18    |9999-12-31  |true      |
+----------------+---------------+----------+-

## Create records for new customers

In [13]:
# ############## create new recs dataset ############## #
hd_new_cust = """
 SELECT   s.customer_number,
          s.first_name,
          s.last_name,
          s.middle_initial,
          s.address,
          s.city,
          s.state,
          s.zip_code,
          DATE(FROM_UTC_TIMESTAMP(CURRENT_TIMESTAMP, 'IST')) 
              AS eff_start_date,
          DATE('9999-12-31') AS eff_end_date,
          BOOLEAN(1) AS is_current
 FROM     customer_data s
          LEFT OUTER JOIN current_scd2 t
              ON t.customer_number = s.customer_number
 WHERE    t.customer_number IS NULL
"""
df_new_cust = spark.sql(hd_new_cust)
df_new_cust.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_cust/")
df_new_cust.createOrReplaceTempView("new_cust")
# ############## review dataset ############## #
df_new_cust = spark.read.parquet(v_s3_path + "/new_cust/*").orderBy("customer_number")
df_new_cust.show(10, False)

+---------------+----------+---------+--------------+----------------+-------+-----+----------+--------------+------------+----------+
|customer_number|first_name|last_name|middle_initial|address         |city   |state|zip_code  |eff_start_date|eff_end_date|is_current|
+---------------+----------+---------+--------------+----------------+-------+-----+----------+--------------+------------+----------+
|932574         |Lisa      |Cohen    |S             |69846 Mason Road|Atlanta|GA   |26584-3591|2022-05-21    |9999-12-31  |true      |
+---------------+----------+---------+--------------+----------------+-------+-----+----------+--------------+------------+----------+



## Combine the datasets for new SCD2

In [14]:
# ############## create new scd2 dataset ############## #
v_max_key = spark.sql(
    "SELECT STRING(MAX(customer_dim_key)) FROM current_scd2"
).collect()[0][0]
hd_new_scd2 = """
 WITH a_cte
 AS   (
        SELECT     x.first_name, x.last_name,
                   x.middle_initial, x.address,
                   x.city, x.state, x.zip_code,
                   x.customer_number, x.eff_start_date,
                   x.eff_end_date, x.is_current
        FROM       new_cust x
        UNION ALL
        SELECT     y.first_name, y.last_name,
                   y.middle_initial, y.address,
                   y.city, y.state, y.zip_code,
                   y.customer_number, y.eff_start_date,
                   y.eff_end_date, y.is_current
        FROM       new_curr_recs y
      )
  ,   b_cte
  AS  (
        SELECT  ROW_NUMBER() OVER(ORDER BY a.eff_start_date)
                    + BIGINT('{v_max_key}') AS customer_dim_key,
                a.first_name, a.last_name,
                a.middle_initial, a.address,
                a.city, a.state, a.zip_code,
                a.customer_number, a.eff_start_date,
                a.eff_end_date, a.is_current
        FROM    a_cte a
      )
  SELECT  customer_dim_key, first_name, last_name,
          middle_initial, address,
          city, state, zip_code,
          customer_number, eff_start_date,
          eff_end_date, is_current
  FROM    b_cte
  UNION ALL
  SELECT  customer_dim_key, first_name,  last_name,
          middle_initial, address,
          city, state, zip_code,
          customer_number, eff_start_date,
          eff_end_date, is_current
  FROM    unaffected_recs
  UNION ALL
  SELECT  customer_dim_key, first_name,  last_name,
          middle_initial, address,
          city, state, zip_code,
          customer_number, eff_start_date,
          eff_end_date, is_current
  FROM    new_hist_recs
"""
df_new_scd2 = spark.sql(hd_new_scd2.replace("{v_max_key}", v_max_key))
# ############## review dataset ############## #
df_new_scd2.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_scd2/")
df_new_scd2 = spark.read.parquet(v_s3_path + "/new_scd2/*").orderBy("customer_dim_key")
df_new_scd2.show(10, False)

+----------------+----------+---------+--------------+------------------+------------+-----+----------+---------------+--------------+------------+----------+
|customer_dim_key|first_name|last_name|middle_initial|address           |city        |state|zip_code  |customer_number|eff_start_date|eff_end_date|is_current|
+----------------+----------+---------+--------------+------------------+------------+-----+----------+---------------+--------------+------------+----------+
|1               |John      |Smith    |G             |123 Main Street   |Springville |VT   |01234-5678|289374         |2014-01-01    |2022-05-20  |false     |
|2               |Susan     |Jones    |L             |987 Central Avenue|Central City|MO   |49257-2657|862447         |2015-03-23    |2018-11-17  |false     |
|3               |Susan     |Harris   |L             |987 Central Avenue|Central City|MO   |49257-2657|862447         |2018-11-18    |9999-12-31  |true      |
|4               |William   |Chase    |X      