In [None]:
import requests
import json
import time
from pyspark.sql import SparkSession, functions as F, types as T

start = time.time()

with open('/home/gesser/air-traffic-data-pipeline/credentials.json') as f:
    creds = json.load(f)

client_id = creds.get("clientId")
client_secret = creds.get("clientSecret")

if not client_id or not client_secret:
    raise ValueError("Set CLIENT_ID and CLIENT_SECRET environment variables before running.")

token_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"

payload = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret
}

headers = {
    "Content-Type": "application/x-www-form-urlencoded"
}

response = requests.post(token_url, data=payload, headers=headers)
response.raise_for_status()

access_token = response.json().get("access_token")

tok = time.time()

print(f"time taken to get token: {tok-start:.2f}")

#print(f"Access token: {access_token}")

url = "https://opensky-network.org/api/states/all"
params = {
    "lamin": 47.001917,
    "lomin": -1.919083, 
    "lamax": 47.340556, 
    "lomax": -1.181750  
}
headers = {
    "Authorization": f"Bearer {access_token}"
}

response = requests.get(url, headers=headers, params=params)
response.raise_for_status()  # raise error if request failed

data = response.json()

laoddata = time.time()

print(f"time taken to load data: {laoddata-tok:.2f}")

if response.status_code == 200:

    start = time.time()

    data_json = response.text

    spark = SparkSession.builder.appName("flight-data-pipeline").getOrCreate()

    sparkinit = time.time()
    print(f"time taken to initialize spark: {sparkinit-start:.2f}")


    # Create RDD from JSON string (split by lines if multiline JSON)
    rdd = spark.sparkContext.parallelize([data_json])

    # Read JSON from RDD
    df = spark.read.json(rdd)

    df_states = df.withColumn("state", F.explode("states"))

    schema_def = [
    (0,  "icao24",          T.StringType()),
    (1,  "callsign",        T.StringType()),
    (2,  "origin_country",  T.StringType()),
    (3,  "time_position",   T.LongType()),
    (4,  "last_contact",    T.LongType()),
    (5,  "longitude",       T.DoubleType()),
    (6,  "latitude",        T.DoubleType()),
    (7,  "baro_altitude",   T.DoubleType()),
    (8,  "on_ground",       T.BooleanType()),
    (9,  "velocity",        T.DoubleType()),
    (10, "true_track",      T.DoubleType()),
    (11, "vertical_rate",   T.DoubleType()),
    (12, "sensors",         T.ArrayType(T.IntegerType())),
    (13, "geo_altitude",    T.DoubleType()),
    (14, "squawk",          T.StringType()),
    (15, "spi",             T.BooleanType()),
    (16, "position_source", T.IntegerType())
    ]

    cols = []

    for idx, name, dtype in schema_def:
        c = F.col("state")[idx]

        # sensors comes in as a JSON-style string like "[1,2,3]"
        if name == "sensors":
            c = F.when(
                    c.isNull(), None
                ).otherwise(
                    F.split(                    
                        F.regexp_replace(c, r'[\[\]\s]', ''),
                        ','
                    ).cast(dtype)              
                )
        else:
            c = c.cast(dtype)

        cols.append(c.alias(name))

    
    df_typed = df_states.select(*cols)
    print(df_typed.count())

    print(f"time taken to treat dataset: {time.time()-sparkinit:.2f}")

    spark.stop()
else:
    print(f"Error: {response.status_code} - {response.text}")


time taken to get token: 0.22
time taken to load data: 0.17
time taken to initialize spark: 0.08


                                                                                

4
time taken to treat dataset: 1.63
