In [0]:
source_table = "bridge.bridge_connect.referrals"

clients_config = {
    "agt": {
        "target_table": "deltasharing.agt.references_bridge_connect",
        "db_values": ["db0","db2"],
    },
    # "telio": {
    #     "target_table": "deltasharing_dev.telio.references_bridge_connect",
    #     "db_values": ["db1"],
    # },
}

all_db_values = []
for cfg in clients_config.values():
    all_db_values.extend(cfg["db_values"])

all_db_values = sorted(set(all_db_values))

checkpoint_path = "/Volumes/bridge/bridge_connect/checkpoint_bc_pipeline"


In [0]:
from pyspark.sql.functions import col

changes_df = (
    spark.readStream
        .format("delta")
        .option("readChangeData", "true")
        .table(source_table)
        .where(col("_change_type").isin("insert", "update_postimage"))
        .where(col("source_db").isin(all_db_values))
)



In [0]:
from pyspark.sql.functions import col, desc, row_number
from pyspark.sql.window import Window
def upsert_to_targets(batch_df, batch_id):
    if len(batch_df.limit(1).collect()) == 0 : # is empty
        return
    
    for client_name, cfg in clients_config.items():
        target_table = cfg["target_table"]
        db_vals = cfg["db_values"]

        df_client = batch_df.filter(col("source_db").isin(db_vals))

        if len(df_client.limit(1).collect()) == 0: # is empty
            print(f"Client {client_name}: aucune ligne pour db in {db_vals} dans ce batch.")
            continue
        # rare : mais dans le cas ou on deux mêmes lignes (id +source_db) dans le même batch :
        if "_commit_version" in df_client.columns: 
            w = Window.partitionBy("id", "source_db").orderBy(desc("_commit_version")) # order by commit version
        elif "_commit_timestamp" in df_client.columns:
            w = Window.partitionBy("id", "source_db").orderBy(desc("_commit_timestamp"))# order by commit timestamp
        else:
            w = Window.partitionBy("id", "source_db").orderBy(desc("id")) # order by id

        df_client = (
            df_client
            .withColumn("rn", row_number().over(w))
            .filter(col("rn") == 1)
            .drop("rn")
        )

        print(f"Client {client_name}: traitement de {df_client.count()} lignes ")

        view_name = f"changes_{client_name}"
        df_client.createOrReplaceTempView(view_name)

        merge_sql = f"""
            MERGE INTO {target_table} AS t
            USING {view_name} AS s
              ON  t.id = s.id         
              AND t.source_db = s.source_db        
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """

        spark.sql(merge_sql)



In [0]:
query = (
    changes_df.writeStream
        .foreachBatch(upsert_to_targets)
        .option("checkpointLocation", checkpoint_path) 
        .trigger(availableNow=True)   
        .start()
)

query.awaitTermination()

# -------------------Sommaire d'Execution-----------
print("\n===== STREAM SUMMARY =====")
print("Status:", query.status)

if query.exception() is not None:
    print("\n❌ ERROR OCCURRED:")
    print(query.exception())
else:
    print("\n✅ STREAM COMPLETED SUCCESSFULLY")

print("\nLast progress event:")
print(query.lastProgress)
