# Processing a Slowly Changing Dimension Type 2 Using PySpark in AWS

We have implemented the following Blog on our Case Study of ClassicModels OLTP to OLAP SCD2 Implementation.

https://towardsdatascience.com/processing-a-slowly-changing-dimension-type-2-using-pyspark-in-aws-9f5013a36902

Note : Step 1 to Step 3 in Blog is Data Prep which we already bring from our own database hence we don't need just Follow below Flow of instructions. We will do our own Step 4 Equivalent work

In [None]:
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
from sqlalchemy.sql import text
import pymysql
import pandas as pd

appName = "PySpark MySQL Example - via mysql.connector"
master = "local"

spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

In [None]:
hostname='host.docker.internal:3306'
dbname="classicmodels"
uname="root"
pwd=""

# Create SQLAlchemy engine to connect to MySQL Database
oltp_engine = create_engine("mysql+pymysql://{user}:{pw}@{host}/{db}".format(host=hostname, db=dbname, user=uname, pw=pwd))

In [None]:
hostname='host.docker.internal:3306'
dbname="starschema"
uname="root"
pwd=""

# Create SQLAlchemy engine to connect to MySQL Database
olap_engine = create_engine("mysql+pymysql://{user}:{pw}@{host}/{db}".format(host=hostname, db=dbname, user=uname, pw=pwd))

## Checking OLTP Dimension Table for Customer

In [None]:
sql = "SELECT * FROM customers limit 10"
with oltp_engine.connect().execution_options(autocommit=True) as conn:
    query = conn.execute(text(sql))         
pdf = pd.DataFrame(query.fetchall())
customer_oltp = spark.createDataFrame(pdf)
customer_oltp.show()


  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+--------------+--------------------+---------------+----------------+-----------------+--------------------+------------+-------------+--------+----------+---------+----------------------+--------------------+-------------------+-------------------+
|customerNumber|        customerName|contactLastName|contactFirstName|            phone|        addressLine1|addressLine2|         city|   state|postalCode|  country|salesRepEmployeeNumber|         creditLimit|         created_at|         updated_at|
+--------------+--------------------+---------------+----------------+-----------------+--------------------+------------+-------------+--------+----------+---------+----------------------+--------------------+-------------------+-------------------+
|           103|   Atelier graphique|        Schmitt|         Carine |       40.32.2555|             dhoraji|        null|       Nantes|    null|     44000|   France|                1370.0|21000.00000000000...|2023-03-18 15:04:58|2023-03-25 15:35:

# Pre-Requisite Infrastructure to be in place beforehand (Step 4 Equivalent)

You need to create a Customer Dimension in OLAP first, with Surrogate Key (Incremental) , Effective Date, Expiry Date, isCurrent

To Populate your OLAP Dim you will require to write a stored procedure in MySQL that does the Job for you, Populate it with Data from OLTP Dimension with all records marked Current, Affected Data set to any old date, Expiry Date set to Future Date like 2099

```
drop table if exists starschema.customers;

CREATE TABLE IF NOT EXISTS `starschema`.`customers` (
  `customerNumber` INT(11) ,
  `customerName` VARCHAR(50) ,
  `contactLastName` VARCHAR(50) ,
  `contactFirstName` VARCHAR(50) ,
  `phone` VARCHAR(50) ,
  `addressLine1` VARCHAR(50) ,
  `addressLine2` VARCHAR(50) ,
  `city` VARCHAR(50) ,
  `state` VARCHAR(50) ,
  `postalCode` VARCHAR(50) ,
  `country` VARCHAR(50) ,
  `salesRepEmployeeNumber` INT(11) ,
  `creditLimit` DECIMAL(10,2) 
)

ALTER TABLE `starschema`.`customers` 
ADD COLUMN `Customer_Dimension_key` INT(11) AUTO_INCREMENT,
ADD COLUMN `affected_date` DATETIME,
ADD COLUMN `expiry_date` DATETIME,
ADD COLUMN `iscurrent` INT;

insert into starschema.customers 
as 
( select 
  customerNumber
  ,customerName
  ,contactLastName
  ,contactFirstName
  ,phone
  ,addressLine1
  ,addressLine2
  ,city
  ,state
  ,postalCode
  ,country
  ,salesRepEmployeeNumber
  ,creditLimit
  ,default
  ,'1971-12-01'
  ,'2099-12-01'
  ,1
);

```

## Checking OLAP Dimension Table for Customer

In [None]:
sql = "SELECT * FROM customers limit 10"
with olap_engine.connect().execution_options(autocommit=True) as conn:
    query = conn.execute(text(sql))         
pdf = pd.DataFrame(query.fetchall())

In [None]:
type(pdf)
pdf


Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,creditLimit,customer_dimension_key,affected_date,expiry_date,iscurrent
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,21000.0,1,1971-12-01,2099-12-01,1
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,71800.0,2,1971-12-01,2099-12-01,1
2,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,117300.0,3,1971-12-01,2099-12-01,1
3,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,118200.0,4,1971-12-01,2099-12-01,1
4,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,81700.0,5,1971-12-01,2099-12-01,1
5,124,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,,San Rafael,CA,97562,USA,210500.0,6,1971-12-01,2099-12-01,1
6,125,Havel & Zbyszek Co,Piestrzeniewicz,Zbyszek,(26) 642-7555,ul. Filtrowa 68,,Warszawa,,01-012,Poland,0.0,7,1971-12-01,2099-12-01,1
7,128,"Blauer See Auto, Co.",Keitel,Roland,+49 69 66 90 2555,Lyonerstr. 34,,Frankfurt,,60528,Germany,59700.0,8,1971-12-01,2099-12-01,1
8,129,Mini Wheels Co.,Murphy,Julie,6505555787,5557 North Pendale Street,,San Francisco,CA,94217,USA,64600.0,9,1971-12-01,2099-12-01,1
9,131,Land of Toys Inc.,Lee,Kwai,2125557818,897 Long Airport Avenue,,NYC,NY,10022,USA,114900.0,10,1971-12-01,2099-12-01,1


In [None]:
customer_olap = spark.createDataFrame(pdf)

customer_olap.show()
type(customer_olap)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+--------------+--------------------+---------------+----------------+-----------------+--------------------+------------+-------------+--------+----------+---------+--------------------+----------------------+-------------------+-------------------+---------+
|customerNumber|        customerName|contactLastName|contactFirstName|            phone|        addressLine1|addressLine2|         city|   state|postalCode|  country|         creditLimit|customer_dimension_key|      affected_date|        expiry_date|iscurrent|
+--------------+--------------------+---------------+----------------+-----------------+--------------------+------------+-------------+--------+----------+---------+--------------------+----------------------+-------------------+-------------------+---------+
|           103|   Atelier graphique|        Schmitt|         Carine |       40.32.2555|      54, rue Royale|            |       Nantes|    null|     44000|   France|21000.00000000000...|                     1|1971-12

pyspark.sql.dataframe.DataFrame

# Saving Views for customer_oltp & customer_olap

In [None]:
customer_oltp.createOrReplaceTempView("customer_data")
customer_olap.createOrReplaceTempView("current_scd2")

# Starting work on Slowly Changing Dimension 2 (SCD2)

## Step 5: Create new current records for existing customers

In order to logically capture this address change, I need to compare the current SCD2 and the source data and flag changes. I also need to be mindful of our row metadata fields to ensure that I am expiring and starting records using the appropriate dates.

Luckily, I can do this in one block of SQL, What I will do is:

* Join the customer_data dataset to the current SCD2 dataset on customer_number and the current record
* Check for differences in the WHERE clause
* Select all the attributes from the source dataset
* Select the customer_dim_key from the current SCD2 dataset (for use in step 6)
* Set eff_start_date to today
* Set eff_end_date to the future
* Set is_current to 1

In [None]:
hd_new_curr_recs = """
 SELECT   t.customer_dimension_key,
          s.customerNumber,
          s.customerName,
          s.contactLastName,
          s.contactFirstName,
          s.phone,
          s.addressLine1,
          s.addressLine2,
          s.city,
          s.state,
          s.postalCode,
          s.country,
          DATE(CURRENT_TIMESTAMP)
              AS affected_date,
          DATE('9999-12-31') AS expiry_date,
          1 AS iscurrent
 FROM     customer_data s
          INNER JOIN current_scd2 t
              ON t.customerNumber = s.customerNumber
              AND t.iscurrent = 1
 WHERE    NVL(s.customerName, '') <> NVL(t.customerName, '')
          OR NVL(s.addressLine1, '') <> NVL(t.addressLine1, '')
          OR NVL(s.phone, '') <> NVL(t.phone, '')

"""
df_new_curr_recs = spark.sql(hd_new_curr_recs)
df_new_curr_recs.createOrReplaceTempView("new_curr_recs")
# ############## review dataset ############## #
df_new_curr_recs.show(10, False)

+----------------------+--------------+-----------------+---------------+----------------+----------+------------+------------+------+-----+----------+-------+-------------+-----------+---------+
|customer_dimension_key|customerNumber|customerName     |contactLastName|contactFirstName|phone     |addressLine1|addressLine2|city  |state|postalCode|country|affected_date|expiry_date|iscurrent|
+----------------------+--------------+-----------------+---------------+----------------+----------+------------+------------+------+-----+----------+-------+-------------+-----------+---------+
|1                     |103           |Atelier graphique|Schmitt        |Carine          |40.32.2555|dhoraji     |null        |Nantes|null |44000     |France |2023-03-25   |9999-12-31 |1        |
+----------------------+--------------+-----------------+---------------+----------------+----------+------------+------------+------+-----+----------+-------+-------------+-----------+---------+



## Step 6: Find previous current records to expire

Now that I have the a new current record for a customer that already exists, I need to expire the previous current record. I did include the customer_dim_key of the previous current record in our prior dataset, so I isolate that for future use.

In [None]:
df_modfied_keys = df_new_curr_recs.select("customer_dimension_key")
df_modfied_keys.createOrReplaceTempView("modfied_keys")

## Step 7: Expire previous current records

Now I can go about expiring that prior record, while again being mindful of our row metadata fields and modifying them correctly.

In this block of code, I will:

* Join the the current SCD2 dataset to the modified_keys dataset on customer_dim_key
* Ensure that I am expiring the current record by double-checking if is_current = 1
* Select all the attributes and eff_start_date from the current SCD2 dataset
* Set eff_end_date to yesterday
* Set is_current to 0

In [None]:
hd_new_hist_recs = """

 SELECT   t.customer_dimension_key,
          t.customerNumber,
          t.customerName,
          t.contactLastName,
          t.contactFirstName,
          t.phone,
          t.addressLine1,
          t.addressLine2,
          t.city,
          t.state,
          t.postalCode,
          t.country,
          t.affected_date,
          DATE_SUB(
              DATE(CURRENT_TIMESTAMP), 1
          ) AS expiry_date,
          0 AS iscurrent
 FROM     current_scd2 t
          INNER JOIN modfied_keys k
              ON k.customer_dimension_key = t.customer_dimension_key
 WHERE    t.iscurrent = 1
"""
df_new_hist_recs = spark.sql(hd_new_hist_recs)
df_new_hist_recs.createOrReplaceTempView("new_hist_recs")
# ############## review dataset ############## #
df_new_hist_recs.show(10, False)

+----------------------+--------------+-----------------+---------------+----------------+----------+--------------+------------+------+-----+----------+-------+-------------------+-----------+---------+
|customer_dimension_key|customerNumber|customerName     |contactLastName|contactFirstName|phone     |addressLine1  |addressLine2|city  |state|postalCode|country|affected_date      |expiry_date|iscurrent|
+----------------------+--------------+-----------------+---------------+----------------+----------+--------------+------------+------+-----+----------+-------+-------------------+-----------+---------+
|1                     |103           |Atelier graphique|Schmitt        |Carine          |40.32.2555|54, rue Royale|            |Nantes|null |44000     |France |1971-12-01 00:00:00|2023-03-24 |0        |
+----------------------+--------------+-----------------+---------------+----------------+----------+--------------+------------+------+-----+----------+-------+-------------------+---

## Step 8: Isolate unaffected records

Most of the Clients had no changes and they need to remain in the target dataset, so the next step is to place those in their own dataset.

This logic will:

* Left join the modified_keys dataset to the current SC2 dataset on customer_dim_key
* Look at records that are not in the modified_keys dataset
* Keep all the attributes and row metadata the same

In [None]:
hd_unaffected_recs = """
 SELECT   s.customer_dimension_key,
          s.customerNumber,
          s.customerName,
          s.contactLastName,
          s.contactFirstName,
          s.phone,
          s.addressLine1,
          s.addressLine2,
          s.city,
          s.state,
          s.postalCode,
          s.country,      
          s.affected_date,
          s.expiry_date,
          s.iscurrent
 FROM     current_scd2 s
          LEFT OUTER JOIN modfied_keys k
              ON k.customer_dimension_key = s.customer_dimension_key
 WHERE    k.customer_dimension_key IS NULL
"""
df_unaffected_recs = spark.sql(hd_unaffected_recs)
df_unaffected_recs.createOrReplaceTempView("unaffected_recs")
# ############## review dataset ############## #
df_unaffected_recs.show(10, False)

+----------------------+--------------+----------------------------+---------------+----------------+-----------------+----------------------------+------------+-------------+--------+----------+---------+-------------------+-------------------+---------+
|customer_dimension_key|customerNumber|customerName                |contactLastName|contactFirstName|phone            |addressLine1                |addressLine2|city         |state   |postalCode|country  |affected_date      |expiry_date        |iscurrent|
+----------------------+--------------+----------------------------+---------------+----------------+-----------------+----------------------------+------------+-------------+--------+----------+---------+-------------------+-------------------+---------+
|2                     |112           |Signal Gift Stores          |King           |Jean            |7025551838       |8489 Strong St.             |            |Las Vegas    |NV      |83030     |USA      |1971-12-01 00:00:00|2099-12

## Step 9: Create records for new customers

A new customer and therefore does not yet exist in our SCD2.

The following logic:

* Left joins the current SCD2 dataset to the customer_data dataset on customer_number
* Look at records that are not in the current SCD2 dataset
* Uses all the attributes from the source
* Sets eff_start_date to today
* Sets eff_end_date to the future
* Sets is_current to 1

In [None]:
hd_new_cust = """
 SELECT   s.customerNumber,
          s.customerName,
          s.contactLastName,
          s.contactFirstName,
          s.phone,
          s.addressLine1,
          s.addressLine2,
          s.city,
          s.state,
          s.postalCode,
          s.country,
          DATE(CURRENT_TIMESTAMP) 
              AS affected_date,
          DATE('9999-12-31') AS expiry_date,
          1 AS iscurrent
 FROM     customer_data s
          LEFT OUTER JOIN current_scd2 t
              ON t.customerNumber = s.customerNumber
 WHERE    t.customerNumber IS NULL
"""
df_new_cust = spark.sql(hd_new_cust)
df_new_cust.createOrReplaceTempView("new_cust")
# ############## review dataset ############## #
df_new_cust.show(10, False)

+--------------+------------+---------------+----------------+-----+------------+------------+----+-----+----------+-------+-------------+-----------+---------+
|customerNumber|customerName|contactLastName|contactFirstName|phone|addressLine1|addressLine2|city|state|postalCode|country|affected_date|expiry_date|iscurrent|
+--------------+------------+---------------+----------------+-----+------------+------------+----+-----+----------+-------+-------------+-----------+---------+
+--------------+------------+---------------+----------------+-----+------------+------------+----+-----+----------+-------+-------------+-----------+---------+



## Step 10: Combine the datasets for new SCD2

I’ve created the four datasets needed to create the new iteration of our SCD2:

* Dataset of new current records for existing customers (“new_curr_recs”)
* Dataset of expiring previous current records for existing customers (“new_hist_recs”)
* Dataset of records which aren’t being modified (“unaffected_recs”)
* Dataset of new, previously unseen, customers (“new_cust”)

All that remains is to merge them together for the final product. In this last bit of code, I will:

* Find the max customer_dim_key value
* Union the two datasets which do not have a customer_dim_key: new_cust and new_curr_recs
* To create a new customer_dim_key, use the ROW_NUMBER() function and add the max customer_dim_key value (to preserve consecutiveness and uniqueness)
* name above dataset as df_new_scd2_a
* Union the prior unioned dataset to unaffected_recs and new_hist_recs

In [None]:
v_max_key = spark.sql(
    "SELECT STRING(MAX(customer_dimension_key)) FROM current_scd2"
).collect()[0][0]
hd_new_scd2_a = """
 WITH a_cte
 AS   (
        SELECT     s.customerNumber,
          s.customerName,
          s.contactLastName,
          s.contactFirstName,
          s.phone,
          s.addressLine1,
          s.addressLine2,
          s.city,
          s.state,
          s.postalCode,
          s.country,s.affected_date,s.expiry_date, s.iscurrent
        FROM       new_cust s
        UNION ALL
        SELECT     y.customerNumber,
          y.customerName,
          y.contactLastName,
          y.contactFirstName,
          y.phone,
          y.addressLine1,
          y.addressLine2,
          y.city,
          y.state,
          y.postalCode,
          y.country,y.affected_date,y.expiry_date, y.iscurrent
        FROM       new_curr_recs y
      )
        SELECT  ROW_NUMBER() OVER(ORDER BY a.affected_date)
                    + BIGINT('{v_max_key}') AS customer_dimension_key,
          a.customerNumber,
          a.customerName,
          a.contactLastName,
          a.contactFirstName,
          a.phone,
          a.addressLine1,
          a.addressLine2,
          a.city,
          a.state,
          a.postalCode,
          a.country,a.affected_date,a.expiry_date, a.iscurrent
        FROM    a_cte a
     
 """
df_new_scd2_a = spark.sql(hd_new_scd2_a.replace("{v_max_key}", v_max_key))
# ############## review dataset ############## #
df_new_scd2_a.show(10, False)
df_new_scd2_a.createOrReplaceTempView("df_new_scd2_a")

+----------------------+--------------+-----------------+---------------+----------------+----------+------------+------------+------+-----+----------+-------+-------------+-----------+---------+
|customer_dimension_key|customerNumber|customerName     |contactLastName|contactFirstName|phone     |addressLine1|addressLine2|city  |state|postalCode|country|affected_date|expiry_date|iscurrent|
+----------------------+--------------+-----------------+---------------+----------------+----------+------------+------------+------+-----+----------+-------+-------------+-----------+---------+
|11                    |103           |Atelier graphique|Schmitt        |Carine          |40.32.2555|dhoraji     |null        |Nantes|null |44000     |France |2023-03-25   |9999-12-31 |1        |
+----------------------+--------------+-----------------+---------------+----------------+----------+------------+------------+------+-----+----------+-------+-------------+-----------+---------+



In [None]:
hd_new_scd2_b="""SELECT   customer_dimension_key,customerNumber,
          customerName,
          contactLastName,
          contactFirstName,
          phone,
          addressLine1,
          addressLine2,
          city,
          state,
          postalCode,
          country,affected_date,expiry_date, iscurrent
  FROM    df_new_scd2_a
  UNION ALL
  SELECT  customer_dimension_key,customerNumber,
          customerName,
          contactLastName,
          contactFirstName,
          phone,
          addressLine1,
          addressLine2,
          city,
          state,
          postalCode,
          country,affected_date,expiry_date, iscurrent
  FROM    unaffected_recs
  UNION ALL
  SELECT  customer_dimension_key, customerNumber,
          customerName,
          contactLastName,
          contactFirstName,
          phone,
          addressLine1,
          addressLine2,
          city,
          state,
          postalCode,
          country,affected_date,expiry_date, iscurrent
  FROM    new_hist_recs"""
df_new_scd2_b = spark.sql(hd_new_scd2_b)
# ############## review dataset ############## #
df_new_scd2_b.show(50, False)
df_new_scd2_b.createOrReplaceTempView("df_new_scd2_b")

+----------------------+--------------+----------------------------+---------------+----------------+-----------------+----------------------------+------------+-------------+--------+----------+---------+-------------------+-------------------+---------+
|customer_dimension_key|customerNumber|customerName                |contactLastName|contactFirstName|phone            |addressLine1                |addressLine2|city         |state   |postalCode|country  |affected_date      |expiry_date        |iscurrent|
+----------------------+--------------+----------------------------+---------------+----------------+-----------------+----------------------------+------------+-------------+--------+----------+---------+-------------------+-------------------+---------+
|11                    |103           |Atelier graphique           |Schmitt        |Carine          |40.32.2555       |dhoraji                     |null        |Nantes       |null    |44000     |France   |2023-03-25 00:00:00|9999-12

## Step 11: Overriding the Newly Created SCD2 in OLAP Table

So that next time the pipeline is run again, the change is captured going forward

In [None]:
with olap_engine.connect().execution_options(autocommit=True) as conn:
  df_new_scd2_b.toPandas().to_sql(con=conn, name='customers', if_exists='replace')