In [0]:
from pyspark.sql.functions import col, coalesce, concat_ws, xxhash64,expr,current_timestamp,lit
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text("p_target_table_path","")
dbutils.widgets.text("p_src_view","")
dbutils.widgets.text("p_merge_key_cols",'')

v_target_table_path = dbutils.widgets.get("p_target_table_path")
v_src_view = dbutils.widgets.get("p_src_view")
v_merge_key_cols_str = dbutils.widgets.get("p_merge_key_cols")
v_merge_key_cols = v_merge_key_cols_str.split(',')
print(v_merge_key_cols)


In [0]:
src_df = spark.sql(f"select * from {v_src_view}")

In [0]:
tgt_df = spark.read.format("delta").table(v_target_table_path).drop("current_timestamp","ending_timestamp")

In [0]:
tgt_renamed_df = tgt_df.select([col(c).alias(f"tgt_{c}") for c in tgt_df.columns])

###Updation,Insertion and Deletion

In [0]:
l_joint_df = src_df.join(tgt_renamed_df,src_df[v_merge_key_cols[0]] == tgt_renamed_df[f"tgt_{v_merge_key_cols[0]}"],"left")

In [0]:
l_joint_df.display()

In [0]:
deletion_df = tgt_df.join(src_df,src_df[v_merge_key_cols[0]] == tgt_df[v_merge_key_cols[0]],"anti")
deletion_df.display()

In [0]:
print(v_merge_key_cols)
print(src_df.columns)
print(tgt_renamed_df.columns)


In [0]:
src_cols_expr = [coalesce(col(c)) for c in src_df.columns]
tgt_cols_expr = [coalesce(col(c)) for c in tgt_renamed_df.columns]
merge_key_cols_expr = [coalesce(col(c).cast("string")) for c in v_merge_key_cols]


# Build hashes from concatenated non-null-safe strings
src_hash_col = xxhash64(concat_ws('|', *src_cols_expr))
tgt_hash_col = xxhash64(concat_ws('|', *tgt_cols_expr))
merge_key = concat_ws('|', *merge_key_cols_expr)


In [0]:
print(merge_key)

In [0]:
incremental_joint_df = l_joint_df.filter(src_hash_col != tgt_hash_col)
updated_vals_df= incremental_joint_df.filter(incremental_joint_df[f"tgt_{v_merge_key_cols[0]}"].isNotNull())


In [0]:
updated_vals_df.display()

In [0]:
incremental_joint_df.display()


In [0]:
merged_incremental_joint_df = incremental_joint_df.withColumn("merge_key",merge_key)
merged_deletion_df = deletion_df.withColumn("merge_key",merge_key)



In [0]:
merged_deletion_df.display()

In [0]:
final_columns = [field.name for field in merged_incremental_joint_df.schema]
reference_df = merged_incremental_joint_df



def align_schema(df, final_columns):
    for col in final_columns:
        if col not in df.columns:
            df = df.withColumn(col, lit(None))
    return df.select(final_columns)



In [0]:
for col in final_columns:
    if col not in updated_vals_df.columns:
        print(col)

In [0]:
merged_incremental_joint_df = align_schema(merged_incremental_joint_df, final_columns)
merged_deletion_df = align_schema(merged_deletion_df, final_columns)
updated_vals_df = align_schema(updated_vals_df, final_columns)


In [0]:
    print(
        merged_incremental_joint_df.schema == merged_deletion_df.schema and
        merged_deletion_df.schema == updated_vals_df.schema
    )


In [0]:
merged_incremental_joint_df.display()


In [0]:
print("updated_vals_df count: ", updated_vals_df.count())


In [0]:
final_updation_df = merged_incremental_joint_df\
    .unionByName(merged_deletion_df,allowMissingColumns=False)\
    .unionByName(updated_vals_df,allowMissingColumns=False)\
        .withColumn("creation_timestamp", lit(None).cast("timestamp")) \
        .withColumn("ending_timestamp", lit(None).cast("timestamp"))

In [0]:
final_updation_df.select("driver_name","merge_key").display()

In [0]:
final_updation_df.display(truncate=False)

In [0]:
delta_tgt = DeltaTable.forName(spark, v_target_table_path)


tgt_merge_key_expr = "concat_ws('|', " + ", "\
    .join([f"coalesce(cast(tgt.{c} as string), '')" for c in
v_merge_key_cols]) + ")"
merge_condition = f"{tgt_merge_key_expr} = src.merge_key"

delta_tgt.alias("tgt").merge(
    source=final_updation_df.alias("src"),
    condition=merge_condition
).whenMatchedUpdate(set = {
    "ending_timestamp":current_timestamp()
}) \
 .whenNotMatchedInsertAll().execute()

In [0]:
delta_tgt_df = delta_tgt.toDF()

In [0]:
delta_tgt_df.write.mode("overwrite").format("delta").saveAsTable(v_target_table_path)


In [0]:
# %sql 
# SELECT * FROM vsarthicat.formula1_gold.race_results;