 # Instructions:
 - For all cells marked with <font color="red">(CODE Needed)</font> below, replace `#CODE_HERE` with your solution


# 1. Load Spark Context (No CODE Needed)

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, desc, col, when, max
from functools import reduce

spark = SparkSession.builder.getOrCreate()

# 2. Load some printing functions (No CODE Needed)

In [32]:
#These are some printing helper functions we will use to make the output more clear
from pprint import pprint
def title(s):
    pprint("---- %s -----" %s)    
    
def see(s, v):
    pprint("---- %s -----" %s)
    pprint(v)

# 3. Load each file in an DF and add the year to the data (No CODE Needed)

In [33]:
def createOneYearDF(year):
    file = "../../data/flight-data/csv/%s"%year+"*"
    see("year:"+str(year), file)
    fileDF =spark.read.csv(file, header=True)
    arrDF = fileDF\
                .withColumn("year", lit(str(year)))\
                .withColumn("count", fileDF["count"].cast("int"))
    see("DF count for year:"+str(year), arrDF.count())
    title("DF elements for year:"+str(year))
    for arr in (arrDF.take(5)):
        print(arr)
    return arrDF

separateDFs  = list(map(createOneYearDF, range(2010,2016)))
see("arrays of DF", separateDFs)

'---- year:2010 -----'
'../../data/flight-data/csv/2010*'
'---- DF count for year:2010 -----'
255
'---- DF elements for year:2010 -----'
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1, year='2010')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=264, year='2010')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=69, year='2010')
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=24, year='2010')
Row(DEST_COUNTRY_NAME='Equatorial Guinea', ORIGIN_COUNTRY_NAME='United States', count=1, year='2010')
'---- year:2011 -----'
'../../data/flight-data/csv/2011*'
'---- DF count for year:2011 -----'
255
'---- DF elements for year:2011 -----'
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Saint Martin', count=2, year='2011')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Guinea', count=2, year='2011')
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia'

# 4. Merge all DFs into one DF  (No CODE Needed)

In [34]:
allYearsDF = reduce(lambda df1, df2: df1.union(df2),separateDFs)
allYearsDF.createOrReplaceTempView("flightData")
see("allYearsDF count", allYearsDF.count())
see("allYearsDF sample", allYearsDF.rdd.takeSample(False, 10, 17))

title("SQL show(5)")
spark.sql("SELECT * FROM flightData").show(5)

'---- allYearsDF count -----'
1502
'---- allYearsDF sample -----'
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Jamaica', count=714, year='2014'),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=29, year='2011'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Samoa', count=28, year='2010'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=24, year='2011'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Saint Barthelemy', count=58, year='2013'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Afghanistan', count=5, year='2012'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Russia', count=151, year='2014'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Thailand', count=4, year='2015'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sweden', count=68, year='2011'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenad

# 5. Compute a DF containing all flights which departed from US  <font color="red">(CODE Needed)</font>
- Hint: make 1 `filter` transformation

In [38]:
sqlQuery = 'SELECT * FROM flightData WHERE ORIGIN_COUNTRY_NAME = "United States"'

spark.sql(sqlQuery).createOrReplaceTempView("fromUS")

title("fromUS count")
spark.sql("SELECT count(*) FROM fromUS").show()

title("fromUS show(10)")
spark.sql("SELECT * FROM fromUS").show(10)

'---- fromUS count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- fromUS show(10) -----'
+--------------------+-------------------+-----+----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|year|
+--------------------+-------------------+-----+----+
|               Egypt|      United States|   24|2010|
|   Equatorial Guinea|      United States|    1|2010|
|          Costa Rica|      United States|  477|2010|
|             Senegal|      United States|   29|2010|
|              Guyana|      United States|   17|2010|
|               Malta|      United States|    1|2010|
|             Bolivia|      United States|   46|2010|
|            Anguilla|      United States|   21|2010|
|Turks and Caicos ...|      United States|  136|2010|
|Saint Vincent and...|      United States|    1|2010|
+--------------------+-------------------+-----+----+
only showing top 10 rows



### Expectated output for step 5.

<pre>
'---- fromUS count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- fromUS show(10) -----'
+--------------------+-------------------+-----+----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|year|
+--------------------+-------------------+-----+----+
|               Egypt|      United States|   24|2010|
|   Equatorial Guinea|      United States|    1|2010|
|          Costa Rica|      United States|  477|2010|
|             Senegal|      United States|   29|2010|
|              Guyana|      United States|   17|2010|
|               Malta|      United States|    1|2010|
|             Bolivia|      United States|   46|2010|
|            Anguilla|      United States|   21|2010|
|Turks and Caicos ...|      United States|  136|2010|
|Saint Vincent and...|      United States|    1|2010|
+--------------------+-------------------+-----+----+
only showing top 10 rows
</pre>

# 6. Sum all the flights for the same destination and the same year <font color="red">(CODE Needed)</font>
- Hint: make 1 `groupBy` and 1 `sum` transformations

In [48]:
sqlQuery = 'SELECT DEST_COUNTRY_NAME, year, SUM(count) AS flight_count FROM fromUS GROUP BY DEST_COUNTRY_NAME, year'

spark.sql(sqlQuery).createOrReplaceTempView("groupedFlightData")

title("groupedFlightData count")
spark.sql("SELECT count(*) FROM groupedFlightData").show()

title("groupedFlightData show(10)")
spark.sql("SELECT * FROM groupedFlightData").show(10)

'---- groupedFlightData count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- groupedFlightData show(10) -----'
+-----------------+----+------------+
|DEST_COUNTRY_NAME|year|flight_count|
+-----------------+----+------------+
|      Saint Lucia|2011|         102|
|          Hungary|2014|           2|
|       Guadeloupe|2014|          43|
|          Nigeria|2014|          49|
|             Fiji|2015|          24|
|           Kuwait|2010|          26|
|   Cayman Islands|2011|         251|
|         Paraguay|2011|          85|
|          Austria|2011|          34|
|        Gibraltar|2014|           1|
+-----------------+----+------------+
only showing top 10 rows



### Exepected output for step  6
<pre>
'---- grouped count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- grouped show(10) -----'
+-----------------+----+------------+
|dest_country_name|year|flight_count|
+-----------------+----+------------+
|      Saint Lucia|2011|         102|
|          Hungary|2014|           2|
|       Guadeloupe|2014|          43|
|          Nigeria|2014|          49|
|             Fiji|2015|          24|
|           Kuwait|2010|          26|
|   Cayman Islands|2011|         251|
|         Paraguay|2011|          85|
|          Austria|2011|          34|
|        Gibraltar|2014|           1|
+-----------------+----+------------+
only showing top 10 rows
 </pre>

# 7. Sort the summed data by the highest sum <font color="red">(CODE Needed)</font>
- Hint: make 1 `orderBy` transformation


In [50]:
sqlQuery = 'SELECT * FROM groupedFlightData ORDER BY flight_count DESC'

spark.sql(sqlQuery).createOrReplaceTempView("sortedFlightData")

title("sortedFlightData count")
spark.sql("SELECT count(*) FROM sortedFlightData").show()

title("sortedFlightData show(10)")
spark.sql("SELECT * FROM sortedFlightData").show(10)

'---- sortedFlightData count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- sortedFlightData show(10) -----'
+-----------------+----+------------+
|DEST_COUNTRY_NAME|year|flight_count|
+-----------------+----+------------+
|    United States|2015|      370002|
|    United States|2014|      358354|
|    United States|2011|      352742|
|    United States|2010|      348113|
|    United States|2012|      347452|
|    United States|2013|      343132|
|           Canada|2011|        8514|
|           Canada|2015|        8399|
|           Canada|2010|        8271|
|           Canada|2012|        8034|
+-----------------+----+------------+
only showing top 10 rows



### Exepected output for step  7
<pre>
'---- sortedFlightData count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- sortedFlightData show(10) -----'
+-----------------+----+------------+
|dest_country_name|year|flight_count|
+-----------------+----+------------+
|    United States|2015|      370002|
|    United States|2014|      358354|
|    United States|2011|      352742|
|    United States|2010|      348113|
|    United States|2012|      347452|
|    United States|2013|      343132|
|           Canada|2011|        8514|
|           Canada|2015|        8399|
|           Canada|2010|        8271|
|           Canada|2012|        8034|
+-----------------+----+------------+
only showing top 10 rows
 </pre>

# 8. Compute the Sum of all Trips <font color="red">(CODE Needed)</font>
- Hint: make 1 `sum` transformations


In [51]:
sqlQuery = 'SELECT SUM(flight_count) AS total_count FROM sortedFlightData'

spark.sql(sqlQuery).createOrReplaceTempView("sumAllTrips")

title("sumAllTrips")
spark.sql("SELECT * FROM sumAllTrips").show(10)

'---- sumAllTrips -----'
+-----------+
|total_count|
+-----------+
|    2352430|
+-----------+



### Exepected output for step  8
<pre>
'---- sumAllTrips -----'
+-----------+
|total_count|
+-----------+
|    2352430|
+-----------+
 </pre>

# 9. Compute the Sum of Each Year's Trip <font color="red">(CODE Needed)</font>
- Hint: make 1 `groupBy` , 1 `sum` transformations


In [53]:
sqlQuery = 'SELECT year, SUM(flight_count) AS year_count FROM sortedFlightData GROUP BY year'
spark.sql(sqlQuery).createOrReplaceTempView("TPY")

title("TPY")
spark.sql("SELECT * FROM TPY").show(10)

'---- TPY -----'
+----+----------+
|year|year_count|
+----+----------+
|2012|    385290|
|2014|    398022|
|2013|    381039|
|2011|    390663|
|2015|    411966|
|2010|    385450|
+----+----------+



### Exepected output for step  9
<pre>
'---- TPY -----'
+----+----------+
|year|year_count|
+----+----------+
|2012|    385290|
|2014|    398022|
|2013|    381039|
|2011|    390663|
|2015|    411966|
|2010|    385450|
+----+----------+
 </pre>

# 10. Use the above sums to compute percent Per Year, percent from Total for each country, year pair <font color="red">(CODE Needed)</font>
- Hint: make 1 `join` transformations

In [None]:
totalFlightCount = spark.sql("SELECT * FROM sumAllTrips").first()[0]

sqlQuery = #CODE_HERE

spark.sql(sqlQuery).createOrReplaceTempView("stats")

title("stats count")
spark.sql("SELECT count(*) FROM stats").show()

title("stats show(10)")
spark.sql("SELECT * FROM stats").show(10)

### Exepected output for step  10
<pre>
'---- stats count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- stats show(10) -----'
+-----------+----+------------+-------+----------+
|    Country|year|flight_count|  total|year_count|
+-----------+----+------------+-------+----------+
|Afghanistan|2011|           8|2352430|    390663|
|Afghanistan|2012|           5|2352430|    385290|
|Afghanistan|2010|          11|2352430|    385450|
|    Algeria|2015|           4|2352430|    411966|
|    Algeria|2014|           9|2352430|    398022|
|    Algeria|2013|           2|2352430|    381039|
|     Angola|2015|          15|2352430|    411966|
|     Angola|2014|          13|2352430|    398022|
|     Angola|2011|          13|2352430|    390663|
|     Angola|2012|          12|2352430|    385290|
+-----------+----+------------+-------+----------+
only showing top 10 rows
 </pre>

# 11. Transform the above table into the format below <font color="red">(CODE Needed)</font>
- Hint: use the function `percent`


In [None]:
def percent(number, total):
    return 100.0* number/total

spark.udf.register("percent", percent)

sqlQuery = #CODE_HERE

Table = spark.sql(sqlQuery)

Table.createOrReplaceTempView("Table")

title("Table count")
spark.sql("SELECT count(*) FROM Table").show()

title("Table show(10)")
spark.sql("SELECT * FROM Table").show(10)

### Exepected output for step  11
<pre>
'---- Table count -----'
+--------+
|count(1)|
+--------+
|     772|
+--------+

'---- Table show(10) -----'
+-----------+----+------------+-------+----------+--------------------+--------------------+
|    Country|year|flight_count|  total|year_count|           PcntTotal|            PcntYear|
+-----------+----+------------+-------+----------+--------------------+--------------------+
|Afghanistan|2010|          11|2352430|    385450|4.676015864446551E-4|0.002853807238292...|
|Afghanistan|2011|           8|2352430|    390663|3.400738810506582...|0.002047800789939...|
|Afghanistan|2012|           5|2352430|    385290|2.125461756566614E-4|0.001297723792468...|
|    Algeria|2013|           2|2352430|    381039|8.501847026266456E-5|  5.2488065526101E-4|
|    Algeria|2014|           9|2352430|    398022|3.825831161819905...|0.002261181542728794|
|    Algeria|2015|           4|2352430|    411966|1.700369405253291...|9.709539136724876E-4|
|     Angola|2010|          14|2352430|    385450| 5.95129291838652E-4|0.003632118303281...|
|     Angola|2011|          13|2352430|    390663|5.526200567073197E-4|0.003327676283651...|
|     Angola|2012|          12|2352430|    385290|5.101108215759874E-4|0.003114537101923...|
|     Angola|2013|          12|2352430|    381039|5.101108215759874E-4| 0.00314928393156606|
+-----------+----+------------+-------+----------+--------------------+--------------------+
only showing top 10 rows
</pre>

# 12. Final output


In [160]:
Table.toPandas()

Unnamed: 0,Country,year,flight_count,total,year_count,PcntTotal,PcntYear
0,Afghanistan,2010,11,2352430,385450,4.676015864446551E-4,0.0028538072382929044
1,Afghanistan,2011,8,2352430,390663,3.4007388105065825E-4,0.0020478007899391547
2,Afghanistan,2012,5,2352430,385290,2.125461756566614E-4,0.0012977237924680111
3,Algeria,2013,2,2352430,381039,8.501847026266456E-5,5.2488065526101E-4
4,Algeria,2014,9,2352430,398022,3.8258311618199055E-4,0.002261181542728794
5,Algeria,2015,4,2352430,411966,1.7003694052532912E-4,9.709539136724876E-4
6,Angola,2010,14,2352430,385450,5.95129291838652E-4,0.0036321183032818785
7,Angola,2011,13,2352430,390663,5.526200567073197E-4,0.0033276762836511264
8,Angola,2012,12,2352430,385290,5.101108215759874E-4,0.0031145371019232265
9,Angola,2013,12,2352430,381039,5.101108215759874E-4,0.00314928393156606
