# Test de consommation Kafka avec Spark Streaming

Ce notebook teste la consommation de données de fréquence cardiaque depuis Kafka via Spark Streaming.

In [1]:
# Cell 1 : Imports
# Import des modules nécessaires pour Spark et Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, FloatType

# Vérification plus complète des imports
import importlib

libraries = [
    "pyspark",
    "pyspark.sql",
    "pyspark.sql.functions",
    "pyspark.sql.types",
]

failed_imports = []
for lib in libraries:
    try:
        importlib.import_module(lib)
    except ImportError:
        failed_imports.append(lib)

if not failed_imports:
    print("✅ Tous les imports nécessaires ont réussi :")
    for lib in libraries:
        print(f"   - {lib}")
else:
    print("❌ Problème d'import sur :")
    for lib in failed_imports:
        print(f"   - {lib}")
    raise ImportError(f"Echec d'import : {failed_imports}")

✅ Tous les imports nécessaires ont réussi :
   - pyspark
   - pyspark.sql
   - pyspark.sql.functions
   - pyspark.sql.types


In [2]:
# Cell 2 : Créer une SparkSession avec robustesse et logging
import logging
import sys
import traceback
from datetime import datetime

# Configuration du logger pour le notebook
logger = logging.getLogger("spark_session_logger")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s', datefmt='%H:%M:%S')

# Ajouter un StreamHandler pour affichage direct dans le notebook
if not logger.hasHandlers():
    ch = logging.StreamHandler(sys.stdout)
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(formatter)
    logger.addHandler(ch)

try:
    logger.info("Démarrage de la création de SparkSession...")
    spark = SparkSession.builder \
        .appName("KafkaHeartRateConsumer") \
        .master("spark://spark-master:7077") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
        .getOrCreate()
    logger.info("SparkSession créée avec succès.")

    # Réduire le niveau de logs Spark (ne réduit pas ceux du logger Python)
    spark.sparkContext.setLogLevel("WARN")
    logger.info(f"Version de Spark détectée : {spark.version}")

except Exception as e:
    logger.error("Erreur lors de la création de la SparkSession.")
    logger.error(f"Détails: {e}")
    traceback_lines = traceback.format_exc().splitlines()
    for line in traceback_lines:
        logger.error(line)
    raise  # On arrête l'exécution du notebook ici pour que l'utilisateur voie l'erreur.

[17:28:32][INFO] Démarrage de la création de SparkSession...
[17:28:43][INFO] SparkSession créée avec succès.
[17:28:43][INFO] Version de Spark détectée : 3.5.0


In [3]:
# Cell 3 : Configurer les paramètres Kafka
# Configuration des paramètres de connexion Kafka
import os

kafka_bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka1:19092")
kafka_topic = "fake-heart-data-test"

print(f"Broker Kafka : {kafka_bootstrap_servers}")
print(f"Topic : {kafka_topic}")
print("Configuration Kafka terminée")

Broker Kafka : kafka1:19092
Topic : fake-heart-data-test
Configuration Kafka terminée


In [4]:
# Cell 4 : Créer le schéma JSON
# Définition du schéma pour parser les messages JSON Kafka
schema = StructType([
    StructField("timestamp", FloatType()),
    StructField("bpm", FloatType()),
    StructField("rr_interval_ms", FloatType())
])

print("Schéma JSON défini :")
print(schema)

Schéma JSON défini :
StructType([StructField('timestamp', FloatType(), True), StructField('bpm', FloatType(), True), StructField('rr_interval_ms', FloatType(), True)])


In [5]:
# Cell 5 : Lire depuis Kafka en streaming
# Essayons d'établir le DataFrame de streaming depuis Kafka avec gestion d'erreur claire
try:
    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic) \
        .option("startingOffsets", "latest") \
        .load()
    print("✅ Le connecteur Kafka est bien chargé.")
except Exception as e:
    print("❌ Erreur lors de la connexion au connecteur Kafka :")
    print(e)
    import traceback
    traceback.print_exc()
    raise

print("DataFrame Kafka créé")
print("Schéma brut du DataFrame :")
kafka_df.printSchema()

✅ Le connecteur Kafka est bien chargé.
DataFrame Kafka créé
Schéma brut du DataFrame :
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# Cell 6 : Parser les messages JSON
# Conversion et parsing des messages JSON
parsed_df = kafka_df \
    .selectExpr("CAST(value AS STRING) as json_string") \
    .select(from_json("json_string", schema).alias("data")) \
    .select("data.*")

print("Messages JSON parsés")
print("Schéma du DataFrame parsé :")
parsed_df.printSchema()

Messages JSON parsés
Schéma du DataFrame parsé :
root
 |-- timestamp: float (nullable = true)
 |-- bpm: float (nullable = true)
 |-- rr_interval_ms: float (nullable = true)



In [None]:
# Cell 7 : Afficher les données avec logs détaillés
import sys
import traceback

print("Tentative de configuration du stream et d'affichage en console...")

try:
    print("➡️ Lancement de writeStream.mem: création de la requête de streaming...")
    query = (
        parsed_df.writeStream
        .format("console")
        # .queryName("heart_rate_stream")
        .outputMode("append")
        .start()
    )
    print("✅ Stream démarré - En attente de données Kafka...")
    print("ℹ️ Appuyez sur Stop (carré rouge) pour arrêter le streaming")
except Exception as e:
    print("❌ Erreur lors de la configuration ou du démarrage du stream Spark :")
    print(e)
    traceback.print_exc(file=sys.stdout)
    query = None

# Attendre la terminaison (interruption manuelle ou erreur)
if query is not None:
    try:
        print("⏳ Attente de la terminaison du stream (Ctrl+C pour interrompre ou bouton Stop)...")
        query.awaitTermination()
        print("✅ Stream terminé proprement.")
    except Exception as e:
        print("❌ Erreur lors de awaitTermination du stream :")
        print(e)
        traceback.print_exc(file=sys.stdout)
else:
    print("⚠️ Impossible d'attendre la terminaison : la requête de streaming n'a pas pu être lancée.")


Tentative de configuration du stream et d'affichage en console...
➡️ Lancement de writeStream.mem: création de la requête de streaming...
✅ Stream démarré - En attente de données Kafka...
ℹ️ Appuyez sur Stop (carré rouge) pour arrêter le streaming
⏳ Attente de la terminaison du stream (Ctrl+C pour interrompre ou bouton Stop)...


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 