In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

class DeltaTableIncrementalMerger:
    """
    A class to incrementally merge records from a source Delta table into a target Delta table.
    It creates the target table from the source table schema if it does not exist.
    """

    def __init__(
        self,
        source_table: str,
        target_table: str,
        checkpoint_location: str,
        order_by_col: str = "DateIngested",
        merge_condition: str = "target_df.RewardNumber = batch_df.RewardNumber AND target_df.OfferCode = batch_df.OfferCode",
        partition_by_cols: list = ["RewardNumber", "OfferCode"]
    ):
        """
        Initialize the DeltaTableIncrementalMerger with the given parameters.

        :param source_table: The source Delta table name.
        :param target_table: The target Delta table name.
        :param checkpoint_location: The location for storing checkpoints.
        :param order_by_col: The column to order by for incremental merging.
        :param merge_condition: The condition for merging records.
        :param partition_by_cols: The columns to partition by.
        """
        self.source_table = source_table
        self.target_table = target_table
        self.checkpoint_location = checkpoint_location
        self.order_by_col = order_by_col
        self.merge_condition = merge_condition
        self.partition_by_cols = partition_by_cols

    def create_table_from_template_if_not_exists(self, cols_to_remove: list = []):
        """
        Create the target table from the source table schema if it does not exist.

        :param cols_to_remove: List of columns to remove from the source schema.
        """
        if not spark.catalog.tableExists(self.target_table):
            template_df = spark.table(self.source_table)
            cols = [c for c in template_df.columns if c not in cols_to_remove]
            schema_str = ", ".join(
                f"{field.name} {field.dataType.simpleString()}"
                for field in template_df.schema.fields
                if field.name in cols
            )
            spark.sql(
                f"""
                CREATE TABLE {self.target_table} ({schema_str})
                USING DELTA
                TBLPROPERTIES (delta.enableChangeDataFeed = true)
                CLUSTER BY AUTO
                """
            )

    def start_incremental_merge(self):
        """
        Start the incremental merge process from the source table to the target table.
        """
        self.create_table_from_template_if_not_exists()

        source_cdf_df = (
            spark.readStream.format("delta")
            .option("readChangeData", "true")
            .table(self.source_table)
            .filter("_change_type = 'insert'")
        )

        def foreach_batch_function(batch_df, batch_id):
            """
            Function to process each batch of data during the streaming read.

            :param batch_df: The DataFrame for the current batch.
            :param batch_id: The ID of the current batch.
            """
            window_spec = Window.partitionBy(*self.partition_by_cols).orderBy(
                F.col(self.order_by_col).desc()
            )
            batch_df = (
                batch_df.withColumn("row_num", F.row_number().over(window_spec))
                .filter(F.col("row_num") == 1)
                .drop("row_num")
            )

            target_df = spark.table(self.target_table)
            batch_df.createOrReplaceTempView("batch_df")
            target_df.createOrReplaceTempView("target_df")

            spark.sql(
                f"""
                MERGE INTO {self.target_table} AS target_df
                USING batch_df
                ON {self.merge_condition}
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
                """
            )

        source_cdf_df.writeStream.foreachBatch(foreach_batch_function).option(
            "checkpointLocation", self.checkpoint_location
        ).trigger(availableNow=True).start()

        # Select the most recent set of records in self.source_table based on self.order_by_col
        most_recent_records_df = (
            spark.table(self.source_table)
            .withColumn("row_num", F.row_number().over(Window.partitionBy(*self.partition_by_cols).orderBy(F.col(self.order_by_col).desc())))
            .filter(F.col("row_num") == 1)
            .drop("row_num")
        )

        target_df = spark.table(self.target_table)
        temp_key_col = "temp_key"
        most_recent_records_df = most_recent_records_df.withColumn(temp_key_col, F.concat_ws("_", *self.partition_by_cols))
        target_df = target_df.withColumn(temp_key_col, F.concat_ws("_", *self.partition_by_cols))
        most_recent_records_df.createOrReplaceTempView("most_recent_records_df")
        target_df.createOrReplaceTempView("target_df")
        
        # Find records in target_df that are not in the most recent set of records
        records_not_in_most_recent_df = target_df.join(
            most_recent_records_df, on=[temp_key_col], how="left_anti"
        )
        records_not_in_most_recent_df.createOrReplaceTempView("records_not_in_most_recent_df")

        delete_condition = " AND ".join([f"target_df.{col} = records_not_in_most_recent_df.{col}" for col in self.partition_by_cols])
        spark.sql(
            f"""
            DELETE FROM {self.target_table} as target_df
            WHERE EXISTS (
                SELECT 1
                FROM records_not_in_most_recent_df
                WHERE {delete_condition}
            )
            """
        )

merger = DeltaTableIncrementalMerger(
    source_table="clubroyale.offers.bronze_offers",
    target_table="clubroyale.offers.silver_offers",
    checkpoint_location="/Volumes/clubroyale/offers/checkpoints//bronze_to_silver",
    order_by_col="DateIngested",
    partition_by_cols=["RewardNumber", "OfferCode"]
)
merger.start_incremental_merge()

In [0]:
%sql select count(*) from clubroyale.offers.bronze_offers

In [0]:
%sql SELECT count(*) FROM `clubroyale`.`offers`.`silver_offers`;