In [0]:
# This ugly crap is needed to import modules from the parent folder
import os
import sys
sys.path.append(f"{os.getcwd()}/..")

from pyspark.sql.functions import to_date, from_utc_timestamp, current_timestamp, lit, sha2, to_json, struct, col
from functions import create_table_if_not_exists
from delta.tables import DeltaTable

settings = {
    "pipeline_function": "gold.powerPlay",
    "src_table_name": "edsm.silver.powerPlay",
    "dst_table_name": "edsm.gold.powerPlay",
    "merge_condition": "t.name = s.name and t.id = s.id and t.id64 = s.id64 and t.power = s.power",
    "readStreamOptions": {
        "rescuedDataColumn": "_rescued_data",
        "ignoreChanges": "true"
    },
    "writeStreamOptions": {
        "checkpointLocation": "/Volumes/edsm/gold/utility/powerPlay/_checkpoints/"
    }
}

src_table_name          = settings.get("src_table_name")
dst_table_name          = settings.get("dst_table_name")
readStreamOptions       = settings.get("readStreamOptions")
writeStreamOptions      = settings.get("writeStreamOptions")
merge_condition         = settings.get("merge_condition")

def upsert_to_gold(microBatchDF, batchId):
    microBatchDF = microBatchDF.withColumn("created_on", col("ingest_time"))
    microBatchDF = microBatchDF.withColumn("deleted_on", lit(None).cast("timestamp"))
    microBatchDF = microBatchDF.withColumn("current_flag", lit("Yes"))
    microBatchDF = microBatchDF.withColumn("valid_from", col("ingest_time"))
    microBatchDF = microBatchDF.withColumn("valid_to", lit("9999-12-31 23:59:59").cast("timestamp"))

    fields_to_hash = ["id", "id64", "name", "power", "powerState", "state"]
    microBatchDF = microBatchDF.withColumn(
        "row_hash",
        sha2(to_json(struct(*[col(c) for c in fields_to_hash])),256)
    )
    microBatchDF = microBatchDF.dropDuplicates(["id", "id64", "name", "power", "row_hash"])

    dupes = microBatchDF.groupBy(fields_to_hash).count().filter("count > 1")
    dupes.show(truncate=False)
    print(dupes.count())

    # Sanity check
    create_table_if_not_exists(spark, microBatchDF, dst_table_name)
    
    microBatchDF.createOrReplaceTempView("updates")
    spark.sql(f"""
        MERGE INTO {dst_table_name} t
        USING updates s
        ON {merge_condition} AND t.current_flag='Yes'
        WHEN MATCHED AND t.row_hash<>s.row_hash THEN
            UPDATE SET
                t.deleted_on=s.ingest_time,
                t.current_flag='No',
                t.valid_to=s.ingest_time
    """)

    spark.sql(f"""
        INSERT INTO {dst_table_name}
        SELECT
            s.* EXCEPT (current_flag, deleted_on, valid_from, created_on, valid_to),
            s.ingest_time AS created_on,
            NULL AS deleted_on,
            'Yes' AS current_flag,
            s.ingest_time AS valid_from,
            CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS valid_to
        FROM updates s
        LEFT JOIN {dst_table_name} t
            ON {merge_condition} AND t.current_flag='Yes'
        WHERE t.current_flag IS NULL
    """)

history_df = spark.sql(f"DESCRIBE HISTORY {src_table_name}").select("version").orderBy("version").collect()
spark.sql(f"TRUNCATE TABLE {dst_table_name}")
for i in range(len(history_df) - 1):
    start_version = history_df[i]['version']
    end_version = history_df[i+1]['version']
    batchDF = (
        spark.read
        .format("delta")
        .option("readChangeData", "true")
        .option("startingVersion", start_version)
        .option("endingVersion", end_version)
        .table(src_table_name)
    )
    if batchDF.count() > 0:
        upsert_to_gold(batchDF, None)