In [0]:
from datetime import datetime
TARGET_TABLE = dbutils.widgets.get("TARGET_TABLE")

curr_timestamp = datetime.now()

In [0]:
%run ./data_utility_modules

In [0]:
%run ./custom_etl_functions

In [0]:
schema_mgr = SchemaManager(spark)

# Get metadata values
checkpoint_time = schema_mgr.get_metadata(TARGET_TABLE, "checkpoint")
source_table = schema_mgr.get_metadata(TARGET_TABLE, "source_table")
table_keys = schema_mgr.get_metadata(TARGET_TABLE, "table_keys").split(",")
ncp_schema = schema_mgr.get_schema(TARGET_TABLE)



In [0]:
from pyspark.sql.functions import current_timestamp, from_utc_timestamp, col

sync_point_column = "inserted_at"
print(f"Last Checkpoint: {checkpoint_time}")
      
source_df = (
    spark.read.table(source_table)
    .where(col(sync_point_column) > checkpoint_time)
    .drop("inserted_at").drop("source_file_path").drop("source_file_name")
    .withColumn("inserted_at", from_utc_timestamp(current_timestamp(), "GMT"))
    .dropDuplicates(table_keys)
)

total_rows = source_df.count()

print(f"Run timestamp: {curr_timestamp} - Total rows expected {total_rows}")

In [0]:
if TARGET_TABLE == 'ws_ml_databricks_prod_uks.ncp.transactions_silver':
    schema = spark.table(TARGET_TABLE).schema
    source_df = filter_and_transform_transactions(df=source_df, schema=ncp_schema)
    

In [0]:
from delta import DeltaTable


if spark.catalog.tableExists(TARGET_TABLE):
    merge_keys_string = " AND ".join([f"target.{t} = source.{t}" for t in table_keys])
    
    # Get target table schema fields
    target_fields = set(f.name for f in spark.table(TARGET_TABLE).schema.fields)
    source_fields = set(f.name for f in source_df.schema.fields)
    
    # Identify new columns in source
    new_columns = source_fields - target_fields

    if new_columns:
        add_cols_sql = ", ".join(
            f"{col} {source_df.schema[col].dataType.simpleString()}" for col in new_columns
        )
        spark.sql(f"ALTER TABLE {TARGET_TABLE} ADD COLUMNS ({add_cols_sql})")

    # Perform the merge
    target = DeltaTable.forName(spark, TARGET_TABLE)
    target.alias("target").merge(
        source_df.alias("source"), merge_keys_string
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
    source_df.write.format("delta").saveAsTable(TARGET_TABLE)


In [0]:
if total_rows > 0:
  schema_mgr.update_metadata(TARGET_TABLE, "checkpoint", str(curr_timestamp))

In [0]:
# Optimize the target table
result = spark.sql(f"OPTIMIZE {TARGET_TABLE}")

display(result)