## Start Spark and H2O

### Create Spark session

In [1]:
import pyspark
from pyspark.sql import SparkSession, functions

In [2]:
spark = SparkSession.builder.appName("sparkling").getOrCreate()

In [3]:
spark

### Create H2OContext

In [4]:
import h2o
from pysparkling import H2OContext

In [5]:
hc = H2OContext.getOrCreate(spark)

Connecting to H2O server at http://172.31.13.96:54321... successful.


0,1
H2O cluster uptime:,10 secs
H2O cluster timezone:,Etc/UTC
H2O data parsing timezone:,UTC
H2O cluster version:,3.18.0.4
H2O cluster version age:,16 days
H2O cluster name:,sparkling-water-ubuntu_local-1521928013156
H2O cluster total nodes:,1
H2O cluster free memory:,916 Mb
H2O cluster total cores:,4
H2O cluster allowed cores:,4



Sparkling Water Context:
 * H2O name: sparkling-water-ubuntu_local-1521928013156
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (driver,172.31.13.96,54321)
  ------------------------

  Open H2O Flow in browser: http://172.31.13.96:54321 (CMD + click in Mac OSX)

    


## Data

### Loading data

Here we load using H2O and move to Spark DFs because its easy to use `h2o.import_file`
but in general would be more common to use just Spark to load from HDFS for example.

In [6]:
allFlights = h2o.import_file(path="http://h2o-public-test-data.s3.amazonaws.com/smalldata/airlines/year2005.csv.gz")
weatherTable = h2o.import_file(path="http://h2o-public-test-data.s3.amazonaws.com/smalldata/chicago/Chicago_Ohare_International_Airport.csv")

Parse progress: |█████████████████████████████████████████████████████████| 100%
Parse progress: |█████████████████████████████████████████████████████████| 100%


In [7]:
allFlights.head(5)

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,IsArrDelayed,IsDepDelayed
2005,6,19,7,1948,1950,2316,2124,OH,5321,N403CA,208,94,192,112,-2,CVG,AVP,499,3,13,0,,0,0,0,112,0,0,YES,NO
2005,3,28,1,2035,1908,2217,2104,EV,4392,N931EV,102,116,90,73,87,ATL,GRR,640,2,10,0,,0,2,71,0,0,0,YES,YES
2005,8,8,1,1035,1035,1701,1640,WN,1972,N445,266,245,233,21,0,SLC,BWI,1864,5,28,0,,0,0,0,21,0,0,YES,NO
2005,7,8,5,902,905,936,952,MQ,3922,N648AE,34,47,24,-16,-3,ORD,MSN,109,2,8,0,,0,0,0,0,0,0,NO,NO
2005,6,13,1,2020,2015,2158,2135,UA,1181,N914UA,158,140,126,23,5,DEN,BUR,850,9,23,0,,0,0,0,23,0,0,YES,YES




In [8]:
weatherTable.head(5)

Date,TmaxF,TminF,TmeanF,PrcpIn,SnowIn,CDD,HDD,GDD
2005-01-01 00:00:00,41,25,33.0,0.31,0.0,0,32.0,0
2005-01-02 00:00:00,54,33,43.5,0.08,0.0,0,21.5,0
2005-01-03 00:00:00,36,32,34.0,0.36,0.0,0,31.0,0
2005-01-04 00:00:00,35,30,32.5,0.05,1.2,0,32.5,0
2005-01-05 00:00:00,31,26,28.5,0.38,6.2,0,36.5,0




In [9]:
allFlightsDF = hc.as_spark_frame(allFlights)
weatherTableDF = hc.as_spark_frame(weatherTable)

In [10]:
allFlightsDF.count(), allFlightsDF.head()

(100000,
 Row(Year=2005, Month=6, DayofMonth=19, DayOfWeek=7, DepTime=1948, CRSDepTime=1950, ArrTime=2316, CRSArrTime=2124, UniqueCarrier='OH', FlightNum=5321, TailNum='N403CA', ActualElapsedTime=208, CRSElapsedTime=94, AirTime=192, ArrDelay=112, DepDelay=-2, Origin='CVG', Dest='AVP', Distance=499, TaxiIn=3, TaxiOut=13, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=112, SecurityDelay=0, LateAircraftDelay=0, IsArrDelayed='YES', IsDepDelayed='NO'))

In [11]:
weatherTableDF.count(), weatherTableDF.head()

(1461,
 Row(Date=datetime.datetime(2005, 1, 1, 0, 0), TmaxF=41, TminF=25, TmeanF=33.0, PrcpIn=0.31, SnowIn=0.0, CDD=0.0, HDD=32.0, GDD=0.0))

### Clean data using Spark

In [12]:
flightsToORD = allFlightsDF.filter(allFlightsDF.Dest == "ORD")

In [13]:
flightsToORD.count()

4690

In [14]:
datasetDF = flightsToORD.join(weatherTableDF, (flightsToORD.Year == functions.year(weatherTableDF.Date)) & 
                                              (flightsToORD.Month == functions.month(weatherTableDF.Date)) &
                                              (flightsToORD.DayofMonth == functions.dayofmonth(weatherTableDF.Date))
                             )

In [15]:
datasetDF.count()

4690

In [16]:
datasetDF.head()

Row(Year=2005, Month=1, DayofMonth=15, DayOfWeek=6, DepTime=1106, CRSDepTime=1050, ArrTime=1202, CRSArrTime=1151, UniqueCarrier='DH', FlightNum=1158, TailNum='N662BR', ActualElapsedTime=116, CRSElapsedTime=121, AirTime=98, ArrDelay=11, DepDelay=16, Origin='IAD', Dest='ORD', Distance=589, TaxiIn=4, TaxiOut=13, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, IsArrDelayed='YES', IsDepDelayed='YES', Date=datetime.datetime(2005, 1, 15, 0, 0), TmaxF=15, TminF=4, TmeanF=9.5, PrcpIn=0.0, SnowIn=0.0, CDD=0.0, HDD=55.5, GDD=0.0)

In [17]:
datasetDF = datasetDF.select("Year", "Month", "DayofMonth", "CRSDepTime", "CRSArrTime", "CRSElapsedTime",
                 "UniqueCarrier", "FlightNum", "TailNum", "Origin", "Distance",
                 "TmaxF", "TminF", "TmeanF", "PrcpIn", "SnowIn", "CDD", "HDD", "GDD", "ArrDelay")

## Build model

We take the SparkDF make it an H2OFrame to train a model

In [18]:
from h2o.estimators.deeplearning import H2ODeepLearningEstimator

In [19]:
flightsWithWeather = hc.as_h2o_frame(datasetDF)

In [20]:
flightsWithWeather.head(5)

Year,Month,DayofMonth,CRSDepTime,CRSArrTime,CRSElapsedTime,UniqueCarrier,FlightNum,TailNum,Origin,Distance,TmaxF,TminF,TmeanF,PrcpIn,SnowIn,CDD,HDD,GDD,ArrDelay
2005,1,15,1050,1151,121,DH,1158,N662BR,IAD,589,15,4,9.5,0,0,0,55.5,0,11
2005,1,15,835,1000,85,MQ,4042,N811AE,SGF,438,15,4,9.5,0,0,0,55.5,0,-10
2005,1,15,1228,1315,107,MQ,4168,N523AE,BUF,473,15,4,9.5,0,0,0,55.5,0,-7
2005,1,15,647,822,95,MQ,3954,N815AE,MEM,491,15,4,9.5,0,0,0,55.5,0,29
2005,1,15,1100,1236,156,AA,321,N4XTAA,LGA,733,15,4,9.5,0,0,0,55.5,0,-18




In [21]:
train, valid, test = flightsWithWeather.split_frame(ratios=[.7, .15])

In [22]:
predictor_columns = list(range(0, flightsWithWeather.ncol - 1))
target_col = flightsWithWeather.ncol - 1

In [36]:
dl_model = H2ODeepLearningEstimator(hidden=[100, 100], epochs=5)

In [37]:
dl_model.train(x=predictor_columns, y=target_col, training_frame=train, validation_frame=valid)

deeplearning Model Build progress: |██████████████████████████████████████| 100%


In [38]:
dl_model.model_performance(test)


ModelMetricsRegression: deeplearning
** Reported on test data. **

MSE: 1643.4886249256167
RMSE: 40.539963306910096
MAE: 21.379430026187702
RMSLE: NaN
Mean Residual Deviance: 1643.4886249256167




In [32]:
test["ArrDelay"].head(5)

ArrDelay
59.0
""
-14.0
100.0
10.0




In [33]:
dl_model.predict(test.head(5))

deeplearning prediction progress: |███████████████████████████████████████| 100%


predict
17.3427
25.6626
43.4875
26.1504
21.8099


