In [24]:
#Import libraries
import findspark
findspark.init()
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from math import sin, cos, sqrt, atan2, radians
#Begin spark session
spark = SparkSession.builder.appName('Assessment').getOrCreate()

In [25]:
#Import passenger file as dataframe
sc = spark.sparkContext
file_passenger = sc.textFile('AComp_Passenger_data_no_error.csv').filter(lambda x: x)
rdd_passenger = file_passenger.map(lambda a: a.split(','))
#Import airport file as dataframe
file_airport = sc.textFile('Top30_airports_LatLong.csv').filter(lambda x: x)
rdd_airport = file_airport.map(lambda a: a.split(','))

In [3]:
#1.Determine number of flights from each airport, then provide list of any not used airports
#Obtain count of id flight's
#Essential because of duplicates of exact flight that are inside information
map_of_flight = rdd_passenger.map(lambda a: ((a[1], a[2]), 1))
sum_of_map_flight = map_of_flight.reduceByKey(lambda a,b: a+b)
sum_of_map_flight.collect()

[(('SOH3431A', 'ORD'), 18),
 (('MBA8071P', 'KUL'), 16),
 (('RUM0422W', 'MUC'), 14),
 (('ATT7791R', 'AMS'), 15),
 (('DKZ3042O', 'MIA'), 11),
 (('ULZ8130D', 'CAN'), 27),
 (('VYU9214I', 'ORD'), 15),
 (('VYW5940P', 'LAS'), 17),
 (('TMV7633W', 'CGK'), 15),
 (('YZO4444S', 'BKK'), 17),
 (('XIL3623J', 'PEK'), 13),
 (('RPG3351U', 'HND'), 13),
 (('SQU6245R', 'DEN'), 21),
 (('XXQ4064B', 'JFK'), 25),
 (('PME8178S', 'DEN'), 18),
 (('MOO1786A', 'MAD'), 13),
 (('HUR0974O', 'DEN'), 7),
 (('GMO5938W', 'LHR'), 25),
 (('DAU2617A', 'CGK'), 12),
 (('WPW9201U', 'DFW'), 11),
 (('QHU1140O', 'CDG'), 21),
 (('HZT2506M', 'IAH'), 14),
 (('EWH6301Y', 'CAN'), 10),
 (('WSK1289Z', 'CLT'), 21),
 (('FYL5866L', 'ATL'), 20),
 (('BER7172M', 'KUL'), 17),
 (('JVY9791G', 'PVG'), 20),
 (('VDC9164W', 'FCO'), 15),
 (('KJR6646J', 'IAH'), 23),
 (('XOY7948U', 'ATL'), 16)]

In [4]:
#Occurrences counted as airport codes
count_of_codes_for_airport = sum_of_map_flight.map(lambda a: (a[0][1], a[1]))
sum_of_codes_for_airport = count_of_codes_for_airport.reduceByKey(lambda a,b: a+b)
sum_of_codes_for_airport.collect()

[('ORD', 33),
 ('KUL', 33),
 ('MUC', 14),
 ('AMS', 15),
 ('MIA', 11),
 ('CAN', 37),
 ('CGK', 27),
 ('PEK', 13),
 ('HND', 13),
 ('DEN', 46),
 ('JFK', 25),
 ('DFW', 11),
 ('IAH', 37),
 ('CLT', 21),
 ('PVG', 20),
 ('FCO', 15),
 ('LAS', 17),
 ('BKK', 17),
 ('MAD', 13),
 ('LHR', 25),
 ('CDG', 21),
 ('ATL', 36)]

In [5]:
#Generate pandas dataframe
cols = ['Name of Airport', 'Count of Flights']
count_of_flights_for_each_airport = pd.DataFrame(sum_of_codes_for_airport.collect(), columns=cols)
count_of_flights_for_each_airport

Unnamed: 0,Name of Airport,Count of Flights
0,ORD,33
1,KUL,33
2,MUC,14
3,AMS,15
4,MIA,11
5,CAN,37
6,CGK,27
7,PEK,13
8,HND,13
9,DEN,46


In [6]:
#Obtain airport cods and titles
map_of_airport_names = rdd_airport.map(lambda a: (a[1], a[0]))
#Merge rdd's
dirty_airport_for_each_flights = map_of_airport_names.join(sum_of_codes_for_airport)
dirty_airport_for_each_flights.collect()

[('PEK', ('BEIJING', 13)),
 ('ORD', ('CHICAGO', 33)),
 ('DFW', ('DALLAS/FORT WORTH', 11)),
 ('CGK', ('JAKARTA', 27)),
 ('PVG', ('SHANGHAI', 20)),
 ('MIA', ('MIAMI', 11)),
 ('MUC', ('MUNICH', 14)),
 ('ATL', ('ATLANTA', 36)),
 ('CDG', ('PARIS', 21)),
 ('MAD', ('MADRID', 13)),
 ('BKK', ('BANGKOK', 17)),
 ('LAS', ('LAS VEGAS', 17)),
 ('HND', ('TOKYO', 13)),
 ('DEN', ('DENVER', 46)),
 ('AMS', ('AMSTERDAM', 15)),
 ('JFK', ('NEW YORK', 25)),
 ('CAN', ('GUANGZHOU', 37)),
 ('IAH', ('HOUSTON', 37)),
 ('CLT', ('CHARLOTTE', 21)),
 ('KUL', ('KUALA LUMPUR', 33)),
 ('FCO', ('ROME', 15)),
 ('LHR', ('LONDON', 25))]

In [7]:
#These are airports that have been flown from
clean_airport_for_each_flights = dirty_airport_for_each_flights.map(lambda a: (a[1][0], a[1][1]))
formed_clean_airport_for_each_flights = clean_airport_for_each_flights.sortBy(lambda a: a[1], ascending=False)
formed_clean_airport_for_each_flights.collect()

[('DENVER', 46),
 ('GUANGZHOU', 37),
 ('HOUSTON', 37),
 ('ATLANTA', 36),
 ('CHICAGO', 33),
 ('KUALA LUMPUR', 33),
 ('JAKARTA', 27),
 ('NEW YORK', 25),
 ('LONDON', 25),
 ('PARIS', 21),
 ('CHARLOTTE', 21),
 ('SHANGHAI', 20),
 ('BANGKOK', 17),
 ('LAS VEGAS', 17),
 ('AMSTERDAM', 15),
 ('ROME', 15),
 ('MUNICH', 14),
 ('BEIJING', 13),
 ('MADRID', 13),
 ('TOKYO', 13),
 ('DALLAS/FORT WORTH', 11),
 ('MIAMI', 11)]

In [8]:
#Airports that have been flown from 
flights_with_airport_names = clean_airport_for_each_flights.map(lambda a: a[0])
#All airports
every_names_of_airports = rdd_airport.map(lambda a: a[0])
#Subtract airports flown from all airports
count_of_airports_not_used = every_names_of_airports.subtract(flights_with_airport_names)
count_of_airports_not_used_formed = count_of_airports_not_used.sortBy(lambda a: a)

In [10]:
#Generate pandas dataframe
cols = ['Airports not used']
count_of_airports_not_used = pd.DataFrame(count_of_airports_not_used_formed.collect(), columns=cols)
count_of_airports_not_used

Unnamed: 0,Airports not used
0,DUBAI
1,FRANKFURT
2,HONG KONG
3,ISTANBUL
4,LOS ANGELES
5,PHOENIX
6,SAN FRANCISCO
7,SINGAPORE


In [11]:
#2.Create list of flights [ID flight; inc number of passengers, codes, departure & arrival times]
#Transforms unix time to hh:mm
def transform_time(time: int) -> str:
    time_date = datetime.fromtimestamp(time)
    return time_date.strftime("%H:%M")

In [12]:
#Obtains list of flights and number of passengers on flight
mapped_rdd = rdd_passenger.map(lambda x: ((x[1], x[2], x[3], x[4], x[5]), 1))
flights_on_list = mapped_rdd.reduceByKey(lambda a,b: a+b)
flights_on_list.collect()

[(('XXQ4064B', 'JFK', 'FRA', '1420563917', '802'), 25),
 (('SOH3431A', 'ORD', 'MIA', '1420563649', '250'), 18),
 (('PME8178S', 'DEN', 'PEK', '1420564409', '1322'), 18),
 (('MBA8071P', 'KUL', 'PEK', '1420563856', '572'), 16),
 (('MOO1786A', 'MAD', 'FRA', '1420563408', '184'), 13),
 (('HUR0974O', 'DEN', 'PVG', '1420564525', '1398'), 7),
 (('EWH6301Y', 'CAN', 'DFW', '1420564967', '1683'), 10),
 (('VYW5940P', 'LAS', 'SIN', '1420565203', '1843'), 17),
 (('WSK1289Z', 'CLT', 'DEN', '1420563542', '278'), 21),
 (('BER7172M', 'KUL', 'LAS', '1420565167', '1848'), 17),
 (('JVY9791G', 'PVG', 'FCO', '1420564561', '1189'), 20),
 (('VDC9164W', 'FCO', 'LAS', '1420564698', '1276'), 15),
 (('XOY7948U', 'ATL', 'LHR', '1420564038', '877'), 16),
 (('SQU6245R', 'DEN', 'FRA', '1420564460', '1049'), 21),
 (('GMO5938W', 'LHR', 'PEK', '1420564317', '1057'), 25),
 (('DAU2617A', 'CGK', 'SFO', '1420564986', '1811'), 12),
 (('RUM0422W', 'MUC', 'MAD', '1420563539', '194'), 14),
 (('ATT7791R', 'AMS', 'DEN', '142056439

In [13]:
#Flight time obtained in minutes (x60 obtain seconds for unix time)
mapped_flights_on_list = flights_on_list.map(
   lambda a: (
        a[0][0],
        a[0][1],
        a[0][2],
       transform_time(int(a[0][3])),
       transform_time(int(a[0][3]) + int(a[0][4])*60),
       a[1]
   )
)
mapped_flights_on_list.collect()

[('XXQ4064B', 'JFK', 'FRA', '17:05', '06:27', 25),
 ('SOH3431A', 'ORD', 'MIA', '17:00', '21:10', 18),
 ('PME8178S', 'DEN', 'PEK', '17:13', '15:15', 18),
 ('MBA8071P', 'KUL', 'PEK', '17:04', '02:36', 16),
 ('MOO1786A', 'MAD', 'FRA', '16:56', '20:00', 13),
 ('HUR0974O', 'DEN', 'PVG', '17:15', '16:33', 7),
 ('EWH6301Y', 'CAN', 'DFW', '17:22', '21:25', 10),
 ('VYW5940P', 'LAS', 'SIN', '17:26', '00:09', 17),
 ('WSK1289Z', 'CLT', 'DEN', '16:59', '21:37', 21),
 ('BER7172M', 'KUL', 'LAS', '17:26', '00:14', 17),
 ('JVY9791G', 'PVG', 'FCO', '17:16', '13:05', 20),
 ('VDC9164W', 'FCO', 'LAS', '17:18', '14:34', 15),
 ('XOY7948U', 'ATL', 'LHR', '17:07', '07:44', 16),
 ('SQU6245R', 'DEN', 'FRA', '17:14', '10:43', 21),
 ('GMO5938W', 'LHR', 'PEK', '17:11', '10:48', 25),
 ('DAU2617A', 'CGK', 'SFO', '17:23', '23:34', 12),
 ('RUM0422W', 'MUC', 'MAD', '16:58', '20:12', 14),
 ('ATT7791R', 'AMS', 'DEN', '17:13', '09:54', 15),
 ('WPW9201U', 'DFW', 'PEK', '17:21', '17:33', 11),
 ('DKZ3042O', 'MIA', 'SFO', '17:

In [14]:
#Output information in dataframe
#Time of arrival are less than time of departure for day after
cols = ['ID Flight', 'From Airport', 'To Airport', 'Time of Departure', 'Time of Arrival', 'Count of Passengers']
count_of_flights_on_list = pd.DataFrame(mapped_flights_on_list.collect(), columns=cols)
count_of_flights_on_list

Unnamed: 0,ID Flight,From Airport,To Airport,Time of Departure,Time of Arrival,Count of Passengers
0,XXQ4064B,JFK,FRA,17:05,06:27,25
1,SOH3431A,ORD,MIA,17:00,21:10,18
2,PME8178S,DEN,PEK,17:13,15:15,18
3,MBA8071P,KUL,PEK,17:04,02:36,16
4,MOO1786A,MAD,FRA,16:56,20:00,13
5,HUR0974O,DEN,PVG,17:15,16:33,7
6,EWH6301Y,CAN,DFW,17:22,21:25,10
7,VYW5940P,LAS,SIN,17:26,00:09,17
8,WSK1289Z,CLT,DEN,16:59,21:37,21
9,BER7172M,KUL,LAS,17:26,00:14,17


In [15]:
#3.Compute line-of-sight (nautical) miles for each flight and total travelled by each passenger,output passenger earned highest air miles.
#Calculating miles between nautical
def miles_between_nautical(Latitude1, Longitude1, Latitude2, Longitude2):
    Latitude1 = float(Latitude1)
    Longitude1 = float(Longitude1)
    Latitude2 = float(Latitude2)
    Longitude2 = float(Longitude2)
    #Radius of the earth in nautical miles
    Radius = 3440.064794816613
    Latitude1, Longitude1, Latitude2, Longitude2 = map(radians, [Latitude1, Longitude1, Latitude2, Longitude2])
    DistanceLongitude = Longitude2 - Longitude1
    DistanceLatitude = Latitude2 - Latitude1
    B = sin(DistanceLatitude/2)**2 + cos(Latitude1) * cos(Latitude2) * sin(DistanceLongitude/2)**2
    A = 2 * atan2(sqrt(B), sqrt(1-B))
    Distance = Radius * A
    return Distance

In [16]:
#Obtain id flight, from, & to
#Format(origin,id flight)
map_of_from_airport = rdd_passenger.map(lambda a: (a[2], (a[1])))
#Format(destination,id flight)
map_of_to_airport = rdd_passenger.map(lambda a: (a[3], a[1]))
#Format(code,Latitude,Longitude)
map_of_airport = rdd_airport.map(lambda a: (a[1], (a[2], a[3])))

In [18]:
#Format('from', ('id flight', 'Latitude', 'Longitude')))
Latitude_Longitude_of_from_airport = map_of_from_airport.join(map_of_airport)
#Format('id flight', ('from', 'Latitude', 'Longitude')))
map_of_Latitude_Longitude_of_from_airport = Latitude_Longitude_of_from_airport.map(lambda a: (a[1][0], ((a[0], a[1][1][0], a[1][1][1]))))
Latitude_Longitude_of_to_airport = map_of_to_airport.join(map_of_airport)
map_of_Latitude_Longitude_of_to_airport = Latitude_Longitude_of_to_airport.map(lambda a: (a[1][0], ((a[0], a[1][1][0], a[1][1][1]))))
#Combine from & to maps
combined = map_of_Latitude_Longitude_of_from_airport.join(map_of_Latitude_Longitude_of_to_airport).distinct()
combined.collect()

[('ULZ8130D',
  (('CAN', '23.392436', '113.298786'), ('DFW', '32.896828', '-97.037997'))),
 ('RPG3351U',
  (('HND', '35.552258', '139.779694'), ('CAN', '23.392436', '113.298786'))),
 ('VDC9164W',
  (('FCO', '41.804475', '12.250797'), ('LAS', '36.080056', '-115.15225'))),
 ('GMO5938W',
  (('LHR', '51.4775', '-0.461389'), ('PEK', '40.080111', '116.584556'))),
 ('SQU6245R',
  (('DEN', '39.861656', '-104.673178'), ('FRA', '50.026421', '8.543125'))),
 ('PME8178S',
  (('DEN', '39.861656', '-104.673178'), ('PEK', '40.080111', '116.584556'))),
 ('WSK1289Z',
  (('CLT', '35.214', '-80.943139'), ('DEN', '39.861656', '-104.673178'))),
 ('MBA8071P',
  (('KUL', '2.745578', '101.709917'), ('PEK', '40.080111', '116.584556'))),
 ('VYU9214I',
  (('ORD', '41.978603', '-87.904842'), ('DXB', '25.252778', '55.364444'))),
 ('WPW9201U',
  (('DFW', '32.896828', '-97.037997'), ('PEK', '40.080111', '116.584556'))),
 ('VYW5940P',
  (('LAS', '36.080056', '-115.15225'), ('SIN', '1.350189', '103.994433'))),
 ('EWH63

In [19]:
#Compute distance for every flight
flight_for_each_miles = combined.map(lambda a: (a[0], miles_between_nautical(a[1][0][1], a[1][0][2], a[1][1][1], a[1][1][2])))

In [20]:
#Output info in dataframe to display as table
cols = ['ID Flight', 'Miles Nuatical']
count_flight_for_each_miles = pd.DataFrame(flight_for_each_miles.collect(), columns=cols)
count_flight_for_each_miles

Unnamed: 0,ID Flight,Miles Nuatical
0,ULZ8130D,7007.334293
1,RPG3351U,1557.699727
2,VDC9164W,5312.029693
3,GMO5938W,4402.109512
4,SQU6245R,4367.0759
5,PME8178S,5502.890698
6,WSK1289Z,1159.942593
7,MBA8071P,2383.079316
8,VYU9214I,6285.384867
9,WPW9201U,6044.707256


In [21]:
#Require passenger id & id flight
#Merge on id flight & map by id passenger
passenger_for_each_flights = rdd_passenger.map(lambda a: (a[1], (a[0])))
dirty_passenger_for_each_miles = passenger_for_each_flights.join(flight_for_each_miles).distinct()
clean_passenger_for_each_miles = dirty_passenger_for_each_miles.map(lambda a: (a[1][0], a[1][1]))
passenger_for_each_miles = clean_passenger_for_each_miles.reduceByKey(lambda a,b: a+b)

In [22]:
#Output information in dataframe
cols = ['ID Passenger', 'Count Flown Miles Nautical']
count_passsenger_for_each_miles = pd.DataFrame(passenger_for_each_miles.collect(), columns=cols)
count_passsenger_for_each_miles

Unnamed: 0,ID Passenger,Count Flown Miles Nautical
0,UES9151GS5,89833.78211
1,POP2875LH3,52116.505887
2,CKZ3132BR4,73434.544105
3,CYJ0225CH1,48371.527482
4,VZY2993ME1,47857.840098
5,HCA3158QA6,77716.974017
6,DAZ3029XA0,87019.633549
7,CDC0302NN5,51759.271706
8,YMH6360YP0,66668.339667
9,XFG5747ZT9,54765.123033


In [23]:
#Obtain passenger having earned highest value for air miles
sorted_passenger_for_each_miles = passenger_for_each_miles.sortBy(lambda a: a[1], ascending=False)
passenger_highest = sorted_passenger_for_each_miles.collect()[0]
print(passenger_highest)

('UES9151GS5', 89833.78211049647)


In [28]:
#End session
spark.stop()