In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit,col,expr
from delta.tables import DeltaTable

In [0]:
data_month_1 = [
  ('dgb01', 'luffy', 20, 1,'A'), 
  ('dgb02', 'zoro', 22, 1,'A'),
  ('dgb03', 'sanji', 22, 1,'A')
        ]
schema = StructType([
  StructField('employee_id', StringType(), True),
  StructField('employee_name', StringType(), True),
  StructField('employee_age', IntegerType(), True),
  StructField('month_no', IntegerType(), True),
  StructField('Flag', StringType(), True)
])
df_month_1 = spark.createDataFrame(data_month_1, schema)
df_month_1.show()

+-----------+-------------+------------+--------+----+
|employee_id|employee_name|employee_age|month_no|Flag|
+-----------+-------------+------------+--------+----+
|      dgb01|        luffy|          20|       1|   A|
|      dgb02|         zoro|          22|       1|   A|
|      dgb03|        sanji|          22|       1|   A|
+-----------+-------------+------------+--------+----+



In [0]:
dbutils.fs.rm('dbfs:/FileStore/streaming_data/stream_read/std_dt1.xlsx')

Out[1]: True

In [0]:
df_month_1.write.mode("overwrite").format("delta").option("path",'dbfs:/FileStore/delta_table2').saveAsTable('scd2')

In [0]:
dbutils.fs.rm('dbfs:/FileStore/delta_table2',True)

Out[3]: True

In [0]:
%sql
select * from scd2

employee_id,employee_name,employee_age,month_no,Flag
dgb01,luffy,20,1,A
dgb03,sanji,22,1,A
dgb02,zoro,22,1,A


In [0]:
deltaTable_main = DeltaTable.forPath(spark, 'dbfs:/FileStore/delta_table2')

In [0]:
data_month_2 = [
  ('dgb01', 'luffy', 20, 1), 
  ('dgb02', 'lost zoro', 22, 1),
  ('dgb04', 'Ace', 22, 1)
        ]
schema = StructType([
  StructField('employee_id', StringType(), True),
  StructField('employee_name', StringType(), True),
  StructField('employee_age', IntegerType(), True),
  StructField('month_no', IntegerType(), True)
])
df_month_2 = spark.createDataFrame(data_month_2, schema)
df_month_2.show()

+-----------+-------------+------------+--------+
|employee_id|employee_name|employee_age|month_no|
+-----------+-------------+------------+--------+
|      dgb01|        luffy|          20|       1|
|      dgb02|    lost zoro|          22|       1|
|      dgb04|          Ace|          22|       1|
+-----------+-------------+------------+--------+



In [0]:
newAddressesToInsert = df_month_2.alias("updates").join(deltaTable_main.toDF().alias("main"), "employee_id").where("main.Flag = 'A' AND updates.employee_name <> main.employee_name")

In [0]:
#with where condition
newAddressesToInsert.display()

employee_id,employee_name,employee_age,month_no,employee_name.1,employee_age.1,month_no.1,Flag
dgb02,lost zoro,22,1,zoro,22,1,A


In [0]:
#without where condition
newAddressesToInsert.display()

employee_id,employee_name,employee_age,month_no,employee_name.1,employee_age.1,month_no.1,Flag
dgb01,luffy,20,1,luffy,20,1,A
dgb02,lost zoro,22,1,zoro,22,1,A


In [0]:
stagedUpdates = (newAddressesToInsert.selectExpr("NULL as mergeKey", "updates.*").union(df_month_2.selectExpr("employee_id as mergeKey", "*")))

In [0]:
stagedUpdates.display()

mergeKey,employee_id,employee_name,employee_age,month_no
dgb01,dgb01,luffy,20,1
dgb02,dgb02,lost zoro,22,1
dgb04,dgb04,Ace,22,1


In [0]:
deltaTable_main.alias("target").merge(
    stagedUpdates.alias("source"),
    "target.employee_id = mergeKey"
).whenMatchedUpdate(
    condition="target.Flag = 'A' AND target.employee_name != source.employee_name",
    set={
        "Flag": lit("aa")  # Set current to false and endDate to source's effective date.
    }
).whenNotMatchedInsert(
    values={
        "employee_id": "source.employee_id",
        "employee_name": "source.employee_name",
        "employee_age": "source.employee_age",
        "month_no": "source.month_no",
        "Flag": lit("A")  # Set current to true along with the new address and its effective date.
    }
).execute()

In [0]:
%sql
select * from scd2

employee_id,employee_name,employee_age,month_no,Flag
dgb02,lost zoro,22,1,A
dgb01,luffy,20,1,A
dgb02,zoro,22,1,aa
dgb03,sanji,22,1,A
dgb04,Ace,22,1,A


In [0]:
def mergeAndSaveDf(df):
    # d1 = dict(map(lambda x: (f'target.{x}', f'source.{x}'), df.columns))
    # d2=dict(map(lambda x: (f'target.{x}', f'source.{x}'), df.columns))
    # d1['target.status']=lit('Existing')
    # d2['target.status']=lit('Additional')
    if DeltaTable.isDeltaTable(spark,f"{path}"):
        deltaTable = DeltaTable.forPath(spark, f"{path}")
        # spark.sql(f"UPDATE {database_name}.{target_table_name} SET status='missing'")
        # deltaTable.alias("target").merge(
        #   source=df.alias("source"),
        #   condition=f'target.{mergeCol}=source.{mergeCol}').whenMatchedUpdate(set=d1).whenNotMatchedInsert(values=d2).execute()
        newAddressesToInsert = df_month_2.alias("updates").join(deltaTable_main.toDF().alias("main"), "employee_id").where("main.Flag = 'A' AND updates.employee_name <> main.employee_name")
        stagedUpdates = (newAddressesToInsert.selectExpr("NULL as mergeKey", "updates.*").union(df_month_2.selectExpr("employee_id as mergeKey", "*")))
        deltaTable_main.alias("target").merge(stagedUpdates.alias("source"),"target.employee_id = mergeKey").whenMatchedUpdate(
          condition="target.Flag = 'A' AND target.employee_name != source.employee_name",set={"Flag": lit("INACTIVE")}).whenNotMatchedInsert(
            values={"employee_id": "source.employee_id","employee_name": "source.employee_name","employee_age": "source.employee_age","month_no": "source.month_no","Flag": lit("ACTIVE")}).execute()
    else:
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        df_final = df.withColumn("FLAG",lit("ACTIVE"))
        df_final.write.mode("overwrite").format("delta").option("path",path).saveAsTable(f"{database_name}.{target_table_name}")
