In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
!pip install openap
!pip install requests
!pip install pymongo[srv]
from google.colab import drive
drive.mount('/content/drive')

import os
from pyspark.sql import SparkSession

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

spark = SparkSession.builder \
    .appName("RealTimeAviationPipeline") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("Spark Session Created.")

csv_path = "/content/drive/MyDrive/aircraft-database-complete-2025-08.csv"

try:
    aircraft_df = spark.read.option("header", "true").csv(csv_path)

    new_columns = [c.replace("'", "") for c in aircraft_df.columns]
    aircraft_df = aircraft_df.toDF(*new_columns)

    valid_aircraft = aircraft_df.filter("typecode IS NOT NULL").select("icao24", "typecode")
    aircraft_map = {row['icao24']: row['typecode'] for row in valid_aircraft.collect()}
    bc_aircraft_db = spark.sparkContext.broadcast(aircraft_map)

    print(f"Database Loaded & Broadcasted. Count: {len(aircraft_map)}")

except Exception as e:
    print(e)

os.makedirs("/content/flight_data", exist_ok=True)
print("Setup Complete.")

Mounted at /content/drive
Spark Session Created.
Database Loaded & Broadcasted. Count: 616628
Setup Complete.


In [None]:
import time
import json
import requests
import threading
import os
import glob
from datetime import datetime

stop_streaming = False
folder_path = "/content/flight_data"

def cleanup_old_files():
    files = sorted(glob.glob(f"{folder_path}/*.json"))

    if len(files) > 2:
        for f in files[:-2]:
            try:
                os.remove(f)
            except:
                pass

def fetch_live_data():
    batch_id = 0
    url = "https://opensky-network.org/api/states/all"

    while not stop_streaming:
        try:
            response = requests.get(url, timeout=10)

            if response.status_code == 200:
                data = response.json()
                states = data.get("states", [])

                if states:
                    timestamp = int(time.time())
                    filename = f"{folder_path}/batch_{timestamp}.json"

                    with open(filename, "w") as f:
                        json.dump(states, f)

                    print(f"Batch {batch_id} Ingested: {len(states)} flights")
                    batch_id += 1

                    cleanup_old_files()
            else:
                print(f"API Error: {response.status_code}")

        except Exception as e:
            print(f"Connection Error: {e}")

        time.sleep(100)

stream_thread = threading.Thread(target=fetch_live_data)
stream_thread.daemon = True
stream_thread.start()

print("Live Streaming Started...")

Live Streaming Started...


In [None]:
stop_streaming = True

In [None]:
from openap import prop


unique_types_rows = aircraft_df.select("typecode").distinct().collect()
unique_types = [row.typecode for row in unique_types_rows if row.typecode]

print(f"Building physics database for {len(unique_types)} types...")


openap_data = {}

for tc in unique_types:
    try:

        aircraft = prop.aircraft(tc)
        if aircraft and 'limits' in aircraft and 'MTOW' in aircraft['limits']:
             openap_data[tc] = {
                 'mtow': aircraft['limits']['MTOW']
             }
    except:
        pass
bc_openap_db = spark.sparkContext.broadcast(openap_data)

print(f"Success! Physics Database Broadcasted. Valid Models: {len(openap_data)}")
if "B744" in openap_data:
    print(f"Verification Passed: B744 MTOW is {openap_data['B744']['mtow']} kg")
else:
    print("Warning: B744 still missing.")

Building physics database for 3323 types...
Success! Physics Database Broadcasted. Valid Models: 37
Verification Passed: B744 MTOW is 396800 kg


In [16]:
import pymongo
from datetime import datetime, timezone
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from openap import FuelFlow, Emission

MONGO_URI = "mongodb+srv://<username>:<password>@cluster.mongodb.net/"
DB_NAME = "AviationIntelligence"
COLLECTION_NAME = "FlightStream"

DEFAULT_AC_TYPE = "E190"
DEFAULT_MTOW_KG = 28000.0


def lookup_typecode(icao24):
    if not icao24: return None
    return bc_aircraft_db.value.get(icao24)


def determine_phase(altitude, vertical_rate, speed):
    if altitude is None or vertical_rate is None or speed is None:
        return "CRUISE"

    if altitude < 200 and vertical_rate < -500:
        return "LANDING"
    elif altitude < 200 and speed < 92.6:
        return "TAXI"
    elif vertical_rate > 500 and altitude < 2000:
        return "TAKEOFF"
    elif vertical_rate > 500:
        return "ASCENDING"
    elif vertical_rate < -500:
        return "DESCENDING"
    else:
        return "CRUISE"

def calculate_fuel_flow(typecode, altitude_ft, speed_kmh, vertical_rate_fpm, phase):
    try:
        if altitude_ft is None or speed_kmh is None: return None

        ac_props = bc_openap_db.value.get(typecode, {})
        model_type = typecode if (typecode and ac_props.get('mtow')) else DEFAULT_AC_TYPE
        mass_kg = 0.80 * ac_props.get('mtow', DEFAULT_MTOW_KG)

        try:
            fuelflow = FuelFlow(ac=model_type)
        except:
            fuelflow = FuelFlow(ac=DEFAULT_AC_TYPE)
            mass_kg = DEFAULT_MTOW_KG

        alt_m = altitude_ft * 0.3048
        speed_ms = speed_kmh / 3.6
        vs_ms = (vertical_rate_fpm or 0) * 0.00508

        if phase == "TAXI":
            return fuelflow.at_actuator_model(0, alt=0)
        else:
            active_vs = vs_ms if phase in ["ASCENDING", "DESCENDING", "TAKEOFF", "LANDING"] else 0
            return fuelflow.enroute(mass=mass_kg, tas=speed_ms, alt=alt_m, vs=active_vs)
    except:
        return None

def calculate_nox(fuel_flow, typecode, altitude_ft, speed_kmh):
    try:
        if fuel_flow is None or altitude_ft is None or speed_kmh is None: return None

        ac_props = bc_openap_db.value.get(typecode, {})
        model_type = typecode if (typecode and ac_props.get('mtow')) else DEFAULT_AC_TYPE

        try:
            emission = Emission(ac=model_type)
        except:
            emission = Emission(ac=DEFAULT_AC_TYPE)

        alt_m = altitude_ft * 0.3048
        speed_ms = speed_kmh / 3.6

        return emission.nox(fuel_flow, tas=speed_ms, alt=alt_m)
    except:
        return None

def calculate_co2(fuel_flow):
    if fuel_flow is None: return None
    return fuel_flow * 3.16

def calculate_cost(fuel_flow):
    if fuel_flow is None: return None
    return fuel_flow * 0.77

def save_to_mongodb(spark_df, batch_filename):
    try:
        data = spark_df.toPandas().to_dict("records")

        if not data:
            return

        ingest_time = datetime.utcnow()

        formatted_docs = []
        for row in data:
            doc = {
                "timestamp": ingest_time,
                "batch_source": batch_filename,
                "icao24": row['icao24'],
                "callsign": row.get('callsign', '').strip(),
                "country": row.get('origin_country', '').strip(),
                "position": {
                    "latitude": row['latitude'],
                    "longitude": row['longitude'],
                    "heading": row['heading']
                },
                "telemetry": {
                    "altitude_ft": row['geo_altitude_ft'],
                    "speed_kmh": row['velocity_kmh'],
                    "vertical_rate_fpm": row['vertical_rate_fpm']
                },
                "aircraft": {
                    "type_code": row['typecode']
                },
                "analytics": {
                    "phase": row['phase'],
                    "fuel_rate_kg_s": row['fuel_rate_kg_s'],
                    "co2_rate_kg_s": row['co2_rate_kg_s'],
                    "nox_rate_kg_s": row['nox_rate_kg_s'],
                    "cost_rate_usd_s": row['cost_rate_usd_s']
                }
            }
            formatted_docs.append(doc)

        with pymongo.MongoClient(MONGO_URI) as client:
            db = client[DB_NAME]
            coll = db[COLLECTION_NAME]
            coll.insert_many(formatted_docs)

        print(f"MongoDB: Successfully inserted {len(formatted_docs)} documents.")

    except Exception as e:
        print(f"MongoDB Write Error: {e}")

In [18]:
from pyspark.sql.functions import col, element_at, from_json, explode, lit, udf
from pyspark.sql.types import ArrayType, StringType, DoubleType
import glob

files = sorted(glob.glob(f"{folder_path}/*.json"))

if files:
    latest_file = files[-1]
    print(f"Processing Batch: {latest_file}")

    try:
        text_df = spark.read.text(latest_file)
        json_schema = ArrayType(ArrayType(StringType()))
        raw_df = text_df.select(explode(from_json(col("value"), json_schema)).alias("value"))

        flights_df = raw_df.select(
            element_at(col("value"), 1).alias("icao24"),
            element_at(col("value"), 2).alias("callsign"),
            element_at(col("value"), 3).alias("origin_country"),
            element_at(col("value"), 6).cast("double").alias("longitude"),
            element_at(col("value"), 7).cast("double").alias("latitude"),
            element_at(col("value"), 11).cast("double").alias("heading"),
            (element_at(col("value"), 14).cast("double") * 3.28084).alias("geo_altitude_ft"),
            (element_at(col("value"), 10).cast("double") * 3.6).alias("velocity_kmh"),
            (element_at(col("value"), 12).cast("double") * 196.85).alias("vertical_rate_fpm")
        ).filter(
            col("icao24").isNotNull() &
            col("velocity_kmh").isNotNull() &
            col("geo_altitude_ft").isNotNull()
        )

        lookup_udf = udf(lookup_typecode, StringType())
        phase_udf = udf(determine_phase, StringType())
        fuel_udf = udf(calculate_fuel_flow, DoubleType())
        co2_udf = udf(calculate_co2, DoubleType())
        nox_udf = udf(calculate_nox, DoubleType())

        flights_df = flights_df.withColumn("typecode", lookup_udf(col("icao24")))

        flights_df = flights_df.withColumn("phase",
            phase_udf(col("geo_altitude_ft"), col("vertical_rate_fpm"), col("velocity_kmh")))

        flights_df = flights_df.withColumn("fuel_rate_kg_s",
            fuel_udf(col("typecode"), col("geo_altitude_ft"), col("velocity_kmh"),
                     col("vertical_rate_fpm"), col("phase")))

        flights_df = flights_df.withColumn("co2_rate_kg_s", co2_udf(col("fuel_rate_kg_s")))
        flights_df = flights_df.withColumn("nox_rate_kg_s",
            nox_udf(col("fuel_rate_kg_s"), col("typecode"), col("geo_altitude_ft"), col("velocity_kmh")))
        flights_df = flights_df.withColumn("cost_rate_usd_s", col("fuel_rate_kg_s") * 0.77)

        final_df = flights_df.filter(col("fuel_rate_kg_s").isNotNull())

        count = final_df.count()
        print(f"Computed Analytics for {count} flights.")

        if count > 0:
            save_to_mongodb(final_df, latest_file)

            final_df.select("icao24", "typecode", "fuel_rate_kg_s", "co2_rate_kg_s", "nox_rate_kg_s").show(5)

    except Exception as e:
        print(f"Pipeline Error: {str(e)}")

else:
    print("No data files found.")

Processing Batch: /content/flight_data/batch_1763705274.json
Batch 22 Ingested: 5334 flights
Batch 23 Ingested: 5362 flights
Computed Analytics for 4605 flights.
MongoDB Write Error: An error occurred while calling o1139.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 22) (4283400bb344 executor driver): org.apache.spark.SparkFileNotFoundException: File file:/content/flight_data/batch_1763705274.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$read