In [1]:
# https://mungingdata.com/delta-lake/type-2-scd-upserts/

In [1]:
import pandas as pd

In [2]:
from pyspark.sql import SparkSession, functions as F

In [3]:

spark = (SparkSession.builder
         .appName("Delta Lake SCD Type2")
         .master("local[2]")
         .config("spark.sql.shuffle.partition", 4)
         .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .getOrCreate())

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d53970a1-ae04-4830-a5f8-726ef06400a7;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 576ms :: artifacts dl 35ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |  

In [4]:
# You must import delta libs after SparkSession created

from delta.tables import *

## Create a spark dataframe

In [5]:
customers = spark.createDataFrame([(10001504, "Hasan Şahintepesi", "Çankırı", "Eldivan", True, "1992-09-01", None),
                                    (10001505, "Tuncay Çadırcı", "Ankara", "Keçiören", True, "1992-10-12", None),
                                  (10001506, "Melahat Bakır", "İstanbul", "Beykoz", True, "1992-08-26", None),
                                   (10001526, "Sultan Balcı", "Muğla", "Bodrum", True, "1992-09-26", None),
                                  (10001530, "Dudu Karagölet", "Yozgat", "Sorgun", False, "1992-08-11", "1993-08-25"),
                                  (10001518, "Burcu Vataneri", "Kırşehir", "Mucur", False, "1992-08-22", "1993-06-21")],
                                   ["Id", "personName", "state", "province", "still_here", "join_date", "leave_date"])

In [6]:
customers.show()

                                                                                

+--------+-----------------+--------+--------+----------+----------+----------+
|      Id|       personName|   state|province|still_here| join_date|leave_date|
+--------+-----------------+--------+--------+----------+----------+----------+
|10001504|Hasan Şahintepesi| Çankırı| Eldivan|      true|1992-09-01|      NULL|
|10001505|   Tuncay Çadırcı|  Ankara|Keçiören|      true|1992-10-12|      NULL|
|10001506|    Melahat Bakır|İstanbul|  Beykoz|      true|1992-08-26|      NULL|
|10001526|     Sultan Balcı|   Muğla|  Bodrum|      true|1992-09-26|      NULL|
|10001530|   Dudu Karagölet|  Yozgat|  Sorgun|     false|1992-08-11|1993-08-25|
|10001518|   Burcu Vataneri|Kırşehir|   Mucur|     false|1992-08-22|1993-06-21|
+--------+-----------------+--------+--------+----------+----------+----------+



## Write to deltalake

In [7]:
deltaPath = "file:///opt/examples/datasets/sc2_delta"

In [8]:
customers.write \
.mode("overwrite") \
.format("delta") \
.save(deltaPath)

                                                                                

In [9]:
! ls -l /opt/examples/datasets/sc2_delta

total 8
drwxr-xr-x. 3 root root   93 Nov  5 05:36 _delta_log
-rw-r--r--. 1 root root 2118 Nov  5 05:36 part-00000-518f7b27-7b87-4e80-a7cb-3dcf8d6ebe41-c000.snappy.parquet
-rw-r--r--. 1 root root 2139 Nov  5 05:36 part-00001-8eefa1b1-485c-453e-9ade-fe68e3e2bf7a-c000.snappy.parquet


In [10]:
! ls -l /opt/examples/datasets/sc2_delta/_delta_log

total 4
-rw-r--r--. 1 root root 2457 Nov  5 05:36 00000000000000000000.json
drwxr-xr-x. 2 root root    6 Nov  5 05:36 _commits


## Read From Deltalake as DeltaTable

In [11]:
customers_delta = DeltaTable.forPath(spark, deltaPath)

In [12]:
type(customers_delta)

delta.tables.DeltaTable

In [13]:
customers_delta.toDF().show()

24/11/05 05:36:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------+-----------------+--------+--------+----------+----------+----------+
|      Id|       personName|   state|province|still_here| join_date|leave_date|
+--------+-----------------+--------+--------+----------+----------+----------+
|10001526|     Sultan Balcı|   Muğla|  Bodrum|      true|1992-09-26|      NULL|
|10001530|   Dudu Karagölet|  Yozgat|  Sorgun|     false|1992-08-11|1993-08-25|
|10001518|   Burcu Vataneri|Kırşehir|   Mucur|     false|1992-08-22|1993-06-21|
|10001504|Hasan Şahintepesi| Çankırı| Eldivan|      true|1992-09-01|      NULL|
|10001505|   Tuncay Çadırcı|  Ankara|Keçiören|      true|1992-10-12|      NULL|
|10001506|    Melahat Bakır|İstanbul|  Beykoz|      true|1992-08-26|      NULL|
+--------+-----------------+--------+--------+----------+----------+----------+



## New Customers

In [14]:
customers_new = spark.createDataFrame([(10001417, "Tuncay Kavcı", "Kütahya", "Merkez", "1994-09-01"),
                                       (10001418, "Tülay İçtiyar", "Ankara", "Sincan", "1994-09-01"),
                                       (10004055, "Arzu Taksici", "Çankırı", "Merkez", "1994-11-07"),
                                       (10001505, "Tuncay Çadırcı", "İstanbul", "Küçükyalı", "1995-01-09"),
                                      (10001526, "Sultan Balcı", "İstanbul", "Tuzla", "1995-01-09")],
                                       ["Id", "personName", "state", "province", "join_date"])

In [15]:
customers_new.show()

[Stage 15:>                                                         (0 + 1) / 1]

+--------+--------------+--------+---------+----------+
|      Id|    personName|   state| province| join_date|
+--------+--------------+--------+---------+----------+
|10001417|  Tuncay Kavcı| Kütahya|   Merkez|1994-09-01|
|10001418| Tülay İçtiyar|  Ankara|   Sincan|1994-09-01|
|10004055|  Arzu Taksici| Çankırı|   Merkez|1994-11-07|
|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+--------+--------------+--------+---------+----------+



                                                                                

# Adım-1: Yeni Kayıtlarla Boyut Tablosundaki Mevcut Kayıtları Karşılaştırma

In [16]:
stagedPart1 = customers_new.alias("updates") \
  .join(customers_delta.toDF().alias("customers"), "Id") \
  .where("customers.still_here = true AND (updates.state <> customers.state OR updates.province <> customers.province)") \
  .selectExpr("NULL as mergeKey", "updates.*")

In [17]:
stagedPart1.show()

                                                                                

+--------+--------+--------------+--------+---------+----------+
|mergeKey|      Id|    personName|   state| province| join_date|
+--------+--------+--------------+--------+---------+----------+
|    NULL|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|    NULL|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+--------+--------+--------------+--------+---------+----------+



                                                                                

# Adım-2: Mevcut Kayıtlara MergeKey Ekleme

In [18]:
stagedPart2 = customers_new.selectExpr("Id as merge_key", *customers_new.columns)

In [19]:
stagedPart2.show()

                                                                                

+---------+--------+--------------+--------+---------+----------+
|merge_key|      Id|    personName|   state| province| join_date|
+---------+--------+--------------+--------+---------+----------+
| 10001417|10001417|  Tuncay Kavcı| Kütahya|   Merkez|1994-09-01|
| 10001418|10001418| Tülay İçtiyar|  Ankara|   Sincan|1994-09-01|
| 10004055|10004055|  Arzu Taksici| Çankırı|   Merkez|1994-11-07|
| 10001505|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
| 10001526|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+---------+--------+--------------+--------+---------+----------+



# Adım-3: Adım-1 ve Adım-2’deki Kayıtların Birbirine Eklenmesi (union)

In [20]:
stagedUpdates = stagedPart1.union(stagedPart2)
stagedUpdates.show()



+--------+--------+--------------+--------+---------+----------+
|mergeKey|      Id|    personName|   state| province| join_date|
+--------+--------+--------------+--------+---------+----------+
|    NULL|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|    NULL|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
|10001417|10001417|  Tuncay Kavcı| Kütahya|   Merkez|1994-09-01|
|10001418|10001418| Tülay İçtiyar|  Ankara|   Sincan|1994-09-01|
|10004055|10004055|  Arzu Taksici| Çankırı|   Merkez|1994-11-07|
|10001505|10001505|Tuncay Çadırcı|İstanbul|Küçükyalı|1995-01-09|
|10001526|10001526|  Sultan Balcı|İstanbul|    Tuzla|1995-01-09|
+--------+--------+--------------+--------+---------+----------+



                                                                                

In [21]:
type(stagedUpdates)

pyspark.sql.dataframe.DataFrame

# Adım-4: Boyut Tablosu (DeltaTable) Üzerinde Değişikliklerin İşlenmesim

In [22]:
customers_delta \
  .alias("customers") \
  .merge(stagedUpdates.alias("staged_updates"), "customers.Id = mergeKey") \
  .whenMatchedUpdate(condition="customers.still_here = true AND (staged_updates.state <> customers.state OR staged_updates.province <> customers.province)", \
  set={"still_here": "false", "leave_date": "staged_updates.join_date"}) \
  .whenNotMatchedInsert(values={
    "Id": "staged_updates.Id",
    "personName": "staged_updates.personName",
    "state": "staged_updates.state",
    "province": "staged_updates.province",
    "still_here": "true",
    "join_date": "staged_updates.join_date",
    "leave_date": "null"}) \
  .execute()

                                                                                

In [23]:
customers_delta.toDF().orderBy(F.desc("personName")).show()

                                                                                

+--------+-----------------+--------+---------+----------+----------+----------+
|      Id|       personName|   state| province|still_here| join_date|leave_date|
+--------+-----------------+--------+---------+----------+----------+----------+
|10001418|    Tülay İçtiyar|  Ankara|   Sincan|      true|1994-09-01|      NULL|
|10001505|   Tuncay Çadırcı|İstanbul|Küçükyalı|      true|1995-01-09|      NULL|
|10001505|   Tuncay Çadırcı|  Ankara| Keçiören|     false|1992-10-12|1995-01-09|
|10001417|     Tuncay Kavcı| Kütahya|   Merkez|      true|1994-09-01|      NULL|
|10001526|     Sultan Balcı|İstanbul|    Tuzla|      true|1995-01-09|      NULL|
|10001526|     Sultan Balcı|   Muğla|   Bodrum|     false|1992-09-26|1995-01-09|
|10001506|    Melahat Bakır|İstanbul|   Beykoz|      true|1992-08-26|      NULL|
|10001504|Hasan Şahintepesi| Çankırı|  Eldivan|      true|1992-09-01|      NULL|
|10001530|   Dudu Karagölet|  Yozgat|   Sorgun|     false|1992-08-11|1993-08-25|
|10001518|   Burcu Vataneri|

In [24]:
spark.stop()