In this notebook, I answer several questions concerning airplane data.

In [2]:
import pyspark  

In [3]:
sc = pyspark.SparkContext()

Question: Find the top 3 airlines with the most flights

First import data and split.

In [5]:
flightRdd=sc.textFile("datasets/flights.csv").map(lambda line: line.split(","))
print(flightRdd.take(1))

[['1', '3', '4', '1512', '1802', 'WN', '706', 'N491WN', '110', '88', '112', '117', 'LAS', 'DEN', '629', '8', '14', '0', '', '0']]


Then map the carriers.

In [6]:
carrierRdd = flightRdd.map(lambda line: (line[5],1))
print(carrierRdd.take(1))

[('WN', 1)]


Reduce the RDD to get the number of flights for each airline.

In [7]:
reducedCarrierRdd = carrierRdd.reduceByKey(lambda a,b: a+b)
print(reducedCarrierRdd.map(lambda x: (x[1],x[0])).sortByKey(False).take(5))

[(11807, 'WN'), (5819, 'AA'), (5550, 'OO'), (4670, 'MQ'), (4491, 'US')]


Question: Find the top 5 most common routes, between two cities.

First import data and split.

In [8]:
airportsRdd = sc.textFile("datasets/airports.csv").map(lambda line: line.split(","))
print(airportsRdd.take(3))

[['"iata"', '"airport"', '"city"', '"state"', '"country"', '"lat"', '"long"'], ['"00M"', '"Thigpen "', '"Bay Springs"', '"MS"', '"USA"', '31.95376472', '-89.23450472'], ['"00R"', '"Livingston Municipal"', '"Livingston"', '"TX"', '"USA"', '30.68586111', '-95.01792778']]


Create a new RDD using the smallest amount of required data.

In [9]:
cityRdd = airportsRdd.map(lambda line: (line[0],line[2]))
print(cityRdd.sortByKey().take(5))
flightOrigDestRdd = flightRdd.map(lambda line: (line[12], line[13]))
print(flightOrigDestRdd.sortByKey().take(5))

[('"00M"', '"Bay Springs"'), ('"00R"', '"Livingston"'), ('"00V"', '"Colorado Springs"'), ('"01G"', '"Perry"'), ('"01J"', '"Hilliard"')]
[('ABE', 'DTW'), ('ABE', 'DTW'), ('ABE', 'CLE'), ('ABE', 'CLE'), ('ABE', 'CLE')]


Clean the data in order to join.

In [10]:
cityRdd = cityRdd.map(lambda line: (line[0].strip('"'),line[1].strip('"')))
print(cityRdd.take(5))
flightsRdd = cityRdd.join(flightOrigDestRdd)
print(flightsRdd.take(5))

[('iata', 'city'), ('00M', 'Bay Springs'), ('00R', 'Livingston'), ('00V', 'Colorado Springs'), ('01G', 'Perry')]
[('JNU', ('Juneau', 'SIT')), ('JNU', ('Juneau', 'KTN')), ('JNU', ('Juneau', 'SEA')), ('JNU', ('Juneau', 'SEA')), ('JNU', ('Juneau', 'ANC'))]


Map and reduce.

In [11]:
mappedFlightsRdd = flightsRdd.map(lambda line: (line[1], 1))
print(mappedFlightsRdd.take(5))
reducedRdd = mappedFlightsRdd.reduceByKey(lambda a,b:a+b)
reducedRdd = reducedRdd.map(lambda x: (x[1],x[0])).sortByKey(False)
print(reducedRdd.take(5))

[(('Juneau', 'SIT'), 1), (('Juneau', 'KTN'), 1), (('Juneau', 'SEA'), 1), (('Juneau', 'SEA'), 1), (('Juneau', 'ANC'), 1)]
[(164, ('New York', 'BOS')), (150, ('New York', 'DCA')), (140, ('Los Angeles', 'SAN')), (137, ('Los Angeles', 'SFO')), (137, ('Houston', 'DAL'))]


Question: Find the longest departure delay for each airline if its over 15 minutes

In [12]:
flightRdd=sc.textFile("datasets/flights.csv").map(lambda line: line.split(","))
delayRdd = flightRdd.map(lambda line: (line[5], int(line[11]))).filter(lambda line: line[1] > 15).reduceByKey(lambda a,b: max(a,b)).map(lambda x:(x[1],x[0])).sortByKey(False)
print(delayRdd.take(5))

[(960, 'UA'), (953, 'AA'), (759, 'CO'), (757, 'US'), (733, 'NW')]


Question: Find the most common airplane model for flights over 1500 miles

In [15]:
flightRdd=sc.textFile("datasets/flights.csv").map(lambda line: line.split(","))
planeDataRdd = sc.textFile("datasets/plane-data.csv").map(lambda line: line.split(",")).filter(lambda line: len(line) == 9).map(lambda line: (line[0], line[4]))
longDataRdd = flightRdd.map(lambda line: (line[7], int(line[14]))).filter(lambda line: line[1] > 1500).join(planeDataRdd).map(lambda line: (line[1][1],line[1][0])).map(lambda line: (line[0],1)).reduceByKey(lambda a,b:a+b).map(lambda x:(x[1],x[0])).sortByKey(False)

print(longDataRdd.take(5))

[(950, 'A320-232'), (747, '737-7H4'), (505, '757-222'), (455, '757-232'), (366, '737-824')]
