In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

#### Create Spark Session and Spark Context

In [5]:
spark = SparkSession.\
        builder.\
        appName('Handle_INCR_Data'). \
        enableHiveSupport(). \
        getOrCreate()

In [6]:
sc = spark.sparkContext

In [7]:
hist_df = spark.read.csv('/home/rameshbabug/Documents/projects/internal/spark-playground/src/data/hist.csv', header=True)
hist_df.show(5)

+---+----+----+-------+-------------+
| id|name| sal|address|modified_date|
+---+----+----+-------+-------------+
|  1| ABC|5000|    Hyd|   2015-01-12|
|  2| DEF|4000|   Bang|   2016-03-15|
|  3| GHI|3000|   Pune|   2014-06-18|
|  4| JKL|4500|    Chn|   2018-01-03|
|  5| MNO|5600|    Chn|   2019-04-17|
+---+----+----+-------+-------------+
only showing top 5 rows



In [8]:
incr_df = spark.read.csv('/home/rameshbabug/Documents/projects/internal/spark-playground/src/data/incr.csv', header=True)
incr_df.show()

+---+----+----+-------+-------------+----+
| id|name| sal|address|modified_date|flag|
+---+----+----+-------+-------------+----+
|  2| DEF|4500|   Bang|   2017-05-15|   U|
|  3| GHI|3000|    Hyd|   2019-03-18|   U|
|  4| JKL|4500|    Chn|   2019-01-03|   D|
| 10|ABC1|5300|    Hyd|   2020-01-05|   D|
|  8| XYZ|4800|    Chn|   2020-10-21|   I|
|  9| PQR|5000|   Bang|   2020-12-29|   I|
+---+----+----+-------+-------------+----+



### Handle Deletes

In [9]:
delete_df = incr_df.filter(incr_df['Flag']=='D')
delete_df.show()

+---+----+----+-------+-------------+----+
| id|name| sal|address|modified_date|flag|
+---+----+----+-------+-------------+----+
|  4| JKL|4500|    Chn|   2019-01-03|   D|
| 10|ABC1|5300|    Hyd|   2020-01-05|   D|
+---+----+----+-------+-------------+----+



In [10]:
from pyspark.sql.functions import col

In [11]:
hist_df.printSchema()
hist_df_updated = hist_df.select(*(col(x).alias(x + '_1') for x in hist_df.columns))
hist_df_updated.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- address: string (nullable = true)
 |-- modified_date: string (nullable = true)

root
 |-- id_1: string (nullable = true)
 |-- name_1: string (nullable = true)
 |-- sal_1: string (nullable = true)
 |-- address_1: string (nullable = true)
 |-- modified_date_1: string (nullable = true)



In [12]:
delete_df.printSchema()
delete_df_updated = delete_df.select(*(col(x).alias(x + '_2') for x in delete_df.columns))
delete_df_updated.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- address: string (nullable = true)
 |-- modified_date: string (nullable = true)
 |-- flag: string (nullable = true)

root
 |-- id_2: string (nullable = true)
 |-- name_2: string (nullable = true)
 |-- sal_2: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- modified_date_2: string (nullable = true)
 |-- flag_2: string (nullable = true)



In [13]:
from pyspark.sql.functions import broadcast

In [14]:
snapshot_df = hist_df_updated.join(broadcast(delete_df_updated), col('id_1') == col('id_2'), "fullouter")
snapshot_df.printSchema()
snapshot_df.show()

root
 |-- id_1: string (nullable = true)
 |-- name_1: string (nullable = true)
 |-- sal_1: string (nullable = true)
 |-- address_1: string (nullable = true)
 |-- modified_date_1: string (nullable = true)
 |-- id_2: string (nullable = true)
 |-- name_2: string (nullable = true)
 |-- sal_2: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- modified_date_2: string (nullable = true)
 |-- flag_2: string (nullable = true)

+----+------+-----+---------+---------------+----+------+-----+---------+---------------+------+
|id_1|name_1|sal_1|address_1|modified_date_1|id_2|name_2|sal_2|address_2|modified_date_2|flag_2|
+----+------+-----+---------+---------------+----+------+-----+---------+---------------+------+
|   7|   VWX| 3200|      Hyd|     2019-12-24|null|  null| null|     null|           null|  null|
|   3|   GHI| 3000|     Pune|     2014-06-18|null|  null| null|     null|           null|  null|
|   5|   MNO| 5600|      Chn|     2019-04-17|null|  null| null|     null|

In [15]:
actual_delete_records_df = snapshot_df.filter(col("id_1") == col("id_2")).select([col(c).alias(c.replace("_2", "")) for c in delete_df_updated.columns])
actual_delete_records_df.printSchema()
actual_delete_records_df.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- address: string (nullable = true)
 |-- modified_date: string (nullable = true)
 |-- flag: string (nullable = true)

+---+----+----+-------+-------------+----+
| id|name| sal|address|modified_date|flag|
+---+----+----+-------+-------------+----+
|  4| JKL|4500|    Chn|   2019-01-03|   D|
+---+----+----+-------+-------------+----+



In [16]:
snapshot_df_updated = snapshot_df.filter(col("id_1").isNotNull() & col("id_2").isNull()).select([col(c).alias(c.replace("_1", "")) for c in hist_df_updated.columns])
snapshot_df_updated.printSchema()
snapshot_df_updated.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- address: string (nullable = true)
 |-- modified_date: string (nullable = true)

+---+----+----+-------+-------------+
| id|name| sal|address|modified_date|
+---+----+----+-------+-------------+
|  1| ABC|5000|    Hyd|   2015-01-12|
|  2| DEF|4000|   Bang|   2016-03-15|
|  3| GHI|3000|   Pune|   2014-06-18|
|  5| MNO|5600|    Chn|   2019-04-17|
|  6| STU|4200|   Pune|   2020-09-26|
|  7| VWX|3200|    Hyd|   2019-12-24|
+---+----+----+-------+-------------+



### Handle Inserts/Updates

In [17]:
insert_update_df = incr_df.filter((col('flag') == 'I') | (col('flag') == 'U'))
insert_update_df.show()

+---+----+----+-------+-------------+----+
| id|name| sal|address|modified_date|flag|
+---+----+----+-------+-------------+----+
|  2| DEF|4500|   Bang|   2017-05-15|   U|
|  3| GHI|3000|    Hyd|   2019-03-18|   U|
|  8| XYZ|4800|    Chn|   2020-10-21|   I|
|  9| PQR|5000|   Bang|   2020-12-29|   I|
+---+----+----+-------+-------------+----+



In [25]:
from pyspark.sql.functions import row_number, desc
from pyspark.sql.window import Window

In [33]:
snapshot_df_latest = snapshot_df_updated.select(*snapshot_df_updated.columns).union(insert_update_df.select(*snapshot_df_updated.columns))
snapshot_df_latest.show()

+---+----+----+-------+-------------+
| id|name| sal|address|modified_date|
+---+----+----+-------+-------------+
|  1| ABC|5000|    Hyd|   2015-01-12|
|  2| DEF|4000|   Bang|   2016-03-15|
|  3| GHI|3000|   Pune|   2014-06-18|
|  5| MNO|5600|    Chn|   2019-04-17|
|  6| STU|4200|   Pune|   2020-09-26|
|  7| VWX|3200|    Hyd|   2019-12-24|
|  2| DEF|4500|   Bang|   2017-05-15|
|  3| GHI|3000|    Hyd|   2019-03-18|
|  8| XYZ|4800|    Chn|   2020-10-21|
|  9| PQR|5000|   Bang|   2020-12-29|
+---+----+----+-------+-------------+



In [34]:
snapshot_df_final = snapshot_df_latest.withColumn("rownum", row_number().over(Window.partitionBy('id').orderBy(desc('modified_date')))).filter("rownum == 1").drop("rownum")
snapshot_df_final.show()

+---+----+----+-------+-------------+
| id|name| sal|address|modified_date|
+---+----+----+-------+-------------+
|  7| VWX|3200|    Hyd|   2019-12-24|
|  3| GHI|3000|    Hyd|   2019-03-18|
|  8| XYZ|4800|    Chn|   2020-10-21|
|  5| MNO|5600|    Chn|   2019-04-17|
|  6| STU|4200|   Pune|   2020-09-26|
|  9| PQR|5000|   Bang|   2020-12-29|
|  1| ABC|5000|    Hyd|   2015-01-12|
|  2| DEF|4500|   Bang|   2017-05-15|
+---+----+----+-------+-------------+

