In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType

# Première requête streaming : compter le nombre d'avion par région


In [None]:
spark = SparkSession.builder \
    .appName("OpenSkyStreaming-Callsigns") \
    .master("spark://spark:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .getOrCreate()

schema1 = StructType([
    StructField("region", StringType(), True),
    StructField("count", IntegerType(), True)
])

# Lecture du flux Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092") \
    .option("subscribe", "opensky-planes") \
    .load()

# Parsing du JSON plus simple
df = df.selectExpr("CAST(value AS STRING) as json_str")
parsed_df = df.select(from_json(col("json_str"), schema1).alias("data")).select("data.*")

# Écriture en console
query = parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()



# 2ème requête streaming : Suivi en temps réel des vols commerciaux (callsigns non vides)

In [None]:
spark = SparkSession.builder \
    .appName("OpenSkyStreaming-Callsigns") \
    .master("spark://spark:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .getOrCreate()

schema = StructType([
    StructField("callsign", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("baro_altitude", DoubleType(), True),
    StructField("velocity", DoubleType(), True)
])

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092") \
    .option("subscribe", "opensky-callsigns") \
    .load()

df = df.selectExpr("CAST(value AS STRING) as json_str")
parsed_df = df.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

# Filtrer les avions avec un callsign non vide
filtered_df = parsed_df.filter(col("callsign").isNotNull() & (col("callsign") != ""))

query = filtered_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


25/03/24 17:31:57 WARN NetworkClient: [AdminClient clientId=adminclient-2] Connection to node 1 (43bd697e26f9/172.18.0.6:9092) could not be established. Broker may not be available.
25/03/24 17:31:57 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 1 (43bd697e26f9/172.18.0.6:9092) could not be established. Broker may not be available.
25/03/24 17:31:57 WARN NetworkClient: [AdminClient clientId=adminclient-5] Connection to node 1 (43bd697e26f9/172.18.0.6:9092) could not be established. Broker may not be available.
25/03/24 17:31:57 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1 (43bd697e26f9/172.18.0.6:9092) could not be established. Broker may not be available.
25/03/24 17:31:57 WARN NetworkClient: [AdminClient clientId=adminclient-3] Connection to node 1 (43bd697e26f9/172.18.0.6:9092) could not be established. Broker may not be available.
25/03/24 17:31:57 WARN NetworkClient: [AdminClient clientId=adminclient-2] Connection to n