# 02 - Unified Streaming Pipeline

**Single SparkSession** orchestrating all data layers:

- **Bronze**: Raw Kafka → Delta (append-only log)
- **Silver**: Cleaning, normalization, type coercion
- **Silver_ML**: Feature engineering with rolling windows
- **Gold**: Aggregations, KPIs, metrics

Performance optimizations:
- Micro-batch streaming (30s trigger)
- Minimal shuffle operations
- Type-hinting + production logging

In [1]:
import os
import logging
from typing import Optional
from datetime import datetime

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, from_json, from_unixtime, to_timestamp, round as spark_round,
    lag, avg, stddev, row_number, when, sqrt, pow, lit,
    window, sum as spark_sum, max as spark_max, min as spark_min,
    count, broadcast
)
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType, IntegerType, BooleanType, LongType
)
from pyspark.sql.window import Window

from dotenv import load_dotenv
import sys
sys.path.insert(0, '/home/jovyan/work')

from config import get_s3_path, create_spark_session

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

load_dotenv()

BRONZE_PATH: str = get_s3_path("bronze", "flights")
SILVER_PATH: str = get_s3_path("silver", "flights")
SILVER_ML_PATH: str = get_s3_path("silver", "flights_ml")
GOLD_TRAFFIC_PATH: str = get_s3_path("gold", "traffic_by_country")
GOLD_METRICS_PATH: str = get_s3_path("gold", "metrics_by_category")

CHECKPOINT_BRONZE: str = get_s3_path("checkpoints", "bronze")
CHECKPOINT_SILVER: str = get_s3_path("checkpoints", "silver")
CHECKPOINT_SILVER_ML: str = get_s3_path("checkpoints", "silver_ml")
CHECKPOINT_GOLD_TRAFFIC: str = get_s3_path("checkpoints", "gold_traffic")
CHECKPOINT_GOLD_METRICS: str = get_s3_path("checkpoints", "gold_metrics")

KAFKA_BOOTSTRAP: str = os.getenv("KAFKA_BOOTSTRAP", "kafka1:9092")
TOPIC_NAME: str = os.getenv("TOPIC_NAME", "opensky-data")
AIRPORTS_CSV: str = "./data/airports.csv"

PROCESSING_TIME: str = "120 seconds"

logger.info(f"Bronze: {BRONZE_PATH} | Silver: {SILVER_PATH} | Gold: {GOLD_TRAFFIC_PATH}")

INFO:__main__:Bronze: s3a://datalake/bronze/flights | Silver: s3a://datalake/silver/flights | Gold: s3a://datalake/gold/traffic_by_country


✅ Configuration chargée depuis .env


In [15]:
spark: SparkSession = create_spark_session(
    "UnifiedPipeline",
    extra_packages=["org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3"],
    shuffle_partitions=6
)

logger.info("Spark session initialized")

INFO:__main__:Spark session initialized


✅ Spark Session 'UnifiedPipeline' configurée


## Bronze Stream - Kafka to Delta

In [None]:
schema_bronze: StructType = StructType([
    StructField("time", LongType(), True),
    StructField("icao24", StringType(), True),
    StructField("callsign", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("time_position", LongType(), True),
    StructField("last_contact", LongType(), True),
    StructField("longitude", FloatType(), True),
    StructField("latitude", FloatType(), True),
    StructField("baro_altitude", FloatType(), True),
    StructField("on_ground", BooleanType(), True),
    StructField("velocity", FloatType(), True),
    StructField("true_track", FloatType(), True),
    StructField("vertical_rate", FloatType(), True),
    StructField("geo_altitude", FloatType(), True),
    StructField("squawk", StringType(), True),
    StructField("spi", BooleanType(), True),
    StructField("position_source", IntegerType(), True),
    StructField("category", IntegerType(), True)
])

kafka_stream: DataFrame = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP) \
    .option("subscribe", TOPIC_NAME) \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

df_bronze: DataFrame = kafka_stream.select(
    from_json(col("value").cast("string"), schema_bronze).alias("data")
).select("data.*")

query_bronze = df_bronze.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(processingTime=PROCESSING_TIME) \
    .option("checkpointLocation", CHECKPOINT_BRONZE) \
    .option("mergeSchema", "true") \
    .start(BRONZE_PATH)

logger.info("Bronze stream started")

26/01/23 20:24:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/23 20:24:04 WARN StreamingQueryManager: Stopping existing streaming query [id=5de058b5-78d5-4071-b99e-c51668b4ed50, runId=3e4c3d84-fb09-49e5-bd7a-ed5a12cda30a], as a new run is being started.
INFO:__main__:Bronze stream started


26/01/23 20:24:04 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
26/01/23 20:24:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


In [17]:
df_silver: DataFrame = spark.readStream.format("delta").load(BRONZE_PATH) \
    .filter(col("icao24").isNotNull()) \
    .filter(col("latitude").isNotNull() & col("longitude").isNotNull()) \
    .withColumn("event_timestamp", to_timestamp(from_unixtime(col("time")))) \
    .withColumn("velocity_kmh", spark_round(col("velocity") * 3.6, 2)) \
    .withColumn("altitude_meters", col("baro_altitude")) \
    .select(
        "event_timestamp", "icao24", "callsign", "origin_country",
        "longitude", "latitude", "velocity_kmh", "altitude_meters",
        "on_ground", "category"
    )

query_silver = df_silver.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(processingTime=PROCESSING_TIME) \
    .option("checkpointLocation", CHECKPOINT_SILVER) \
    .option("mergeSchema", "true") \
    .start(SILVER_PATH)

logger.info("Silver stream started")

26/01/23 20:24:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/23 20:24:08 WARN StreamingQueryManager: Stopping existing streaming query [id=fb8e3cba-f4b7-401f-bbcf-060035ff7f36, runId=c6c67ff9-ce00-41ed-ae1e-6aac57cafd3a], as a new run is being started.
INFO:__main__:Silver stream started


## Airports Reference Data

In [18]:
df_airports: DataFrame = spark.read \
    .option("header", "true") \
    .csv(AIRPORTS_CSV) \
    .select(
        col("ident").alias("airport_icao"),
        col("name").alias("airport_name"),
        col("iso_country").alias("airport_country"),
        col("latitude_deg").cast("double").alias("airport_lat"),
        col("longitude_deg").cast("double").alias("airport_lon")
    ) \
    .filter(col("type").isin("large_airport", "medium_airport"))

logger.info(f"Loaded {df_airports.count()} airports")

INFO:__main__:Loaded 5211 airports


## Silver_ML Stream - Feature Engineering

In [19]:
def process_ml_batch(batch_df: DataFrame, batch_id: int) -> None:
    """Micro-batch feature engineering for ML layer."""
    
    if batch_df.isEmpty():
        return
    
    df_base = batch_df \
        .filter(col("icao24").isNotNull()) \
        .filter(col("latitude").isNotNull() & col("longitude").isNotNull()) \
        .withColumn("event_timestamp", to_timestamp(from_unixtime(col("time")))) \
        .withColumn("velocity_kmh", spark_round(col("velocity") * 3.6, 2)) \
        .withColumn("altitude_meters", col("baro_altitude"))
    
    df_clean = df_base \
        .filter(col("altitude_meters").between(-500, 15000)) \
        .filter(col("velocity_kmh").between(0, 1200))
    
    if df_clean.isEmpty():
        return
    
    w_aircraft = Window.partitionBy("icao24").orderBy("event_timestamp")
    
    df_temporal = df_clean \
        .withColumn("prev_altitude", lag("altitude_meters", 1).over(w_aircraft)) \
        .withColumn("prev_velocity", lag("velocity_kmh", 1).over(w_aircraft)) \
        .withColumn("altitude_change", col("altitude_meters") - col("prev_altitude")) \
        .withColumn("velocity_change", col("velocity_kmh") - col("prev_velocity")) \
        .withColumn("observation_rank", row_number().over(w_aircraft))
    
    df_on_ground = df_temporal.filter(col("on_ground") == True)
    df_in_flight = df_temporal.filter(col("on_ground") == False)
    
    if df_on_ground.count() > 0:
        df_with_dist = df_on_ground.crossJoin(broadcast(df_airports)) \
            .withColumn(
                "dist",
                sqrt(pow(col("latitude") - col("airport_lat"), 2) +
                     pow(col("longitude") - col("airport_lon"), 2))
            )
        
        w_dist = Window.partitionBy("icao24", "event_timestamp")
        df_closest = df_with_dist.withColumn("min_dist", spark_min("dist").over(w_dist)) \
            .filter(col("dist") == col("min_dist")) \
            .drop("dist", "min_dist", "airport_lat", "airport_lon")
        
        df_enriched = df_closest.unionByName(
            df_in_flight.withColumn("airport_icao", lit(None))
                        .withColumn("airport_name", lit(None))
                        .withColumn("airport_country", lit(None)),
            allowMissingColumns=True
        )
    else:
        df_enriched = df_in_flight \
            .withColumn("airport_icao", lit(None)) \
            .withColumn("airport_name", lit(None)) \
            .withColumn("airport_country", lit(None))
    
    w_rolling = Window.partitionBy("icao24").orderBy("event_timestamp").rowsBetween(-5, 0)
    
    df_rolling = df_enriched \
        .withColumn("rolling_avg_altitude", avg("altitude_meters").over(w_rolling)) \
        .withColumn("rolling_std_altitude", stddev("altitude_meters").over(w_rolling)) \
        .withColumn("rolling_avg_velocity", avg("velocity_kmh").over(w_rolling))
    
    df_ml = df_rolling.withColumn(
        "flight_phase",
        when(col("on_ground") == True, "GROUND")
            .when((col("altitude_change") > 50) & (col("altitude_meters") < 3000), "TAKEOFF")
            .when(col("altitude_change") > 20, "CLIMB")
            .when(col("altitude_change").between(-20, 20) & (col("altitude_meters") > 8000), "CRUISE")
            .when(col("altitude_change") < -20, "DESCENT")
            .otherwise("TRANSITION")
    )
    
    df_final = df_ml.select(
        "event_timestamp", "icao24", "callsign", "origin_country",
        "longitude", "latitude", "velocity_kmh", "altitude_meters",
        "on_ground", "category",
        "prev_altitude", "prev_velocity", "altitude_change", "velocity_change",
        "observation_rank",
        "airport_icao", "airport_name", "airport_country",
        "rolling_avg_altitude", "rolling_std_altitude", "rolling_avg_velocity",
        "flight_phase"
    )
    
    df_final.write.format("delta").mode("append").save(SILVER_ML_PATH)


df_bronze_ml = spark.readStream.format("delta").load(BRONZE_PATH)

query_silver_ml = df_bronze_ml.writeStream \
    .foreachBatch(process_ml_batch) \
    .trigger(processingTime=PROCESSING_TIME) \
    .option("checkpointLocation", CHECKPOINT_SILVER_ML) \
    .start()

logger.info("Silver_ML stream started")

26/01/23 20:24:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/23 20:24:12 WARN StreamingQueryManager: Stopping existing streaming query [id=f29f4bbb-96a5-4979-9c67-8e2cbd8eeec8, runId=ec693f22-1502-44c0-b262-24c85667063d], as a new run is being started.
INFO:__main__:Silver_ML stream started


## Gold Streams - Aggregations

In [20]:
df_gold_traffic = spark.readStream.format("delta").load(SILVER_PATH) \
    .filter(col("origin_country").isNotNull()) \
    .withColumn("window", window(col("event_timestamp"), "5 minutes")) \
    .groupBy("window", "origin_country") \
    .agg(
        count("icao24").alias("aircraft_count"),
        spark_round(avg("velocity_kmh"), 2).alias("avg_velocity_kmh"),
        spark_round(avg("altitude_meters"), 0).alias("avg_altitude_m")
    ) \
    .select(
        col("window").alias("time_window"), "origin_country", "aircraft_count",
        "avg_velocity_kmh", "avg_altitude_m"
    )

query_gold_traffic = df_gold_traffic.writeStream \
    .format("delta") \
    .outputMode("complete") \
    .trigger(processingTime=PROCESSING_TIME) \
    .option("checkpointLocation", CHECKPOINT_GOLD_TRAFFIC) \
    .option("mergeSchema", "true") \
    .start(GOLD_TRAFFIC_PATH)

logger.info("Gold (Traffic) stream started")

26/01/23 20:24:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/23 20:24:15 WARN StreamingQueryManager: Stopping existing streaming query [id=49f34648-087e-4710-a6a8-d5e90d5f62dd, runId=68ceddd5-1678-48eb-abc5-f893d6be5cc5], as a new run is being started.
INFO:__main__:Gold (Traffic) stream started


26/01/23 20:24:16 WARN HDFSBackedStateStoreProvider: The state for version 49 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:16 WARN HDFSBackedStateStoreProvider: The state for version 49 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.


In [None]:
df_gold_metrics = spark.readStream.format("delta").load(SILVER_ML_PATH) \
    .filter(col("category").isNotNull()) \
    .withColumn("window", window(col("event_timestamp"), "5 minutes")) \
    .groupBy("window", "category", "flight_phase") \
    .agg(
        count("icao24").alias("aircraft_count"),
        spark_round(avg("velocity_kmh"), 2).alias("avg_velocity_kmh"),
        spark_round(avg("altitude_meters"), 0).alias("avg_altitude_m"),
        spark_round(avg("rolling_avg_altitude"), 0).alias("rolling_altitude_m")
    ) \
    .select(
        "window", "category", "flight_phase", "aircraft_count",
        "avg_velocity_kmh", "avg_altitude_m", "rolling_altitude_m"
    )

query_gold_metrics = df_gold_metrics.writeStream \
    .format("delta") \
    .outputMode("complete") \
    .trigger(processingTime=PROCESSING_TIME) \
    .option("checkpointLocation", CHECKPOINT_GOLD_METRICS) \
    .option("mergeSchema", "true") \
    .start(GOLD_METRICS_PATH)

logger.info("Gold (Metrics) stream started")

26/01/23 20:24:16 WARN HDFSBackedStateStoreProvider: The state for version 49 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:16 WARN HDFSBackedStateStoreProvider: The state for version 49 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:16 WARN HDFSBackedStateStoreProvider: The state for version 49 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:16 WARN HDFSBackedStateStoreProvider: The state for version 49 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Dat

26/01/23 20:24:18 WARN HDFSBackedStateStoreProvider: The state for version 24 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:18 WARN HDFSBackedStateStoreProvider: The state for version 24 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:18 WARN HDFSBackedStateStoreProvider: The state for version 24 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:18 WARN HDFSBackedStateStoreProvider: The state for version 24 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
26/01/23 20:24:18 WARN HDFSBackedStateStoreProvider: The state for version 24 doesn't exist in loadedMaps. Reading s

In [None]:
import time

logger.info("Unified Pipeline Started")
logger.info(f"Time: {datetime.now().strftime('%H:%M:%S UTC')}")
logger.info("Active Streams: Bronze, Silver, Silver_ML, Gold_Traffic, Gold_Metrics")
logger.info("Processing Time: 30 seconds | Spark UI: http://localhost:4040")
print()

# Monitoring loop

## Shutdown Streams

In [None]:
def shutdown_all() -> None:
    """Graceful shutdown of all streams and Spark session."""
    logger.info("Shutting down pipeline")
    
    streams = [
        ("Bronze", query_bronze),
        ("Silver", query_silver),
        ("Silver_ML", query_silver_ml),
        ("Gold_Traffic", query_gold_traffic),
        ("Gold_Metrics", query_gold_metrics)
    ]
    
    for name, stream in streams:
        try:
            stream.stop()
            logger.info(f"{name} stopped")
        except Exception as e:
            logger.error(f"Error stopping {name}: {str(e)}")
    
    spark.stop()
    logger.info("Spark session stopped")

shutdown_all()