# Provision EMR Cluster
```
aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole \
--termination-protected --applications Name=Hadoop Name=Hive Name=Spark \
--bootstrap-actions '[{"Path":"s3://chkrd/artifacts/emr-base.sh","Args":["--julia","--ruby","--ds-packages","--ml-packages","--python-packages","ggplot nilearn spark-sklearn"],"Name":"EMR Base"}]' \
--ec2-attributes '{"KeyName":"devenv-key","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-39a22e5e","EmrManagedSlaveSecurityGroup":"sg-86bb82fe","EmrManagedMasterSecurityGroup":"sg-83bb82fb"}' \
--service-role EMR_DefaultRole --enable-debugging --release-label emr-5.1.0 \
--log-uri 's3n://chkrd2/elasticmapreduce/' --name 'SparkaaS' --instance-groups '[{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core - 2"},{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master - 1"}]' \
--scale-down-behavior TERMINATE_AT_INSTANCE_HOUR --region us-west-2
```

# Load `sparkmagic` Jupyter Extension

In [1]:
%load_ext sparkmagic.magics

# Configure EMR Endpoint

In [2]:
%manage_spark

Added endpoint http://54.190.49.183:8998
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1489528374652_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [3]:
%%spark
sc

<pyspark.context.SparkContext object at 0x7fc919b38f50>

In [4]:
%%spark
numbers = sc.parallelize([1, 2, 3, 4])
print('First element of numbers is {} and its description is:\n{}'.format(numbers.first(), numbers.toDebugString()))

First element of numbers is 1 and its description is:
(2) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 []

In [5]:
%%spark
sqlContext

<pyspark.sql.context.SQLContext object at 0x7fc919846f10>

In [6]:
%%spark
spark

<pyspark.sql.session.SparkSession object at 0x7fc919846c90>

# Test

In [10]:
%%spark
from sklearn.ensemble import RandomForestClassifier
from spark_sklearn import GridSearchCV
import warnings
warnings.filterwarnings('ignore')
from operator import itemgetter
import pandas as pd
from time import time

# Databrix utility function to report top 3 best scores
def report(grid_scores, n_top = 3):
    top_scores = sorted(grid_scores, key = itemgetter(1), reverse = True)[:n_top]
    for i, score in enumerate(top_scores):
        print("Model with rank: {0}".format(i + 1))
        print("Mean validation score: {0:.3f} (std: {1:.3f})".format(
              score.mean_validation_score,
              np.std(score.cv_validation_scores)))
        print("Parameters: {0}".format(score.parameters))
        print("")

# Load the train and test datasets to create two DataFrames
train_url = "http://s3.amazonaws.com/assets.datacamp.com/course/Kaggle/train.csv"
train = pd.read_csv(train_url)
test_url = "http://s3.amazonaws.com/assets.datacamp.com/course/Kaggle/test.csv"
test = pd.read_csv(test_url)

# Creat a list of the features. 
target = train["Survived"].values
forest_features = train[["Pclass", "Age", "Sex", "Fare", "SibSp", "Parch", "Embarked"]].values

# Add grid parameter code based on Databricks
param_grid = {"max_depth": [3, 10, None],
              "min_samples_split": [1.0, 2, 3, 10], 
              "min_samples_leaf": [1.0, 2, 3, 10], 
              "bootstrap": [True, False],
              "criterion": ["gini", "entropy"],
              "n_estimators": [10, 20, 40, 80, 160]}

# Execute the grid search
Spark_gs = GridSearchCV(sc, RandomForestClassifier(), param_grid = param_grid)
start = time()

Spark_gs.fit(forest_features, target)
print("Spark GridSearchCV took {:.2f} seconds for {:d} candidate settings.".format(time() - start, len(Spark_gs.grid_scores_)))
report(Spark_gs.grid_scores_)

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 50, ip-172-31-16-152.us-west-2.compute.internal, executor 6): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1489528374652_0003/container_1489528374652_0003_01_000008/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1489528374652_0003/container_1489528374652_0003_01_000008/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1489528374652_0003/container_1489528374652_0003_01_000008/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.i

In [None]:
%%spark
from sklearn import grid_search, datasets
from sklearn.ensemble import RandomForestClassifier
# Use spark_sklearn’s grid search instead:
from spark_sklearn import GridSearchCV
digits = datasets.load_digits()
X, y = digits.data, digits.target
param_grid = {"max_depth": [3, None],
              "max_features": [1.0, 3, 10],
              "min_samples_split": [1.0, 3, 10],
              "min_samples_leaf": [3, 10],
              "bootstrap": [True, False],
              "criterion": ["gini", "entropy"],
              "n_estimators": [10, 20, 40, 80]}
gs = grid_search.GridSearchCV(RandomForestClassifier(), param_grid=param_grid)
gs.fit(X, y)

# T?E?ST 

In [4]:
%%spark
# Imports and utilities
import numpy as np
from time import time
from operator import itemgetter
from sklearn import svm, grid_search, datasets
from sklearn.ensemble import RandomForestClassifier
from spark_sklearn import GridSearchCV

# Utility function to report best scores
def report(grid_scores, n_top=3):
    top_scores = sorted(grid_scores, key=itemgetter(1), reverse=True)[:n_top]
    for i, score in enumerate(top_scores):
        print("Model with rank: {0}".format(i + 1))
        print("Mean validation score: {0:.3f} (std: {1:.3f})".format(
              score.mean_validation_score,
              np.std(score.cv_validation_scores)))
        print("Parameters: {0}".format(score.parameters))
        print("")

digits = datasets.load_digits()
X, y = digits.data, digits.target
param_grid = {"max_depth": [3, None],
              "max_features": [1, 3, 10],
              "min_samples_split": [1.0, 3, 10],
              "min_samples_leaf": [1, 3, 10],
              "bootstrap": [True, False],
              "criterion": ["gini", "entropy"],
              "n_estimators": [10, 20, 40, 80]}
clf = RandomForestClassifier()

gs = grid_search.GridSearchCV(clf, param_grid=param_grid)
start = time()
gs.fit(X, y)
print("GridSearchCV took {:.2f} seconds for {:d} %%sdpark settings.".format(time() - start, len(gs.grid_scores_)))
report(gs.grid_scores_)

GridSearchCV took 488.55 seconds for 864 %%sdpark settings.
Model with rank: 1
Mean validation score: 0.949 (std: 0.005)
Parameters: {'bootstrap': True, 'min_samples_leaf': 1, 'n_estimators': 80, 'min_samples_split': 3, 'criterion': 'entropy', 'max_features': 3, 'max_depth': None}

Model with rank: 2
Mean validation score: 0.946 (std: 0.006)
Parameters: {'bootstrap': False, 'min_samples_leaf': 1, 'n_estimators': 80, 'min_samples_split': 3, 'criterion': 'gini', 'max_features': 1, 'max_depth': None}

Model with rank: 3
Mean validation score: 0.944 (std: 0.004)
Parameters: {'bootstrap': False, 'min_samples_leaf': 1, 'n_estimators': 80, 'min_samples_split': 3, 'criterion': 'gini', 'max_features': 3, 'max_depth': None}

In [5]:
%%spark
from spark_sklearn import GridSearchCV

gs = GridSearchCV(sc, clf, param_grid)
start = time()
gs.fit(X, y)
print("GridSearchCV took {:.2f} seconds for {:d} candidate settings.".format(time() - start, len(gs.grid_scores_)))
report(gs.grid_scores_)

GridSearchCV took 64.37 seconds for 864 candidate settings.
Model with rank: 1
Mean validation score: 0.949 (std: 0.009)
Parameters: {'bootstrap': False, 'min_samples_leaf': 1, 'n_estimators': 80, 'min_samples_split': 10, 'criterion': 'gini', 'max_features': 3, 'max_depth': None}

Model with rank: 2
Mean validation score: 0.945 (std: 0.004)
Parameters: {'bootstrap': False, 'min_samples_leaf': 1, 'n_estimators': 80, 'min_samples_split': 10, 'criterion': 'entropy', 'max_features': 3, 'max_depth': None}

Model with rank: 3
Mean validation score: 0.945 (std: 0.008)
Parameters: {'bootstrap': False, 'min_samples_leaf': 1, 'n_estimators': 40, 'min_samples_split': 3, 'criterion': 'gini', 'max_features': 3, 'max_depth': None}