<a href="https://colab.research.google.com/github/crystalloide/Big_Data/blob/master/Spark%2BStreaming%2BKafka%2BTwitter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

*Basé sur [direct_kafka_wordcount.py](https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py)*

[`S. Alleaume`](https://github.com/crystalloide/Big_Data), 17 mai 2025

## Utilisation de Kafka dans un notebook Jupyter

In [3]:
# Installation de trois packages essentiels:
# - PySpark: L'interface Python pour Apache Spark, un framework de traitement de données distribuées populaire pour l'analyse de données à grande échelle.
# - PyArrow: Une bibliothèque pour le traitement efficace de données en mémoire et la compatibilité avec le format Apache Arrow, souvent utilisée avec Spark pour des performances optimales.
# - kafka-python: Un client Python pour Apache Kafka, permettant d'interagir avec des systèmes de messagerie en temps réel pour le streaming de données.

!pip3 install pyspark pyarrow kafka-python



In [5]:
# Version du connecteur Kafka: spark-sql-kafka-0-10 est la version plus récente et recommandée pour travailler avec Kafka dans Spark. La version 0-8 est obsolète et n'est plus maintenue.
# Version Scala: 2.12 est compatible avec les versions récentes de Spark.
# Version de Spark: version 3.3.0 compatible avec les environnements Colab actuels.

# Cette configuration permet d'utiliser l'API Structured Streaming de Spark avec Kafka,
# qui offre une meilleure performance et une API plus simple que l'ancien Spark Streaming DStream utilisé précédemment


import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

## Import dependencies

In [8]:

# Remarque : SparkContext et SparkConf sont remplacés par SparkSession, point d'entrée recommandé pour les applications Spark modernes.
# Les classes StreamingContext et KafkaUtils de l'ancien module pyspark.streaming.kafka ne sont plus utilisées dans l'API Structured Streaming.
# Importations pour pyspark.sql.functions et pyspark.sql.types, essentielles pour travailler avec l'API DataFrame et Structured Streaming.

# Avec cette nouvelle configuration, vous pourrez créer un pipeline de streaming de données plus robuste et maintenable,
# avec un meilleur support pour les traitements à base de SQL et une meilleure gestion des garanties de livraison des messages.
# À l'étape suivante, il faudra  créer une SparkSession et configurer une source de streaming Kafka.

# Print to stdout
from __future__ import print_function

# Spark
from pyspark.sql import SparkSession

# json parsing
import json

# Pour utiliser des fonctions SQL et des opérations sur les colonnes
from pyspark.sql.functions import col, from_json, explode
from pyspark.sql.types import StructType, StringType, StructField

## Create Spark context

In [9]:
# Création d'une SparkSession plutôt qu'un SparkContext
spark = SparkSession.builder \
    .appName("PythonStructuredStreamingKafkaWordCount") \
    .getOrCreate()

## Create Streaming context, with a batch duration of 10 seconds

* http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext
* http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext

In [None]:
# Avec l'API Structured Streaming moderne, plus besoin de créer un StreamingContext.
# L'intervalle de traitement sera défini lors de la configuration du streaming

# Optionnel: définir des configurations de log pour réduire la verbosité
spark.sparkContext.setLogLevel("WARN")

# Changements importants:

# Suppression de la création du StreamingContext qui n'est plus utilisé dans l'API Structured Streaming.
# Avec l'API Structured Streaming, le concept d'intervalle de traitement par lots (par exemple : 10 secondes)
# est remplacé par les modes de traitement déclenchés (trigger modes) qui sont définis au moment de démarrer la requête de streaming.

# La nouvelle approche est plus flexible car permet de:
# - Configurer le mode de traitement des données (trigger) au moment de la définition de la requête
# - Faire des mises à jour continues sans microbatches (mode continu)
# - Ou définir un intervalle de traitement par lots similaire à l'ancien modèle

# Plus tard, lorsque vous définirez votre requête de streaming,
# vous pourrez spécifier un mode de déclenchement équivalent à l'ancien intervalle de 10 secondes avec par exemple :

# Utilisation lors de la configuration d'une requête de streaming
query = df.writeStream \
    .trigger(processingTime='10 seconds') \
    .format("...") \
    .start()

## Connect to Kafka

Topic `twitter`, Consumer group `spark-streaming`

* Voici la documentation à jour pour l'intégration Structured Streaming avec Kafka:

Pour Spark 3.x: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

#### L'ancienne API Spark Streaming a été remplacée pour utiliser l'API Structured Streaming moderne avec le nouveau connecteur Kafka.

#### Définition des options Kafka pour la lecture en streaming :

kafka_options = {
    "kafka.bootstrap.servers": "cdh57-01-node-01.moffatt.me:9092",
    "subscribe": "twitter",
    "startingOffsets": "latest",
    "group.id": "spark-streaming"
}

####  Création d'un DataFrame de streaming depuis Kafka :
kafkaDF = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

#### Convertion de la valeur des messages Kafka (en binaire) en chaîne de caractères :
valueDF = kafkaDF.selectExpr("CAST(value AS STRING)")

### Modifications importantes:

#### Remplacement de l'adresse Zookeeper (port 2181) par l'adresse des brokers Kafkabrokers Kafka (port 9092)
#### Avcec les protocole Kraft, le connecteur Kafka se connecte directement aux brokers Kafka sans passer par Zookeeper

### API:
#### KafkaUtils.createStream() est remplacé par l'API DataFrame avec spark.readStream.format("kafka")
#### L'API moderne est plus facile à utiliser et offre de meilleures garanties de traitement


### Configuration:
#### Utilisation de startingOffsets pour définir à partir de quel offset lire (équivalent du comportement de l'ancien consommateur)
#### Le groupe de consommateurs est maintenant configuré via l'option group.id


### Traitement des données:

#### Les messages Kafka sont lus sous forme de DataFrame avec des colonnes standardisées (key, value, topic, partition, offset, timestamp, etc.)
#### Une étape de conversion est nécessaire pour transformer les valeurs binaires en chaînes de caractères

In [None]:
kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})

## Parse the inbound message as json

In [None]:
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

## Count the number of instance of each tweet text

In [None]:
text_counts = parsed.map(lambda tweet: (tweet['text'],1)).\
  reduceByKey(lambda x,y: x + y)

### Print the text counts (first ten shown)

In [None]:
text_counts.pprint()

## Count the number of tweets per author

In [None]:
author_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],1)).\
  reduceByKey(lambda x,y: x + y)

### Print the author tweet counts (first ten shown)

In [None]:
author_counts.pprint()

## Start the streaming context

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2017-01-04 14:28:20
-------------------------------------------
(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\u2019s nominee for\u2026 https://t.co/eONdIe8xvL | #Election2016', 1)
(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\u2019s nominee for\u2026 https://t.co/NC6njITxcE | #Election2016', 1)
(u'A new approach to moving ore from remote mines: #airships! https://t.co/HmuoGQjEzC', 1)
(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement package: Donald Trump\u2019s nominee for\u2026 https://t.co/sk05rwzxMa | #Election2016', 1)
(u'Donald Trump appears to trust Julian Assange more than US intelligence agents https://t.co/ShlmBXoW4Z #election2016 https://t.co/HS95S98fOS', 1)
(u'Ngopo2 kudu dipikir disik. Ojo nganti kowe kagol amargo opo sing tok lakoni kuwi ora di pikir sek.', 1)
(u'Rex Tillerson cuts ties with Exxon Mobil via $180m retirement pack