# SCD 2 Data frames

In [12]:
from pyspark.sql.functions import udf, lit, when, date_sub
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType
import json
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import Row
from datetime import datetime

appName = "Spark SCD Merge Example"
master = "local"
conf = SparkConf().setAppName(appName).setMaster(master)
#sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)


def quiet_logs(sc):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
    logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
# hide info logs
quiet_logs(sc)


# Target data set

data_target = [
Row(1, "Hello!", False, False, datetime.strptime(
'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),
Row(1, "Hello World!", True, False, datetime.strptime(
'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(2, "Hello Spark!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(3, "Hello Old World!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))
]

schema_target = StructType([
StructField("id", IntegerType(), True),
StructField("attr", StringType(), True),
StructField("is_current", BooleanType(), True),
StructField("is_deleted", BooleanType(), True),
StructField("start_date", DateType(), True),
StructField("end_date", DateType(), True)
])

df_targetLKP = sqlContext.createDataFrame(sc.parallelize(data_target),schema_target )

#df_targetLKP.show()
#df_targetLKP.printSchema()

df_target_false = df_targetLKP.filter(df_targetLKP.is_current == 'False')
df_target_false.show()

+---+------+----------+----------+----------+----------+
| id|  attr|is_current|is_deleted|start_date|  end_date|
+---+------+----------+----------+----------+----------+
|  1|Hello!|     false|     false|2018-01-01|2018-12-31|
+---+------+----------+----------+----------+----------+



In [4]:
# Source data set

data_source = [
Row(1, "Hello World!"),
Row(2, "Hello PySpark!"),
Row(4, "Hello Scala!")
]

schema_source = StructType([
StructField("src_id", IntegerType(), True),
StructField("src_attr", StringType(), True)
])

df_source = sqlContext.createDataFrame(sc.parallelize(data_source),schema_source)

df_source.show()
df_source.printSchema()

+------+--------------+
|src_id|      src_attr|
+------+--------------+
|     1|  Hello World!|
|     2|Hello PySpark!|
|     4|  Hello Scala!|
+------+--------------+

root
 |-- src_id: integer (nullable = true)
 |-- src_attr: string (nullable = true)



In [5]:

high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()
print(high_date)

current_date = datetime.today().date()
print(current_date)

# Prepare for merge - Added effective and end date
df_source_new = df_source.withColumn('src_start_date', lit(current_date)).withColumn('src_end_date', lit(high_date))
#df_source_new.show()

9999-12-31
2019-07-25


In [6]:
df_target = df_targetLKP.filter( df_targetLKP.is_current == 'true')
df_target.show()

+---+----------------+----------+----------+----------+----------+
| id|            attr|is_current|is_deleted|start_date|  end_date|
+---+----------------+----------+----------+----------+----------+
|  1|    Hello World!|      true|     false|2019-01-01|9999-12-31|
|  2|    Hello Spark!|      true|     false|2019-02-01|9999-12-31|
|  3|Hello Old World!|      true|     false|2019-02-01|9999-12-31|
+---+----------------+----------+----------+----------+----------+



In [7]:

#df_merge = df_target_lookup.join(df_source, (df_source.src_id == df_target_lookup.id) &
#(df_source_new.src_end_date == df_target.end_date), how='fullouter')

df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) , how='fullouter')
df_merge.show()

+----+----------------+----------+----------+----------+----------+------+--------------+--------------+------------+
|  id|            attr|is_current|is_deleted|start_date|  end_date|src_id|      src_attr|src_start_date|src_end_date|
+----+----------------+----------+----------+----------+----------+------+--------------+--------------+------------+
|   1|    Hello World!|      true|     false|2019-01-01|9999-12-31|     1|  Hello World!|    2019-07-25|  9999-12-31|
|   3|Hello Old World!|      true|     false|2019-02-01|9999-12-31|  null|          null|          null|        null|
|null|            null|      null|      null|      null|      null|     4|  Hello Scala!|    2019-07-25|  9999-12-31|
|   2|    Hello Spark!|      true|     false|2019-02-01|9999-12-31|     2|Hello PySpark!|    2019-07-25|  9999-12-31|
+----+----------------+----------+----------+----------+----------+------+--------------+--------------+------------+



In [8]:

df_merge = df_merge.withColumn('action',
 when(df_merge.attr != df_merge.src_attr, 'UPSERT')
.when(df_merge.src_id.isNull(), 'DELETE')
.when(df_merge.id.isNull(), 'INSERT')
.otherwise('NOACTION')
)

df_merge.show()

+----+----------------+----------+----------+----------+----------+------+--------------+--------------+------------+--------+
|  id|            attr|is_current|is_deleted|start_date|  end_date|src_id|      src_attr|src_start_date|src_end_date|  action|
+----+----------------+----------+----------+----------+----------+------+--------------+--------------+------------+--------+
|   1|    Hello World!|      true|     false|2019-01-01|9999-12-31|     1|  Hello World!|    2019-07-25|  9999-12-31|NOACTION|
|   3|Hello Old World!|      true|     false|2019-02-01|9999-12-31|  null|          null|          null|        null|  DELETE|
|null|            null|      null|      null|      null|      null|     4|  Hello Scala!|    2019-07-25|  9999-12-31|  INSERT|
|   2|    Hello Spark!|      true|     false|2019-02-01|9999-12-31|     2|Hello PySpark!|    2019-07-25|  9999-12-31|  UPSERT|
+----+----------------+----------+----------+----------+----------+------+--------------+--------------+-------

In [9]:
# Generate the new data frames based on action code

column_names = ['id', 'attr', 'is_current','is_deleted', 'start_date', 'end_date']

# For records that needs no action

df_merge_p1 = df_merge.filter(df_merge.action == 'NOACTION').select(column_names)

# For records that needs insert only

df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),df_merge.src_attr.alias('attr'),
lit(True).alias('is_current'),lit(False).alias('is_deleted'),df_merge.src_start_date.alias('start_date'),
df_merge.src_end_date.alias('end_date'))

# For records that needs to be deleted

df_merge_p3 = df_merge.filter(df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))

# For records that needs to be expired and then inserted

df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias('attr'),lit(True).alias('is_current'),lit(False).alias('is_deleted'),
df_merge.src_start_date.alias('start_date'),df_merge.src_end_date.alias('end_date'))

df_merge_p4_2 = df_merge.filter(df_merge.action == 'UPSERT').withColumn('end_date',
date_sub(df_merge.src_start_date, 1)).withColumn('is_current', lit(False)).withColumn('is_deleted',
lit(False)).select(column_names)


In [14]:
# Union all records together

df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(
df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2).unionAll(df_target_false)

df_merge_final.orderBy(['id', 'start_date']).show()

# At last, you can overwrite existing data using this new data frame.

# ...

#df_merge_final.show()

+---+----------------+----------+----------+----------+----------+
| id|            attr|is_current|is_deleted|start_date|  end_date|
+---+----------------+----------+----------+----------+----------+
|  1|          Hello!|     false|     false|2018-01-01|2018-12-31|
|  1|    Hello World!|      true|     false|2019-01-01|9999-12-31|
|  2|    Hello Spark!|     false|     false|2019-02-01|2019-07-24|
|  2|  Hello PySpark!|      true|     false|2019-07-25|9999-12-31|
|  3|Hello Old World!|     false|      true|2019-02-01|9999-12-31|
|  4|    Hello Scala!|      true|     false|2019-07-25|9999-12-31|
+---+----------------+----------+----------+----------+----------+

