### Flight Radar 24 

Overview: 
- You are developing a system to process and analyze airport and flight data in real-time. Your objective is to design and implement a scalable solution for analyzing large datasets generated by various sources, e.g. FlightRadar.

Task:
- Load a dataset simulating airport and flight data - adsb.json, oag.jsonfiles.
- Use Apache Spark to ingest and process the data (e.g., data cleaning, aggregation, transformation).
- Conduct simple analysis. Compute some basic airport KPIs, including but not limited to:
        - average speed for each airport 
        - the total number of delayed flights (categorized into arrival delays and departure delays)
- Filter and transform a DataFrame by applying a window function (Spark partitioning):
        - Filter the DataFrame to retain only the most recent entry (the one with the smallest LastUpdate) for each FlightId.
        - Return a DataFrame containing only the FlightId and the corresponding latest LastUpdate.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import explode,count, col, isnan, date_format, from_unixtime, avg, row_number,when
from pyspark.sql.window import Window

In [None]:
spark = (SparkSession.builder 
        .appName("Flight Radar 24") 
        .getOrCreate())

1.Load a dataset simulating airport and flight data: 
 - adsb.json, 
 - oag.jsonfiles.

In [None]:
try:
    adsb_df = spark.read.json("./data/adsb.json", multiLine=True)
    print("ADS-B data loaded successfully.")
except Exception as e:
    print(f"Error loading ADS-B data: {e}")

try:
    oag_df = spark.read.json("./data/oag.json", multiLine=True)
    print("OAG data loaded successfully.")
except Exception as e:
    print(f"Error loading OAG data: {e}")

try:
    flights_df = oag_df.select(explode("data").alias("flight"))
    print("Flight data processed successfully.")
except Exception as e:
    print(f"Error processing flight data: {e}")

2.Data Processing 
- Use Apache Spark to ingest and process the data (e.g., data cleaning, aggregation, transformation)

In [None]:
# count rows and columns
print(f'The datase "adsb" has {adsb_df.count()} rows and {len(adsb_df.columns)} columns')

In [None]:
for column in adsb_df.columns:
    unique_count = adsb_df.select(column).distinct().count()
    print(f"The column '{column}' has {unique_count} unique values.")

In [None]:
# Check for both null and NaN values in each column
null_nan_check = adsb_df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in adsb_df.columns])
null_nan_check.show()
# there are 6 missing values in RadarId but i will not delete it

In [None]:
# Filter out rows with invalid Speed (negative values)
adsb_df = adsb_df.filter(adsb_df["Speed"] >= 0)
adsb_df.count()

In [None]:
# The measure of flight speed are in knots (1knot = 1,852 km/h)
# convert speed 
try:
    adsb_df = adsb_df.withColumn("SpeedKmH", round(col("Speed") * 1.852, 2))
    
    print("Speed conversion and rounding successful.")
except Exception as e:
    print(f"Error during speed conversion and rounding: {e}")


In [None]:
#format the LastUpdate column
try:
    adsb_df = adsb_df.withColumn("LastUpdate", date_format(from_unixtime("LastUpdate"), "yyyy-MM-dd HH:mm"))
    print("LastUpdate column formatted successfully.")

except AnalysisException as ae:
    print(f"AnalysisException occurred: {ae}")
except Exception as e:
    print(f"An error occurred: {e}")

In [None]:
# rename the Flight column to FlightId
try:
    adsb_df = adsb_df.withColumnRenamed("Flight", "FlightId")
    print("Column renamed successfully from 'Flight' to 'FlightId'.")

except AnalysisException as ae:
    print(f"AnalysisException occurred: {ae}")
except Exception as e:
    print(f"An error occurred: {e}")

3.Basic airport KPIs: 
 - average speed for each airport 
 - the total number of delayed flights (categorized into arrival delays and departure delays)

In [None]:
unique_airport= adsb_df.select("Origin").distinct()
unique_airport_list = [row["Origin"] for row in unique_airport.collect()]

print(f'The dataset "adbs_df" has {unique_airport.count()} unique airports.')
print("The unique airports are:")
print(", ".join(unique_airport_list))

In [None]:
filtered_adsb_df = adsb_df.filter(adsb_df["Onground"] == 0)
average_speed = filtered_adsb_df.groupBy("Origin").agg(avg("Speed").alias("AverageSpeed"))
average_speed_results = average_speed.collect()

for row in average_speed_results:
    origin_airport = row["Origin"]
    avg_speed = row["AverageSpeed"]
    print(f"The average speed for flights from {origin_airport} (in the air) is {avg_speed:.2f} knot.")

In [None]:
filtered_adsb_df = adsb_df.filter(adsb_df["Onground"] == 0)
average_speed = filtered_adsb_df.groupBy("Origin").agg(avg("SpeedKmH").alias("AverageSpeedKmH"))
average_speed_results = average_speed.collect()

for row in average_speed_results:
    origin_airport = row["Origin"]
    avg_speed = row["AverageSpeedKmH"]
    print(f"The average speed for flights from {origin_airport} (in the air) is {avg_speed:.2f} km/h.")

In [None]:
flights_df = flights_df.select(
    col("flight.statusDetails.arrival.actualTime.inGateTimeliness").alias("arrival_inGateTimeliness"),
    col("flight.statusDetails.departure.actualTime.outGateTimeliness").alias("departure_outGateTimeliness")
)


flights_df = flights_df.withColumn("arrival_inGateTimeliness", col("arrival_inGateTimeliness").getItem(0))
flights_df = flights_df.withColumn("departure_outGateTimeliness", col("departure_outGateTimeliness").getItem(0))


print(f"Total number of delayed arrival flights: {flights_df.filter(col("arrival_inGateTimeliness") == "Delayed").count()}")
print(f"Total number of delayed departure flights: {flights_df.filter(col("departure_outGateTimeliness") == "Delayed").count()}")


4.Filter and transform a DataFrame by applying a window function (Spark partitioning):
 - Filter the DataFrame to retain only the most recent entry (the one with the smallest LastUpdate) for each FlightId.
 - Return a DataFrame containing only the FlightId and the corresponding latest LastUpdate.

In [None]:
order_adsb_df = Window.partitionBy("FlightId").orderBy("LastUpdate")

adsb_latest_df = (adsb_df
                  .withColumn("row_num", row_number().over(order_adsb_df)) 
                  .filter(col("row_num") == 1) 
                  .select("FlightId", "LastUpdate"))

adsb_latest_df.show(truncate=False)