Όνομα: Βασίλειος Ραφαήλ Αβραμίδης

Αριθμός Μητρώου: ics23033

Μάθημα: Ανάλυση Δεδομένων μεγάλου Όγκου (Big Data)

Εργασία: Flight Delay Analytics με Apache Spark


In [1]:
#Installing PySpark and JDK packages.
!pip -q install pyspark
!apt-get install openjdk-11-jdk -y

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  at-spi2-core fonts-dejavu-core fonts-dejavu-extra gsettings-desktop-schemas
  libatk-bridge2.0-0 libatk-wrapper-java libatk-wrapper-java-jni libatk1.0-0
  libatk1.0-data libatspi2.0-0 libxcomposite1 libxt-dev libxtst6 libxxf86dga1
  openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  session-migration x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm libnss-mdns
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  | fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  at-spi2-core fonts-dejavu-core fonts-dejavu-extra gsettings-desktop-schemas
  libatk-bridge2.0-0 libatk-wrapper-java libatk-wrapper-java-jni libatk1.0-0
  libatk1.0-data libatspi2.0-0 libxcomposite1 libxt-dev libxtst6 libxxf86dga1
  openjdk-11-jdk openjdk-11-jdk-headless openjdk-

In [2]:
csv_filename = "flights_2000.csv"
csv_url = f"https://raw.githubusercontent.com/billavramidis/Big-Data-2025/refs/heads/main/{csv_filename}"


!wget -q -O {csv_filename} {csv_url}

In [3]:
#Importing Spark and Colab.
from pyspark.sql import SparkSession
from google.colab import files

#Libraries necessary for saving the result as .csv file.
import glob
import shutil
import os

from time import perf_counter

In [4]:
#Starting Spark session.
spark = SparkSession.builder.appName("Flights").getOrCreate()

#Initializing SparkContext.
sc = spark.sparkContext

#Initializing the RDD.
flight_rdd = sc.textFile(csv_filename)

In [5]:
total_runtime_start = perf_counter()
#Parsing and filtering out non-positive values from the ‘DEP_DELAY’ column, including NULL or empty values.
filtered_rdd = flight_rdd.filter(lambda line: line.split(",")[10] == '0' \
                                 and line.split(",")[8].isdigit() \
                                 and line.split(",")[6].isdigit() \
                                 and int(line.split(",")[6]) > 0)\
                                 .map(lambda line: line.split(","))

#Calculates the average departure delay for each origin airport.
average_rdd = filtered_rdd\
.map(lambda col: (col[3], (int(col[6]), 1)))\
.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
.mapValues(lambda b: round(b[0] / b[1], 2))

#Records the start time before taking the first action on the RDD.
first_action_start = perf_counter()

#Takes the top 10 airports with the highest average departure delays from the sorted RDD.
top10 = average_rdd.takeOrdered(10, key=lambda x: -x[1])

#Records the stop time after the first action is completed.
first_action_stop = perf_counter()
total_runtime_stop = first_action_stop

In [6]:
result_filename = "top-10-airports.csv"

#Transforming the RDD into a DataFrame with specified Headers.
headers = ["ORIGIN_AIRPORT", "AVG_DEP_DELAY"]
top10_DF = spark.createDataFrame(top10, headers)

temp_folder = "csv_folder"
final_file = "top-10-airport-delay.csv"

#If a previous result exists, it gets removed.
if os.path.exists(final_file) and os.path.isfile(final_file):
    os.remove(final_file)

#Forcing Spark to create only one partition into a temporary folder.
top10_DF.coalesce(1).write.mode("overwrite").csv(temp_folder, header=True)

#Getting the single partition file.
temp_file = glob.glob(f"{temp_folder}/*.csv")[0]

#Renaming the file and moving it out of the temporary folder.
shutil.move(temp_file, final_file)

#Removing the temporary folder.
shutil.rmtree(temp_folder)

#Downloading the result.
files.download(final_file)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [7]:
#Stopping Spark Session.
spark.stop()

In [8]:
#Calculating Total Runtime and First Action Time.
total_runtime = total_runtime_stop - total_runtime_start
first_action = first_action_stop - first_action_start

print(f"Total Runtime: {round(total_runtime, 3)}")
print(f"First Action Runtime: {round(first_action, 3)}")

Total Runtime: 4.105
First Action Runtime: 3.842
