In [1]:
import pyspark as ps    # for the pyspark suite
import warnings         # for displaying warning
import os               # for environ variables in Part 3

## Part 1: RDD and Spark Basics

Here we will get familiar with the basics of Spark via the Spark Python API,
`pyspark` module in python. For now, we will be just working with a single node that will
parallelize processes across all of our cores (rather than distributing them
across worker nodes).

1\. Initiate a `SparkSession`. A `SparkSession` embeds both a `SparkContext` and a `SQLContext` to use RDD-based and DataFrame-based functionalities of Spark. Specify your `SparkSession` as follows.

```python
import pyspark as ps

spark = ps.sql.SparkSession.builder \
        .master("local[4]") \
        .appName("df lecture") \
        .getOrCreate()
```

Create a variable `sc` using the following line. It will let you use `sc` as a `sparkContext` for compatibility with pre-2.0 RDD-based spark commands.

```
sc = spark.sparkContext
```

In [2]:
import pyspark as ps

spark = ps.sql.SparkSession.builder \
        .master("local[4]") \
        .appName("df lecture") \
        .getOrCreate()

sc = spark.sparkContext

2\. Spark operates in **[Resilient Distributed Datasets](http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds) (RDDs). An RDD is
a collection of data partitioned across machines**. RDDs allow the processing
of data to be parallelized due to the partitions. RDDs can be created from
a SparkContext in two ways: loading an external dataset, or by parallelizing
an existing collection of objects in your currently running program (in our
Python programs, this is often times a list).

* Create an RDD from a Python list.

```python
lst_rdd = sc.parallelize([1, 2, 3])
```

* Read an RDD in from a text file. **By default, the RDD will treat each line
as an item and read it in as string.**

```python
file_rdd = sc.textFile('data/cookie_data.txt')
```

In [3]:
file_rdd = sc.textFile('data/cookie_data.txt')

3\. Now that we have an RDD, we need to see what is inside. RDDs by default will
  load data into partitions across the machines on your cluster. This means that
  you can quickly check out the first few entries of a potentially enormous RDD
  without accessing all of the partitions and loading all of the data into memory.

```python
file_rdd.first() # Returns the first entry in the RDD
file_rdd.take(2) # Returns the first two entries in the RDD as a list
```

In [4]:
file_rdd.take(5)

['{"Jane": "2"}',
 '{"Jane": "1"}',
 '{"Pete": "20"}',
 '{"Tyler": "3"}',
 '{"Duncan": "4"}']

4\. To retrieve all the items in your RDD, every partition in the RDD has to be
  accessed, and this could take a long time. In general, before you execute
  commands (like the following) to retrieve all the items in your RDD, you
  should be aware of how many entries you are pulling. Keep in mind that to
  execute the `.collect()` method on the RDD object (like we do below), your entire
  dataset must fit in memory in your driver program (we in general don't want
  to call `.collect()` on very large datasets).

  The standard workflow when working with RDDs is to perform all the big data
  operations/transformations **before** you pool/retrieve the results. If the
  results can't be collected onto your driver program, it's common to write
  data out to a distributed storage system, like HDFS or S3.

  With that said, we can retrieve all the items from our RDD as follows:

```python
file_rdd.collect()
lst_rdd.collect()
```

In [5]:
file_rdd.collect()

['{"Jane": "2"}',
 '{"Jane": "1"}',
 '{"Pete": "20"}',
 '{"Tyler": "3"}',
 '{"Duncan": "4"}',
 '{"Yuki": "5"}',
 '{"Duncan": "6"}',
 '{"Duncan": "4"}',
 '{"Duncan": "5"}']

## Part 2: Intro to Functional Programming

Spark operations fit within the [functional programming paradigm](https://en.wikipedia.org/wiki/Functional_programming).
In terms of our RDD objects, this means that our RDD objects are immutable and that
anytime we apply a **transformation** to an RDD (such as `.map()`, `.reduceByKey()`,
or `.filter()`) it returns another RDD.

Transformations in Spark are lazy, this means that performing a transformation does
not cause computations to be performed. Instead, an RDD remembers the chain of
transformations that you define and computes them all only when and action requires
a result to be returned.

**Spark notes**:

  * A lot of Spark's functionalities assume the items in an RDD to be tuples
  of `(key, value)` pairs, so often times it can be useful to structure your
  RDDs this way.
  * Beware of [lazy evaluation](https://en.wikipedia.org/wiki/Lazy_evaluation), where transformations
  on the RDD are not executed until an **action** is executed on the RDD
  to retrieve items from it (such as `.collect()`, `.first()`, `.take()`, or
  `.count()`). So if you are doing a lot transformations in a row, it can
  be helpful to call `.first()` in between to ensure your transformations are
  running properly.
  * If you are not sure what RDD transformations/actions there are, you can
  check out the [docs](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD).

**Steps**:

1\. Turn the items in `file_rdd` into `(key, value)` pairs using `.map()`. In order to do that, you'll find a template function `parse_json_first_key_pair` in the `spark_intro.py` file. Implement this function that takes a json formatted string (use `json.loads()`) and output the key,value pair you need. Test it with the string `u'{"Jane": "2"}'`, your function should return `(u'Jane', 2)`. **Remember to cast value as type** `int`.


In [6]:
from spark_intro_solns import parse_json_first_key_pair

# apply the map function
pairs_rdd = file_rdd.map(parse_json_first_key_pair)

# take 2 to check result
pairs_rdd.take(2)

[('Jane', 2), ('Jane', 1)]

2\. Now use `.filter()` to look for entries with more than `5` chocolate chip cookies.

In [7]:
# use a lambda function to apply filter (should return True/False)
filtered_rdd = pairs_rdd.filter(lambda row: row[1] >= 5)

# take 2 to check result
filtered_rdd.take(2)

[('Pete', 20), ('Yuki', 5)]

3\. For each name, return the entry with the max number of cookies.

**Hint**:
* Use `.reduceByKey()` instead of `.groupByKey()`. See why [here](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html).
* You may get a warning saying that you should install `psutil`. You can with
`pip install psutil`.

In [8]:
# If we use reduceByKey we can rely on python's max function to compare tuples
# with a matching value in the zeroth index by the first index.
# the lambda given to reduceByKey() will be used to combine the values in the (key,value) pairs of your rdd
reduced_rdd = pairs_rdd.reduceByKey(lambda a,b: max(a,b))

# let's apply filter
output_rdd = reduced_rdd.filter(lambda x: x[1] >= 5)

4\. Let's show the first results using `.sortBy()` and `.take()`. `.sortBy()` requires a lambda function that outputs the value/quantity on which we want to sort our rows. Because we currently have only one value, you will use **`lambda (k, v): v`** or **`lambda x: x[1]`** (they are equivalent).

In [9]:
output_rdd.sortBy(lambda kvtuple: kvtuple[1], ascending=False).take(10)

[('Pete', 20), ('Duncan', 6), ('Yuki', 5)]

5\. Calculate the total revenue from people buying cookies (we're assuming that
each cookie only costs $1).

**Hint**:
* `rdd.values()` returns another RDD of all the values.
* Use `.reduce()` to return the sum of all the values.

In [10]:
# Reduces automatically calls the lambda and fill in corresponding values into x and y.
# The first time the lambda is called x and y are the first two values of the sequence.
# Once the first sum has occurred, that first output value fills into x
# and the next value in the sequence fills into y.
pairs_rdd.values().reduce(lambda a,b: a+b)

50

## Part 3: Spark for Data Processing

  We will now explore airline data. The data are stored on S3 so you will need your AWS access key and secret access key.

### Side Note About Personal Credentials

  It's good practice to keep personal credentials stored in environment variables set in
  your bash profile so that you don't have to hard code their values into your solutions.
  This is particularly important when the code that uses your keys is stored on GitHub
  since you don't want to be sharing your access keys with the world. To do this make
  add the lines below to your bash profile.


```bash
export AWS_ACCESS_KEY_ID=YOUR ACCESS KEY
export AWS_SECRET_ACCESS_KEY=YOUR SECRET ACCESS KEY
```

  Keep in mind that if you ever have to change your keys you'll need to make sure that you
  update your bash profile.

  Now you're ready to load up and explore the data all while becoming more familiar with
  Spark.

  ### 3.1: Loading Data from an S3 bucket

  1\. Load the data from S3 as follows.

```python

link = 's3a://mortar-example-data/airline-data'
airline_rdd = sc.textFile(link)
```

**Note**: If you ever encounter an issue using your AWS credentials, and if you want to skip that at this point to save time on the assignment, you'll find an extract of that dataset (100 lines) in `data/airline-data-extract.csv`. You can use this extract to develop your complete pipeline and solve your issue later on. Use `airline_rdd = sc.textFile("data/airline-data-extract.csv")` to transform that extract into an RDD.

---

**NOTE**: In order to load data from s3, we need to launch our spark session with the `--packages` options for interfacing with aws and hadoop.  For Example:

```bash
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --NotebookApp.open_browser=True --NotebookApp.ip='localhost' --NotebookApp.port=8888"

${SPARK_HOME}/bin/pyspark \
    --master local[4] \
    --executor-memory 1G \
    --driver-memory 1G \
    --conf spark.sql.warehouse.dir="file:///tmp/spark-warehouse" \
    --packages com.databricks:spark-csv_2.11:1.5.0 \
    --packages com.amazonaws:aws-java-sdk-pom:1.10.34 \
    --packages org.apache.hadoop:hadoop-aws:2.7.3
```

---

In [11]:
link = 's3a://mortar-example-data/airline-data'
airline_rdd = sc.textFile(link)
# use line below instead to run on a local extract
#airline_rdd = sc.textFile("data/airline-data-extract.csv")

2\. Print the first 2 entries with `.take(2)` on `airline_rdd`. The first entry is the column names and starting with the second we have our data.

In [12]:
airline_rdd.take(2)

['"YEAR","MONTH","UNIQUE_CARRIER","ORIGIN_AIRPORT_ID","DEST_AIRPORT_ID","DEP_DELAY","DEP_DELAY_NEW","ARR_DELAY","ARR_DELAY_NEW","CANCELLED",',
 '2012,4,"AA",12478,12892,-4.00,0.00,-21.00,0.00,0.00,']

3\. Now run `.count()` on the RDD. **This will take a while**, as the data set is a few million rows and it all must be downloaded from S3.

In [13]:
airline_rdd.count()

5113194

### 3.2: Create a pipeline on a sub-sample dataset

Now we can move on to looking at the data and transforming it. In this section we will operate only on a limited data set, develop a full pipeline and later on execute that on the full scale data.

We want to identify airports with the worst / least delays. Consider the following about delays:

* **2 types of delays:** Arrival delays, `ARR_DELAY`, and departure delays, `DEP_DELAY`.
* All delays are in terms of **minutes**.
* Arrival delays are associated with the destination airport, `DEST_AIRPORT_ID`.
* Departure delays are associated with the origin airport, `ORIGIN_AIRPORT_ID`.


1\. As you just saw the `.count()` action takes a long time to run. More involved commands can take even longer. In order to not waste time when writing/testing your code, it's common practice to work with a sub-sample of your data until you have your code finalized/polished and ready to run on the full dataset. Use `.take(100)` to sample out the first 100 rows and assign it to a new RDD using `sc.parallelize()`.

In [14]:
# use the line below to test your pipeline
airline_small_rdd = sc.parallelize(airline_rdd.take(100))

In [15]:
airline_small_rdd.take(10)

['"YEAR","MONTH","UNIQUE_CARRIER","ORIGIN_AIRPORT_ID","DEST_AIRPORT_ID","DEP_DELAY","DEP_DELAY_NEW","ARR_DELAY","ARR_DELAY_NEW","CANCELLED",',
 '2012,4,"AA",12478,12892,-4.00,0.00,-21.00,0.00,0.00,',
 '2012,4,"AA",12478,12892,-7.00,0.00,-65.00,0.00,0.00,',
 '2012,4,"AA",12478,12892,-6.00,0.00,-63.00,0.00,0.00,',
 '2012,4,"AA",12478,12892,-6.00,0.00,5.00,5.00,0.00,',
 '2012,4,"AA",12478,12892,-2.00,0.00,-39.00,0.00,0.00,',
 '2012,4,"AA",12478,12892,-6.00,0.00,-34.00,0.00,0.00,',
 '2012,4,"AA",12478,12892,-8.00,0.00,-16.00,0.00,0.00,',
 '2012,4,"AA",12478,12892,-7.00,0.00,-19.00,0.00,0.00,',
 '2012,4,"AA",12478,12892,-9.00,0.00,-2.00,0.00,0.00,']

2\. Let's do some preprocessing and parsing. You may have noticed that those rows are in fact csv lines. We are going to parse those lines one by one and output a list of the values we can split from those lines.

In order to do that, you'll find a template function `split_csvstring` in the `spark_intro.py` file. Implement this function that takes a string that contains a csv line, and output the list of values contained in the line. You can use a combination of the `csv` module function `csv.reader()` and the `StringIO` module.

Test it with the string `'a,b,0.7,"Oct 7, 2016",42,'`, your function should return `['a', 'b', '0.7', 'Oct 7, 2016', '42', '']`

Once your function works, use `.map()` to apply it to your RDD. Print the first 2 lines, with `take(2)`, to confirm you've cleaned the rows correctly. The first 2 lines should look like the following.

```
[['YEAR', 'MONTH', 'UNIQUE_CARRIER', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'DEP_DELAY', 'DEP_DELAY_NEW', 'ARR_DELAY', 'ARR_DELAY_NEW', 'CANCELLED', ''],
['2012', '4', 'AA', '12478', '12892', '-4.00', '0.00', '-21.00', '0.00', '0.00', '']]
```

In [16]:
from spark_intro_solns import split_csvstring

# applying the split_csvstring() function to all rows in RDD
clean_row_rdd = airline_small_rdd.map(split_csvstring)

# take 2 for checking
clean_row_rdd.take(2)

[['YEAR',
  'MONTH',
  'UNIQUE_CARRIER',
  'ORIGIN_AIRPORT_ID',
  'DEST_AIRPORT_ID',
  'DEP_DELAY',
  'DEP_DELAY_NEW',
  'ARR_DELAY',
  'ARR_DELAY_NEW',
  'CANCELLED',
  ''],
 ['2012',
  '4',
  'AA',
  '12478',
  '12892',
  '-4.00',
  '0.00',
  '-21.00',
  '0.00',
  '0.00',
  '']]

3\. Use `filter()` with a `lambda` function to filter out the line containing the column names. Keep that line in a variable so that you can use in next question.

In [17]:
# get first line of RDD
first_row = clean_row_rdd.first()

# filter lines that are identical to the first line
only_data_rdd = clean_row_rdd.filter(lambda row: row != first_row)

# obtain column names (first row)
column_names = first_row

4\. Write a function `make_row_dict()`, that takes a row (list of values) as an argument and returns a dictionary where the keys are column names and the values are the values for the column. Follow the specifications below to make your dictionary.

The dictionary will only keep track of the following columns:

`['DEST_AIRPORT_ID', 'ORIGIN_AIRPORT_ID', 'DEP_DELAY', 'ARR_DELAY']`
* Cast the values for `DEP_DELAY` and `ARR_DELAY` as floats. These values
correspond with delay lengths in minutes.
* Subtract `DEP_DELAY` from `ARR_DELAY` to get the actual `ARR_DELAY`.
* If a flight is `CANCELLED`, add 5 hours, 300 minutes, to `DEP_DELAY`.
* There are missing values in `DEP_DELAY` and `ARR_DELAY` (i.e. `''`) and
 you would want to replace those with `0.0`.

You'll find a template function `make_row_dict` in the `spark_intro.py` file with a `doctest` you can try to make it work, using `python -m doctest -v spark_intro.py`.

Now use `.map()` with your function  `make_row_dict()` over your RDD to make a new RDD made of dictionaries.

In [18]:
from spark_intro_solns import make_row_dict

dict_keys = {'DEST_AIRPORT_ID', 'ORIGIN_AIRPORT_ID', 'DEP_DELAY', 'ARR_DELAY'}

dict_rdd = only_data_rdd.map(lambda row: make_row_dict(row, column_names, dict_keys))
dict_rdd.take(2)

[{'ARR_DELAY': -17.0,
  'DEP_DELAY': -4.0,
  'DEST_AIRPORT_ID': '12892',
  'ORIGIN_AIRPORT_ID': '12478'},
 {'ARR_DELAY': -58.0,
  'DEP_DELAY': -7.0,
  'DEST_AIRPORT_ID': '12892',
  'ORIGIN_AIRPORT_ID': '12478'}]

**Note on Solution**: what if we wanted to do that generically ? if we want to be able to apply any kind of transformation (type, values) on the columns ? find out in the `airline_pipeline.py` file, under the function `make_row_dict_generic`.

5\. Now we will use these dictionaries to create 2 RDDs, where the items are tuples. Remember, much of Spark's functionality assumes RDDs to be storing (key, value) tuples. You can `.map()` to create those RDDs using `lambda` functions applied to the RDD generated in 4.

The first RDD will contain tuples `(DEST_AIRPORT_ID, ARR_DELAY)`. The other RDD will contain `(ORIGIN_AIRPORT_ID, DEP_DELAY)`. Run a `.first()` or `.take()` to confirm your results.

In [19]:
arrival_rdd = dict_rdd.map(lambda row: (row['DEST_AIRPORT_ID'], row['ARR_DELAY']))
arrival_rdd.take(2)

[('12892', -17.0), ('12892', -58.0)]

In [20]:
departure_rdd = dict_rdd.map(lambda row: (row['ORIGIN_AIRPORT_ID'], row['DEP_DELAY']))
departure_rdd.take(2)

[('12478', -4.0), ('12478', -7.0)]

6\. Using the two RDDs you just created, make 2 RDDs with the mean delay time for each origin airports and each destination airports. You will need to `.groupByKey()` and then take the mean of the delay times for each airport. Use `.mapValues()` to calculate the mean of each group's values.

This is where having our RDDs be composed of `(key, value)` pairs is relevant.
It allows us to use the `.groupByKey()` method on our RDD.

In [21]:
# we define a mean function to feed into groupByKey()
def rdd_group_mean(value_group):
    # transform groups produced by groupByKey into lists of values
    values = list(value_group)
    # so that we can compute their mean
    return sum(values) / len(values)

average_departure_delay = departure_rdd.groupByKey().mapValues(lambda value_group: rdd_group_mean(value_group))
average_arrival_delay = arrival_rdd.groupByKey().mapValues(lambda value_group: rdd_group_mean(value_group))

In [22]:
%time print("average departure delays: {}".format( average_departure_delay.collect() ))

%time print("average arrival delays: {}".format( average_arrival_delay.collect() ))

average departure delays: [('12478', -5.966666666666667), ('12892', 3.717948717948718)]
CPU times: user 12 ms, sys: 4 ms, total: 16 ms
Wall time: 351 ms
average arrival delays: [('12892', -12.116666666666667), ('12478', -16.17948717948718)]
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 159 ms


**Note:** There is a slightly more performant way of calculating the mean which uses
`.aggregateByKey()` rather than `.groupByKey()`. This transformation models the combiner
model that we saw in Hadoop. Unfortunately, the documentation for `.aggregateByKey()` is
quite poor. Check out [this](http://stackoverflow.com/a/29930162) stack overflow post
for a good description for how to use it.

In [23]:
# this lambda function aggregates values within a partition
# in the output tuple, component 1 is the sum of all values,
# component 2 counts how many values have been summed
within_partition_agg = lambda agg, value: (agg[0] + value, agg[1] + 1)

# this lambda function aggregates two aggregates between partitions
# in the output tuple, component 1 sums the values, component 2 sums the counts
between_partition_agg = lambda agg1, agg2: (agg1[0] + agg2[0], agg1[1] + agg2[1])

# run the aggregateByKey routine on departure_rdd
departure_sum_count = departure_rdd.aggregateByKey((0, 0), within_partition_agg,
                                                           between_partition_agg)

# run the aggregateByKey routine on arrival_rdd
arrival_sum_count = arrival_rdd.aggregateByKey((0, 0), within_partition_agg,
                                                       between_partition_agg)

In [24]:
%time print("average departure delays: {}".format( departure_sum_count.mapValues(lambda _tuple: _tuple[0] / _tuple[1]).collect()) )

%time print("average arrival delays: {}".format( arrival_sum_count.mapValues(lambda _tuple: _tuple[0] / _tuple[1]).collect()) )


average departure delays: [('12478', -5.966666666666667), ('12892', 3.717948717948718)]
CPU times: user 12 ms, sys: 8 ms, total: 20 ms
Wall time: 279 ms
average arrival delays: [('12892', -12.116666666666667), ('12478', -16.17948717948718)]
CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 177 ms


7\. Run `.cache()` on the RDDs you just made. Remember to set the name of the RDD using `.setName()` before running `.cache()` (e.g. `rdd.setName('airline_rdd').cache()`). Setting the name will allow you to identify the RDD in the Spark web UI (see extra credit).

When you cache the RDDs, you make sure that computations which produced them don't
need to be performed every time they are called upon. It is good practice to use `cache()`
for RDDs that you are going to repeatedly use.

In [25]:
average_departure_delay = average_departure_delay.setName('avg_dep_delay').cache()
average_arrival_delay = average_arrival_delay.setName('avg_arr_delay').cache()

8\. Perform appropriate actions on your RDDs to answer the following questions:

* Q1: What are the top 10 departing airports that have the lowest average delay?
* Q2: What are the top 10 departing airports that have the highest average delay?
* Q3: What are the top 10 arriving airports that have the lowest average delay?
* Q4: What are the top 10 arriving airports that have the highest average delay?

There are a couple of ways that you can do this. One is by using `sortBy()` and then
`take(10)`. However, this is not the most efficient way. Why not?

The other way, more efficient way to answer this question is with `takeOrdered()`.
You'll have to be a little clever to get the highest delays. Check out the
[docs](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeOrdered)
for a hint.

You'll need to run all the transformations that you tested on the smaller dataset
on the full data set to answer these questions.

** Note on solution ** : in the following lines, we'll first test the sortBy function on the small local dataset (`airline_rdd` 100 rows) so that we make it right. Later on we will deploy that on the full rdd `airline_rdd`.

In [26]:
# Q1: What are the top 10 departing airports that have the lowest average delay?
average_departure_delay.sortBy(lambda kvtuple: kvtuple[1], ascending=False).take(10)

[('12892', 3.717948717948718), ('12478', -5.966666666666667)]

In [27]:
# Q2: What are the top 10 departing airports that have the highest average delay?
average_departure_delay.sortBy(lambda kvtuple: kvtuple[1], ascending=True).take(10)

[('12478', -5.966666666666667), ('12892', 3.717948717948718)]

In [28]:
# Q3: What are the top 10 arriving airports that have the lowest average delay?
average_arrival_delay.sortBy(lambda kvtuple: kvtuple[1], ascending=False).take(10)

[('12892', -12.116666666666667), ('12478', -16.17948717948718)]

In [29]:
# Q4: What are the top 10 arriving airports that have the highest average delay?
average_arrival_delay.sortBy(lambda kvtuple: kvtuple[1], ascending=True).take(10)

[('12478', -16.17948717948718), ('12892', -12.116666666666667)]

### 3.3: Assemble your pipeline and run it on the full scale dataset

1\. In `spark_intro.py` you'll find a function `transformation_pipeline` you will implement by embedding all the transformations we've done so far, starting from question 3.2.2 (creating a clean rdd) to question 3.2.8 (finding answers to Q1, Q2, Q3, Q4). The function should return the 4 result lists to questions Q1, Q2, Q3, Q4 in a tuple.

Then, run this function from the jupyter notebook or from the main section in `spark_intro.py` to test it on your sub-sample rdd. You should obtain the same answers you had previously obtained on a step by step basis.

In [30]:
airline_small_rdd

ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:175

In [31]:
from spark_intro_solns import transformation_pipeline
print(transformation_pipeline(airline_small_rdd))

([('12892', 3.717948717948718), ('12478', -5.966666666666667)], [('12478', -5.966666666666667), ('12892', 3.717948717948718)], [('12892', -12.116666666666667), ('12478', -16.17948717948718)], [('12478', -16.17948717948718), ('12892', -12.116666666666667)])


2\. Now run this pipeline on the full dataset, relax while the processing is done, and enjoy. You rock.

In [32]:
dataq1, dataq2, dataq3, dataq4 = transformation_pipeline(airline_rdd)

In [33]:
print("Top 10 departing airports that have the lowest average delay:{}".format(dataq1))

print("Top 10 departing airports that have the highest average delay:{}".format(dataq2))

print("Top 10 arriving airports that have the lowest average delay:{}".format(dataq3))

print("Top 10 arriving airports that have the highest average delay:{}".format(dataq4))

Top 10 departing airports that have the lowest average delay:[('10930', 60.73560517038778), ('13388', 60.03344481605351), ('13964', 52.38934426229508), ('13424', 49.313011828935394), ('10157', 45.66734211415063), ('14487', 39.08197989172467), ('11002', 34.00552995391705), ('13541', 33.845454545454544), ('10170', 33.21785714285714), ('10165', 31.931818181818183)]
Top 10 departing airports that have the highest average delay:[('13127', -2.950089126559715), ('14113', -2.813027744270205), ('11336', -1.7261904761904763), ('10739', -1.7254901960784315), ('10466', -1.625), ('14633', -0.6433649289099526), ('15389', -0.569078947368421), ('11648', -0.48034934497816595), ('11898', -0.32980132450331123), ('12402', -0.30011664722546244)]
Top 10 arriving airports that have the lowest average delay:[('14955', 13.0), ('14794', 3.7736625514403292), ('12177', 3.576540755467197), ('14802', 2.8), ('10918', 2.4816176470588234), ('14254', 1.7567185289957568), ('15295', 1.6835443037974684), ('12402', 1.40209