# Init OpenSky into Kafka

In [1]:
import requests
from requests.auth import HTTPBasicAuth
import time
from kafka import KafkaProducer
import json
from datetime import datetime, timedelta, timezone

correspond = ["icao24", "callsign", "origin_country", "time_position",
              "last_contact", "longitude", "latitude", "baro_altitude",
              "on_ground", "velocity", "true_track", "vertical_rate",
              "sensors", "geo_altitude", "squawk", "spi", "position_source", "category"]

# OpenSky credentials
USERNAME = "username"
PASSWORD = "password"

def send_opensky_to_kafka(topic, url, fields={}):
    kafka_config = {
        'bootstrap_servers': 'kafka1:9092',
    }

    producer = KafkaProducer(
        bootstrap_servers=kafka_config['bootstrap_servers'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    # response = requests.get(url, headers=headers, auth=HTTPBasicAuth(USERNAME, PASSWORD))
    response = requests.get(url)

    if response.status_code == 200:
        data = response.json()
        if data and "states" in data:
            for state in data["states"]:
                flight_info = {key: state[i] for i, key in enumerate(correspond) if key}
                flight_info["created_at"] = datetime.now(timezone.utc).isoformat()
                producer.send(topic, value=flight_info)
            producer.flush()    
            print(f"Sent {len(data['states'])} records to Kafka.")
        else:
            print("No valid data to send.")
    else:
        print(f"Failed to fetch data from OpenSky API. Status code: {response.status_code}")
        print(response.text)  # Print the response for debugging
    producer.close()

api_url_time = "https://opensky-network.org/api/states/all?extended={}&time={}"
api_url = "https://opensky-network.org/api/states/all?extended={}"

time = int((datetime.now(timezone.utc) - timedelta(minutes=5)).timestamp())
# send_opensky_to_kafka("opensky", api_url.format(1), {})


## Initiate beginning

## Imports

## Spark

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import from_json, col, window, avg, count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType
conf = SparkConf() \
    .setAppName('SparkApp') \
    .setMaster('spark://spark:7077') \
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .set("spark.sql.shuffle.partitions", "10")
 

sc = SparkContext.getOrCreate(conf=conf)

from pyspark.sql import SQLContext
# Créer un SQLContext pour les opérations SQL
sql_context = SQLContext(sc)

:: loading settings :: url = jar:file:/opt/conda/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-46f09d49-b4c8-4c8b-800e-d25804fa97d8;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.3 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import time
from IPython.display import clear_output

# Kafka configuration
kafka_broker = "kafka1:9092"
kafka_topic = "opensky"  # Update if necessary

# Define the schema for Kafka message, including `created_at`
schema = StructType([
    StructField("icao24", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("velocity", FloatType(), True)  # created_at as a string (to be converted later)
])

# Initialize Spark session
spark = SparkSession.builder.appName("KafkaStreamExample").getOrCreate()

raw_batch_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Parse Kafka messages and convert `created_at` to `TimestampType`
parsed_batch_df = raw_batch_df.selectExpr("CAST(value AS STRING) AS message") \
    .select(from_json(col("message"), schema).alias("data")) \
    .select(
        col("data.icao24").alias("icao24"),
    ) \
    .filter(col("icao24").isNotNull())

# Query to get the first distinct icao24 value
first_icao24_df = parsed_batch_df.select("icao24").distinct().limit(1)

# Show the result of the first ICAO24
first_icao24 = first_icao24_df.collect()[0]["icao24"]

raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Parse Kafka messages and convert `created_at` to `TimestampType`
parsed_stream = raw_stream.selectExpr("CAST(value AS STRING) AS message") \
    .select(from_json(col("message"), schema).alias("data")) \
    .select(
        col("data.icao24").alias("icao24"),
        col("data.origin_country").alias("origin_country"),
        col("data.velocity").alias("velocity"),
        to_timestamp(col("data.created_at"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX").alias("created_at")
    ) \
    .filter(col("icao24").isNotNull() & col("origin_country").isNotNull())

# Now filter the streaming data for the first ICAO24
filtered_stream = parsed_stream.filter(col("icao24") == first_icao24)


rolling_average = filtered_stream \
    .groupBy(window(col("created_at"), "1 minutes")) \
    .agg(avg("velocity").alias("velocity")) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("velocity")
    )  # Explicit ordering by window start

# Apply foreachBatch to print first plane's information in each batch
query = rolling_average.writeStream \
   .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()
query.awaitTermination()


25/03/24 12:55:18 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/03/24 12:55:18 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ad86e3f3-4463-445f-8f95-84c7462c05b3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/24 12:55:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/24 12:55:18 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                             

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-------------------+------------------+
|window_start       |window_end         |velocity          |
+-------------------+-------------------+------------------+
|2025-03-24 10:30:00|2025-03-24 11:00:00|181.28599853515624|
+-------------------+-------------------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+--------------+--------+----------+
|icao24|origin_country|velocity|created_at|
+------+--------------+--------+----------+
+------+--------------+--------+----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+------------------+
|window_start       |window_end         |velocity          |
+-------------------+-------------------+------------------+
|2025-03-24 10:30:00|2025-03-24 11:00:00|18

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+------+--------------+--------+----------+
|icao24|origin_country|velocity|created_at|
+------+--------------+--------+----------+
+------+--------------+--------+----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-------------------+------------------+
|window_start       |window_end         |velocity          |
+-------------------+-------------------+------------------+
|2025-03-24 10:30:00|2025-03-24 11:00:00|181.28599853515624|
+-------------------+-------------------+------------------+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-------------------+------------------+
|window_start       |window_end         |velocity          |
+-------------------+-------------------+------------------+
|2025-03-24 10:30:00|2025-03-24 11:00:00|181.28599853515624|
+-------------------+-------------------+------------------+

