<div style="color:red;font-weight:bold;background:yellow;text-align:center;padding:10px;border:solid">
    <h1>RUN IN EMR CLUSTER ONLY</h1>
    If the URL of the current page does not begin with "ec2", then do **NOT** proceed!
</div>

# Spark Exercise
In this exercise, you will use the tools you learned in the readings and labs to perform some computation, some dataset querying, and some machine learning.

## Connecting to PySpark

In [13]:
name = !hostname
if "dsa" in name[0]:
    raise RuntimeError("Only run this notebook in the EMR Cluster!")
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("pyspark-excercise")
sc = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-excercise, master=yarn) created by __init__ at <ipython-input-2-be3af4cd9708>:9 

# 1
For this exercise, you will be using a dataset that describes workers that have missed hours at work for some reason. 
This dataset is located in HDFS at `/datasets/work-absentees.csv`. Ingest that dataset below:

In [14]:
from pyspark.sql import SQLContext

# To use Spark SQL we create a SQLContext from SparkContext
sqlContext = SQLContext(sc)

# Location of the dataset on HDFS
DATASET = '/datasets/work-absentees.csv'

# Load a table with a CSV format reader
dataset = sqlContext.read.format('com.databricks.spark.csv').options(
    header='true', inferschema='true').load(DATASET)

# 2
To understand what we are looking at, show the columns and their data types in the cell below:

In [15]:
dataset.head()

Row(ID=11, reason=26, month=7, day=3, season=1, transportation_expense=289, distance=36, service_time=13, age=33, work_load_per_day=239.554, target=97, disciplined=0, education_level=1, number_children=2, social_drinker=1, social_smoker=0, number_pets=1, weight=90, height=172, bmi=30, hours_absent=4)

In [4]:
dataset.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- reason: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- season: integer (nullable = true)
 |-- transportation_expense: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- service_time: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- work_load_per_day: double (nullable = true)
 |-- target: integer (nullable = true)
 |-- disciplined: integer (nullable = true)
 |-- education_level: integer (nullable = true)
 |-- number_children: integer (nullable = true)
 |-- social_drinker: integer (nullable = true)
 |-- social_smoker: integer (nullable = true)
 |-- number_pets: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- bmi: integer (nullable = true)
 |-- hours_absent: integer (nullable = true)



# 3
#### Using SQL Statements, answer the following:

Say the company for which these employees work is trying to understand why these employees are missing work. 
In the cells below, display the top 5 reasons that workers missed. For this one, you can use LIMIT

In [16]:
dataset.createOrReplaceTempView("dataset")

In [24]:
query = "SELECT reason,count(*) as count FROM dataset WHERE hours_absent>0 GROUP BY reason ORDER BY count DESC LIMIT 5"
sqlContext.sql(query).show()

+------+-----+
|reason|count|
+------+-----+
|    23|  149|
|    28|  112|
|    27|   68|
|    13|   55|
|    19|   40|
+------+-----+



What month(s) has the most absent hours? DO NOT USE `LIMIT`!


In [8]:
query = """
SELECT * 
FROM (SELECT month,SUM(hours_absent) as total FROM dataset GROUP BY month) as table
WHERE total = (SELECT MAX(total) FROM (SELECT month,SUM(hours_absent) as total FROM dataset GROUP BY month))
"""

sqlContext.sql(query).show()


+-----+-----+
|month|total|
+-----+-----+
|    3|  765|
+-----+-----+



What is the difference in average workload per day between workers with a PhD and workers without one? (education_level = 3 for workers with PhD) 

In [20]:
# I get the average from non_PhD workers - the average from PhD worker because I like the positive number


query = """
SELECT (sum(case when education_level =3 then work_load_per_day else 0 end)/sum(case when education_level =3 then 1 else 0 end)) 
- (sum(case when education_level !=3 then work_load_per_day else 0 end)/sum(case when education_level !=3 then 1 else 0 end)) as different
FROM dataset 

"""

sqlContext.sql(query).show()

+-----------------+
|        different|
+-----------------+
|-8.59885378885133|
+-----------------+



How many workers have 0 hours absent?

In [25]:
query = """
SELECT count(*)
FROM (SELECT DISTINCT ID FROM dataset WHERE hours_absent =0) as table

"""

sqlContext.sql(query).show()

+--------+
|count(1)|
+--------+
|      24|
+--------+



# 4
Use PySpark to parallelize: What is the correlation between social drinking and social smoking according to this dataset? You must parallelize the calculation of cross covariance, but you may use libraries for mean and standard deviation

In [5]:
df=dataset.toPandas()
social_drinker =  df['social_drinker'].values.tolist()
social_smoker =  df['social_smoker'].values.tolist()

In [6]:
#find mean
social_drinker_mean = sc.parallelize(social_drinker).mean()
social_smoker_mean = sc.parallelize(social_smoker).mean()

#find standard deviation
social_drinker_stdev = sc.parallelize(social_drinker).stdev()
social_smoker_stdev = sc.parallelize(social_smoker).stdev()



In [7]:
############
# Covariance between social_drinker and social_smoker
############
# Create RDD
rdd = sc.parallelize(social_drinker)
rdd1 = sc.parallelize(social_smoker)


# create broadcast variables
broadcast_social_drinker_mean = sc.broadcast(social_drinker_mean)
broadcast_social_smoker_mean = sc.broadcast(social_smoker_mean)

# do x - mu
temp_residual_drinker = rdd.map(lambda x: (x - broadcast_social_drinker_mean.value))
temp_residual_smoker = rdd1.map(lambda x: (x - broadcast_social_smoker_mean.value))

#get the results
residual_drinker = temp_residual_drinker.collect()
residual_smoker = temp_residual_smoker.collect()

#create list of production betweeen social_drinker and social_smoker
product_drinker_smoker = []
for a,b in zip(residual_drinker,residual_smoker):
    product_drinker_smoker.append(a*b)


# sum the lists
rdd2 = sc.parallelize(product_drinker_smoker)
acc2 = sc.accumulator(0)
# add them
rdd2.foreach(lambda x: acc2.add(x))
# get back values
total = acc2.value

# divide by n -1
covariance_drinker_smoker = total/(len(product_drinker_smoker)-1)
print("This is covariance: " + str(covariance_drinker_smoker))


##############
# Calculate correlation
##############
correlation_drinker_smoker = covariance_drinker_smoker/(social_drinker_stdev*social_smoker_stdev)
print("This is correlation: " + str(correlation_drinker_smoker))



This is covariance: -0.014409538090187619
This is correlation: -0.11182912497430188


# 5 
Use PySpark to parallelize the following:
    Find the median of the number of hours absent. 
    
To do this, you will be using the Median of Medians algorithm.
1. Split the data into 5 equal partitions
2. Hand off each partition and find the median of that partition 
3. Collect the 5 medians
4. Find the medians of the 5 medians

The result of step 4 will be approximately the median

In [8]:
hours_absent =  df['hours_absent'].values.tolist()

In [9]:
# 1. Split the data into 5 equal partions


rdd = sc.parallelize(hours_absent, 5)
partitions = rdd.glom().collect()



In [19]:
"""
ANOTHER WAY TO PARTITION

number_of_partition = 5
partition_size = len(hours_absent)/number_of_partition


split_points = [int(i*partition_size) for i in range(0, number_of_partition+1)]

partitions = []
for i in range(len(split_points)-1):
    partition = hours_absent[split_points[i]:split_points[i+1]]
    partitions.append(partition)
    
"""

'\nANOTHER WAY TO PARTITION\n\n\nsplit_points = [int(i*partition_size) for i in range(0, number_of_partition+1)]\n\npartitions = []\nfor i in range(len(split_points)-1):\n    partition = hours_absent[split_points[i]:split_points[i+1]]\n    partitions.append(partition)\n    \n'

In [11]:
# 2. Hand off each partition and find the median of that partition

import numpy as np

medians = []
rdd = sc.parallelize(partitions)
temp_result = rdd.map(lambda x: np.median(x))

# 3. Collect 5 medians
medians.append(temp_result.collect())

print("List of medians: ",medians)

# 4.Find the medians of the 5 medians
median = np.median(medians)
print("The final median: ",median)

List of medians:  [[3.0, 8.0, 3.0, 3.0, 3.0]]
The final median:  3.0


# 6
So now we know the Top 5 reasons workers missed and the mode of the number of hours missed.
Lets now try to do some Machine Learning to see if we can use certain elements to predict the number of hours someone will miss.

In the cells below, try 2 different methods to predict the number of hours a worker will miss. The two methods can have differing input columns for the prediction or a differing type of machine learning model or both. 

The only requirements are:
1. Split the data into an 80/20 random split for training/testing, respectively
2. Generate a percent accuracy on the testing set for two differing ways of predicting using the Spark ML library.

You will need to create a column named "label" inside the Spark Dataset for previous code to work

In [27]:
## break into categories
## 0      #0  0 hours absent
## 1-8    #1  <1 day
## 9-24   #2  1-3 days
## 25-56  #3  3-7 days
## 57-112 #4  7-14 days
## >113   #5  >14 days

bins = [1,8,24,56,112,125]
labels=[1,2,3,4,5]

import pandas as pd


df['label'] = pd.cut(df['hours_absent'], bins=bins, labels=labels, include_lowest=True)
df['label'] = pd.to_numeric(df['label']).fillna(0)

#convert back to spark dataframe
spark_df = SQLContext(sc).createDataFrame(df)

In [37]:
from pyspark.ml.feature import VectorAssembler

# create a vector assembler - this will create a new column that includes columns that are considered 
# features and assembles them into a vector
features = VectorAssembler(
    inputCols = ["reason","season","transportation_expense","distance","service_time","age","work_load_per_day","education_level","number_children","social_drinker","social_smoker"],
    outputCol = "features")

# split into Train and Test
train_data, test_data = features.transform(spark_df).randomSplit([0.8,0.2])


In [45]:
# MultilayerPerceptronClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
#bring in evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

layers = [11,5,4,6]
trainer = MultilayerPerceptronClassifier(layers=layers)
model = trainer.fit(train_data)

# predict on the test data -- the model has not seen in
result = model.transform(test_data)


#create evaluator
predictionAndLabels = result.select("label", "prediction")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# get the accuracy
accuracy = evaluator.evaluate(predictionAndLabels)


print("Test set accuracy = {:.2f}%".format(accuracy*100))

Test set accuracy = 84.21%


In [43]:
from pyspark.ml.classification import NaiveBayes


# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train_data)

# predictions
predictions = model.transform(test_data)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Test set accuracy = {:.2f}%".format(accuracy*100))



Test set accuracy = 63.16%
