## Configuration

In [1]:
%%configure -f 
{
"conf":{
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
    
        "spark.executor.heartbeatInterval":"10800s",
        "spark.network.timeout":"24h",
    
        "spark.driver.memory": "10G",
        "spark.executor.memory": "10G",
        "spark.executor.cores":"4",
    
        "livy.server.session.timeout":"24h",
    
        "spark.dynamicAllocation.enabled":"false",
        "spark.ext.h2o.fail.on.unsupported.spark.param":"false",    
        
        "spark.app.name":"694"
      }
}

In [2]:
sc

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
30,application_1615332382624_0031,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=694>

In [3]:
sc.install_pypi_package("h2o-pysparkling-2.4")
sc.install_pypi_package("h2o")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting h2o-pysparkling-2.4
  Using cached h2o_pysparkling_2.4-3.32.0.4.post1-py2.py3-none-any.whl
Collecting tabulate
  Using cached tabulate-0.8.9-py3-none-any.whl (25 kB)
Collecting requests
  Using cached requests-2.25.1-py2.py3-none-any.whl (61 kB)
Collecting future
  Using cached future-0.18.2-py3-none-any.whl
Collecting colorama>=0.3.8
  Using cached colorama-0.4.4-py2.py3-none-any.whl (16 kB)
Collecting urllib3<1.27,>=1.21.1
  Using cached urllib3-1.26.3-py2.py3-none-any.whl (137 kB)
Collecting chardet<5,>=3.0.2
  Using cached chardet-4.0.0-py2.py3-none-any.whl (178 kB)
Collecting idna<3,>=2.5
  Using cached idna-2.10-py2.py3-none-any.whl (58 kB)
Collecting certifi>=2017.4.17
  Using cached certifi-2020.12.5-py2.py3-none-any.whl (147 kB)
Installing collected packages: urllib3, idna, chardet, certifi, tabulate, requests, future, colorama, h2o-pysparkling-2.4
Successfully installed certifi-2020.12.5 chardet-4.0.0 colorama-0.4.4 future-0.18.2 h2o-pysparkling-2.4-3.32.0.4.post1 

## Data Processing for H2O

In [4]:
import h2o
from pyspark.sql import SparkSession
from pysparkling import *

ss = SparkSession.builder.getOrCreate()
hc = H2OContext.getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Connecting to H2O server at http://ip-172-31-8-193.ec2.internal:54321 ... successful.
--------------------------  -------------------------------------------------------------------------------------------------------
H2O_cluster_uptime:         06 secs
H2O_cluster_timezone:       UTC
H2O_data_parsing_timezone:  UTC
H2O_cluster_version:        3.32.0.4
H2O_cluster_version_age:    1 month and 10 days
H2O_cluster_name:           sparkling-water-livy_application_1615332382624_0031
H2O_cluster_total_nodes:    1
H2O_cluster_free_memory:    9.97 Gb
H2O_cluster_total_cores:    4
H2O_cluster_allowed_cores:  4
H2O_cluster_status:         locked, healthy
H2O_connection_url:         http://ip-172-31-8-193.ec2.internal:54321
H2O_connection_proxy:       null
H2O_internal_security:      False
H2O_API_Extensions:         XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4
Python_version:             3.6.8 final
--------------------------  ----------

In [5]:
processed_df = ss.read.parquet("s3://msds694-finalprojects/processed_df")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
processed_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------------+----------------+-------------+--------------+---------------+-------------+-------------+-------------+----------------+---------------+-------------+
|is_arrested|contraband_found|    location_raw|driver_gender|driver_age_raw|driver_race_raw|  driver_race|violation_raw|    violation|search_conducted|search_type_raw|  search_type|
+-----------+----------------+----------------+-------------+--------------+---------------+-------------+-------------+-------------+----------------+---------------+-------------+
|      FALSE|             0.0|(145,[10],[1.0])|(2,[0],[1.0])| (6,[4],[1.0])| (18,[4],[1.0])|(5,[4],[1.0])|(8,[1],[1.0])|(4,[2],[1.0])|   (2,[0],[1.0])| (10,[0],[1.0])|(7,[0],[1.0])|
|      FALSE|             0.0|(145,[10],[1.0])|(2,[0],[1.0])| (6,[0],[1.0])| (18,[0],[1.0])|(5,[0],[1.0])|(8,[2],[1.0])|(4,[1],[1.0])|   (2,[0],[1.0])| (10,[0],[1.0])|(7,[0],[1.0])|
|      FALSE|             0.0|(145,[10],[1.0])|(2,[1],[1.0])| (6,[3],[1.0])| (18,[1],[1.0]

In [None]:
df_h2o = hc.asH2OFrame(processed_df, "process_df")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_h2o['is_arrested'] = df_h2o['is_arrested'].asfactor()

In [None]:
df_h2o.summary()

In [None]:
predictors = df_h2o.names[:]
response = "is_arrested"
predictors.remove(response)

In [None]:
predictors

## Split DataFrame into training and validation set

In [None]:
df = hc.asSparkFrame(df_h2o)

In [None]:
dftsets = df.randomSplit([0.8, 0.2], 1)
train = dftsets[0]
valid = dftsets[1]

In [None]:
train_h2o = hc.asH2OFrame(train, "train_df")
valid_h2o = hc.asH2OFrame(valid, "valid_df")

## Fit RandomForest model using H2O 

In [None]:
from h2o.estimators.random_forest import H2ORandomForestEstimator

In [None]:
model_rf = H2ORandomForestEstimator(ntrees=30, max_depth=10, seed=1, nfolds=5)

In [None]:
model_rf.train(x=predictors,
               y=response,
               training_frame=train_h2o,
               validation_frame=valid_h2o)



In [None]:
model_rf.confusion_matrix(valid=True)

In [None]:
print(round(model_rf.auc(train=True),3))

## Fit an XGBoost model using H2O

In [15]:
from h2o.estimators.xgboost import H2OXGBoostEstimator

model_xg = H2OXGBoostEstimator()
model_xg.train(x=predictors,
               y="is_arrested",
               training_frame=train_h2o,
               validation_frame=valid_h2o)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

xgboost Model Build progress: [###########################################] 100%

In [16]:
model_xg.confusion_matrix(valid=True)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Confusion Matrix (Act/Pred) for max f1 @ threshold = 0.41104772116275545: 
       FALSE        TRUE    Error    Rate
-----  -----------  ------  -------  -------------------
FALSE  2.83022e+06  2462    0.0009   (2462.0/2832687.0)
TRUE   17885        55577   0.2435   (17885.0/73462.0)
Total  2.84811e+06  58039   0.007    (20347.0/2906149.0)

In [17]:
print(round(model_xg.auc(train=True),3))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.95

## Fit an deeplearning model using H2O 

In [18]:
from h2o.estimators.deeplearning import H2ODeepLearningEstimator

model_dl = H2ODeepLearningEstimator(variable_importances=True,
                                    loss="Automatic",nfolds=5,max_runtime_secs=1000,seed=1)

model_dl.train(x=predictors,
               y="is_arrested",
               training_frame=train_h2o,
               validation_frame=valid_h2o)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

deeplearning Model Build progress: [######################################] 100%

In [19]:
model_dl.confusion_matrix(valid=True)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Confusion Matrix (Act/Pred) for max f1 @ threshold = 0.9762762197705446: 
       FALSE        TRUE    Error    Rate
-----  -----------  ------  -------  -------------------
FALSE  2.82783e+06  4856    0.0017   (4856.0/2832687.0)
TRUE   19812        53650   0.2697   (19812.0/73462.0)
Total  2.84764e+06  58506   0.0085   (24668.0/2906149.0)

In [20]:
print(round(model_dl.auc(train=True),3))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.919

## Fit GradientBoosting model using H2O 

In [21]:
from h2o.estimators.gbm import H2OGradientBoostingEstimator

model_gbm = H2OGradientBoostingEstimator()
model_gbm.train(x=predictors,
               y="is_arrested",
               training_frame=train_h2o,
               validation_frame=valid_h2o)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

gbm Model Build progress: [###############################################] 100%

In [22]:
model_gbm.confusion_matrix(valid=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Confusion Matrix (Act/Pred) for max f1 @ threshold = 0.37661218578846384: 
       FALSE        TRUE    Error    Rate
-----  -----------  ------  -------  -------------------
FALSE  2.83014e+06  2547    0.0009   (2547.0/2832687.0)
TRUE   17921        55541   0.2439   (17921.0/73462.0)
Total  2.84806e+06  58088   0.007    (20468.0/2906149.0)

In [23]:
print(round(model_gbm.auc(train=True),3))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.945