 # Instructions:
 - For all cells marked with <font color="red">(CODE Needed)</font> below, replace `#CODE_HERE` with your solution


# 1. Load Spark Context (No CODE Needed)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, desc, col, when, max
from functools import reduce

spark = SparkSession.builder.getOrCreate()

# 2. Load some printing functions (No CODE Needed)

In [5]:
#These are some printing helper functions we will use to make the output more clear
from pprint import pprint
def title(s):
    pprint("---- %s -----" %s)    
    
def see(s, v):
    pprint("---- %s -----" %s)
    pprint(v)

# 3. Load each file in a DF and add the year to the data (No CODE Needed)

In [6]:
def createOneYearDF(year):
    file = "../../data/flight-data/csv/%s"%year+"*"
    see("year:"+str(year), file)
    fileDF = spark.read.csv(file, header=True)
    arrDF = fileDF\
                .withColumn("year", lit(str(year)))\
                .withColumn("count", fileDF["count"].cast("int"))
    see("DF count for year:"+str(year), arrDF.count())
    title("DF elements for year:"+str(year))
    for arr in (arrDF.take(5)):
        print(arr)
    return arrDF

separateDFs  = list(map(createOneYearDF, range(2010,2016)))
see("arrays of DF", separateDFs)

'---- year:2010 -----'
'../../data/flight-data/csv/2010*'
'---- DF count for year:2010 -----'
255
'---- DF elements for year:2010 -----'
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1, year='2010')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=264, year='2010')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=69, year='2010')
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=24, year='2010')
Row(DEST_COUNTRY_NAME='Equatorial Guinea', ORIGIN_COUNTRY_NAME='United States', count=1, year='2010')
'---- year:2011 -----'
'../../data/flight-data/csv/2011*'
'---- DF count for year:2011 -----'
255
'---- DF elements for year:2011 -----'
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Saint Martin', count=2, year='2011')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Guinea', count=2, year='2011')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia'

# 4. Merge all DFs into one DF  (No CODE Needed)

In [7]:
allYearsDF = reduce(lambda df1, df2: df1.union(df2),separateDFs)

see("allYearsDF count", allYearsDF.count())
see("allYearsDF sample", allYearsDF.rdd.takeSample(False, 10, 17))

'---- allYearsDF count -----'
1502
'---- allYearsDF sample -----'
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Jamaica', count=714, year='2014'),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=29, year='2011'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Samoa', count=28, year='2010'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=24, year='2011'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Saint Barthelemy', count=58, year='2013'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Afghanistan', count=5, year='2012'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Russia', count=151, year='2014'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Thailand', count=4, year='2015'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sweden', count=68, year='2011'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenad

# 5. Compute a DF containing all flights which departed from US  <font color="red">(CODE Needed)</font>
- Hint: make 1 `filter` transformation

In [8]:
fromUSRdd = allYearsDF.where("ORIGIN_COUNTRY_NAME = 'United States'")
see("fromUSRdd count",fromUSRdd.count())

title("fromUSRdd show(10)")
fromUSRdd.show(10)

'---- fromUSRdd count -----'
772
'---- fromUSRdd show(10) -----'
+--------------------+-------------------+-----+----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|year|
+--------------------+-------------------+-----+----+
|               Egypt|      United States|   24|2010|
|   Equatorial Guinea|      United States|    1|2010|
|          Costa Rica|      United States|  477|2010|
|             Senegal|      United States|   29|2010|
|              Guyana|      United States|   17|2010|
|               Malta|      United States|    1|2010|
|             Bolivia|      United States|   46|2010|
|            Anguilla|      United States|   21|2010|
|Turks and Caicos ...|      United States|  136|2010|
|Saint Vincent and...|      United States|    1|2010|
+--------------------+-------------------+-----+----+
only showing top 10 rows



### Expectated output for step 5.

<pre>
'---- fromUSRdd count -----'
772
'---- fromUSRdd show(10) -----'
+--------------------+-------------------+-----+----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|year|
+--------------------+-------------------+-----+----+
|               Egypt|      United States|   24|2010|
|   Equatorial Guinea|      United States|    1|2010|
|          Costa Rica|      United States|  477|2010|
|             Senegal|      United States|   29|2010|
|              Guyana|      United States|   17|2010|
|               Malta|      United States|    1|2010|
|             Bolivia|      United States|   46|2010|
|            Anguilla|      United States|   21|2010|
|Turks and Caicos ...|      United States|  136|2010|
|Saint Vincent and...|      United States|    1|2010|
+--------------------+-------------------+-----+----+
only showing top 10 rows
</pre>

# 6. Sum all the flights for the same destination and the same year <font color="red">(CODE Needed)</font>
- Hint: make 1 `groupBy` and 1 `sum` transformations

In [15]:
groupedRdd = fromUSRdd.groupBy("DEST_COUNTRY_NAME","year").sum("count").withColumnRenamed("sum(count)", "flight_count")

see("groupedRdd count",groupedRdd.count())

title("groupedRdd show(10)")
groupedRdd.show(10)

'---- groupedRdd count -----'
772
'---- groupedRdd show(10) -----'
+-----------------+----+------------+
|DEST_COUNTRY_NAME|year|flight_count|
+-----------------+----+------------+
|      Saint Lucia|2011|         102|
|          Hungary|2014|           2|
|       Guadeloupe|2014|          43|
|          Nigeria|2014|          49|
|             Fiji|2015|          24|
|           Kuwait|2010|          26|
|   Cayman Islands|2011|         251|
|         Paraguay|2011|          85|
|          Austria|2011|          34|
|        Gibraltar|2014|           1|
+-----------------+----+------------+
only showing top 10 rows



### Exepected output for step  6
<pre>
'---- groupedRdd count -----'
772
'---- groupedRdd take(10) -----'
+-----------------+----+------------+
|DEST_COUNTRY_NAME|year|flight_count|
+-----------------+----+------------+
|      Saint Lucia|2011|         102|
|          Hungary|2014|           2|
|       Guadeloupe|2014|          43|
|          Nigeria|2014|          49|
|             Fiji|2015|          24|
|           Kuwait|2010|          26|
|   Cayman Islands|2011|         251|
|         Paraguay|2011|          85|
|          Austria|2011|          34|
|        Gibraltar|2014|           1|
+-----------------+----+------------+
only showing top 10 rows
 </pre>

# 7. Sort the summed data by the highest sum <font color="red">(CODE Needed)</font>
- Hint: make 1 `orderBy` transformation


In [16]:
sortedByCountRdd = groupedRdd.sort(desc("flight_count"))

see("sortedByCountRdd count",sortedByCountRdd.count())

title("sortedByCountRdd show(10)")
sortedByCountRdd.show(10)

'---- sortedByCountRdd count -----'
772
'---- sortedByCountRdd show(10) -----'
+-----------------+----+------------+
|DEST_COUNTRY_NAME|year|flight_count|
+-----------------+----+------------+
|    United States|2015|      370002|
|    United States|2014|      358354|
|    United States|2011|      352742|
|    United States|2010|      348113|
|    United States|2012|      347452|
|    United States|2013|      343132|
|           Canada|2011|        8514|
|           Canada|2015|        8399|
|           Canada|2010|        8271|
|           Canada|2012|        8034|
+-----------------+----+------------+
only showing top 10 rows



### Exepected output for step  7
<pre>
'---- sortedByCountRdd count -----'
772
'---- sortedByCountRdd show(10) -----'
+-----------------+----+------------+
|DEST_COUNTRY_NAME|year|flight_count|
+-----------------+----+------------+
|    United States|2015|      370002|
|    United States|2014|      358354|
|    United States|2011|      352742|
|    United States|2010|      348113|
|    United States|2012|      347452|
|    United States|2013|      343132|
|           Canada|2011|        8514|
|           Canada|2015|        8399|
|           Canada|2010|        8271|
|           Canada|2012|        8034|
+-----------------+----+------------+
only showing top 10 rows
 </pre>

# 8. Compute the Sum of all Trips <font color="red">(CODE Needed)</font>
- Hint: make 1 `agg` or 1 `groupBy` and 1 `sum` transformations


In [39]:
sumAllTrips = sortedByCountRdd.agg({'flight_count':'sum'}).withColumnRenamed("sum(flight_count)", "total_count")

title("sumAllTrips")
sumAllTrips.show()

'---- sumAllTrips -----'
+-----------+
|total_count|
+-----------+
|    2352430|
+-----------+



### Exepected output for step  8
<pre>
'---- sumAllTrips -----'
+-----------+
|total_count|
+-----------+
|    2352430|
+-----------+
 </pre>

# 9. Compute the Sum of Each Year's Trip <font color="red">(CODE Needed)</font>
- Hint: make 1 `groupBy` , 1 `sum` transformations and 1 `collectAsMap`


In [95]:
TPY = sortedByCountRdd.groupby(['year']).sum('flight_count').rdd.collectAsMap()
sortedByCountRdd.persist()

see("TPY", TPY)

'---- TPY -----'
{'2010': 385450,
 '2011': 390663,
 '2012': 385290,
 '2013': 381039,
 '2014': 398022,
 '2015': 411966}


### Exepected output for step  9
<pre>
'---- TPY -----'
{'2010': 385450,
 '2011': 390663,
 '2012': 385290,
 '2013': 381039,
 '2014': 398022,
 '2015': 411966}
 </pre>

# 10. Use the above sums to compute percent Per Year, percent from Total for each country, year pair <font color="red">(CODE Needed)</font>
- Hint: make 1 `groupBy`, 1 `pivote` transformations

In [105]:
stats = sortedByCountRdd.groupBy('DEST_COUNTRY_NAME').pivot('year').sum('flight_count').orderBy('DEST_COUNTRY_NAME').fillna(0).withColumnRenamed('DEST_COUNTRY_NAME', "Country")
see("stats count",stats.count())
stats.persist()

title("stats show(10)")
stats.show(10)

'---- stats count -----'
167
'---- stats show(10) -----'
+-------------------+----+----+----+----+----+----+
|            Country|2010|2011|2012|2013|2014|2015|
+-------------------+----+----+----+----+----+----+
|        Afghanistan|  11|   8|   5|   0|   0|   0|
|            Algeria|   0|   0|   0|   2|   9|   4|
|             Angola|  14|  13|  12|  12|  13|  15|
|           Anguilla|  21|  21|  19|  22|  34|  41|
|Antigua and Barbuda| 123| 146| 145| 123| 115| 126|
|          Argentina| 184| 183| 208| 176| 195| 180|
|              Aruba| 359| 347| 349| 352| 351| 346|
|          Australia| 290| 280| 277| 295| 293| 329|
|            Austria|  36|  34|  34|  35|  47|  62|
|         Azerbaijan|   1|   0|   0|   3|   7|  21|
+-------------------+----+----+----+----+----+----+
only showing top 10 rows



### Exepected output for step  10
<pre>
'---- stats count -----'
167
'---- stats show(10) -----'
+-------------------+----+----+----+----+----+----+
|            Country|2010|2011|2012|2013|2014|2015|
+-------------------+----+----+----+----+----+----+
|        Afghanistan|  11|   8|   5|   0|   0|   0|
|            Algeria|   0|   0|   0|   2|   9|   4|
|             Angola|  14|  13|  12|  12|  13|  15|
|           Anguilla|  21|  21|  19|  22|  34|  41|
|Antigua and Barbuda| 123| 146| 145| 123| 115| 126|
|          Argentina| 184| 183| 208| 176| 195| 180|
|              Aruba| 359| 347| 349| 352| 351| 346|
|          Australia| 290| 280| 277| 295| 293| 329|
|            Austria|  36|  34|  34|  35|  47|  62|
|         Azerbaijan|   1|   0|   0|   3|   7|  21|
+-------------------+----+----+----+----+----+----+
only showing top 10 rows
 </pre>

# 11. Transform the above table into the format below <font color="red">(CODE Needed)</font>
- Hint: use `withColumn`
- use the function `percent`
- use `TPY` as a lookup table


In [132]:
def percent(number, total):
    return 100.0* number/float(total)

totalFlightCount = sumAllTrips.first()[0]

years = list(map(lambda year: year[0], 
                 sortedByCountRdd.select("year").distinct().orderBy("year").collect()))

Table = stats.withColumn("PcntTotal-2010", percent(col("2010"),totalFlightCount))\
             .withColumn("PcntYear-2010", percent(col("2010"),int(TPY[years[0]])))\
             .withColumn("PcntTotal-2011", percent(col("2011"),totalFlightCount))\
             .withColumn("PcntYear-2011", percent(col("2011"),int(TPY[years[1]])))\
             .withColumn("PcntTotal-2012", percent(col("2012"),totalFlightCount))\
             .withColumn("PcntYear-2012", percent(col("2012"),int(TPY[years[2]])))\
             .withColumn("PcntTotal-2013", percent(col("2013"),totalFlightCount))\
             .withColumn("PcntYear-2013", percent(col("2013"),int(TPY[years[3]])))\
             .withColumn("PcntTotal-2014", percent(col("2014"),totalFlightCount))\
             .withColumn("PcntYear-2014", percent(col("2014"),int(TPY[years[4]])))\
             .withColumn("PcntTotal-2015", percent(col("2015"),totalFlightCount))\
             .withColumn("PcntYear-2015", percent(col("2015"),int(TPY[years[5]])))\
        
        
see("Table count",Table.count())

title("Table show(10)")
Table.show(10)

'---- Table count -----'
167
'---- Table show(10) -----'
+-------------------+----+----+----+----+----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            Country|2010|2011|2012|2013|2014|2015|      PcntTotal-2010|       PcntYear-2010|      PcntTotal-2011|       PcntYear-2011|      PcntTotal-2012|       PcntYear-2012|      PcntTotal-2013|       PcntYear-2013|      PcntTotal-2014|       PcntYear-2014|      PcntTotal-2015|       PcntYear-2015|
+-------------------+----+----+----+----+----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        Afghanistan|  11|   8|

### Exepected output for step  11
<pre>
'---- Table count -----'
167
'---- Table show(10) -----'
+-------------------+----+----+----+----+----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            Country|2010|2011|2012|2013|2014|2015|      PcntTotal-2010|       PcntYear-2010|      PcntTotal-2011|       PcntYear-2011|      PcntTotal-2012|       PcntYear-2012|      PcntTotal-2013|       PcntYear-2013|      PcntTotal-2014|       PcntYear-2014|      PcntTotal-2015|       PcntYear-2015|
+-------------------+----+----+----+----+----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        Afghanistan|  11|   8|   5|   0|   0|   0|4.676015864446551E-4|0.002853807238292...|3.400738810506582...|0.002047800789939...|2.125461756566614E-4|0.001297723792468...|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|
|            Algeria|   0|   0|   0|   2|   9|   4|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|8.501847026266456E-5|  5.2488065526101E-4|3.825831161819905...|0.002261181542728794|1.700369405253291...|9.709539136724876E-4|
|             Angola|  14|  13|  12|  12|  13|  15| 5.95129291838652E-4|0.003632118303281...|5.526200567073197E-4|0.003327676283651...|5.101108215759874E-4|0.003114537101923...|5.101108215759874E-4| 0.00314928393156606|5.526200567073197E-4|0.003266151117274...|6.376385269699842E-4|0.003641077176271...|
|           Anguilla|  21|  21|  19|  22|  34|  41|8.926939377579779E-4|0.005448177454922...|8.926939377579779E-4|0.005375477073590...|8.076754674953134E-4|0.004931350411378442|9.352031728893102E-4|0.005773687207871...|0.001445313994465...| 0.00854224138364211|0.001742878640384...|0.009952277615142998|
|Antigua and Barbuda| 123| 146| 145| 123| 115| 126|0.005228635921153871| 0.03191075366454793|0.006206348329174514|0.037372364416389574|0.006163839094043181| 0.03763398998157232|0.005228635921153871|0.032280160298552116|0.004888562040103212|0.028892875268201255|0.005356163626547867| 0.03058504828068336|
|          Argentina| 184| 183| 208| 176| 195| 180|0.007821699264165139|  0.0477364119859904|0.007779190029033...|0.046843443069858166|0.008841920907317115|0.053985309766669265|0.007481625383114...|0.046189497662968884|0.008289300850609796| 0.04899226675912387|0.007651662323639811| 0.04369292611526194|
|              Aruba| 359| 347| 349| 352| 351| 346| 0.01526081541214829| 0.09313789077701388|0.014750704590572301| 0.08882335926361083|0.014835723060834966| 0.09058112071426717|0.014963250766228963| 0.09237899532593777| 0.01492074153109763| 0.08818608016642296| 0.01470819535544097| 0.08398751353267017|
|          Australia| 290| 280| 277| 295| 293| 329|0.012327678188086362| 0.07523673628226749|0.011902585836773038| 0.07167302764787041|0.011775058131379043| 0.07189389810272781|0.012540224363743023| 0.07741989665099898|0.012455205893480358| 0.07361402133550407|0.013985538358208321|  0.0798609593995621|
|            Austria|  36|  34|  34|  35|  47|  62|0.001530332464727...|0.009339732779867688|0.001445313994465...|0.008703153357241408|0.001445313994465...|0.008824521788782475|0.001487823229596...|0.009185411467067675|0.001997934051172617|0.011808392500917034|0.002635572578142...|0.015049785661923557|
|         Azerbaijan|   1|   0|   0|   3|   7|  21|4.250923513133228E-5|2.594370216629913E-4|                 0.0|                 0.0|                 0.0|                 0.0|1.275277053939968...| 7.87320982891515E-4| 2.97564645919326E-4|0.001758696755455...|8.926939377579779E-4|0.005097508046780559|
+-------------------+----+----+----+----+----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 10 rows
</pre>

# 12. Final output


In [133]:
Table.toPandas()

Unnamed: 0,Country,2010,2011,2012,2013,2014,2015,PcntTotal-2010,PcntYear-2010,PcntTotal-2011,PcntYear-2011,PcntTotal-2012,PcntYear-2012,PcntTotal-2013,PcntYear-2013,PcntTotal-2014,PcntYear-2014,PcntTotal-2015,PcntYear-2015
0,Afghanistan,11,8,5,0,0,0,0.000468,0.002854,0.000340,0.002048,0.000213,0.001298,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
1,Algeria,0,0,0,2,9,4,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000085,0.000525,0.000383,0.002261,0.000170,0.000971
2,Angola,14,13,12,12,13,15,0.000595,0.003632,0.000553,0.003328,0.000510,0.003115,0.000510,0.003149,0.000553,0.003266,0.000638,0.003641
3,Anguilla,21,21,19,22,34,41,0.000893,0.005448,0.000893,0.005375,0.000808,0.004931,0.000935,0.005774,0.001445,0.008542,0.001743,0.009952
4,Antigua and Barbuda,123,146,145,123,115,126,0.005229,0.031911,0.006206,0.037372,0.006164,0.037634,0.005229,0.032280,0.004889,0.028893,0.005356,0.030585
5,Argentina,184,183,208,176,195,180,0.007822,0.047736,0.007779,0.046843,0.008842,0.053985,0.007482,0.046189,0.008289,0.048992,0.007652,0.043693
6,Aruba,359,347,349,352,351,346,0.015261,0.093138,0.014751,0.088823,0.014836,0.090581,0.014963,0.092379,0.014921,0.088186,0.014708,0.083988
7,Australia,290,280,277,295,293,329,0.012328,0.075237,0.011903,0.071673,0.011775,0.071894,0.012540,0.077420,0.012455,0.073614,0.013986,0.079861
8,Austria,36,34,34,35,47,62,0.001530,0.009340,0.001445,0.008703,0.001445,0.008825,0.001488,0.009185,0.001998,0.011808,0.002636,0.015050
9,Azerbaijan,1,0,0,3,7,21,0.000043,0.000259,0.000000,0.000000,0.000000,0.000000,0.000128,0.000787,0.000298,0.001759,0.000893,0.005098
