In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_date, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

import shutil
import os

In [None]:
# Supprimer le répertoire de checkpoint (historique)
checkpoint_dir = "/tmp/checkpoints/planets"
if os.path.exists(checkpoint_dir):
    shutil.rmtree(checkpoint_dir)

In [None]:
# Initialisation de la session Spark
spark = SparkSession.builder \
    .appName("AppPlanetDiscovery") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0") \
    .getOrCreate()

In [None]:
# Définir le schéma
schema = StructType([
    StructField("id", StringType(), True),
    StructField("nom", StringType(), True),
    StructField("decouvreur", StringType(), True),
    StructField("date_de_decouverte", StringType(), True), 
    StructField("masse", StringType(), True),                
    StructField("rayon", StringType(), True),                
    StructField("distance", StringType(), True),            
    StructField("type", StringType(), True),
    StructField("statut", StringType(), True),
    StructField("atmosphere", StringType(), True),
    StructField("temperature_moyenne", StringType(), True),
    StructField("periode_orbitale", StringType(), True),    
    StructField("nombre_de_satellites", StringType(), True),
    StructField("presence_deau", StringType(), True)       
])


In [None]:
# Lecture du stream Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "planet-discoveries") \
    .load()

In [None]:
# Extraction du message JSON
messages_df = kafka_df.selectExpr("CAST(value AS STRING)")
json_df = messages_df.select(from_json(col("value"), schema).alias("data")).select("data.*")

# Conversion des colonnes aux bons types
final_df = json_df \
    .withColumn("date_de_decouverte", to_date(col("date_de_decouverte"), "yyyy-MM-dd")) \
    .withColumn("masse", col("masse").cast(DoubleType())) \
    .withColumn("rayon", col("rayon").cast(DoubleType())) \
    .withColumn("distance", col("distance").cast(DoubleType())) \
    .withColumn("temperature_moyenne", col("temperature_moyenne").cast(DoubleType())) \
    .withColumn("periode_orbitale", col("periode_orbitale").cast(DoubleType())) \
    .withColumn("nombre_de_satellites", col("nombre_de_satellites").cast(IntegerType())) \
    .withColumn("presence_deau", when(col("presence_deau") == "oui", True)
                                    .when(col("presence_deau") == "non", False)
                                    .otherwise(None))




In [None]:
# Fonction pour traiter chaque batch
def process_batch(batch_df, batch_id):
    print(f"\n=== Batch {batch_id} ===")
    batch_df.show(truncate=False)

    # Enregistrement en CSV (facultatif pour les graphes après coup)
    csv_path = f"planet_discoveries_batch_{batch_id}.csv"
    batch_pd = batch_df.toPandas()
    batch_pd.to_csv(csv_path, index=False)
    print(f"Batch {batch_id} sauvegardé dans {csv_path}")



In [None]:
# Stream avec foreachBatch (affichage et stockage local)
query = final_df.writeStream \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", checkpoint_dir) \
    .start()

query.awaitTermination()

In [7]:
# Afficher les données de la table en mémoire
spark.sql("SELECT * FROM planets").show()


+---+---+----------+------------------+-----+-----+--------+----+------+----------+-------------------+----------------+--------------------+-------------+
| id|nom|decouvreur|date_de_decouverte|masse|rayon|distance|type|statut|atmosphere|temperature_moyenne|periode_orbitale|nombre_de_satellites|presence_deau|
+---+---+----------+------------------+-----+-----+--------+----+------+----------+-------------------+----------------+--------------------+-------------+
+---+---+----------+------------------+-----+-----+--------+----+------+----------+-------------------+----------------+--------------------+-------------+

