# Exercice 18 - Streaming Spark Avance

## Objectifs
- Comprendre les watermarks et le traitement tardif
- Maitriser les differents modes de trigger
- Implementer des jointures en streaming
- Gerer les checkpoints pour la tolerance aux pannes

---

## 1. Concepts avances du streaming

```
+------------------------------------------------------------------+
|                  STREAMING SPARK AVANCE                          |
+------------------------------------------------------------------+
|                                                                  |
|  WATERMARK : Gestion des donnees tardives                        |
|  +---------------------------------------------------------+    |
|  |                                                         |    |
|  |  Temps reel    Watermark         Donnees acceptees      |    |
|  |      |            |                    |                |    |
|  |   12:05        12:00               >= 12:00             |    |
|  |      |<-- 5min -->|                                     |    |
|  |                                                         |    |
|  |  Si delai = 5 min, les donnees avec timestamp < 12:00   |    |
|  |  seront ignorees (arrivees trop tard)                   |    |
|  +---------------------------------------------------------+    |
|                                                                  |
|  TRIGGERS : Frequence de traitement                              |
|  +---------------------------------------------------------+    |
|  |  - processingTime("10 seconds") : toutes les 10 sec     |    |
|  |  - once()                       : une seule fois        |    |
|  |  - continuous("1 second")       : latence minimale      |    |
|  |  - availableNow()               : traite tout dispo     |    |
|  +---------------------------------------------------------+    |
|                                                                  |
|  CHECKPOINTS : Tolerance aux pannes                              |
|  +---------------------------------------------------------+    |
|  |  Sauvegarde :                                           |    |
|  |  - Offsets Kafka traites                                |    |
|  |  - Etat des aggregations                                |    |
|  |  - Metadata du stream                                   |    |
|  +---------------------------------------------------------+    |
|                                                                  |
+------------------------------------------------------------------+
```

## 2. Configuration

In [1]:
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, from_json, to_timestamp, window, expr,
    count, sum as spark_sum, avg, max as spark_max, min as spark_min,
    current_timestamp, lit, struct, to_json
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    DoubleType, ArrayType, TimestampType
)

# --- CONFIGURATION COMPLETE (KAFKA + S3) ---
SPARK_VER = os.environ.get("SPARK_VER", "3.5.0")
SCALA_SUFFIX = os.environ.get("SCALA_SUFFIX", "2.13")

# 1. Le paquet Kafka (Dynamique)
kafka_package = f"org.apache.spark:spark-sql-kafka-0-10_{SCALA_SUFFIX}:{SPARK_VER}"

# 2. Les paquets S3/MinIO (Indispensables pour s3a://)
s3_packages = "org.apache.hadoop:hadoop-aws:3.4.1,com.amazonaws:aws-java-sdk-bundle:1.12.262"

# 3. On combine le tout
all_packages = f"{kafka_package},{s3_packages}"

print(f"Chargement des librairies : {all_packages}")

spark = SparkSession.builder \
    .appName("StreamingAvance") \
    .config("spark.jars.packages", all_packages) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

KAFKA_BROKER = "broker:29092"
CHECKPOINT_PATH = "s3a://bronze/checkpoints"

print(f"Session prête (Kafka + S3 supportés)")

Chargement des librairies : org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.0,org.apache.hadoop:hadoop-aws:3.4.1,com.amazonaws:aws-java-sdk-bundle:1.12.262
Session prête (Kafka + S3 supportés)


## 3. Watermarks - Gestion des donnees tardives

In [2]:
# Schema des commandes
commande_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("total", DoubleType(), True),
    StructField("status", StringType(), True)
])

# Lire le stream Kafka
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", "commandes-json") \
    .option("startingOffsets", "earliest") \
    .load()

print("Stream configure")

Stream configure


In [3]:
# Parser et ajouter watermark
df_parsed = df_stream \
    .select(
        from_json(col("value").cast("string"), commande_schema).alias("data"),
        col("timestamp").alias("kafka_time")
    ) \
    .select(
        "data.order_id",
        "data.customer_id",
        "data.total",
        to_timestamp("data.timestamp").alias("event_time"),
        "kafka_time"
    ) \
    .withWatermark("event_time", "5 minutes")  # Tolere 5 min de retard

print("Watermark defini : 5 minutes")

Watermark defini : 5 minutes


In [4]:
# Aggregation avec fenetre glissante et watermark
df_windowed = df_parsed \
    .groupBy(
        window(col("event_time"), "2 minutes", "1 minute"),  # Fenetre 2min, glisse 1min
        "customer_id"
    ) \
    .agg(
        count("*").alias("nb_commandes"),
        spark_sum("total").alias("total_ventes"),
        spark_max("total").alias("max_commande")
    )

print("Aggregation fenetre glissante configuree")

Aggregation fenetre glissante configuree


In [5]:
# Lancer le stream avec mode update
query_watermark = df_windowed \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="10 seconds") \
    .start()

print("Stream avec watermark demarre")

Stream avec watermark demarre


In [6]:
# Arreter apres 30 secondes
time.sleep(30)
query_watermark.stop()
print("Stream arrete")

Stream arrete


## 4. Differents modes de trigger

In [7]:
# Mode once() - Traitement unique
# Utile pour le traitement batch incremental

df_once = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", "commandes-json") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(
        from_json(col("value").cast("string"), commande_schema).alias("data")
    ) \
    .select("data.*")

query_once = df_once \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(once=True) \
    .start()

query_once.awaitTermination()
print("Traitement once() termine")

Traitement once() termine


In [8]:
# Mode availableNow() - Traite tout ce qui est disponible
# Similaire a once() mais avec meilleure gestion des partitions

df_available = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", "commandes-json") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(
        from_json(col("value").cast("string"), commande_schema).alias("data")
    ) \
    .select("data.order_id", "data.customer_id", "data.total")

query_available = df_available \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(availableNow=True) \
    .start()

query_available.awaitTermination()
print("Traitement availableNow() termine")

Traitement availableNow() termine


## 5. Checkpoints - Tolerance aux pannes

In [9]:
# Stream avec checkpoint
df_checkpoint = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", "commandes-json") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(
        from_json(col("value").cast("string"), commande_schema).alias("data"),
        col("timestamp").alias("kafka_time")
    ) \
    .select(
        "data.order_id",
        "data.customer_id",
        "data.total",
        "kafka_time"
    )

print("Stream avec checkpoint prepare")

Stream avec checkpoint prepare


In [10]:
# Ecrire avec checkpoint dans MinIO
query_ckpt = df_checkpoint \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3a://bronze/streaming/commandes") \
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/commandes") \
    .trigger(processingTime="10 seconds") \
    .start()

print("Stream avec checkpoint demarre")
print(f"Checkpoint: {CHECKPOINT_PATH}/commandes")

Stream avec checkpoint demarre
Checkpoint: s3a://bronze/checkpoints/commandes


In [11]:
# Verifier le statut
print("Statut du stream:")
print(f"  ID: {query_ckpt.id}")
print(f"  Run ID: {query_ckpt.runId}")
print(f"  Actif: {query_ckpt.isActive}")
print(f"  Status: {query_ckpt.status}")

Statut du stream:
  ID: 24681f96-cd02-4ba4-855e-e663c2ab6b57
  Run ID: 1e0e3b27-5d9f-4165-bb57-48d4675811bb
  Actif: True
  Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [12]:
# Attendre et arreter
time.sleep(20)
query_ckpt.stop()
print("Stream arrete - checkpoint sauvegarde")

Stream arrete - checkpoint sauvegarde


## 6. Jointure Stream-Static

In [13]:
# Creer un DataFrame statique de reference
# Simule une table de clients

clients_data = [
    ("CUST-001", "Alice Martin", "Paris", "VIP"),
    ("CUST-002", "Bob Dupont", "Lyon", "Standard"),
    ("CUST-003", "Claire Leroy", "Marseille", "Premium"),
    ("CUST-004", "David Moreau", "Toulouse", "Standard"),
    ("CUST-005", "Emma Petit", "Nice", "VIP"),
]

df_clients = spark.createDataFrame(
    clients_data,
    ["customer_id", "nom", "ville", "segment"]
)

df_clients.show()

+-----------+------------+---------+--------+
|customer_id|         nom|    ville| segment|
+-----------+------------+---------+--------+
|   CUST-001|Alice Martin|    Paris|     VIP|
|   CUST-002|  Bob Dupont|     Lyon|Standard|
|   CUST-003|Claire Leroy|Marseille| Premium|
|   CUST-004|David Moreau| Toulouse|Standard|
|   CUST-005|  Emma Petit|     Nice|     VIP|
+-----------+------------+---------+--------+



In [14]:
# Stream de commandes
df_commandes_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", "commandes-json") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(
        from_json(col("value").cast("string"), commande_schema).alias("data")
    ) \
    .select(
        "data.order_id",
        "data.customer_id",
        "data.total"
    )

print("Stream de commandes pret")

Stream de commandes pret


In [15]:
# Jointure stream-static
df_enrichi = df_commandes_stream.join(
    df_clients,
    on="customer_id",
    how="left"
).select(
    "order_id",
    "customer_id",
    "nom",
    "ville",
    "segment",
    "total"
)

print("Jointure stream-static configuree")

Jointure stream-static configuree


In [16]:
# Executer la jointure
query_join = df_enrichi \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="5 seconds") \
    .start()

print("Stream avec jointure demarre")

Stream avec jointure demarre


In [17]:
time.sleep(20)
query_join.stop()
print("Stream arrete")

Stream arrete


## 7. Ecriture vers Kafka (Stream to Stream)

In [18]:
# Lire, transformer et ecrire vers un autre topic
df_transform = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", "commandes-json") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(
        from_json(col("value").cast("string"), commande_schema).alias("data")
    ) \
    .select(
        col("data.customer_id").alias("key"),
        struct(
            col("data.order_id"),
            col("data.customer_id"),
            col("data.total"),
            (col("data.total") * 0.2).alias("tva"),
            current_timestamp().alias("processed_at")
        ).alias("value")
    ) \
    .select(
        col("key"),
        to_json(col("value")).alias("value")
    )

print("Transformation preparee")

Transformation preparee


In [19]:
# Ecrire vers un nouveau topic Kafka
query_kafka = df_transform \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("topic", "commandes-enrichies") \
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/kafka-to-kafka") \
    .trigger(processingTime="5 seconds") \
    .start()

print("Stream Kafka -> Kafka demarre")

Stream Kafka -> Kafka demarre


In [20]:
time.sleep(20)
query_kafka.stop()
print("Stream arrete")

Stream arrete


## 8. Gestion de plusieurs streams

In [21]:
# Lister tous les streams actifs
streams = spark.streams.active

print(f"Nombre de streams actifs: {len(streams)}")
for stream in streams:
    print(f"  - {stream.name}: {stream.id}")

Nombre de streams actifs: 0


In [22]:
# Arreter tous les streams
for stream in spark.streams.active:
    stream.stop()
    print(f"Stream {stream.id} arrete")

print("Tous les streams arretes")

Tous les streams arretes


---

## Exercice

**Objectif** : Creer un pipeline streaming complet

**Consigne** :
1. Lisez le topic "logs-application" en streaming
2. Ajoutez un watermark de 2 minutes
3. Calculez le nombre de logs par niveau (INFO, WARNING, ERROR) par fenetre de 1 minute
4. Ecrivez les resultats dans la console

A vous de jouer :

In [23]:
# TODO: Definir le schema des logs
from pyspark.sql.types import StructType, StructField, StringType

schema_logs = StructType([
    StructField("timestamp", StringType(), True),
    StructField("level", StringType(), True),
    StructField("service", StringType(), True),
    StructField("message", StringType(), True)
])

print("Schéma logs défini.")

Schéma logs défini.


In [24]:
# TODO: Lire le stream avec watermark

# 1. Lecture brute
df_logs_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("subscribe", "logs-application") \
    .option("startingOffsets", "latest") \
    .load()

# 2. Parsing et Watermark
df_logs = df_logs_raw.select(
    from_json(col("value").cast("string"), schema_logs).alias("data")
).select(
    "data.level",
    "data.service",
    "data.message",
    to_timestamp("data.timestamp").alias("event_time")
).withWatermark("event_time", "2 minutes")

print("Stream logs initialisé avec Watermark.")

Stream logs initialisé avec Watermark.


In [25]:
# TODO: Aggreger par niveau et fenetre

# Agrégation
df_logs_count = df_logs \
    .groupBy(
        window("event_time", "1 minute"),
        "level"
    ) \
    .count() \
    .orderBy("window")

# Affichage Console
query_logs = df_logs_count.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .queryName("logs_monitoring") \
    .start()

# Laissez tourner quelques secondes pour voir le résultat du générateur
import time
time.sleep(15)
query_logs.stop()
print("Streaming terminé.")

Streaming terminé.


---

## Resume

Dans ce notebook, vous avez appris :
- Comment utiliser les **watermarks** pour gerer les donnees tardives
- Les differents **modes de trigger** (processingTime, once, availableNow)
- Comment configurer les **checkpoints** pour la tolerance aux pannes
- Comment faire des **jointures stream-static**
- Comment **ecrire vers Kafka** depuis un stream

### Prochaine etape
Dans le prochain notebook, nous construirons un pipeline complet Bronze/Silver/Gold.