In [1]:
# https://mungingdata.com/delta-lake/type-2-scd-upserts/

In [2]:
import pandas as pd

In [3]:
import findspark
findspark.init("/opt/manual/spark")

In [4]:
from pyspark.sql import SparkSession, functions as F

In [5]:
# If you are running Spark 2.4, you have to use Delta Lake 0.6.0.
spark = (SparkSession.builder
         .appName("Delta Lake SCD Type2")
         .master("yarn")
         .config("spark.sql.shuffle.partition", 4)
         .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .getOrCreate())

In [6]:
# You must import delta libs after SparkSession created

from delta.tables import *

## Create a spark dataframe

In [7]:
customers = spark.createDataFrame([(10001504, "Hasan Şahintepesi", "Çankırı", "Eldivan", True, "1992-09-01", None),
                                    (10001505, "Tuncay Çadırcı", "Ankara", "Keçiören", True, "1992-10-12", None),
                                  (10001506, "Melahat Bakır", "İstanbul", "Beykoz", True, "1992-08-26", None),
                                   (10001526, "Sultan Balcı", "Muğla", "Bodrum", True, "1992-09-26", None),
                                  (10001530, "Dudu Karagölet", "Yozgat", "Sorgun", False, "1992-08-11", "1993-08-25"),
                                  (10001518, "Burcu Vataneri", "Kırşehir", "Mucur", False, "1992-08-22", "1993-06-21")],
                                   ["Id", "personName", "state", "province", "still_here", "join_date", "leave_date"])

In [8]:
customers.show()

+--------+-----------------+--------+--------+----------+----------+----------+
|      Id|       personName|   state|province|still_here| join_date|leave_date|
+--------+-----------------+--------+--------+----------+----------+----------+
|10001504|Hasan Şahintepesi| Çankırı| Eldivan|      true|1992-09-01|      null|
|10001505|   Tuncay Çadırcı|  Ankara|Keçiören|      true|1992-10-12|      null|
|10001506|    Melahat Bakır|İstanbul|  Beykoz|      true|1992-08-26|      null|
|10001526|     Sultan Balcı|   Muğla|  Bodrum|      true|1992-09-26|      null|
|10001530|   Dudu Karagölet|  Yozgat|  Sorgun|     false|1992-08-11|1993-08-25|
|10001518|   Burcu Vataneri|Kırşehir|   Mucur|     false|1992-08-22|1993-06-21|
+--------+-----------------+--------+--------+----------+----------+----------+



## Write to deltalake

In [9]:
deltaPath = "hdfs://localhost:9000/user/train/sc2_delta"

In [10]:
customers.write \
.mode("overwrite") \
.format("delta") \
.save(deltaPath)

In [11]:
! hdfs dfs -ls /user/train/sc2_delta

Found 3 items
drwxr-xr-x   - train supergroup          0 2022-01-08 14:09 /user/train/sc2_delta/_delta_log
-rw-r--r--   1 train supergroup       1877 2022-01-08 14:09 /user/train/sc2_delta/part-00000-0718c752-27f5-483d-b87f-24847019d815-c000.snappy.parquet
-rw-r--r--   1 train supergroup       1911 2022-01-08 14:09 /user/train/sc2_delta/part-00001-6b510b26-bf26-4219-90e1-2ba2975ed5e4-c000.snappy.parquet


In [12]:
! hdfs dfs -ls /user/train/sc2_delta/_delta_log

Found 1 items
-rw-r--r--   1 train supergroup       1394 2022-01-08 14:09 /user/train/sc2_delta/_delta_log/00000000000000000000.json


## Read From Deltalake as DeltaTable

In [13]:
customers_delta = DeltaTable.forPath(spark, deltaPath)

In [14]:
type(customers_delta)

delta.tables.DeltaTable

In [15]:
customers_delta.toDF().show()

+--------+-----------------+--------+--------+----------+----------+----------+
|      Id|       personName|   state|province|still_here| join_date|leave_date|
+--------+-----------------+--------+--------+----------+----------+----------+
|10001526|     Sultan Balcı|   Muğla|  Bodrum|      true|1992-09-26|      null|
|10001530|   Dudu Karagölet|  Yozgat|  Sorgun|     false|1992-08-11|1993-08-25|
|10001518|   Burcu Vataneri|Kırşehir|   Mucur|     false|1992-08-22|1993-06-21|
|10001504|Hasan Şahintepesi| Çankırı| Eldivan|      true|1992-09-01|      null|
|10001505|   Tuncay Çadırcı|  Ankara|Keçiören|      true|1992-10-12|      null|
|10001506|    Melahat Bakır|İstanbul|  Beykoz|      true|1992-08-26|      null|
+--------+-----------------+--------+--------+----------+----------+----------+



## New Customers

In [16]:
customers_new = spark.createDataFrame([(10001417, "Tuncay Kavcı", "Kütahya", "Merkez", "1994-09-01"),
                                       (10001418, "Tülay İçtiyar", "Ankara", "Sincan", "1994-09-01"),
                                       (10004055, "Arzu Taksici", "Çankırı", "Merkez", "1994-11-07"),
                                       (10001505, "Tuncay Çadırcı", "İstanbul", "Küçükyalı", "1995-01-09"),
                                      (10001526, "Sultan Balcı", "İstanbul", "Tuzla", "1995-01-09")],
                                       ["Id", "personName", "state", "province", "join_date"])

In [17]:
customers_new.show()

+--------+--------------+--------+---------+----------+
|      Id|    personName|   state| province| join_date|
+--------+--------------+--------+---------+----------+
|10001417|  Tuncay Kavcı| Kütahya|   Merkez|1994-09-01|
|10001418| Tülay İçtiyar|  Ankara|   Sincan|1994-09-01|
|10004055|  Arzu Taksici| Çankırı|   Merkez|1994-11-07|
|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+--------+--------------+--------+---------+----------+



# Adım-1: Yeni Kayıtlarla Boyut Tablosundaki Mevcut Kayıtları Karşılaştırma

In [18]:
stagedPart1 = customers_new.alias("updates") \
  .join(customers_delta.toDF().alias("customers"), "Id") \
  .where("customers.still_here = true AND (updates.state <> customers.state OR updates.province <> customers.province)") \
  .selectExpr("NULL as mergeKey", "updates.*")

In [19]:
stagedPart1.show()

+--------+--------+--------------+--------+---------+----------+
|mergeKey|      Id|    personName|   state| province| join_date|
+--------+--------+--------------+--------+---------+----------+
|    null|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|    null|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+--------+--------+--------------+--------+---------+----------+



# Adım-2: Mevcut Kayıtlara MergeKey Ekleme

In [20]:
stagedPart2 = customers_new.selectExpr("Id as merge_key", *customers_new.columns)

In [21]:
stagedPart2.show()

+---------+--------+--------------+--------+---------+----------+
|merge_key|      Id|    personName|   state| province| join_date|
+---------+--------+--------------+--------+---------+----------+
| 10001417|10001417|  Tuncay Kavcı| Kütahya|   Merkez|1994-09-01|
| 10001418|10001418| Tülay İçtiyar|  Ankara|   Sincan|1994-09-01|
| 10004055|10004055|  Arzu Taksici| Çankırı|   Merkez|1994-11-07|
| 10001505|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
| 10001526|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+---------+--------+--------------+--------+---------+----------+



# Adım-3: Adım-1 ve Adım-2’deki Kayıtların Birbirine Eklenmesi (union)

In [22]:
stagedUpdates = stagedPart1.union(stagedPart2)
stagedUpdates.show()

+--------+--------+--------------+--------+---------+----------+
|mergeKey|      Id|    personName|   state| province| join_date|
+--------+--------+--------------+--------+---------+----------+
|    null|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|    null|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
|10001417|10001417|  Tuncay Kavcı| Kütahya|   Merkez|1994-09-01|
|10001418|10001418| Tülay İçtiyar|  Ankara|   Sincan|1994-09-01|
|10004055|10004055|  Arzu Taksici| Çankırı|   Merkez|1994-11-07|
|10001505|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|10001526|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+--------+--------+--------------+--------+---------+----------+



In [23]:
type(stagedUpdates)

pyspark.sql.dataframe.DataFrame

# Adım-4: Boyut Tablosu (DeltaTable) Üzerinde Değişikliklerin İşlenmesim

In [24]:
customers_delta \
  .alias("customers") \
  .merge(stagedUpdates.alias("staged_updates"), "customers.Id = mergeKey") \
  .whenMatchedUpdate(condition="customers.still_here = true AND (staged_updates.state <> customers.state OR staged_updates.province <> customers.province)", \
  set={"still_here": "false", "leave_date": "staged_updates.join_date"}) \
  .whenNotMatchedInsert(values={
    "Id": "staged_updates.Id",
    "personName": "staged_updates.personName",
    "state": "staged_updates.state",
    "province": "staged_updates.province",
    "still_here": "true",
    "join_date": "staged_updates.join_date",
    "leave_date": "null"}) \
  .execute()

In [25]:
customers_delta.toDF().orderBy(F.desc("personName")).show()

+--------+-----------------+--------+---------+----------+----------+----------+
|      Id|       personName|   state| province|still_here| join_date|leave_date|
+--------+-----------------+--------+---------+----------+----------+----------+
|10001418|    Tülay İçtiyar|  Ankara|   Sincan|      true|1994-09-01|      null|
|10001505|   Tuncay Çadırcı|  Ankara| Keçiören|     false|1992-10-12|1995-01-09|
|10001505|   Tuncay Çadırcı|İstanbul|Küçükyalı|      true|1995-01-09|      null|
|10001417|     Tuncay Kavcı| Kütahya|   Merkez|      true|1994-09-01|      null|
|10001526|     Sultan Balcı|İstanbul|    Tuzla|      true|1995-01-09|      null|
|10001526|     Sultan Balcı|   Muğla|   Bodrum|     false|1992-09-26|1995-01-09|
|10001506|    Melahat Bakır|İstanbul|   Beykoz|      true|1992-08-26|      null|
|10001504|Hasan Şahintepesi| Çankırı|  Eldivan|      true|1992-09-01|      null|
|10001530|   Dudu Karagölet|  Yozgat|   Sorgun|     false|1992-08-11|1993-08-25|
|10001518|   Burcu Vataneri|

In [26]:
spark.stop()