### Exercices
1) Envoyer le fichier persons.json dans un topic Kakfka
2) Lire le topic en streaming avec Spark
3) Stocker les données en DB dans une table raw_data
4) Lecture en batch des données depuis la table raw_data
5) Parser les données JSON
6) Stocker les données nettoyées dans une table persons_data
7) Filtrer les personnes de plus de 30 ans
8) Compter le nombre de personne par ville
9) Ajouter une colonne age_group( yyoung -18, adult 18-64, senior 65+) et les afficher
10) Stocker les données nettoyés dans une table persons
11) Grouper les donnés par villes et age_group et les afficher le total de chaque groupe
12) Stocker les données agrégées dans une table persons_aggregated

1) Envoyer le fichier persons.json dans un topic Kakfka


In [1]:
import json
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer

topic = 'persons-exercice'
kafka_server = 'kafka:9092'

admin = KafkaAdminClient(bootstrap_servers=kafka_server)
topic_list = admin.list_topics()
if topic not in topic_list:
    new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
    admin.create_topics(new_topics=[new_topic])

producer = KafkaProducer(bootstrap_servers=[kafka_server], value_serializer=lambda v: json.dumps(v).encode('utf-8'))

with open('data/persons.json', 'r') as file:
    persons = json.load(file)
    for person in persons:
        producer.send(topic, value=person)

    producer.flush()

producer.close()

2) Lire le topic en streaming avec Spark


In [2]:
from pyspark.sql import SparkSession
topic = 'persons-exercice'
spark = SparkSession.builder.appName("SparkStreamingDemo").getOrCreate()
df_stream = spark.readStream.format("kafka") \
    .option('kafka.bootstrap.servers', kafka_server) \
    .option('subscribe', topic) \
    .option('startingOffsets', 'earliest') \
    .load()

query = df_stream.writeStream.format("console").start()
query.awaitTermination(10)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/08 07:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/08 07:08:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/10/08 07:08:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-98a523ee-cee3-4646-94f9-e7bf64ce9b90. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/10/08 07:08:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------------------+----------------+---------+------+--------------------+-------------+
| key|               value|           topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------------+---------+------+--------------------+-------------+
|null|[7B 22 69 64 22 3...|persons-exercice|        0|     0|2025-10-08 07:07:...|            0|
|null|[7B 22 69 64 22 3...|persons-exercice|        0|     1|2025-10-08 07:07:...|            0|
|null|[7B 22 69 64 22 3...|persons-exercice|        0|     2|2025-10-08 07:07:...|            0|
|null|[7B 22 69 64 22 3...|persons-exercice|        0|     3|2025-10-08 07:07:...|            0|
|null|[7B 22 69 64 22 3...|persons-exercice|        0|     4|2025-10-08 07:07:...|            0|
|null|[7B 22 69 64 22 3...|persons-exercice|        0|     5|2025-10-08 07:07:...|            0|
|null|[7B 22 69 64 22 3...|per

False

3) Stocker les données en DB dans une table raw_data

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkExercice").getOrCreate()
df_stream = spark.readStream.format("kafka") \
    .option('kafka.bootstrap.servers', kafka_server) \
    .option('subscribe', topic) \
    .option('startingOffsets', 'earliest') \
    .load()

def process_batch(batch_df, batch_id):
    batch_df.write.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/events") \
        .option("dbtable", "public.raw_data") \
        .option("user", "app") \
        .option("password", "1234") \
        .mode("append") \
        .save()

query = df_stream.writeStream \
    .foreachBatch(process_batch) \
    .start()
query.awaitTermination(20)
query.stop()

25/10/08 07:10:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/10/08 07:10:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-55633f0f-b28f-4a99-939a-9f1d6d7d3ff8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/10/08 07:10:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

+---+---------+---+-----------+-----------+
|id |name     |age|city       |age_group  |
+---+---------+---+-----------+-----------+
|1  |Alice    |30 |Paris      |adult 18-64|
|2  |Bob      |25 |Lyon       |adult 18-64|
|3  |Céline   |35 |Marseille  |adult 18-64|
|4  |David    |28 |Paris      |adult 18-64|
|5  |Emma     |40 |Bordeaux   |adult 18-64|
|6  |François |22 |Nice       |adult 18-64|
|7  |Gabrielle|31 |Strasbourg |adult 18-64|
|8  |Hugo     |27 |Lille      |adult 18-64|
|9  |Inès     |29 |Nantes     |adult 18-64|
|10 |Julien   |33 |Toulouse   |adult 18-64|
|11 |Karim    |41 |Montpellier|adult 18-64|
|12 |Laura    |36 |Paris      |adult 18-64|
|13 |Mathieu  |24 |Lyon       |adult 18-64|
|14 |Nina     |26 |Marseille  |adult 18-64|
|15 |Olivier  |39 |Bordeaux   |adult 18-64|
|16 |Pauline  |34 |Nice       |adult 18-64|
|17 |Quentin  |23 |Strasbourg |adult 18-64|
|18 |Rania    |32 |Lille      |adult 18-64|
|19 |Sophie   |38 |Nantes     |adult 18-64|
|20 |Thomas   |27 |Toulouse   |a

4) Lecture en batch des données depuis la table raw_data

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadRawData").getOrCreate()

df_raw = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/events") \
    .option("dbtable", "public.raw_data") \
    .option("user", "app") \
    .option("password", "1234") \
    .load()

df_raw.show(truncate=False)


25/10/08 07:14:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------+------+-----------------------+-------------+
|key |value                                                                                                                                                                                            |topic           |partition|offset|timestamp              |timestampType|
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------+------+-----------------------+-------------+
|null|[7B 22 69 64 22 3A 20 31 2C 20 22 6E 61 6D 65 22 3A 20 22 41 6C 69 63 65 22 2C 20 22 61 67 65 22 3A 20 33 30 2C 20 22 63 69 74 79 22 3A 20 22 50 61 72 69 73 22 7D]            

5) Parser les données JSON


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("SparkExercice").getOrCreate()
df_stream = spark.readStream.format("kafka") \
    .option('kafka.bootstrap.servers', kafka_server) \
    .option('subscribe', topic) \
    .option('startingOffsets', 'earliest') \
    .load()

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("city", StringType(), False)
])

lines = df_stream.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

25/10/08 07:15:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


6) Stocker les données nettoyées dans une table persons_data

In [7]:
def process_batch(batch_df, batch_id):
    batch_df.write.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/events") \
        .option("dbtable", "public.persons_data") \
        .option("user", "app") \
        .option("password", "1234") \
        .mode("append") \
        .save()

query = lines.writeStream \
    .foreachBatch(process_batch) \
    .start()
query.awaitTermination(10)
query.stop()

25/10/08 07:17:17 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8779dac5-4ef2-452b-bf00-033aa79961b9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/10/08 07:17:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

7) Filtrer les personnes de plus de 30 ans

In [43]:
df_persons = spark.read.format('jdbc')\
        .option("url", "jdbc:postgresql://postgres:5432/events") \
        .option("dbtable", "public.persons_data") \
        .option("user", "app") \
        .option("password", "1234") \
        .load()

df_persons.filter(df_persons.age >30).show()

+---+---------+---+-----------+
| id|     name|age|       city|
+---+---------+---+-----------+
|  3|   Céline| 35|  Marseille|
|  5|     Emma| 40|   Bordeaux|
|  7|Gabrielle| 31| Strasbourg|
| 10|   Julien| 33|   Toulouse|
| 11|    Karim| 41|Montpellier|
| 12|    Laura| 36|      Paris|
| 15|  Olivier| 39|   Bordeaux|
| 16|  Pauline| 34|       Nice|
| 18|    Rania| 32|      Lille|
| 19|   Sophie| 38|     Nantes|
| 23|  William| 37|       Lyon|
| 24|   Xavier| 45|  Marseille|
| 27|   Adrien| 33| Strasbourg|
| 28| Brigitte| 42|      Lille|
| 30| Delphine| 31|   Toulouse|
| 31|     Éric| 40|Montpellier|
| 33|  Georges| 36|       Lyon|
| 36|    Julie| 34|       Nice|
| 37|    Kevin| 39| Strasbourg|
| 40|    Nadia| 37|   Toulouse|
+---+---------+---+-----------+
only showing top 20 rows



8) Compter le nombre de personne par ville

In [44]:
from pyspark.sql.functions import count, sum

df_persons.groupBy("city").agg(count("name")).show()

+-----------+-----------+
|       city|count(name)|
+-----------+-----------+
|       Nice|          5|
|Montpellier|          4|
|      Lille|          5|
|     Nantes|          5|
|  Marseille|          5|
|      Paris|          6|
|       Lyon|          5|
|   Bordeaux|          5|
| Strasbourg|          5|
|   Toulouse|          5|
+-----------+-----------+



9) Ajouter une colonne age_group( young -18, adult 18-64, senior 65+) et les afficher


In [45]:
from pyspark.sql.functions import when, lit

df=df_persons.withColumn("age_group",
                      when((df_persons.age < 18), lit("young -18")).
                      when((df_persons.age >= 18) & (
                          df_persons.age < 65), lit("adult 18-64")).otherwise(lit("senior 65+")))


10) Stocker les données nettoyés dans une table persons

In [47]:
from pyspark.sql.functions import when, lit

# Write to PostgreSQL
df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/events") \
    .option("dbtable", "public.persons") \
    .option("user", "app") \
    .option("password", "1234") \
    .mode("append") \
    .save()


11) Grouper les donnés par villes et age_group et les afficher le total de chaque groupe

In [53]:
from pyspark.sql.functions import count

df_grouped =df.groupBy("age_group","city").count()

12) Stocker les données agrégées dans une table persons_aggregated

In [54]:
from pyspark.sql.functions import when, lit

# Write to PostgreSQL
df_grouped.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/events") \
    .option("dbtable", "public.persons_aggregated") \
    .option("user", "app") \
    .option("password", "1234") \
    .mode("append") \
    .save()