<a href="https://colab.research.google.com/github/capt-blackdron/pyspark-examples/blob/main/SCD_Type_2_Implementation_in_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Setting up Hadoop and Pyspark **

In [None]:
SCD Type 2 EXAMPLE CODE : Strategy: Maintain history of record (versions) if match is found in new dataset and insert new records if do not exist

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xvzf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!pip install pyspark


**First Create a Target Table** 

`create external table dev_db.employee(emp_id int, emp_name string, email_id string, state string, eff_date date, end_date date, is_current varchar(2)) stored as ORC location 'hdfs_path';`


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("SCD1_DEMO").getOrCreate()

# load_date value will be '2021-07-01' when the first time you run
# You can pass load_date to spark program through command line arguments
# load_date = sys.argv[1] #2021-07-01
# for testing I have hardcoded this value
load_date = '2021-07-01'
target_table = 'employee'
max_date = '9999-12-31'

# we cannot use Hive in colab that's why I am saving this dataframe as a table here...
# Assume you have created a hive table using 'create table ' command
df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/emp_data_{}.csv".format(load_date), header=True)\
        .withColumn('eff_date', lit(load_date).cast('date'))\
        .withColumn('end_date', lit(max_date).cast('date'))\
        .withColumn('is_current', lit('Y'))

df.createOrReplaceTempView('target_table_view')
spark.sql("create table if not exists {} like target_table_view".format(target_table))
spark.read.table(target_table).printSchema()
print("Empty target table created...")


root
 |-- emp_id: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- email_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- eff_date: date (nullable = true)
 |-- end_date: date (nullable = true)
 |-- is_current: string (nullable = true)

Empty target table created...


In [24]:
#below 4 lines are for testing purpose as on next run data will already exist in colab
#insert overwrite is not supported in colab for same table..
#so here I am creating an empty table
spark.sql("drop table if exists employee_temp")
spark.sql("create table employee_temp like employee")
spark.sql("drop table if exists employee")
spark.sql("alter table employee_temp rename to employee")

def load_data(load_date):
  print("Performing data load for '{}'".format(load_date))
  
  max_date = '9999-12-31'
  target_table = 'employee'

  # step 1 -- read the current date data from file 
  current_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/emp_data_{}.csv".format(load_date), header=True)\
          .withColumn('eff_date', lit(load_date).cast('date'))\
          .withColumn('end_date', lit(max_date).cast('date'))\
          .withColumn('is_current', lit('Y'))
          
  # step 2 -- read target table data 
  # Read active records only 
  target_df = spark.read.table(target_table)\
              .filter(col('is_current')=='Y')
  
  # for the first load target_df will not have any records 

  current_df = current_df.select(col('emp_id').alias('emp_id_right'), col('emp_name').alias('emp_name_right'),
                col('email_id').alias('email_id_right'), col('state').alias('state_right'),
                 col('eff_date').alias('eff_date_right'), col('end_date').alias('end_date_right'), 
                 col('is_current').alias('is_current_right'))

  '''
  ACTION - INSERT : New data is inserted if no match is found in target table and insert into final table (check left column)
  ACTION - Update : Old records are updated in target table if new data is found (maintain version) (join key in the right must not be null)
  ACTION - AS IS / No change : Old records are inserted into target if no match found (if join key in the right is null then keep data as is)  
  '''

  ''' 
  step 3 -- Derive action_flag
  '''

  merged_df = target_df.join(current_df, target_df.emp_id == current_df.emp_id_right, 'full')\
                .withColumn('action_flag', when(target_df.emp_id.isNull(), 'insert')\
                                          .when(current_df.emp_id_right.isNull(), 'no_change')\
                                          .when(current_df.emp_id_right.isNotNull(), 'update'))
  
  # records which are not present in target table, but came in current load for the first time will be tagged as "insert"
  insert_df = merged_df.filter(col('action_flag') == 'insert')\
              .select('emp_id_right', 'emp_name_right', 'email_id_right', 'state_right', 'eff_date_right', 'end_date_right', 'is_current_right')

  # records from target table that got no match in new data, these records will be tagged as "no change"
  no_change_df = merged_df.filter(col('action_flag') == 'no_change')\
              .select('emp_id', 'emp_name', 'email_id', 'state', 'eff_date', 'end_date', 'is_current')

  update_df = merged_df.filter(col('action_flag') == 'update')
  
  #detect change and filter changed records and process
  #insert no change detected records as is

  update_df = update_df.withColumn('is_changed', when(((update_df.emp_name!=update_df.emp_name_right)
                                                    | (update_df.email_id!=update_df.email_id_right)
                                                    | (update_df.state!=update_df.state_right)), lit('Y')).otherwise(lit('N')))\
                      
                      
  change_detected_df = update_df.filter(col('is_changed')=='Y')
  
  #we got new records in file matching with old records,
  #but column values were not changed, these records will be tagged as "no changed detected"
  
  no_change_detected_df = update_df.filter(col('is_changed')=='N')\
                      .select('emp_id', 'emp_name', 'email_id', 'state', 'eff_date', 'end_date', 'is_current')

  #we got new records in current load and few columns values were changed, will be processed below
  #marking old records as obsolete: is_current='N'
  #updated end date as (Day - 1) 
  update_df_history = change_detected_df.select('emp_id', 'emp_name', 'email_id', 'state', 'eff_date', 
                                       date_add(lit(load_date),-1).alias('end_date'),
                                       lit('N').alias('is_current'))
  
  updated_records = change_detected_df.withColumn('emp_name', when(col('emp_name_right').isNotNull(), col('emp_name_right')).otherwise(col('emp_name')))\
                      .withColumn('email_id', when(col('email_id_right').isNotNull(), col('email_id_right')).otherwise(col('email_id')))\
                      .withColumn('state', when(col('state_right').isNotNull(), col('state_right')).otherwise(col('state')))\
                      .select('emp_id', 'emp_name', 'email_id', 'state', 'eff_date_right', 'end_date_right', 'is_current_right')
  
  final_df = insert_df.union(no_change_df).union(no_change_detected_df).union(update_df_history).union(updated_records)  

  # carrying inactive records 
  # please note in Step 1 we have read only active records from target table
  inactive_records = spark.read.table(target_table)\
              .filter(col('is_current')=='N')

  final_df = inactive_records.union(final_df)
  

  # step 4 -- write final_df to target table (not supported in colab)
  # final_df.write.mode('overwrite').saveAsTable(target_table)

  #below three lines are for testing purpose as overwrite to same table is not supported in colab
  temp_table = 'employee_temp'
  final_df.write.mode('overwrite').saveAsTable(temp_table)
  spark.sql("insert overwrite table {} select * from {}".format(target_table, temp_table))

  print("Data loaded for '{}'".format(load_date))
  print("Target Table")
  spark.read.table(target_table).orderBy(col('emp_id').cast('int'), col('eff_date')).show(100)
  
load_data('2021-07-01')
load_data('2021-07-02')
load_data('2021-07-03')

Performing data load for '2021-07-01'
Data loaded for '2021-07-01'
Target Table
+------+--------------------+--------------------+-----+----------+----------+----------+
|emp_id|            emp_name|            email_id|state|  eff_date|  end_date|is_current|
+------+--------------------+--------------------+-----+----------+----------+----------+
|     1|       Denis Hagenes|leroy83@runolfsdo...|   AS|2021-07-01|9999-12-31|         Y|
|     2|Shiela Altenwerth...|ialtenwerth@rolfs...|   GA|2021-07-01|9999-12-31|         Y|
|     3| Elois Marquardt PhD|cornel12@hotmail.com|   MN|2021-07-01|9999-12-31|         Y|
|     4|       Leila Simonis|lidie39@satterfie...|   NC|2021-07-01|9999-12-31|         Y|
|     5|       Jerri Spencer|wolffkatarina@hot...|   LA|2021-07-01|9999-12-31|         Y|
|     6|Dr. Drusilla Olso...|concepcion18@hotm...|   NE|2021-07-01|9999-12-31|         Y|
|     7| Dr. Cade Shields MD|clevie31@hotmail.com|   CT|2021-07-01|9999-12-31|         Y|
|     8|Mr. Maximo B