# Step 1: Create Session and Load JSON Files

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, to_timestamp
from pyspark import SparkContext, SparkConf
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1")
sc = SparkContext(conf=conf_spark)

# Initialize Spark Session
spark = SparkSession.builder.appName("AirportFlightDataProcessing").getOrCreate()

# Load JSON files
adsb_df = spark.read.json("adsb.json", multiLine=True)
oag_df = spark.read.json("oag.json", multiLine=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/03 19:10:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Step 2: Remove Null Values
Remove rows with any null values across all columns.

In [21]:
# Drop rows with any null values in all columns
adsb_df = adsb_df.dropna(how="any")
oag_df = oag_df.dropna(how="any")

# Step 3: Check for Data Type Consistency in Each Column
Let’s print the schema and inspect a few rows to see the data types in both DataFrames. This will help identify any inconsistencies, especially with nested or array fields.

In [22]:
# Check data types in ADS-B data
print("ADSB Data Schema:")
adsb_df.printSchema()
adsb_df.show(5)

# Check data types in OAG data
print("OAG Data Schema:")
oag_df.printSchema()
oag_df.show(5)

ADSB Data Schema:
root
 |-- AircraftId: string (nullable = true)
 |-- Altitude: long (nullable = true)
 |-- Callsign: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- ETA: long (nullable = true)
 |-- Flight: string (nullable = true)
 |-- LastUpdate: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Onground: long (nullable = true)
 |-- Origin: string (nullable = true)
 |-- RadarId: string (nullable = true)
 |-- Registration: string (nullable = true)
 |-- SourceType: string (nullable = true)
 |-- Speed: float (nullable = true)
 |-- Squawk: long (nullable = true)
 |-- Track: long (nullable = true)
 |-- Type: string (nullable = true)
 |-- Vspeed: long (nullable = true)
 |-- FormattedLastUpdate: string (nullable = true)

+----------+--------+--------+-----------+---+------+-------------------+---------+---------+--------+------+-------+------------+--------------------+-----+------+-----+----+------+---

+--------------------+--------------------+
|                data|              paging|
+--------------------+--------------------+
|[{{773, NULL}, {{...|{10, https://api....|
+--------------------+--------------------+



# Step 4: Convert to a Single Data Type for Each Column
For `adsb_df`
If there are inconsistencies, let’s enforce consistent types. We’ll assume LastUpdate should be a timestamp and convert other columns accordingly:

In [24]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, FloatType, IntegerType, TimestampType

# Convert Speed to FloatType if necessary
adsb_df = adsb_df.withColumn("Speed", col("Speed").cast(FloatType()))

# Convert LastUpdate to TimestampType if necessary
adsb_df = adsb_df.withColumn("LastUpdate", col("LastUpdate").cast(TimestampType()))


from pyspark.sql.types import IntegerType, DoubleType, StringType, TimestampType
from pyspark.sql.functions import from_unixtime, col

# Ensure consistent data types for relevant columns
adsb_df = adsb_df.withColumn("AircraftId", col("AircraftId").cast(StringType())) \
    .withColumn("Latitude", col("Latitude").cast(DoubleType())) \
    .withColumn("Longitude", col("Longitude").cast(DoubleType())) \
    .withColumn("Track", col("Track").cast(IntegerType())) \
    .withColumn("Altitude", col("Altitude").cast(IntegerType())) \
    .withColumn("Speed", col("Speed").cast(IntegerType())) \
    .withColumn("Squawk", col("Squawk").cast(StringType())) \
    .withColumn("LastUpdate", col("LastUpdate").cast(TimestampType()))


For `oag_df`
We’ll flatten the nested structure and ensure each column has a consistent type. Here, OutGateTime and InGateTime are timestamps, and we’ll handle any array types in the flattening process.

In [25]:
from pyspark.sql.functions import explode

# Flatten nested structures and select relevant fields
oag_df_flat = oag_df.select(
    col("data.flightNumber").alias("FlightNumber"),
    col("data.departure.airport.iata").alias("DepartureAirport"),
    col("data.arrival.airport.iata").alias("ArrivalAirport"),
    explode("data.statusDetails").alias("StatusDetail")
)

# Extract and convert fields from the flattened status details
oag_df_flat = oag_df_flat.select(
    col("FlightNumber").cast(StringType()),
    col("DepartureAirport").cast(StringType()),
    col("ArrivalAirport").cast(StringType()),
    col("StatusDetail.state").alias("FlightStatus").cast(StringType()),
    col("StatusDetail.departure.actualTime.outGate.local")[0].alias("OutGateTime").cast(TimestampType()),
    col("StatusDetail.arrival.actualTime.inGate.local")[0].alias("InGateTime").cast(TimestampType())
)

# Step 5: Perform Analysis
Now that data types are consistent and clean, we can calculate the KPIs.

## Example 1: Average Speed per Airport (from `adsb_df`)

In [26]:
from pyspark.sql.functions import avg

# Calculate average speed per airport (by Origin)
avg_speed_df = adsb_df.groupBy("Origin").agg(avg("Speed").alias("AverageSpeed"))
avg_speed_df.show()

+------+------------+
|Origin|AverageSpeed|
+------+------------+
|   GUA|       170.0|
|   DOH|       250.0|
|   SGN|         0.5|
|   IAD|       170.0|
+------+------------+



## Example 2: Count of Delayed Flights (from `oag_df_flat`)
We’ll count delayed flights based on the FlightStatus field, assuming Delayed indicates a delay.

In [27]:
from pyspark.sql.functions import count, when

# Calculate number of delayed flights by airport
delayed_flights_df = oag_df_flat.withColumn(
    "ArrivalDelay", when(col("FlightStatus") == "Delayed", 1).otherwise(0)
).withColumn(
    "DepartureDelay", when(col("FlightStatus") == "Delayed", 1).otherwise(0)
)

# Summarize delays by departure and arrival airport
delayed_summary_df = delayed_flights_df.groupBy("DepartureAirport", "ArrivalAirport").agg(
    count("ArrivalDelay").alias("TotalArrivalDelays"),
    count("DepartureDelay").alias("TotalDepartureDelays")
)
delayed_summary_df.show()

+--------------------+--------------------+------------------+--------------------+
|    DepartureAirport|      ArrivalAirport|TotalArrivalDelays|TotalDepartureDelays|
+--------------------+--------------------+------------------+--------------------+
|[SGN, DOH, SDF, G...|[ICN, SYZ, YHM, M...|                10|                  10|
+--------------------+--------------------+------------------+--------------------+



## Example 3: Get Latest Flight Entry for Each Flight (Window Function)
Using a window function, we filter the latest `OutGateTime` entry for each flight.

In [28]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define a window by FlightNumber, ordered by OutGateTime
window_spec = Window.partitionBy("FlightNumber").orderBy(col("OutGateTime").desc())

# Filter to get only the latest entry per flight
latest_flights_df = oag_df_flat.withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .select("FlightNumber", "OutGateTime", "InGateTime")

latest_flights_df.show()

+--------------------+-------------------+-------------------+
|        FlightNumber|        OutGateTime|         InGateTime|
+--------------------+-------------------+-------------------+
|[476, 476, 476, 4...|2023-10-03 20:54:00|2023-10-03 23:35:00|
+--------------------+-------------------+-------------------+

