In [0]:
%pip install confluent-kafka supabase

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%sql
CREATE CATALOG IF NOT EXISTS invistis;
CREATE SCHEMA IF NOT EXISTS invistis.datalake;
CREATE VOLUME IF NOT EXISTS invistis.datalake.raw;

In [0]:
# -----------------------------
# Kafka Config
# -----------------------------
CONFLUENT_BOOTSTRAP = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
CONFLUENT_API_KEY   = "XGXDR43CWRIHV5HG"
CONFLUENT_SECRET    = "cflthFPYEOhMt81diPo6zq/9+MUBOyLt6oFOvvKlw0B3De42LE23KIir+Elwegdw"

CHECKPOINT_PATH = "/Volumes/invistis/datalake/raw/checkpoints"
DATA_PATH       = "/Volumes/invistis/datalake/raw/enriched"

kafka_options = {
    "kafka.bootstrap.servers": CONFLUENT_BOOTSTRAP,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{CONFLUENT_API_KEY}" password="{CONFLUENT_SECRET}";',
    "startingOffsets": "earliest",
    "failOnDataLoss": "false"
}

print("✅ Config OK")

✅ Config OK


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Schéma des trades
trades_schema = StructType([
    StructField("symbol", StringType()),
    StructField("price", DoubleType()),
    StructField("quantity", DoubleType()), 
    StructField("timestamp", LongType()),
    StructField("trade_id", StringType())
])

# Schéma des news
news_schema = StructType([
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("source", StringType()),
    StructField("published_at", StringType()),
    StructField("sentiment", DoubleType())
])

In [0]:
trades_df = (spark.read
    .format("kafka")
    .options(**kafka_options)
    .option("subscribe", "trades_topic")
    .option("startingOffsets", "earliest")
    .load()
    .select(from_json(col("value").cast("string"), trades_schema).alias("data"))
    .select("data.*")
    .withColumn("event_time", to_timestamp(col("timestamp") / 1000))
)

print("✅ Trades loaded from Kafka")

✅ Trades loaded from Kafka


In [0]:
news_df = (spark.readStream
    .format("kafka")
    .options(**kafka_options)
    .option("subscribe", "news_topic")
    .load()
    .select(from_json(col("value").cast("string"), news_schema).alias("data"), "timestamp")
    .select("data.*", col("timestamp").alias("kafka_ts"))
    .withColumn("event_time", to_timestamp(col("published_at")))  # <- utilise published_at
    .withColumn("symbol", lit("BTCUSDT"))  # ou mettre dynamique selon besoin
    .withWatermark("event_time", "5 minutes")
)

print("✅ News stream OK")

✅ News stream OK


In [0]:
joined_df = trades_df.alias("t").join(
    news_df.alias("n"),
    expr("""
        t.symbol = n.symbol AND
        t.event_time BETWEEN n.event_time - INTERVAL 24 HOURS
        AND n.event_time + INTERVAL 24 HOURS
    """),
    "left"
).select(
    col("t.symbol"),
    col("t.price"),
    col("t.quantity").alias("quantity"),  # <- corrige qty -> quantity
    col("t.event_time").alias("trade_time"),
    col("n.title").alias("news_title"),
    col("n.sentiment"),
    col("n.event_time").alias("news_time"),
    window(col("t.event_time"), "5 minutes").alias("window_5min")
)

print("✅ Join OK")

✅ Join OK


In [0]:
query_lake = (joined_df.writeStream
    .format("delta")
    .outputMode("append")  # <-- append obligatoire
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/lake")
    .option("path", DATA_PATH)
    .trigger(availableNow=True)
    .start()
)

query_lake.awaitTermination()
print("✅ Data written to Delta Lake")

In [0]:
# Lire les données enrichies depuis Delta Lake pour voir le résultat
df = spark.read.format("delta").load(DATA_PATH)
display(df.limit(50))

symbol,price,quantity,trade_time,news_title,sentiment,news_time,window_5min
BTCUSDT,66546.26,,2026-02-25T14:23:21.130Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.27,,2026-02-25T14:23:21.313Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.27,,2026-02-25T14:23:21.359Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.27,,2026-02-25T14:23:21.423Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:21.534Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.296Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.635Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.696Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.936Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:23.342Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"


In [0]:
# Charger le Delta Lake
df = spark.read.format("delta").load("/Volumes/invistis/datalake/raw/enriched")

# Vérifier le schéma exact pour voir le nom de toutes les colonnes
df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- trade_time: timestamp (nullable = true)
 |-- news_title: string (nullable = true)
 |-- sentiment: double (nullable = true)
 |-- news_time: timestamp (nullable = true)
 |-- window_5min: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)



In [0]:
# Afficher 20 lignes pour vérifier les valeurs
display(df.limit(20))

symbol,price,quantity,trade_time,news_title,sentiment,news_time,window_5min
BTCUSDT,66546.26,,2026-02-25T14:23:21.130Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.27,,2026-02-25T14:23:21.313Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.27,,2026-02-25T14:23:21.359Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.27,,2026-02-25T14:23:21.423Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:21.534Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.296Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.635Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.696Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:22.936Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"
BTCUSDT,66546.26,,2026-02-25T14:23:23.342Z,,,,"List(2026-02-25T14:20:00.000Z, 2026-02-25T14:25:00.000Z)"


In [0]:
# Schéma des trades
trades_schema = StructType([
    StructField("symbol", StringType()),
    StructField("price", DoubleType()),
    StructField("quantity", DoubleType()), 
    StructField("timestamp", LongType()),
    StructField("trade_id", StringType())
])

# Schéma des news
news_schema = StructType([
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("source", StringType()),
    StructField("published_at", StringType()),
    StructField("sentiment", DoubleType())
])

In [0]:
trades_df = (spark.read
    .format("kafka")
    .options(**kafka_options)
    .option("subscribe", "trades_topic")
    .option("startingOffsets", "earliest")
    .load()
    .select(from_json(col("value").cast("string"), trades_schema).alias("data"))
    .select("data.*")
    .withColumn("event_time", to_timestamp(col("timestamp") / 1000))
)

print("✅ Trades loaded from Kafka")
trades_df.show(20, truncate=False)

✅ Trades loaded from Kafka
+-------+-------+--------+-------------+--------+-----------------------+
|symbol |price  |quantity|timestamp    |trade_id|event_time             |
+-------+-------+--------+-------------+--------+-----------------------+
|ETHUSDT|1825.7 |0.0023  |1771935088791|NULL    |2026-02-24 12:11:28.791|
|ETHUSDT|1825.7 |0.0029  |1771935088791|NULL    |2026-02-24 12:11:28.791|
|ETHUSDT|1825.7 |0.0854  |1771935088791|NULL    |2026-02-24 12:11:28.791|
|BTCUSDT|63224.1|2.0E-4  |1771935088796|NULL    |2026-02-24 12:11:28.796|
|SOLUSDT|76.78  |0.067   |1771935089396|NULL    |2026-02-24 12:11:29.396|
|BNBUSDT|586.86 |0.009   |1771935089408|NULL    |2026-02-24 12:11:29.408|
|BNBUSDT|586.86 |0.009   |1771935089408|NULL    |2026-02-24 12:11:29.408|
|BNBUSDT|586.86 |0.009   |1771935089408|NULL    |2026-02-24 12:11:29.408|
|BNBUSDT|586.86 |0.009   |1771935089408|NULL    |2026-02-24 12:11:29.408|
|BNBUSDT|586.85 |0.009   |1771935089408|NULL    |2026-02-24 12:11:29.408|
|BNBUSDT|58