In [0]:
import pyspark.sql.functions as sf
from datetime import datetime
from pyspark.sql.types import DateType, StringType

In [0]:
primary_keys = dbutils.widgets.get("primary_keys").split(",")
target_database = dbutils.widgets.get("target_database")
target_table_name = dbutils.widgets.get("target_table_name")

In [0]:
df = spark.sql('SELECT * FROM '+ target_database +'.' + target_table_name)
stage_df = spark.sql('SELECT * FROM '+ target_database +'.stage_' + target_table_name)

In [0]:
join_rule = None

for key in primary_keys:
  if join_rule is None:
      join_rule = df[key] == stage_df[key]
  else:
      join_rule = join_rule & (df[key] == stage_df[key])

In [0]:
invalid_df = df.join(stage_df, join_rule, how='leftanti').withColumn('date_end_validation', sf.lit(datetime.now().strftime("%Y-%m-%d")).cast(DateType()))
display(invalid_df)

In [0]:
unchanged_df = df.join(stage_df, join_rule, how='inner').select(df['*'])

In [0]:
new_df = stage_df.join(df, join_rule, how='leftanti')
new_columns = ['UUID'] + new_df.columns + ['date_begin_validation', 'date_end_validation']

new_df = new_df.withColumn('date_begin_validation', sf.lit(datetime.now().strftime("%Y-%m-%d")).cast(DateType()))
new_df = new_df.withColumn('date_end_validation', sf.lit(None).cast(DateType()))
new_df = new_df.withColumn('UUID', sf.uuid().cast(StringType()))

In [0]:
invalid_df = invalid_df.select(*new_columns)
unchanged_df = unchanged_df.select(*new_columns)
new_df = new_df.select(*new_columns)

updated_df = invalid_df.union(unchanged_df).union(new_df)
display(updated_df)

In [0]:
updated_df.write.mode("overwrite").saveAsTable(target_database + '.' + target_table_name)