# Data Processing with Apache Spark


In [None]:
import os
import pyspark
from pyspark.sql import SparkSession

In [None]:
os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk-23" 
os.environ["SPARK_HOME"] = "C:/Users/janad/Downloads/spark-3.5.3-bin-hadoop3" 
os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"] 

In [None]:
spark = SparkSession.builder \
    .appName("Case study") \
    .master("local[*]") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") \
    .getOrCreate()

In [None]:
spark

In [None]:
df_adsb = spark.read.json('adsb.json', multiLine=True)
df_oag = spark.read.json('oag.json', multiLine=True)

In [None]:
df_adsb.show(3,truncate=True)

In [None]:
df_adsb.printSchema()

In [None]:
df_oag.show(3, truncate=2)

In [None]:
df_oag.printSchema()

In [None]:
from pyspark.sql.functions import explode_outer
df_flat=df_oag.withColumn("data_flat", explode_outer("data"))

In [None]:
df_flat.show()

In [None]:
#df_flatten=df_flat.select("data_flat.*")

In [None]:
#df_flatten.show(1,truncate=True)

In [None]:
from pyspark.sql.functions import col

df_flatten = df_flat.select(
    col("data_flat.aircraftType.iata").alias("aircraftType_iata"),
    col("data_flat.aircraftType.icao").alias("aircraftType_icao"),
    col("data_flat.arrival.airport.faa").alias("arrival_airport_faa"),
    col("data_flat.arrival.airport.iata").alias("arrival_airport_iata"),
    col("data_flat.arrival.airport.icao").alias("arrival_airport_icao"),
    col("data_flat.arrival.date.local").alias("arrival_date_local"),
    col("data_flat.arrival.date.utc").alias("arrival_date_utc"),
    col("data_flat.arrival.terminal").alias("arrival_airport_terminal"),
    col("data_flat.arrival.time.local").alias("arrival_airport_time_local"),
    col("data_flat.arrival.time.utc").alias("arrival_airport_time_utc"),
    col("data_flat.carrier.iata").alias("carrier.iata"),
    col("data_flat.carrier.icao").alias("carrier.icao"),
    col("data_flat.codeshare.aircraftOwner.name").alias("airlineowner"),
    col("data_flat.departure.airport.faa").alias("departure_airport_faa"),
    col("data_flat.departure.airport.iata").alias("departure_airport_iata"),
    col("data_flat.departure.airport.icao").alias("departure_airport_icao"),
    col("data_flat.departure.date.local").alias("departure_date_local"),
    col("data_flat.departure.date.utc").alias("departure_date_utc"),
    col("data_flat.departure.terminal").alias("departure_airport_terminal"),
    col("data_flat.departure.time.local").alias("departure_airport_time_local"),
    col("data_flat.departure.time.utc").alias("departure_airport_time_utc"),
    col("data_flat.elapsedTime"),
    col("data_flat.flightNumber"),
    col("data_flat.flightType"),
    col("data_flat.segmentInfo.numberOfStops").alias("segmentInfo_numberOfStops"),
    col("data_flat.statusDetails.arrival.actualTerminal").getItem(0).alias("arrival_actualTerminal"),
    col("data_flat.statusDetails.arrival.actualTime.inGate.local").getItem(0).alias("arrival_inGate_local"),
    col("data_flat.statusDetails.arrival.actualTime.inGate.utc").getItem(0).alias("arrival_inGate_utc"),
    col("data_flat.statusDetails.arrival.actualTime.inGateTimeliness").getItem(0).alias("arrival_inGateTimeliness"),
    col("data_flat.statusDetails.arrival.actualTime.inGateVariation").getItem(0).alias("arrival_inGateVariation"),
    col("data_flat.statusDetails.arrival.actualTime.onGround.local").getItem(0).alias("arrival_onGround_local"),
    col("data_flat.statusDetails.arrival.actualTime.onGround.utc").getItem(0).alias("arrival_onGround_utc"),
    col("data_flat.statusDetails.arrival.airport.faa").getItem(0).alias("arrival_airport_faa"),
    col("data_flat.statusDetails.arrival.airport.iata").getItem(0).alias("arrival_airport_iata_status"),
    col("data_flat.statusDetails.arrival.airport.icao").getItem(0).alias("arrival_airport_icao_status"),
    col("data_flat.statusDetails.arrival.baggage").getItem(0).alias("arrival_baggage"),
    col("data_flat.statusDetails.arrival.estimatedTime.inGate.local").getItem(0).alias("arrival_estimatedTime_inGate_local"),
    col("data_flat.statusDetails.arrival.estimatedTime.inGate.utc").getItem(0).alias("arrival_estimatedTime_inGate_utc"),
    col("data_flat.statusDetails.arrival.estimatedTime.inGateTimeliness").getItem(0).alias("arrival_estimatedTime_inGateTimeliness"),
    col("data_flat.statusDetails.arrival.estimatedTime.inGateVariation").getItem(0).alias("arrival_estimatedTime_inGateVariation"),
    col("data_flat.statusDetails.arrival.estimatedTime.onGround.local").getItem(0).alias("arrival_estimatedTime_onGround_local"),
    col("data_flat.statusDetails.arrival.estimatedTime.onGround.utc").getItem(0).alias("arrival_estimatedTime_onGround_utc"),
    col("data_flat.statusDetails.arrival.gate").getItem(0).alias("arrival_gate"),
    col("data_flat.statusDetails.departure.actualTerminal").getItem(0).alias("departure_actualTerminal"),
    col("data_flat.statusDetails.departure.actualTime.offGround.local").getItem(0).alias("departure_offGround_local"),
    col("data_flat.statusDetails.departure.actualTime.offGround.utc").getItem(0).alias("departure_offGround_utc"),
    col("data_flat.statusDetails.departure.actualTime.outGate.local").getItem(0).alias("departure_outGate_local"),
    col("data_flat.statusDetails.departure.actualTime.outGate.utc").getItem(0).alias("departure_outGate_utc"),
    col("data_flat.statusDetails.departure.actualTime.outGateTimeliness").getItem(0).alias("departure_outGateTimeliness"),
    col("data_flat.statusDetails.departure.actualTime.outGateVariation").getItem(0).alias("departure_outGateVariation"),
    col("data_flat.statusDetails.departure.airport.faa").getItem(0).alias("departure_airport_faa"),
    col("data_flat.statusDetails.departure.airport.iata").getItem(0).alias("departure_airport_iata_status"),
    col("data_flat.statusDetails.departure.airport.icao").getItem(0).alias("departure_airport_icao_status"),
    col("data_flat.statusDetails.departure.checkInCounter").getItem(0).alias("departure_checkInCounter"),
    col("data_flat.statusDetails.departure.estimatedTime.offGround.local").getItem(0).alias("departure_estimatedTime_offGround_local"),
    col("data_flat.statusDetails.departure.estimatedTime.offGround.utc").getItem(0).alias("departure_estimatedTime_offGround_utc"),
    col("data_flat.statusDetails.departure.estimatedTime.outGate.local").getItem(0).alias("departure_estimatedTime_outGate_local"),
    col("data_flat.statusDetails.departure.estimatedTime.outGate.utc").getItem(0).alias("departure_estimatedTime_outGate_utc"),
    col("data_flat.statusDetails.departure.estimatedTime.outGateTimeliness").getItem(0).alias("departure_estimatedTime_outGateTimeliness"),
    col("data_flat.statusDetails.departure.estimatedTime.outGateVariation").getItem(0).alias("departure_estimatedTime_outGateVariation"),
    col("data_flat.statusDetails.departure.gate").getItem(0).alias("departure_gate"),
    col("data_flat.statusDetails.equipment.actualAircraftType.iata").getItem(0).alias("equipment_actualAircraftType_iata"),
    col("data_flat.statusDetails.equipment.actualAircraftType.icao").getItem(0).alias("equipment_actualAircraftType_icao"),
    col("data_flat.statusDetails.equipment.aircraftRegistrationNumber").getItem(0).alias("equipment_aircraftRegistrationNumber"),
    col("data_flat.statusDetails.state").getItem(0).alias("state"),
    col("data_flat.statusDetails.updatedAt").getItem(0).alias("updatedAt")
)

In [None]:
import pandas as pd
pd.set_option('display.max_columns', None) 
pd_df = df_flatten.limit(5).toPandas()
pd_df.head()

In [None]:
pd_df_adsb = df_adsb.limit(5).toPandas()
pd_df_adsb.head()

In [None]:
aircraft=df_adsb

In [None]:
airport=df_flatten

In [None]:
aircraft_count = aircraft.count()
print(f"Aircraft instances: {aircraft_count}")
airport_count = airport.count()
print(f"Airport instances: {airport_count}")

# Data cleaning

### Duplicates 

In [None]:
aircraft=aircraft.distinct()
airport=airport.distinct()

In [None]:
aircraft_count_d = aircraft.count()
print(f"Cleaning duplicates - Aircraft instances: {aircraft_count_d}")
airport_count_d = airport.count()
print(f"Cleaning duplicates -Airport instances: {airport_count_d}")

### Filltering - flightType - Unscheduled 

In [None]:
airport = airport.filter(airport.flightType != "Unscheduled")

In [None]:
airport_count_f = airport.count()
print(f"Filltering -Airport instances: {airport_count_f}")

### Missing values

In [None]:
aircraft = aircraft.dropna(subset=["Callsign"])

In [None]:
aircraft_count_missing = aircraft.count()
print(f"Cleaning duplicates - Aircraft instances: {aircraft_count_missing}")

### Formating 

In [None]:
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp


spark.conf.set("spark.sql.session.timeZone", "UTC")
aircraft = aircraft.withColumn(
    "LastUpdateUTC", 
    from_unixtime("LastUpdate", "yyyy-MM-dd HH:mm:ss")
)
aircraft = aircraft.withColumn("LastUpdateUTC", to_timestamp("LastUpdateUTC", "yyyy-MM-dd HH:mm:ss"))
#aircraft.show()


# KPIs


## a. average speed for each airport - Avg arrivals per day

In [None]:
# arrival_airport_icao - airport
# arrival_date_local - date local for landing
#counting the arrivals per day as how many records of that specific airport are in the dataset per day

from pyspark.sql.functions import count, to_date, avg


perday_count = airport.groupBy(
    "arrival_airport_icao",
    to_date("arrival_date_local")).agg(count("*").alias("perday_arrival"))

avg_arrivals = perday_count.groupBy("arrival_airport_icao").agg(avg("perday_arrival").alias("avg_perday_arrival"))
avg_arrivals.show()

In [None]:
pd_avg_arrivals = avg_arrivals.toPandas()
pd_avg_arrivals.head()

## b. the total number of delayed flights (categorized into arrival delays and departure delays)


In [None]:
#Delayed
arrival_delays = airport.filter(col("arrival_inGateTimeliness") == "Delayed").count()
departure_delays = airport.filter(col("departure_outGateTimeliness") == "Delayed").count()
#All
total_arrivals = airport.filter(col("arrival_inGateTimeliness").isNotNull()).count()
total_departures = airport.filter(col("departure_outGateTimeliness").isNotNull()).count()
#Counting proportions
arrival_delay_proportion = arrival_delays / total_arrivals
departure_delay_proportion = departure_delays / total_departures

print(f"Arrival's delays: {arrival_delays}; Out of all arrivals it makes: {arrival_delay_proportion}")
print(f"Departure's delays: {departure_delays}; Out of all departures it makes: {departure_delay_proportion}")

# Spark partitioning

## a. Filter the DataFrame to retain only the most recent entry (the one with the smallest LastUpdate ) for each FlightId .

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc


#window partitioned by Flight and ordered by LastUpdateUTC
window_entry = Window.partitionBy("Flight").orderBy(desc("LastUpdateUTC"))

#chosing the first row from each fligh, aka the most recent
aircraft_recent_entry = (
    aircraft.withColumn("row_entry", row_number()
            .over(window_entry))
            .filter("row_entry = 1"))

aircraft_recent_entry.show()


## b. Return a DataFrame containing only the FlightId and the corresponding latest LastUpdate 
 

In [None]:
aircraft_recent_entry = aircraft_recent_entry.select("Flight", "LastUpdateUTC").orderBy(desc("LastUpdateUTC"))

In [None]:
pd_aircraft_recent_entry = aircraft_recent_entry.limit(5).toPandas()
pd_aircraft_recent_entry.head()

In [None]:
from io import StringIO

output = StringIO()

#Task 1.a
output.write("Task 1.a KPIs \n")
pd_avg_arrivals.to_csv(output, index=False)
output.write("\n")
# Task 1.b
output.write("Task 1.b KPIs \n")
output.write(f"Arrival's delays: {arrival_delays}; Out of all arrivals it makes: {arrival_delay_proportion}\n")
output.write(f"Departure's delays: {departure_delays}; Out of all departures it makes: {departure_delay_proportion}\n")
output.write("\n")
#Task 2
output.write("Task 2. Spark partitioning\n")
pd_aircraft_recent_entry.to_csv(output, index=False)

with open("report.csv", "w") as f:
    f.write(output.getvalue())
