In [0]:
confluentClusterName = "cluster_0"
confluentBootstrapServers = ""
confluentTopicName = "binance-trades"
confluentApiKey = ""
confluentSecret = ""
schemaRegistryUrl = ""
confluentRegistryApiKey = ""
confluentRegistrySecret = ""
deltaTablePath = "/mnt/databricks/predictions_output"
checkpointPath = '/mnt/databricks/checkpoints/my_model_streaming'

In [0]:
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
streamTestDf = (
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", confluentBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
  .withColumn('key', fn.col("key").cast(StringType()))
  .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
  .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
  .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key', 'valueSchemaId','fixedValue')
)

from pyspark.sql.functions import from_json, col, expr, concat, lit, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType

schema = StructType([
    StructField("E", LongType()),
    StructField("s", StringType()),
    StructField("t_trade", LongType()),
    StructField("p", StringType()),
    StructField("q", StringType()),
    StructField("T", LongType()),
    StructField("m", BooleanType()),
    StructField("M_flag", BooleanType()),  # modifié ici
])

decoded_df = streamTestDf.selectExpr("CAST(fixedValue AS STRING) as json_str")

json_fixed_df = decoded_df.select(
    regexp_replace(regexp_replace("json_str", r'("M":)', '"M_flag":'), r'("t":)', '"t_trade":').alias("fixed_json_str")
)

json_clean_df = json_fixed_df.select(
    concat(lit("{"), expr("substring(fixed_json_str, instr(fixed_json_str, '\"E\"'))")).alias("cleaned_json")
)

parsed_df = json_clean_df.select(
    from_json(col("cleaned_json"), schema).alias("data")
).select("data.*")

# 1. Lance un writeStream vers une table temporaire mémoire
query = (
    parsed_df.writeStream
    .format("memory")
    .queryName("parsed_kafka")
    .outputMode("append")
    .start()
)

import time
waiting_time = 200
time.sleep(waiting_time)

# 3. Lire les données de la mémoire (batch statique)
batch_df = spark.sql("SELECT * FROM parsed_kafka")

# 4. Écrire vers la Delta Table
batch_df.write.format("delta").mode("overwrite").save(deltaTablePath)

# 5. Arrêter le stream
query.stop()
