# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your https://jupyterhub.ischool.syr.edu/ workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [1]:
# Load the packages needed for this part
# create spark and sparkcontext objects
import math
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.ml import feature
from pyspark.ml import regression
from pyspark.sql import functions as fn
from pyspark.sql import Row
from pyspark import sql

## Warning: Use exclusively Spark. Do not use Pandas at all in this assignment

# Part 2: Dataframes and Spark ML

In this section, you will learn to create dataframes from messy data and then perform simple regression on it.

There is some mysterious process generating data, stored in `/datasets/host_server_requests`, with the following format:

`feature1|feature2|...|featurem => outcome`

`feature1` can be either "HOST" or "SERVER" and from feature $2$ through $m$ are floating point numbers.

In [2]:
requests_rdd = sc.textFile("host_server_requests.txt")

## Question 1:

In this question, you will create a function `process_line` that receives a line from `/datasets/host_server_requests` and returns a `Row` object with the following columns: 

- You will codify the first feature as a column `f1` with a `1` if the source is `HOST` and `0` otherwise
- You will create 7 other features that you assign to columns `f2`, `f3`, ..., through `f8`
- Finally, you will assign the outcome to the column `label`
- Remember to make all features of type `float`.

For the following code:


```python
requests_rdd.map(process_line).take(10)
```

it should generate the following:

```python
[Row(f1=1.0, f2=2e-05, f3=0.80279, f4=-0.09174, f5=0.04041, f6=-0.22504, f7=-0.0504, f8=0.58149, label=163.877101489),
 Row(f1=1.0, f2=5e-05, f3=-0.00454, f4=-0.0211, f5=0.00174, f6=-0.11684, f7=0.19182, f8=-0.23745, label=-105.023368852),
 Row(f1=1.0, f2=0.00015, f3=-0.10437, f4=0.04869, f5=0.18333, f6=-0.21864, f7=0.27638, f8=-0.13441, label=-115.011801582),
 Row(f1=1.0, f2=-0.00015, f3=0.27118, f4=0.14526, f5=0.06101, f6=0.13401, f7=0.06237, f8=-0.74065, label=-122.623452696),
 Row(f1=1.0, f2=-6e-05, f3=0.1413, f4=0.12084, f5=0.05452, f6=0.09272, f7=0.2534, f8=-0.65331, label=-117.130523174),
 Row(f1=1.0, f2=-8e-05, f3=-0.41534, f4=-0.04205, f5=-0.00724, f6=-0.07463, f7=0.13273, f8=0.19112, label=-73.5775668047),
 Row(f1=1.0, f2=-8e-05, f3=-0.45937, f4=-0.23509, f5=-0.05679, f6=0.06077, f7=-0.49597, f8=-0.30668, label=-137.37933148),
 Row(f1=0.0, f2=2e-05, f3=-0.23465, f4=0.07345, f5=-0.07217, f6=-0.19256, f7=-0.14377, f8=-0.15183, label=-162.804738349),
 Row(f1=0.0, f2=-7e-05, f3=-0.10321, f4=0.27467, f5=0.04058, f6=-0.24541, f7=0.08631, f8=-0.2979, label=-212.111291232),
 Row(f1=1.0, f2=-7e-05, f3=-0.01039, f4=-0.00453, f5=-0.01352, f6=-0.05199, f7=-0.3772, f8=-0.19641, label=-91.5022329392)]
```

- You will codify the first feature as a column `f1` with a `1` if the source is `HOST` and `0` otherwise
- You will create 7 other features that you assign to columns `f2`, `f3`, ..., through `f8`
- Finally, you will assign the outcome to the column `label`
- Remember to make all features of type `float`.

In [3]:
requests_rdd.take(10)

['HOST|0.00002|0.80279|-0.09174|0.04041|-0.22504|-0.05040|0.58149 =>  163.877101489',
 'HOST|0.00005|-0.00454|-0.02110|0.00174|-0.11684|0.19182|-0.23745 =>  -105.023368852',
 'HOST|0.00015|-0.10437|0.04869|0.18333|-0.21864|0.27638|-0.13441 =>  -115.011801582',
 'HOST|-0.00015|0.27118|0.14526|0.06101|0.13401|0.06237|-0.74065 =>  -122.623452696',
 'HOST|-0.00006|0.14130|0.12084|0.05452|0.09272|0.25340|-0.65331 =>  -117.130523174',
 'HOST|-0.00008|-0.41534|-0.04205|-0.00724|-0.07463|0.13273|0.19112 =>  -73.5775668047',
 'HOST|-0.00008|-0.45937|-0.23509|-0.05679|0.06077|-0.49597|-0.30668 =>  -137.37933148',
 'SERVER|0.00002|-0.23465|0.07345|-0.07217|-0.19256|-0.14377|-0.15183 =>  -162.804738349',
 'SERVER|-0.00007|-0.10321|0.27467|0.04058|-0.24541|0.08631|-0.29790 =>  -212.111291232',
 'HOST|-0.00007|-0.01039|-0.00453|-0.01352|-0.05199|-0.37720|-0.19641 =>  -91.5022329392']

In [4]:
def process_line(line):
    line = line.replace("=> ", "|")
    line = line.split("|")
    if line[0] == "HOST":
        line[0] = 1
    else:
        line[0] = 0
    return Row(f1=float(line[0]), 
               f2=float(line[1]), 
               f3=float(line[2]), 
               f4=float(line[3]), 
               f5=float(line[4]), 
               f6=float(line[5]), 
               f7=float(line[6]), 
               f8=float(line[7]), 
               label=float(line[8]))

In [5]:
requests_rdd.map(process_line).take(10)

[Row(f1=1.0, f2=2e-05, f3=0.80279, f4=-0.09174, f5=0.04041, f6=-0.22504, f7=-0.0504, f8=0.58149, label=163.877101489),
 Row(f1=1.0, f2=5e-05, f3=-0.00454, f4=-0.0211, f5=0.00174, f6=-0.11684, f7=0.19182, f8=-0.23745, label=-105.023368852),
 Row(f1=1.0, f2=0.00015, f3=-0.10437, f4=0.04869, f5=0.18333, f6=-0.21864, f7=0.27638, f8=-0.13441, label=-115.011801582),
 Row(f1=1.0, f2=-0.00015, f3=0.27118, f4=0.14526, f5=0.06101, f6=0.13401, f7=0.06237, f8=-0.74065, label=-122.623452696),
 Row(f1=1.0, f2=-6e-05, f3=0.1413, f4=0.12084, f5=0.05452, f6=0.09272, f7=0.2534, f8=-0.65331, label=-117.130523174),
 Row(f1=1.0, f2=-8e-05, f3=-0.41534, f4=-0.04205, f5=-0.00724, f6=-0.07463, f7=0.13273, f8=0.19112, label=-73.5775668047),
 Row(f1=1.0, f2=-8e-05, f3=-0.45937, f4=-0.23509, f5=-0.05679, f6=0.06077, f7=-0.49597, f8=-0.30668, label=-137.37933148),
 Row(f1=0.0, f2=2e-05, f3=-0.23465, f4=0.07345, f5=-0.07217, f6=-0.19256, f7=-0.14377, f8=-0.15183, label=-162.804738349),
 Row(f1=0.0, f2=-7e-05, f3=-

In [6]:
# 5 pts
np.testing.assert_equal(len(requests_rdd.map(process_line).first()), 9)
np.testing.assert_equal(requests_rdd.map(process_line).count(), 10000)

## Question 2:

Transform the `requests_rdd` RDD into a Spark 2.0 DataFrame and store it in `requests_df`

In [7]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *

In [8]:
df= requests_rdd.map(process_line)

requests_df = spark.createDataFrame(df)

In [9]:
# 5 pts
np.testing.assert_equal(type(requests_df), sql.dataframe.DataFrame)
np.testing.assert_equal(set(requests_df.columns), {'f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'label'})
np.testing.assert_equal(requests_df.count(), 10000)

In [10]:
requests_df = requests_df.dropna()

## Question 3:

In this question, we will explore the data. We have a hypothesis that depending on whether the request was from the "HOST" or "SERVER" (`f1` column), there are significant difference in the outcome (`label` column).

You will find whether this is true by computing two quantities for each group of `f1`. You will compute the mean outcome, the count of each group and the *standard error of the mean* or SE of the outcome. The equation for SE of a variable $x$ is:

$$\text{SE}(x) = \frac{\text{std}(x)}{\sqrt{n}}$$

From `requests_df`, create a dataframe `summary_df` that contains, for each value of `f1`, the mean `label` as a column `mlabel`, the count `label`as a column `clabel`, and the SEM of `label` as a column `semlabel`. For the SE equation, use the *sample standard devivation* computed by `fn.stddev_samp`. **Hint: perform an aggregate operation and use appropriate combinations of functions in the package `fn`. Rename columns appropriately**

In [11]:
# create the dataframe `summary_df` below
from pyspark.sql import functions as fn
from pyspark.sql.functions import *


summary_df = requests_df.groupby('f1').\
    agg(fn.countDistinct('label').alias('clabel'), 
        fn.mean('label').alias('mlabel'), 
        (fn.stddev_samp('label') / fn.sqrt(fn.countDistinct('label'))).alias('semlabel'))


summary_df.printSchema()

root
 |-- f1: double (nullable = true)
 |-- clabel: long (nullable = false)
 |-- mlabel: double (nullable = true)
 |-- semlabel: double (nullable = true)



In [12]:
summary_df.show(10)

+---+------+-------------------+------------------+
| f1|clabel|             mlabel|          semlabel|
+---+------+-------------------+------------------+
|0.0|  5001| -29.61175341232893|1.7554606758419877|
|1.0|  4999|-12.622431936863213| 1.748107734777136|
+---+------+-------------------+------------------+



The schema of `summary_df` should look like:

```python
summary_df.printSchema()
```
```console
root
 |-- f1: double (nullable = true)
 |-- mlabel: double (nullable = true)
 |-- clabel: long(nullable = false)
 |-- semlabel: double (nullable = true)

```
The mean label for each `f1` feature should be:

```python
summary_df.select('f1', 'mlabel').show()
```

```console
+---+------------------+
| f1|            mlabel|
+---+------------------+
|0.0|-29.61175341232892|
|1.0|-12.62243193686321|
+---+------------------+
```

In [13]:
# 5 pts
np.testing.assert_equal(summary_df.count(), 2)
np.testing.assert_equal(set(summary_df.columns), {'f1', 'mlabel', 'clabel', 'semlabel'})
np.testing.assert_approx_equal(summary_df.rdd.map(lambda r: r.mlabel).sum(), -42.23418534919212,
                              significant=3)
np.testing.assert_approx_equal(summary_df.rdd.map(lambda r: r.semlabel).sum(), 3.503568410619124,
                              significant=3)

## Question 4:

Use the transformer `VectorAssembler` to create a dataframe that puts all columns `f1`, `f2`, ..., `f8` from `requests_df` into a column named `features`. Assign the vector assembler object into a variable `var` and the new dataframe into the variable  `features_df`

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
var = feature.VectorAssembler(inputCols=["f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8"], 
                      outputCol="features")

features_df = var.transform(requests_df)

The schema of the new dataframe should be like this:

```python
features_df.printSchema()
```

```console
root
 |-- f1: double (nullable = true)
 |-- f2: double (nullable = true)
 |-- f3: double (nullable = true)
 |-- f4: double (nullable = true)
 |-- f5: double (nullable = true)
 |-- f6: double (nullable = true)
 |-- f7: double (nullable = true)
 |-- f8: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
```

In [15]:
# try it here
features_df.printSchema()

root
 |-- f1: double (nullable = true)
 |-- f2: double (nullable = true)
 |-- f3: double (nullable = true)
 |-- f4: double (nullable = true)
 |-- f5: double (nullable = true)
 |-- f6: double (nullable = true)
 |-- f7: double (nullable = true)
 |-- f8: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [16]:
# 5 pts
np.testing.assert_equal(type(features_df), sql.dataframe.DataFrame)
np.testing.assert_equal(set(features_df.columns), 
                        {'f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'features', 'label'})

## Question 5:

Run a linear regression model on `features_df` using the `features` column to predict the `label` column. Store the transformer fit to the data in the `lr_model` variable (the transformer is what the estimator's `fit` function returns). Use the transformer to create a dataframe named `predictions_df` with two columns: `label` and `prediction` based on the `features_df` dataframe.

In [17]:
# create the linear regression estimator below and name it lr_model.
# use the model to create the dataframe predictions_df with two columns label and prediction
# by transforming the dataframe `features_df` 
# YOUR CODE HERE
from pyspark.ml import regression
# feature engineering
from pyspark.ml import feature

#TWO COLUMNS 

lr_model = regression.LinearRegression(featuresCol='features',
                                          labelCol='label')
lr_model = lr_model.fit(features_df)
                                         
predictions_df = lr_model.transform(features_df).select(fn.col('label').alias('label'), fn.col('prediction').alias('prediction'))
    

The resulting dataframe should be in `predictions_df`. Running `predictions_df.show(5)` should produce something like

```python
predictions_df.show(5)
```

```console
+--------------+-------------------+
|         label|         prediction|
+--------------+-------------------+
| 163.877101489| 159.06994708337518|
|-105.023368852| -99.52598722329135|
|-115.011801582|-109.91382979074436|
|-122.623452696|-118.62864861627764|
|-117.130523174|-116.89245751669506|
+--------------+-------------------+
only showing top 5 rows
```

In [18]:
# 10 pts:
np.testing.assert_equal(set(predictions_df.columns), {'label', 'prediction'})
np.testing.assert_equal(predictions_df.count(), 10000)
np.testing.assert_equal(type(lr_model), regression.LinearRegressionModel)
np.testing.assert_equal(type(var), feature.VectorAssembler)

## Question 6:

Try to get the R squared and adjusted R squared of the linear model by `model.summary` and assign in variable `r2` and `adj_r2`. Also, based on the `predictions_df` dataframe, count how many rows which the difference between prediction and label are greater than 5. Assign the count into `diff_5`. 

In [19]:
r2 = lr_model.summary.r2

In [20]:
adj_r2 = 1-(1-r2)*(1000-1) / (1000 - 2 - 1)

In [21]:
df = predictions_df.toPandas()

df['difference'] = df['label'] - df['prediction']

df['difference'] = df['difference'].abs()

diff_5 = df[df['difference']>5]

df = spark.createDataFrame(diff_5)

diff_5 = df.count()


In [22]:
# 5 pts:
np.testing.assert_approx_equal(r2, 0.9952916629913636, significant=3)
np.testing.assert_approx_equal(adj_r2, 0.9952878929287003, significant=3)
assert diff_5 == 5617

## Question 7:

Root Mean Square Error(RMSE) and Mean Absolute Error(MAE) are common metrics to evaluate regression model. \
The root mean squared error is defined as

$$ \text{RMSE} = \sqrt{\frac{1}{n} \sum_{i=1}^n (\hat{y}_i - y_i)^2}$$

and the mean absolute error is defined as

$$ \text{MAE} = \frac{1}{n} \sum_{i=1}^n \left\lvert \hat{y}_i - y_i \right\rvert$$

Combine functions in `fn` package and other functions to create a dataframe called `lr_metrics_df` that contains the root mean squared error in column `rmse` and the mean absolute error in column `mae` based on the `predictions_df` dataframe.

In [35]:
# root mean squared error
error = (fn.pow(fn.col('prediction') - fn.col('label'), fn.lit(2))) # the squared difference yhat - y 
rmse = predictions_df.select(sqrt(avg(error))).alias('error') # square root of the average

# mean absolute error
from pyspark.sql.functions import abs
df1 = predictions_df.withColumn('mae', (predictions_df['prediction'] - predictions_df['label'])) # yhat - y 
mae = df1.select(mean(abs(fn.col('mae')))) 

In [36]:
lr_metrics_df2 = mae.join(rmse)

In [37]:
# 10 pts
np.testing.assert_array_less(lr_metrics_df.first().rmse, 10)
np.testing.assert_array_less(lr_metrics_df.first().mae, 10)
np.testing.assert_equal(lr_metrics_df.count(), 1)