In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext(master = 'local[*]', appName = 'BusyMonth')

In [3]:
passengers_rdd = sc.textFile('lax_passengers_header.csv')

In [4]:
passengers_rdd.take(5)

['DataExtractDate,ReportPeriod,Terminal,Arrival_Departure,Domestic_International,Passenger_Count',
 '05/01/2014 12:00:00 AM,01/01/2006 12:00:00 AM,Imperial Terminal,Arrival,Domestic,490',
 '05/01/2014 12:00:00 AM,01/01/2006 12:00:00 AM,Imperial Terminal,Departure,Domestic,498',
 '05/01/2014 12:00:00 AM,01/01/2006 12:00:00 AM,Misc. Terminal,Arrival,Domestic,753',
 '05/01/2014 12:00:00 AM,01/01/2006 12:00:00 AM,Misc. Terminal,Departure,Domestic,688']

In [5]:
passengers_rdd.count()

5157

In [6]:
split_rdd = passengers_rdd.map(lambda x: x.split(',')).filter(lambda x : x[0] != 'DataExtractDate')

In [8]:
split_rdd.take(1)

[['05/01/2014 12:00:00 AM',
  '01/01/2006 12:00:00 AM',
  'Imperial Terminal',
  'Arrival',
  'Domestic',
  '490']]

In [9]:
report_date_terminal_rdd = split_rdd.map(lambda x: (x[1].split('/'),x[2],x[5]))

In [10]:
report_date_terminal_rdd.take(5)

[(['01', '01', '2006 12:00:00 AM'], 'Imperial Terminal', '490'),
 (['01', '01', '2006 12:00:00 AM'], 'Imperial Terminal', '498'),
 (['01', '01', '2006 12:00:00 AM'], 'Misc. Terminal', '753'),
 (['01', '01', '2006 12:00:00 AM'], 'Misc. Terminal', '688'),
 (['01', '01', '2006 12:00:00 AM'], 'Terminal 1', '401535')]

In [11]:
date_terminal_rdd = report_date_terminal_rdd.map(lambda x : (x[0][0], x[0][2].split(' ')[0],x[1], x[2]))

In [12]:
date_terminal_rdd.take(3)

[('01', '2006', 'Imperial Terminal', '490'),
 ('01', '2006', 'Imperial Terminal', '498'),
 ('01', '2006', 'Misc. Terminal', '753')]

In [13]:
approved_terminals = [f'Terminal {i}'for i in range(1,9)] + ['Tom Bradley International Terminal']

In [14]:
approved_terminals

['Terminal 1',
 'Terminal 2',
 'Terminal 3',
 'Terminal 4',
 'Terminal 5',
 'Terminal 6',
 'Terminal 7',
 'Terminal 8',
 'Tom Bradley International Terminal']

In [38]:
date_approved_terminals = date_terminal_rdd.filter(lambda x : x[2] in approved_terminals).map(lambda x : (x[0],x[1],x[3]))

In [40]:
date_approved_terminals.take(5)

[('01', '2006', '401535'),
 ('01', '2006', '389745'),
 ('01', '2006', '561'),
 ('01', '2006', '98991'),
 ('01', '2006', '163067')]

In [44]:
date_approved_terminals.map(lambda x : ((x[0],x[1]),int(x[2]))).reduceByKey(lambda x,y : x+y).sortByKey()\
.filter(lambda x: x[1]>5000000).map(lambda x : (x[0][0]+"/"+x[0][1], x[1])).sortByKey().collect()

[('01/2014', 5263473),
 ('01/2015', 5339983),
 ('01/2016', 5869759),
 ('01/2017', 6213892),
 ('02/2016', 5280381),
 ('02/2017', 5387511),
 ('03/2006', 5088556),
 ('03/2007', 5240144),
 ('03/2008', 5232233),
 ('03/2012', 5252277),
 ('03/2013', 5385531),
 ('03/2014', 5622137),
 ('03/2015', 5916774),
 ('03/2016', 6277897),
 ('03/2017', 6525366),
 ('04/2006', 5085946),
 ('04/2007', 5172120),
 ('04/2011', 5028056),
 ('04/2012', 5160973),
 ('04/2013', 5157963),
 ('04/2014', 5614336),
 ('04/2015', 5746373),
 ('04/2016', 6109205),
 ('04/2017', 6640015),
 ('05/2006', 5176330),
 ('05/2007', 5256763),
 ('05/2008', 5247792),
 ('05/2011', 5476704),
 ('05/2012', 5412646),
 ('05/2013', 5557314),
 ('05/2014', 5925513),
 ('05/2015', 6148998),
 ('05/2016', 6571091),
 ('05/2017', 6933026),
 ('06/2006', 5509853),
 ('06/2007', 5626291),
 ('06/2008', 5538956),
 ('06/2009', 5040114),
 ('06/2010', 5311142),
 ('06/2011', 5631602),
 ('06/2012', 5739649),
 ('06/2013', 5947409),
 ('06/2014', 6373135),
 ('06/2015'