A few things you should keep in mind when working on assignments:

1. Make sure you fill in any place that says `YOUR CODE HERE`. Do **not** write your answer in anywhere else other than where it says `YOUR CODE HERE`. Anything you write anywhere else will be removed or overwritten by the autograder.

2. Before you submit your assignment, make sure everything runs as expected. Go to menubar, select _Kernel_, and restart the kernel and run all cells (_Restart & Run all_).

3. Do not change the title (i.e. file name) of this notebook.

4. Make sure that you save your work (in the menubar, select _File_ → _Save and CheckPoint_)

5. You are allowed to submit an assignment multiple times, but only the most recent submission will be graded.

# Problem 2. Spark DataFrames

In this problem, we will use the Spark DataFrame to perform basic data processing tasks.

In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
import pandas as pd

from nose.tools import assert_equal, assert_is_instance
from numpy.testing.utils import assert_array_almost_equal

We run Spark in [local mode](http://spark.apache.org/docs/latest/programming-guide.html#local-vs-cluster-modes) from within our Docker container.

In [None]:
sc = SparkContext('local[*]')

We create a new RDD by reading in the data as a textfile. 
We use the same [Iris flower data set](https://en.wikipedia.org/wiki/Iris_flower_data_set) that we used in [Week 12 Problem 2](https://github.com/UI-DataScience/accy571-fa16/blob/master/Week12/assignments/Problem_2_Hadoop_File_System.ipynb) and [Week 13 Problem 1](https://github.com/UI-DataScience/accy571-fa16/blob/master/Week13/assignments/Problem_1_MapReduce.ipynb).
Recall that the `iris.csv` file has 5 columns: `SepalLength`, `SepalWidth`, `PetalLength`, `PetalWidth`, and `Name`. The first 4 columns are floating point values, and the final column is a string column.

In [None]:
!head /home/data_scientist/data/iris.csv

In [None]:
csv_path = "/home/data_scientist/data/iris.csv"
text_file = sc.textFile(csv_path)

In the following code cell, we create a Spark DataFrame from `text_file`. The return type of `create_iris_df()` is a Spark DataFrame.
```python
>>> iris_df = create_iris_df(text_file)
>>> iris_df.show()
```
```
+-----------+----------+-----------+----------+
|SepalLength|SepalWidth|PetalLength|PetalWidth|
+-----------+----------+-----------+----------+
|        5.1|       3.5|        1.4|       0.2|
|        4.9|       3.0|        1.4|       0.2|
|        4.7|       3.2|        1.3|       0.2|
|        4.6|       3.1|        1.5|       0.2|
|        5.0|       3.6|        1.4|       0.2|
|        5.4|       3.9|        1.7|       0.4|
|        4.6|       3.4|        1.4|       0.3|
|        5.0|       3.4|        1.5|       0.2|
|        4.4|       2.9|        1.4|       0.2|
|        4.9|       3.1|        1.5|       0.1|
|        5.4|       3.7|        1.5|       0.2|
|        4.8|       3.4|        1.6|       0.2|
|        4.8|       3.0|        1.4|       0.1|
|        4.3|       3.0|        1.1|       0.1|
|        5.8|       4.0|        1.2|       0.2|
|        5.7|       4.4|        1.5|       0.4|
|        5.4|       3.9|        1.3|       0.4|
|        5.1|       3.5|        1.4|       0.3|
|        5.7|       3.8|        1.7|       0.3|
|        5.1|       3.8|        1.5|       0.3|
+-----------+----------+-----------+----------+
only showing top 20 rows
```

In [None]:
def create_iris_df(rdd):
    """
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.sql.dataframe.DataFrame instance.
    """
    
    fields = (
        rdd
        .map(lambda line: line.split(","))
        .map(lambda items: (items[0], items[1], items[2], items[3]))
        .filter(lambda line: 'SepalLength' not in line)
        .map(lambda p: (float(p[0]), float(p[1]), float(p[2]), float(p[3])))
        )
    sqlContext = SQLContext(sc)
    schemaString = "SepalLength,SepalWidth,PetalLength,PetalWidth,Name"
    fieldTypes = [FloatType(), FloatType(), FloatType(), FloatType()]
    f_data = [
        StructField(field_name, field_type, True)
        for field_name, field_type
        in zip(schemaString.split(','), fieldTypes)
        ]
    schema = StructType(f_data)
    df = sqlContext.createDataFrame(fields, schema)
    
    return df

In [None]:
iris_df = create_iris_df(text_file)
iris_df.show()

## Filter and select

- In the following code cell, write a function named `filter_and_average` that
  1. Extracts all rows where `SepalLength` is strictly greater than 7.0 and `PetalLength` is strictly greater than 2.0, and
  2. Transforms this output to return a data frame with a new column that is the average of the `SepalWidth` and `PetalWidth`.
  
  In other words, calculate the average of `SepalWidth` and `PetalWidth` for all rows with `SepalLength` > 7.0 and `PetalLength` > 2.0.
  
  
Hints:

- When you run
  ```python
  >>> answer = filter_and_average(iris_df)
  >>> answer.show()
  ```
  you should get something like
  ```
  +---------------------------------+
  |((SepalWidth + PetalWidth) * 0.5)|
  +---------------------------------+
  |                2.549999952316284|
  |                2.549999952316284|
  |               2.3499999046325684|
  |                3.049999952316284|
  |                              3.0|
  |               2.4499998092651367|
  |               2.4000000953674316|
  |                              2.5|
  |                2.299999952316284|
  |               2.3499999046325684|
  |               2.9000000953674316|
  |               2.6500000953674316|
  +---------------------------------+
  ```
  (Note that the column name does not have to match this example. It may be `(SepalWidth + PetalWidth) / 2` for instance. Furthermore, the values do not have to match exactly as long as they are very close (e.g., 2.549999952316284 vs 2.55). They are both correct as long as they are approximately equal.)
- There are 12 rows that satisfy the criteria, `SepalLength` > 7.0 and `PetalLength` > 2.0:
  ```python
  >>> df = pd.read_csv(csv_path)
  >>> mask = (df["SepalLength"] > 7.0) & (df["PetalLength"] > 2.0)
  >>> long_iris = df.loc[mask, ["SepalWidth", "PetalWidth"]].reset_index(drop=True)
  >>> print(len(long_iris))
  ```
  ```
  12
  ```
  Thus, we have 12 rows in the first hint.
  The average of `SepalWidth` and `PetalWidth` in the resulting Pandas DataFrame is
  ```python
  >>> print(df_widths.mean(axis=1))
  ```
  ```
  0     2.55
  1     2.55
  2     2.35
  3     3.05
  4     3.00
  5     2.45
  6     2.40
  7     2.50
  8     2.30
  9     2.35
  10    2.90
  11    2.65
  ```
  (As mentioend previously, the numbers do not have match exactly. They are both correct as long as they are very close.)
- Use a combination of `filter()` and `select()`.
- The [Introduction to Spark DataFrame notebook](https://github.com/UI-DataScience/accy571-fa16/blob/master/Week14/notebooks/sparkdf.ipynb) has some examples.

In [None]:
def filter_and_average(df):
    # YOUR CODE HERE
    
    return df

In [None]:
answer = filter_and_average(iris_df)
answer.show()

In [None]:
# we compare Spark dataframe to Pandas dataframe
df_pd = pd.read_csv(csv_path)
mask = (df_pd["SepalLength"] > 7.0) & (df_pd["PetalLength"] > 2.0)
df_widths = df_pd.loc[mask, ["SepalWidth", "PetalWidth"]].reset_index(drop=True)
df_widths["AverageWidth"] = 0.5 * (df_widths["SepalWidth"] + df_widths["PetalWidth"])
df_widths = df_widths.drop(["SepalWidth", "PetalWidth"], 1)
assert_array_almost_equal(answer.toPandas().values[:, 0], df_widths.values[:, 0])

## Cleanup

We must stop the SparkContext in order to release the spark resources before existing this Notebook.

In [None]:
sc.stop()