In [0]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id,col, xxhash64, lit
from pyspark.sql.types import *

### Define entrypoint Spark_App

In [0]:
conf=SparkConf()
conf.setAppName("spark_iceberg_lession")
conf.setMaster("local[6]")
conf.set("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.driver.memory","4G")
conf.set("spark.executors.memory","2G")
conf.set("spark.executors.cores",2)


Out[3]: <pyspark.conf.SparkConf at 0x7f6930cb8640>

In [0]:
spark=SparkSession.builder.config(conf=conf).getOrCreate()
spark

### Initialize a target table

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS acb_test

In [0]:
spark.sql("CREATE TABLE IF NOT EXISTS acb_test.PIT (\
             uuid string not null, \
             key string, \
             current_flag int, \
             sysdate long \
             ) \
             PARTITIONED BY(sysdate)")

Out[16]: DataFrame[]

In [0]:
%sql
USE acb_test

In [0]:
%sql
SHOW TABLES 

database,tableName,isTemporary
acb_test,pit,False


In [0]:
%sql
SELECT * FROM PIT

uuid,key,current_flag,sysdate


#### Insert data into table PIT

In [0]:
%sql
INSERT INTO PIT VALUES ('1', 'A', 1,20230506);
INSERT INTO PIT VALUES ('2', 'B', 1,20230507);

num_affected_rows,num_inserted_rows
1,1


In [0]:
target_df = spark.sql("SELECT * FROM PIT")
display(target_df)

uuid,key,current_flag,sysdate
1,A,1,20230506
2,B,1,20230507


In [0]:
%sql
DESCRIBE EXTENDED PIT

col_name,data_type,comment
uuid,string,
key,string,
current_flag,int,
sysdate,bigint,
# Partition Information,,
# col_name,data_type,comment
sysdate,bigint,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


### Prepare a merge source DataFrame

In [0]:
source_data = [Row(4, "B", 1, 20230508),
               Row(5, "C", 1, 20230516)]
source_df = spark.createDataFrame(source_data,["uuid", "key", "current_flag", "sysdate"])
display(source_df)

uuid,key,current_flag,sysdate
4,B,1,20230508
5,C,1,20230516


### Implement SCD Type 2

##### Step 1: Join source_df and target_df (left join)

In [0]:
join_df = source_df.join(target_df, (source_df.key == target_df.key) & (target_df.current_flag == 1), "leftouter") \
                   .select(target_df.key.alias("tgt_key"),
                           target_df.current_flag.alias("tgt_current_flag"),
                           target_df.sysdate.alias("tgt_sysdate"),
                           source_df["*"])
join_df.show()

+-------+----------------+-----------+----+---+------------+--------+
|tgt_key|tgt_current_flag|tgt_sysdate|uuid|key|current_flag| sysdate|
+-------+----------------+-----------+----+---+------------+--------+
|      B|               1|   20230507|   4|  B|           1|20230508|
|   null|            null|       null|   5|  C|           1|20230516|
+-------+----------------+-----------+----+---+------------+--------+



##### Step 2: Filter the records that mismatch between target_df and source_df from join_df

In [0]:
filter_df = join_df.filter(xxhash64(join_df.tgt_current_flag, join_df.tgt_sysdate) != xxhash64(join_df.current_flag, join_df.sysdate))
filter_df.show()

+-------+----------------+-----------+----+---+------------+--------+
|tgt_key|tgt_current_flag|tgt_sysdate|uuid|key|current_flag| sysdate|
+-------+----------------+-----------+----+---+------------+--------+
|      B|               1|   20230507|   4|  B|           1|20230508|
|   null|            null|       null|   5|  C|           1|20230516|
+-------+----------------+-----------+----+---+------------+--------+



##### Step 3: Generate a merge key for updating and inserting records

In [0]:
merge_df = filter_df.withColumn("mergekey", filter_df.key)
merge_df.show()

+-------+----------------+-----------+----+---+------------+--------+--------+
|tgt_key|tgt_current_flag|tgt_sysdate|uuid|key|current_flag| sysdate|mergekey|
+-------+----------------+-----------+----+---+------------+--------+--------+
|      B|               1|   20230507|   4|  B|           1|20230508|       B|
|   null|            null|       null|   5|  C|           1|20230516|       C|
+-------+----------------+-----------+----+---+------------+--------+--------+



In [0]:
dummy_df = filter_df.filter(filter_df.tgt_key.isNotNull()) \
                    .withColumn("mergekey", lit(None))
dummy_df.show()

+-------+----------------+-----------+----+---+------------+--------+--------+
|tgt_key|tgt_current_flag|tgt_sysdate|uuid|key|current_flag| sysdate|mergekey|
+-------+----------------+-----------+----+---+------------+--------+--------+
|      B|               1|   20230507|   4|  B|           1|20230508|    null|
+-------+----------------+-----------+----+---+------------+--------+--------+



In [0]:
scd_df = merge_df.union(dummy_df)
scd_df.createOrReplaceTempView("src_table")
scd_df.show()

+-------+----------------+-----------+----+---+------------+--------+--------+
|tgt_key|tgt_current_flag|tgt_sysdate|uuid|key|current_flag| sysdate|mergekey|
+-------+----------------+-----------+----+---+------------+--------+--------+
|      B|               1|   20230507|   4|  B|           1|20230508|       B|
|   null|            null|       null|   5|  C|           1|20230516|       C|
|      B|               1|   20230507|   4|  B|           1|20230508|    null|
+-------+----------------+-----------+----+---+------------+--------+--------+



##### Step 4: Merge scd_df into target table

In [0]:
%sql
MERGE INTO PIT as tgt
USING src_table as src
ON tgt.key = src.mergekey
WHEN MATCHED AND tgt.sysdate < src.sysdate THEN 
  UPDATE SET tgt.current_flag = 0
WHEN NOT MATCHED THEN 
  INSERT (uuid, key ,current_flag, sysdate) VALUES (src.uuid, src.key, src.current_flag, src.sysdate)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,1,0,2


In [0]:
%sql
SELECT * FROM PIT

uuid,key,current_flag,sysdate
1,A,1,20230506
2,B,0,20230507
5,C,1,20230516
4,B,1,20230508


# DONE