# Imports

In [0]:
from pyspark.sql.functions import col, split, explode, expr, monotonically_increasing_id

# Generate bronze table

In [0]:
spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "text") \
    .load("/Volumes/workspace/dev/trading/") \
.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/Volumes/workspace/dev/trading/checkpoint/bronze") \
    .trigger(availableNow=True) \
    .table("workspace.default.bronze_fix_messages")

In [0]:
%sql
SELECT * FROM workspace.default.bronze_fix_messages

# Generate silver table

In [0]:
# Read from the bronze table
raw_df = (spark.readStream
          .table("workspace.default.bronze_fix_messages")
          .filter(col("value").startswith("8=")))

# Dictionnaire des tags à extraire
tags = {
    "BeginString": "8",
    "MsgType": "35",
    "SenderCompID": "49",
    "TargetCompID": "56",
    "MsgSeqNum": "34",
    "SendingTime": "52",
    "ClOrdID": "11",
    "Symbol": "55",
    "Side": "54",
    "OrderQty": "38",
    "Price": "44",
    "CheckSum": "10"
}

# Créer une colonne temporaire en tableau clé-valeur
kv_df = raw_df.withColumn("fields", split(col("value"), "\\|"))

# Pour chaque tag, extraire la valeur associée
for col_name, tag in tags.items():
    kv_df = kv_df.withColumn(
        col_name,
        expr(f"""
            filter(fields, x -> x like '{tag}=%')[0]
        """)
    ).withColumn(
        col_name,
        expr(f"split({col_name}, '=')[1]")
    )

parsed_df = kv_df.select("value", *tags.keys())

def upsert_to_silver(microBatchDF, batchId):
    # Enlever les doublons dans le micro-batch
    dedup_df = microBatchDF.dropDuplicates(["ClOrdID", "SendingTime", "SenderCompID"])

    # Enregistrer le micro-batch comme une vue temporaire SQL
    dedup_df.createOrReplaceTempView("updates")

    # Effectuer un MERGE (upsert)
    spark.sql("""
        MERGE INTO workspace.default.silver_fix_messages AS target
        USING updates AS source
        ON target.ClOrdID = source.ClOrdID
           AND target.SendingTime = source.SendingTime
           AND target.SenderCompID = source.SenderCompID
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

# Démarrer le stream avec foreachBatch
(parsed_df.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", "/Volumes/workspace/dev/trading/checkpoint/silver_merge")
    .trigger(availableNow=True)
    .start())


In [0]:
%sql
SELECT * FROM workspace.default.silver_fix_messages