In [2]:
from pyspark.sql import SparkSession
import os

# locate interpreter
os.environ['PYSPARK_PYTHON'] = 'C:\\Users\\epicj\\AppData\\Local\\Programs\\Python\\Python311\\python.exe'

# build session and define configuration
spark = SparkSession.builder \
    .appName("AirportAnalysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "1") \
    .config("spark.default.parallelism", "2") \
    .getOrCreate()

In [3]:
from pyspark.sql.types import *

# set up the structure for passengerData dataframe
passengerDataSchema = StructType([
    StructField("PassengerID", StringType(), True),
    StructField("FlightID", StringType(), True),
    StructField("DepartureAirportID", StringType(), True),
    StructField("DestinationAirportID", StringType(), True),
    StructField("DepartureTime", IntegerType(), True),
    StructField("FlightDuration", IntegerType(), True)
])

# read "AComp_Passenger_data_no_error.csv" dataset into pyspark dataframe using its schema
passengerData = spark.read.csv('AComp_Passenger_data_no_error.csv',schema=passengerDataSchema, header=False)

# remove duplicate flights
passengerDataNoDupe = passengerData.dropDuplicates(["FlightID"])


In [4]:
# lambda operations are extended and modified code from:
# https://cs.stanford.edu/people/nick/py/python-map-lambda.html
# https://python-course.eu/advanced-python/lambda-filter-reduce-map.php

# map airports from passengerData dataframe and sort for clarity
mapping = passengerDataNoDupe.select("DepartureAirportID").rdd.map(lambda x: (x[0], 1))
sorted_mapping = mapping.sortByKey()

# group mapped data by airport and count the occurrences of each
groupedMapping = sorted_mapping.groupByKey().map(lambda l: (l[0], sum(l[1])))

# set up the structure for flightCounts dataframe
flightCountsSchema = StructType([
    StructField("AirportID", StringType(), True),
    StructField("Number of Flights", IntegerType(), True)
])

# create dataframe for flight counts from groupedMapping
flightCountsDF = spark.createDataFrame(groupedMapping, flightCountsSchema)

In [5]:
# set up the structure for airportsData dataframe
airportsDataSchema = StructType([
    StructField("AirportName", StringType(), True),
    StructField("AirportID", StringType(), True),
    StructField("Latitude", FloatType(), True),
    StructField("Longitude", FloatType(), True)
])

# read 'Top30_airports_LatLong.csv' dataset into pyspark dataframe using its schema
airportsData = spark.read.csv('Top30_airports_LatLong.csv', schema = airportsDataSchema, header=False)
airportsData.show(5)

+-----------+---------+---------+---------+
|AirportName|AirportID| Latitude|Longitude|
+-----------+---------+---------+---------+
|    ATLANTA|      ATL| 33.63672|-84.42807|
|    BEIJING|      PEK|40.080112|116.58456|
|     LONDON|      LHR|  51.4775|-0.461389|
|    CHICAGO|      ORD|41.978603|-87.90484|
|      TOKYO|      HND|35.552258| 139.7797|
+-----------+---------+---------+---------+
only showing top 5 rows



In [14]:
# find used airports: all airports in flightCountsDF
usedAirports = flightCountsDF.rdd.map(lambda x: x[0]).collect()

# find all airports: all airports in airportsData
allAirports = airportsData.select("AirportID").rdd.flatMap(lambda x: x).collect()

# extended and modified code from: https://stackoverflow.com/questions/44192279/find-the-list-values-not-in-pandas-dataframe-data
# find unused airports: the difference between the two lists above
unusedAirports = [x for x in allAirports if x not in usedAirports]

# output information as per brief
flightCountsDF.show()
print("used airports: ", usedAirports)
print("unused airports: ", unusedAirports)

+---------+-----------------+
|AirportID|Number of Flights|
+---------+-----------------+
|      AMS|                1|
|      CAN|                2|
|      CGK|                2|
|      CLT|                1|
|      DEN|                3|
|      DFW|                1|
|      FCO|                1|
|      HND|                1|
|      IAH|                2|
|      JFK|                1|
|      KUL|                2|
|      MIA|                1|
|      MUC|                1|
|      ORD|                2|
|      PEK|                1|
|      PVG|                1|
|      ATL|                2|
|      BKK|                1|
|      CDG|                1|
|      LAS|                1|
+---------+-----------------+
only showing top 20 rows

used airports:  ['AMS', 'CAN', 'CGK', 'CLT', 'DEN', 'DFW', 'FCO', 'HND', 'IAH', 'JFK', 'KUL', 'MIA', 'MUC', 'ORD', 'PEK', 'PVG', 'ATL', 'BKK', 'CDG', 'LAS', 'LHR', 'MAD']
unused airports:  ['LAX', 'FRA', 'HKG', 'DXB', 'SIN', 'SFO', 'PHX', 'IST']


In [15]:
# Extract AirportIDs from used airports
usedAirports = [row['AirportID'] for row in flightCountsDF.select("AirportID").collect()]

# Extract AirportIDs from all airports
allAirports = [row['AirportID'] for row in airportsData.select("AirportID").distinct().collect()]

# extended and modified code from: https://stackoverflow.com/questions/44192279/find-the-list-values-not-in-pandas-dataframe-data
# find unused airports: the difference between the two lists of AirportIDs
unusedAirports = [x for x in allAirports if x not in usedAirports]

# output information as per brief
flightCountsDF.show()
# output information
print("Used airports: ", usedAirports)
print("Unused airports: ", unusedAirports)

+---------+-----------------+
|AirportID|Number of Flights|
+---------+-----------------+
|      AMS|                1|
|      CAN|                2|
|      CGK|                2|
|      CLT|                1|
|      DEN|                3|
|      DFW|                1|
|      FCO|                1|
|      HND|                1|
|      IAH|                2|
|      JFK|                1|
|      KUL|                2|
|      MIA|                1|
|      MUC|                1|
|      ORD|                2|
|      PEK|                1|
|      PVG|                1|
|      ATL|                2|
|      BKK|                1|
|      CDG|                1|
|      LAS|                1|
+---------+-----------------+
only showing top 20 rows

Used airports:  ['AMS', 'CAN', 'CGK', 'CLT', 'DEN', 'DFW', 'FCO', 'HND', 'IAH', 'JFK', 'KUL', 'MIA', 'MUC', 'ORD', 'PEK', 'PVG', 'ATL', 'BKK', 'CDG', 'LAS', 'LHR', 'MAD']
Unused airports:  ['FRA', 'IST', 'SIN', 'DXB', 'PHX', 'SFO', 'HKG', 'LAX']


In [17]:
# map flight IDs from passengerData and sort for clarity
passengerCountMap = passengerData.select("FlightID").rdd.map(lambda x: (x[0], 1))
passengerCountMapSorted = passengerCountMap.sortByKey()

# group mapped data by flight and count occurences for each
groupedPassengerCountMap = passengerCountMapSorted.groupByKey().map(lambda l: (l[0], sum(l[1])))

# collect the passenger count data into a list
passengerCountData = groupedPassengerCountMap.collect()
print(passengerCountData)

[('ATT7791R', 15), ('DKZ3042O', 11), ('FYL5866L', 20), ('GMO5938W', 25), ('MBA8071P', 16), ('MOO1786A', 13), ('QHU1140O', 21), ('RPG3351U', 13), ('RUM0422W', 14), ('SOH3431A', 18), ('TMV7633W', 15), ('ULZ8130D', 27), ('VYU9214I', 15), ('XIL3623J', 13), ('XOY7948U', 16), ('BER7172M', 17), ('DAU2617A', 12), ('EWH6301Y', 10), ('HUR0974O', 7), ('HZT2506M', 14), ('JVY9791G', 20), ('KJR6646J', 23), ('PME8178S', 18), ('SQU6245R', 21), ('VDC9164W', 15), ('VYW5940P', 17), ('WPW9201U', 11), ('WSK1289Z', 21), ('XXQ4064B', 25), ('YZO4444S', 17)]


In [9]:
# create dataframe for passengerCounts from passengerCountData
passengerCountsDF = spark.createDataFrame(passengerCountData, schema=["FlightID", "PassengerCount"])

# create a copy of passengerDataNoDupe, drop PassengerID and sort for clarity
flightsInfoDF = passengerDataNoDupe.select("FlightID", "DepartureAirportID", "DestinationAirportID", "DepartureTime", "FlightDuration")
flightsInfoDF = flightsInfoDF.orderBy("FlightID")

# merge the two dataframes by FlightID
flightsInfoDFMerged = flightsInfoDF.join(passengerCountsDF, "FlightID", "left")

# rearrange columns as per the requirement
flightsInfoDFMerged = flightsInfoDFMerged.select("FlightID", "PassengerCount", "DepartureAirportID", "DestinationAirportID", "DepartureTime", "FlightDuration")

# only show 5 for speed
flightsInfoDFMerged.show(5)

+--------+--------------+------------------+--------------------+-------------+--------------+
|FlightID|PassengerCount|DepartureAirportID|DestinationAirportID|DepartureTime|FlightDuration|
+--------+--------------+------------------+--------------------+-------------+--------------+
|ATT7791R|            15|               AMS|                 DEN|   1420564394|          1001|
|BER7172M|            17|               KUL|                 LAS|   1420565167|          1848|
|DAU2617A|            12|               CGK|                 SFO|   1420564986|          1811|
|DKZ3042O|            11|               MIA|                 SFO|   1420563927|           538|
|EWH6301Y|            10|               CAN|                 DFW|   1420564967|          1683|
+--------+--------------+------------------+--------------------+-------------+--------------+
only showing top 5 rows



In [10]:
from pyspark.sql.functions import col

# calculate arrivalTime by adding departureTime to flightDuration
flightsInfoDFMerged = flightsInfoDFMerged.withColumn("ArrivalTime", col("FlightDuration") + col("DepartureTime"))

# show 5 for speed
flightsInfoDFMerged.show(5)

+--------+--------------+------------------+--------------------+-------------+--------------+-----------+
|FlightID|PassengerCount|DepartureAirportID|DestinationAirportID|DepartureTime|FlightDuration|ArrivalTime|
+--------+--------------+------------------+--------------------+-------------+--------------+-----------+
|ATT7791R|            15|               AMS|                 DEN|   1420564394|          1001| 1420565395|
|BER7172M|            17|               KUL|                 LAS|   1420565167|          1848| 1420567015|
|DAU2617A|            12|               CGK|                 SFO|   1420564986|          1811| 1420566797|
|DKZ3042O|            11|               MIA|                 SFO|   1420563927|           538| 1420564465|
|EWH6301Y|            10|               CAN|                 DFW|   1420564967|          1683| 1420566650|
+--------+--------------+------------------+--------------------+-------------+--------------+-----------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import from_unixtime

# convert epoch times in departureTime and arrivalTime columns to more readable date/time format
flightsInfoDFMerged = flightsInfoDFMerged.withColumn("DepartureTime", from_unixtime(col("DepartureTime")))
flightsInfoDFMerged = flightsInfoDFMerged.withColumn("ArrivalTime", from_unixtime(col("ArrivalTime")))

# show 5 for speed
flightsInfoDFMerged.show(5)

+--------+--------------+------------------+--------------------+-------------------+--------------+-------------------+
|FlightID|PassengerCount|DepartureAirportID|DestinationAirportID|      DepartureTime|FlightDuration|        ArrivalTime|
+--------+--------------+------------------+--------------------+-------------------+--------------+-------------------+
|ATT7791R|            15|               AMS|                 DEN|2015-01-06 17:13:14|          1001|2015-01-06 17:29:55|
|BER7172M|            17|               KUL|                 LAS|2015-01-06 17:26:07|          1848|2015-01-06 17:56:55|
|DAU2617A|            12|               CGK|                 SFO|2015-01-06 17:23:06|          1811|2015-01-06 17:53:17|
|DKZ3042O|            11|               MIA|                 SFO|2015-01-06 17:05:27|           538|2015-01-06 17:14:25|
|EWH6301Y|            10|               CAN|                 DFW|2015-01-06 17:22:47|          1683|2015-01-06 17:50:50|
+--------+--------------+-------

In [27]:
# collect rows from flightsInfoDFMerged
flightRecords = flightsInfoDFMerged.collect()

# convert the collected rows to a tuple list
finalFlightList = [tuple(row) for row in flightRecords]

print(finalFlightList)

[('ATT7791R', 15, 'AMS', 'DEN', '2015-01-06 17:13:14', 1001, '2015-01-06 17:29:55'), ('BER7172M', 17, 'KUL', 'LAS', '2015-01-06 17:26:07', 1848, '2015-01-06 17:56:55'), ('DAU2617A', 12, 'CGK', 'SFO', '2015-01-06 17:23:06', 1811, '2015-01-06 17:53:17'), ('DKZ3042O', 11, 'MIA', 'SFO', '2015-01-06 17:05:27', 538, '2015-01-06 17:14:25'), ('EWH6301Y', 10, 'CAN', 'DFW', '2015-01-06 17:22:47', 1683, '2015-01-06 17:50:50'), ('FYL5866L', 20, 'ATL', 'HKG', '2015-01-06 17:25:40', 1751, '2015-01-06 17:54:51'), ('GMO5938W', 25, 'LHR', 'PEK', '2015-01-06 17:11:57', 1057, '2015-01-06 17:29:34'), ('HUR0974O', 7, 'DEN', 'PVG', '2015-01-06 17:15:25', 1398, '2015-01-06 17:38:43'), ('HZT2506M', 14, 'IAH', 'AMS', '2015-01-06 17:12:04', 1044, '2015-01-06 17:29:28'), ('JVY9791G', 20, 'PVG', 'FCO', '2015-01-06 17:16:01', 1189, '2015-01-06 17:35:50'), ('KJR6646J', 23, 'IAH', 'BKK', '2015-01-06 17:26:43', 1928, '2015-01-06 17:58:51'), ('MBA8071P', 16, 'KUL', 'PEK', '2015-01-06 17:04:16', 572, '2015-01-06 17:13:

In [28]:
import math

# convert airportsData into a tuple list the same way as before only selecting necessary columns
airportsRecords = airportsData.select("AirportID", "Latitude", "Longitude").collect()
airportsRecordsList = [tuple(row) for row in airportsRecords]

# empty list for distances of flights
distancesData = []

# iterate through each flight
for flight in finalFlightList:

    # assign flightID, depAirport and desAirport for clarity
    flightID = flight[0]
    depAirport = flight[2]
    desAirport = flight[3]

    # iterate through airportsRecordsList to find the departure airport's information
    depAirportInfo = next((airport for airport in airportsRecordsList if airport[0] == depAirport), None)
    # iterate through airportsRecordsList to find the destination airport's information
    desAirportInfo = next((airport for airport in airportsRecordsList if airport[0] == desAirport), None)

    # check to ensure that we have all the information we need
    if depAirportInfo and desAirportInfo:

        # get latitude and longitude for departure and destination airports and simulateneously convert them into radians
        depLat, depLon = math.radians(depAirportInfo[1]), math.radians(depAirportInfo[2])
        desLat, desLon = math.radians(desAirportInfo[1]), math.radians(desAirportInfo[2])

        # calculate distance using Haversine formula
        # haversine formula from https://www.movable-type.co.uk/scripts/latlong.html
        diffLat = desLat - depLat
        diffLon = desLon - depLon
        a = math.sin(diffLat / 2) ** 2 + math.cos(depLat) * math.cos(desLat) * math.sin(diffLon / 2) ** 2
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

        # multiply by Earth's radius to get distances in nautical miles
        distNM = 3440.065 * c

        # append distance to distances list
        distancesData.append([flightID, distNM])
    else:
        print("Info missing for ", depAirport, "or", desAirport)

In [30]:
# create RDD from distances list to utilize cluster computing for speed
rdd = spark.sparkContext.parallelize(distancesData)
rddRows = rdd.map(lambda x: Row(FlightID=x[0], Distance=x[1]))

# create distances dataframe from RDD of rows
distancesDF = spark.createDataFrame(rddRows)

distancesDF.show()

+--------+------------------+
|FlightID|          Distance|
+--------+------------------+
|ATT7791R| 4169.868902347976|
|BER7172M| 7693.953583856841|
|DAU2617A| 7538.467918971757|
|DKZ3042O| 2242.815642331329|
|EWH6301Y|7007.3349391155825|
|FYL5866L|7288.2440255521215|
|GMO5938W| 4402.109680576151|
|HUR0974O| 5820.650229676103|
|HZT2506M| 4346.269663648936|
|JVY9791G| 4950.959340864341|
|KJR6646J| 8025.213556825649|
|MBA8071P|2383.0795801155564|
|MOO1786A|  765.880606529188|
|PME8178S| 5502.890889477019|
|QHU1140O|   4717.5544852904|
|RPG3351U| 1557.699990276518|
|RUM0422W| 807.5904051437628|
|SOH3431A|1042.1373565606136|
|SQU6245R| 4367.076218559275|
|TMV7633W| 3535.134495309536|
+--------+------------------+
only showing top 20 rows



In [31]:
# merge the distances dataframe with main flight info dataframe
flightsInfoDFMergedWithDist = flightsInfoDFMerged.join(distancesDF, on='FlightID', how='left')

flightsInfoDFMergedWithDist.show()

+--------+--------------+------------------+--------------------+-------------------+--------------+-------------------+------------------+
|FlightID|PassengerCount|DepartureAirportID|DestinationAirportID|      DepartureTime|FlightDuration|        ArrivalTime|          Distance|
+--------+--------------+------------------+--------------------+-------------------+--------------+-------------------+------------------+
|ATT7791R|            15|               AMS|                 DEN|2015-01-06 17:13:14|          1001|2015-01-06 17:29:55| 4169.868902347976|
|BER7172M|            17|               KUL|                 LAS|2015-01-06 17:26:07|          1848|2015-01-06 17:56:55| 7693.953583856841|
|DAU2617A|            12|               CGK|                 SFO|2015-01-06 17:23:06|          1811|2015-01-06 17:53:17| 7538.467918971757|
|DKZ3042O|            11|               MIA|                 SFO|2015-01-06 17:05:27|           538|2015-01-06 17:14:25| 2242.815642331329|
|EWH6301Y|          

In [None]:
# // TECHNIQUE a \\

# create dictionary to hold each passengers' flights by their ID
passengerFlightDict = {}

# collect PassengerID and FlightID columns from passengerData into a new dataframe
passengerFlights = passengerData.select("PassengerID", "FlightID").collect()

# iterate through passengerFlights list to populate passengerFlightDict
for row in passengerFlights:
    passengerID = row["PassengerID"]
    flightID = row["FlightID"]
    if passengerID in passengerFlightDict:
        passengerFlightDict[passengerID].append(flightID)
    else:
        passengerFlightDict[passengerID] = [flightID]

# create dictionary to hold each passengers' flight distances by their ID
passengerDistanceDict = {}

# iterate through each passenger in passengerFlightDict
for passenger, flights in passengerFlightDict.items():
    totalDistance = 0
    # iterate through each flight corresponding to current passenger
    for flightID in flights:
        # cross-reference with flightsInfoDFMergedWithDist to find the distance per flight
        distance = flightsInfoDFMergedWithDist.filter(col('FlightID') == flightID).select('Distance').collect()
        # add to total distance for current passenger
        totalDistance += distance[0]['Distance']

    # assign total distance value to passenger in passengerDistanceDict
    passengerDistanceDict[passenger] = totalDistance

# create RDD from passengerDistanceDict to utilize cluster computing for speed
passengerDistanceRDD = spark.sparkContext.parallelize(list(passengerDistanceDict.items()))

# create passengerDistance dataframe from RDD
passengerDistanceDF = passengerDistanceRDD.toDF(["PassengerID", "Total Distance Flown"])
passengerDistanceDF.show()

In [33]:
# // TECHNIQUE b \\

from pyspark.sql.functions import col

# collect PassengerID and FlightID columns from passengerData into a DataFrame
passengerFlights = passengerData.select("PassengerID", "FlightID")

# merge passenger flights with their distances
passengerFlightDistance = passengerFlights.join(flightsInfoDFMergedWithDist, passengerFlights["FlightID"] == flightsInfoDFMergedWithDist["FlightID"], "left_outer")

# calculate total distance flown by each passenger and store in a new dataframe and rename distance column for clarity
passengerDistance = passengerFlightDistance.groupBy("PassengerID").agg({"Distance": "sum"}).withColumnRenamed("sum(Distance)", "Total Distance Flown")

passengerDistance.show()

+-----------+--------------------+
|PassengerID|Total Distance Flown|
+-----------+--------------------+
| KKP5277HZ7|   58555.91646919763|
| HCA3158QA6|   96993.60074894653|
| POP2875LH3|     81031.601650141|
| EDV2089LK5|   70430.77126985819|
| PUD8209OG3|  115812.65362009435|
| PAJ3974RK1|   34229.24298116658|
| JBE2302VO4|   69002.45039215357|
| PIT2755XC1|   36076.50494670427|
| YMH6360YP0|   76257.83823570098|
| HGO4350KK1|   81796.00102017862|
| SPR4484HA6|  122258.09976971563|
| IEG9308EA5|   42015.63813599509|
| WYU2010YH8|    96735.8614136759|
| CDC0302NN5|   63112.87953247823|
| CKZ3132BR4|   92728.59102689348|
| XFG5747ZT9|   66420.44625193973|
| CYJ0225CH1|   54192.18061353429|
| WTC9125IE5|   59610.49391610161|
| LLZ3798PE3|   84096.07280138526|
| VZY2993ME1|   73690.19854471082|
+-----------+--------------------+
only showing top 20 rows



In [35]:
# select PassengerID and Total Distance Flown and then order by distance flown in descending order
sortedPassengers = passengerDistance.select("PassengerID", "Total Distance Flown").orderBy(col("Total Distance Flown").desc())

# select the PassengerID of the first row after ordering to get the passenger with the most miles
passengerWithMostMiles = sortedPassengers.select("PassengerID").first()

# display the passenger with the highest total miles flown
print("Passenger with highest total miles flown:", passengerWithMostMiles["PassengerID"])

Passenger with highest total miles flown: UES9151GS5
