###SCD

In [0]:
from pyspark.sql.types import StructType, StringType, StructField
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


In [0]:
# %sql
# drop table if exists table_tgt

#### soruce dataframe

In [0]:

#  the schema for the DataFrame
schema = StructType([
    StructField("col1", StringType()),
    StructField("col2", StringType()),
    StructField("col3", StringType()),
    StructField("col4", StringType()),
    StructField("col5", StringType())
])

#  data tuple
data = [
    ('I','1','2','3','4'),
    ('D', '1', '2', '3', '4'),
    ('D', '4', '5', '6', '7'),
    ('u','8','9','10','12'),
    ('D','8','9','10','12'),
    ('I','11','12','13','14')
]

# incr  DataFrame
df_incr = spark.createDataFrame(data, schema)

# Display the DataFrame
df_incr.display()


# Creating a window specification(only for demo purpose to server watermark/identification)
window_spec = Window.partitionBy("col2", "col3", "col4", "col5").orderBy(df_incr["col1"].asc())

df_incr = df_incr.withColumn("row_id", row_number().over(window_spec))
df_incr.createOrReplaceTempView("inc_view")
df_incr.display()

col1,col2,col3,col4,col5
I,1,2,3,4
D,1,2,3,4
D,4,5,6,7
u,8,9,10,12
D,8,9,10,12
I,11,12,13,14


col1,col2,col3,col4,col5,row_id
D,1,2,3,4,1
I,1,2,3,4,2
I,11,12,13,14,1
D,4,5,6,7,1
D,8,9,10,12,1
u,8,9,10,12,2


#### approach 1:insert records with  id's with row number 1 (wont check upsert)


In [0]:
target_table_name = "table_tgt"
source_table_name = "inc_view"
spark.sql(
            f"""
            CREATE TABLE if not exists  {target_table_name} (
                col1 CHAR(20),
                col2 CHAR(20),
                col3 CHAR(20),
                col4 CHAR(20),
                col5 CHAR(20)
            )
        """
        )
spark.sql(f"""
    INSERT INTO {target_table_name}
    SELECT  * EXCEPT(row_id)
    FROM {source_table_name}
    WHERE row_id = 1
""")
print("final Output dataframe")
spark.sql(f"""select * from {target_table_name}""").display()

final Output dataframe


col1,col2,col3,col4,col5
D,1,2,3,4
I,11,12,13,14
D,4,5,6,7
D,8,9,10,12


#### approach 2:using row_id to distinguish order of insertion and upsert is happening

In [0]:
source_table_name="inc_view"
spark.sql(
    f"""
    MERGE INTO table_tgt_temp AS tgt
    USING (
        SELECT col1, col2, col3, col4, col5,row_id
        FROM {source_table_name} where row_id=1
    ) AS src
    ON tgt.col2 = src.col2
    AND tgt.col3 = src.col3
    AND tgt.col4 = src.col4
    AND tgt.col5 = src.col5 
    WHEN MATCHED THEN
        UPDATE SET tgt.col1 = src.col1
    WHEN NOT MATCHED THEN
        INSERT (col1, col2, col3, col4, col5)
        VALUES (src.col1, src.col2, src.col3, src.col4, src.col5)
    """
)


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from table_tgt_temp

col1,col2,col3,col4,col5
D,1,2,3,4
I,11,12,13,14
D,4,5,6,7
D,8,9,10,12
