## Demo 10

### using pytorch_env

## Run this demo using the Shell below(Run on Standalone mode, before run the below, you should check if spark set to Standalone mod)

In [None]:
! export SPARK_LOCAL_IP=127.0.0.1

! sh /home/sparkuser/jupyter/Bin/H2O/h2o_sparklin_simple.sh

20/04/15 17:18:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/04/15 17:18:31 WARN spark.SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


## Load Library

In [3]:
from pyspark.sql import SparkSession
from pysparkling import *
from datetime import datetime
import h2o
import numpy as np

## Start SparkSession and H2OContext

In [4]:
spark=SparkSession.builder.appName("SparklingWaterApp").getOrCreate()
hc=H2OContext.getOrCreate(spark=spark)

Proxy is defined in the environment: http_proxy. This may interfere with your H2O Connection.


Connecting to H2O server at http://10.16.4.4:54325 ....... failed.


H2OConnectionError: Could not establish link to the H2O cloud http://10.16.4.4:54325 after 5 retries
[37:28.90] H2OConnectionError: Timeout after 3.086s
[37:32.11] H2OConnectionError: Timeout after 3.010s
[37:35.32] H2OConnectionError: Timeout after 3.011s
[37:38.53] H2OConnectionError: Timeout after 3.010s
[37:41.74] H2OConnectionError: Timeout after 3.011s

## Data Prepare

In [None]:
time1=datetime.now()
print("====================Data Prepare=======================")
data=h2o.import_file(path="/home/sparkuser/jupyter/Bin/NYC_Taxi_Fare/input/trainCleanData.csv")
time1_1=datetime.now()
print("data load time:")
print(time1_1-time1)

df_train=hc.asSparkFrame(data)

df_train.createOrReplaceTempView("nyc")

nyc_data=spark.sql("""SELECT pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, fare_amount from nyc""")



train=hc.asH2OFrame(nyc_data, "nyc_dataTable")

predictor_columns=train.col_names[0:4]
lable_column="fare_amount"
time2=datetime.now()
data_prepare_time=time2-time1
print("Data Prepare Consuming Time:")
print(data_prepare_time)
print("===================End Data Prepare==================")

In [None]:
from h2o.estimators.random_forest import H2ORandomForestEstimator

## Model Train

In [None]:
print("======================Model Train============================")
rf_model=H2ORandomForestEstimator(ntrees=20, max_depth=5, nfolds=10)
rf_model.train(x=predictor_columns,y=lable_column,training_frame=train)
print(rf_model)
time3=datetime.now()
model_train_time=time3-time2
print("Model Train Consuming Time:")
print(model_train_time)

print("====================End Model Train==========================")

## Prediction and Evaluation

In [None]:
print("===================Evaluation result=========================")


test_data=h2o.import_file(path="/home/sparkuser/jupyter/Bin/NYC_Taxi_Fare/input/testCleanData.csv")


df_test=hc.asSparkFrame(test_data)

df_test.createOrReplaceTempView("nyc_test")

nyc_test_data=spark.sql("""SELECT pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, fare_amount from nyc_test""")



test=hc.asH2OFrame(nyc_test_data, "nyc_test_dataTable")


test_result=rf_model.predict(test)

def RMSE(x, y):
    return np.sqrt(((x - y) ** 2).mean())


rmse_value=RMSE(test_result,test["fare_amount"])
print(test_result)
print("RMSE value")
print(rmse_value)


In [None]:
print("Data Prepare Consuming Time:")
print(data_prepare_time)
print("Model Train Consuming Time:")
print(model_train_time)

In [None]:
#two nodes, ntrees=20, max_depth=20, nfolds=10,10 cores
RMSE value
[19.29719719]
Data Prepare Consuming Time:
0:00:27.503874
Model Train Consuming Time:
0:34:31.678458

In [None]:
#two nodes: ntrees=20, max_depth=20, nfolds=10,10 cores
RMSE value
[19.60689014]
Data Prepare Consuming Time:
0:01:07.487494
Model Train Consuming Time:
0:35:37.384876

In [None]:
#single node: ntrees=20, max_depth=20, nfolds=10,10 cores
RMSE value
[19.30460253]
Data Prepare Consuming Time:
0:01:06.381584
Model Train Consuming Time:
0:35:44.261877

In [None]:
#single node: ntrees=20, max_depth=5, nfolds=10,10 cores
data load time:
0:00:51.688446
RMSE value
[19.72957374]
Data Prepare Consuming Time:
0:01:08.218417
Model Train Consuming Time:
0:08:03.625978

In [None]:
#single node: ntrees=20, max_depth=5, nfolds=10,10 cores
data load time:
0:00:53.886734
RMSE value
[19.7442661]
Data Prepare Consuming Time:
0:01:07.930413
Model Train Consuming Time:
0:07:38.896936

In [None]:
#single node: ntrees=20, max_depth=5, nfolds=10,20 cores
data load time:
0:01:00.702585
RMSE value
[19.70716208]
Data Prepare Consuming Time:
0:01:17.942207
Model Train Consuming Time:
0:04:34.257820

In [None]:
#single node: ntrees=20, max_depth=5, nfolds=10,20 cores
RMSE value
[19.74521829]
Data Prepare Consuming Time:
0:01:15.422524
Model Train Consuming Time:
0:04:36.551092

In [None]:
#single node: ntrees=20, max_depth=20, nfolds=10,20 cores
data load time:
0:01:00.741514
RMSE value
[19.38341794]
Data Prepare Consuming Time:
0:01:18.341583
Model Train Consuming Time:
0:23:21.249120

In [None]:
#single node: ntrees=20, max_depth=20, nfolds=10,40 cores
RMSE value
[19.41381396]
Data Prepare Consuming Time:
0:01:42.192490
Model Train Consuming Time:
0:18:25.526213