# 03 - Model Execution & Evaluation
##### In this notebook, we:
##### 1. Load and transform the data
##### 2. Train tree based Model
##### 3. Implement AutoML based approach for Mass Model Training
##### 4. Evaluate results and give model insight

## 1. Loading Data

Loading master table into model pipeline

In [3]:
cmc_model = spark.table("cmc_db.model_data_csv")

cmc_model.cache()

#display(cmc_model.select("*"))

## Transforming Data

Applying transformations to generalize data

In [5]:
from pyspark.sql.functions import *

names = cmc_model.schema.names

for name in names:
  cmc_model = cmc_model.withColumn(name, regexp_replace(name, 'nan', ''))
  cmc_model = cmc_model.withColumn(name, regexp_replace(name, 'None', ''))
  cmc_model = cmc_model.withColumn(name, regexp_replace(name, 'mm', ''))
  cmc_model = cmc_model.withColumn(name, regexp_replace(name, 'Knots', ''))
  cmc_model = cmc_model.withColumn(name, regexp_replace(name, 'knots', ''))
  cmc_model = cmc_model.withColumn(name, regexp_replace(name, '\\"\"\""', ''))
  cmc_model = cmc_model.withColumn(name, regexp_replace(name, '>', ''))

#display(cmc_model)

In [6]:
from pyspark.sql.types import IntegerType, TimestampType, DoubleType, StringType, DecimalType, FloatType

cmc_model_schema = cmc_model.withColumn("time", cmc_model["time"].cast(StringType()))
cmc_model_schema = cmc_model_schema.withColumn("Latitude", cmc_model_schema["Latitude"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("Longitude", cmc_model_schema["Longitude"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("month", cmc_model_schema["month"].cast(FloatType()))
cmc_model_schema = cmc_model_schema.withColumn("year", cmc_model_schema["year"].cast(FloatType()))
cmc_model_schema = cmc_model_schema.withColumn("do", cmc_model_schema["do"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("tn", cmc_model_schema["tn"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("tp", cmc_model_schema["tp"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("watertemp", cmc_model_schema["watertemp"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("airtemp_narr", cmc_model_schema["airtemp_narr"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("precip3_narr", cmc_model_schema["precip3_narr"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("precip24_narr", cmc_model_schema["precip24_narr"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("precip48_narr", cmc_model_schema["precip48_narr"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("windspeed_narr\r", cmc_model_schema["windspeed_narr\r"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("HUC12", cmc_model_schema["HUC12"].cast(DoubleType()))
cmc_model_schema = cmc_model_schema.withColumn("StationName", cmc_model_schema["StationName"].cast(StringType()))
cmc_model_schema = cmc_model_schema.withColumn("GroupCode", cmc_model_schema["GroupCode"].cast(StringType()))
cmc_model_schema = cmc_model_schema.withColumnRenamed("windspeed_narr\r","windspeed_narr")

#display(cmc_model_schema)

In [7]:
from pyspark.sql.functions import lower, col

cmc_model_schema_rep = cmc_model_schema.replace('', 'None')
cmc_model_schema_rep = cmc_model_schema_rep.fillna('') # needs to eliminate nulls

names = ["StationName", "GroupCode", "time"]

for name in names:
  cmc_model_schema_rep = cmc_model_schema_rep.withColumn(name,lower(col(name)))

#display(cmc_model_schema_rep)

In [8]:
cmc_model_schema_rep.printSchema()

## StringIndexer

Transform categorical strings with StringIndexer

In [10]:
from pyspark.ml.feature import StringIndexer

iStationName = StringIndexer(inputCol="StationName", outputCol="cat_StationName", handleInvalid="skip")
iGroupCode = StringIndexer(inputCol="GroupCode", outputCol="cat_GroupCode", handleInvalid="skip")
iTime = StringIndexer(inputCol="time", outputCol="cat_Time", handleInvalid="skip")

## Imputer

Impute possible missing double values

In [12]:
from pyspark.ml.feature import Imputer

imputer = Imputer()
imputer.setStrategy("median")

imputeCols = ["Latitude",
  "Longitude",
  "month",
  "year",
  "HUC12",
  "do",
  "tn",
  "tp",
  "watertemp",
  "airtemp_narr",
  "precip3_narr",
  "precip24_narr",
  "precip48_narr",
  "windspeed_narr"]
imputer.setInputCols(imputeCols)
imputer.setOutputCols(imputeCols)

cmc_model_schema_rep = imputer.fit(cmc_model_schema_rep).transform(cmc_model_schema_rep)

#display(cmc_model_schema_rep)

In [13]:
recordCount = cmc_model_schema_rep.count()
noNullsRecordCount = cmc_model_schema_rep.na.drop().count()

print("We have {} records that contain null values.".format(recordCount - noNullsRecordCount))

## OneHotEncoder

One hot encoding of indexed strings

In [15]:
from pyspark.ml.feature import OneHotEncoderEstimator

oneHotEnc = OneHotEncoderEstimator()
oneHotEnc.setInputCols(["cat_StationName", "cat_GroupCode", "cat_Time"])
oneHotEnc.setOutputCols(["vec_StationName", "vec_GroupCode", "vec_Time"])

## 2. Train/Test Split

80/20 split for model train and test

In [17]:
seed = 1234

testDF, trainDF = cmc_model_schema_rep.randomSplit((0.2, 0.8), seed=seed)

print(testDF.count(), trainDF.count())

display(testDF)

StationName,Latitude,Longitude,GroupCode,time,month,year,HUC12,do,tn,tp,watertemp,airtemp_narr,precip3_narr,precip24_narr,precip48_narr,windspeed_narr
8-col-40-laca,37.984722,-77.766944,laca,10:10:00,8.0,2017.0,20801060603.0,7.87,0.34,0.01,29.45,24.526764,1.4765625,0.59713733,12.6953125,2.9420943
8-col-40-laca,37.984722,-77.766944,laca,10:20:00,6.0,2013.0,20801060603.0,7.14,0.4,0.01,28.28,24.393707,0.0625,0.31296083,8.664523,3.4805393
8-col-40-laca,37.984722,-77.766944,laca,9:00:00,8.0,2018.0,20801060603.0,8.09,0.44,0.02,31.79,22.972809,0.25258133,2.0625,4.6683507,9.122026
8-col-40-laca,37.984722,-77.766944,laca,9:00:00,10.0,2018.0,20801060603.0,8.5,0.48,0.02,25.67,19.233307,0.0,0.0,0.03125,13.361147
8-col-40-laca,37.984722,-77.766944,laca,9:16:00,8.0,2019.0,20801060603.0,7.76,0.38,0.02,29.03,24.447418,0.1640625,0.0,14.3828125,20.511784
8-col-40-laca,37.984722,-77.766944,laca,9:18:00,4.0,2019.0,20801060603.0,9.79,0.44,0.05,18.71,18.951813,0.0,5.7578125,5.7578125,5.4579177
8-col-40-laca,37.984722,-77.766944,laca,9:45:00,4.0,2016.0,20801060603.0,8.97,0.48,0.02,15.54,-0.4128723,0.0,1.6875,1.6875,95.04044
8-col-40-laca,37.984722,-77.766944,laca,9:50:00,8.0,2016.0,20801060603.0,6.86,0.38,0.01,32.8,24.508453,0.0,0.09611061,3.414179,2.6908665
8-crc-44-laca,38.088111,-77.901194,laca,9:00:00,10.0,2019.0,20801060404.0,8.32,0.64,0.06,21.68,15.24057,0.109375,0.1875,0.59375,57.71276
8-crc-44-laca,38.088111,-77.901194,laca,9:45:00,8.0,2019.0,20801060404.0,6.34,0.68,0.07,28.3,24.447418,0.1640625,0.0,14.3828125,20.511784


In [18]:
featureCols = [
  "Latitude",
  "Longitude",
  "month",
  "year",
  "HUC12",
  "do",
  "tp",
  "watertemp",
  "airtemp_narr",
  "precip3_narr",
  "precip24_narr",
  "precip48_narr",
  "windspeed_narr",
  "vec_StationName", 
  "vec_GroupCode", 
  "vec_Time"
]

## Baseline Model Linear Regression

Creating baseline model to extablish metrics for other models to beat

In [20]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

vectorAssembler = VectorAssembler(inputCols=featureCols, outputCol="features")

lr = (LinearRegression()
      .setLabelCol("tn")
      .setFeaturesCol("features"))

lrpipeline = Pipeline(stages = [iStationName, iGroupCode, iTime, oneHotEnc, vectorAssembler, lr])

lrpipeline.getStages()

lrPipelineModel = lrpipeline.fit(trainDF)

predictedDF_lr = lrPipelineModel.transform(testDF)

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics

def printEval(df):
  out = df\
  .select("prediction", "tn").rdd.map(lambda x: (float(x[0]), float(x[1])))
  
  metrics = RegressionMetrics(out)

  mse = str(metrics.meanSquaredError)
  rmse = str(metrics.rootMeanSquaredError)
  r2 = str(metrics.r2)
  mae = str(metrics.meanAbsoluteError)
  ve = str(metrics.explainedVariance)

  print("RMSE: {}\nR2: {}\nMSE: {}\nMAE: {}\nVariance Explained: {}".format(rmse, r2, mse, mae, ve))

In [22]:
# Metrics to beat

printEval(predictedDF_lr)

## Random Forrest

In [24]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="tn", 
                           featuresCol="features",
                           maxDepth=15, 
                           maxBins=10, 
                           minInstancesPerNode=1, 
                           minInfoGain=0.0, 
                           maxMemoryInMB=256, 
                           cacheNodeIds=False, 
                           checkpointInterval=10, 
                           subsamplingRate=1.0, 
                           seed=None, 
                           numTrees=500, 
                           featureSubsetStrategy='auto'
                          )

rfpipeline = Pipeline(stages = [iStationName, iGroupCode, iTime, oneHotEnc, vectorAssembler, rf])

rfPipelineModel = rfpipeline.fit(trainDF)

predictedDF_rf = rfPipelineModel.transform(testDF)

In [25]:
printEval(predictedDF_rf)

## GBT

In [27]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(labelCol="tn", 
                   featuresCol="features", 
                   maxIter=10)

gbtpipeline = Pipeline(stages = [iStationName, iGroupCode, iTime, oneHotEnc, vectorAssembler, gbt])

gbtPipelineModel = gbtpipeline.fit(trainDF)

predictedDF_gbt = gbtPipelineModel.transform(testDF)

In [28]:
printEval(predictedDF_gbt)

## XGBoost

Please install xgboost and numpy!

In [30]:
import xgboost as xgb
from sklearn.metrics import accuracy_score, explained_variance_score, mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import pandas as pd
import math

seed = 1234

pipeline = Pipeline(stages=[iStationName, iGroupCode, iTime, oneHotEnc])

pipelineModel = pipeline.fit(cmc_model_schema_rep)
trainingData = pipelineModel.transform(cmc_model_schema_rep)

xgbfeatureCols = ["Latitude",
  "Longitude",
  "month",
  "year",
  "HUC12",
  "do",
  "tp",
  "watertemp",
  "airtemp_narr",
  "precip3_narr",
  "precip24_narr",
  "precip48_narr",
  "windspeed_narr",
  "tn"
   ]

xgbInputTrainingDF = trainingData.select(xgbfeatureCols)

xgbtestDF, xgbtrainDF = xgbInputTrainingDF.randomSplit((0.2, 0.8), seed=seed)
print(xgbtestDF.count(), xgbtrainDF.count())

#change to pandas for the xgb
pandas_xgbtrainDF = xgbtrainDF.toPandas()
pandas_xgbtestDF = xgbtestDF.toPandas()

In [31]:
xg_reg = xgb.XGBRegressor(objective ='reg:squarederror', 
                          alpha = 0.0, 
                          min_child_weight = 1.0,
                          tweedie_variance_power = 1.5,
                          random_state = 1234,
                          max_bin = 256,
                          gamma = 0.01,
                          max_depth = 10, 
                          colsample_bylevel = 1.0,
                          scale_pos_weight = 1,       
                          max_delta_step = 0.0, 
                          learning_rate = 1, 
                          n_estimators = 2000,
                          n_jobs = 1,
                          tree_method = 'auto', 
                          num_parallel_tree = 1,
                          colsample_bytree = 0.5,
                          subsample = 1.0, 
                          reg_lambda = 1.0)

xg_reg.fit(pandas_xgbtrainDF[pandas_xgbtrainDF.columns[:-1]], pandas_xgbtrainDF['tn'])

y_pred = xg_reg.predict(pandas_xgbtestDF[pandas_xgbtestDF.columns[:-1]])
y_true = pd.DataFrame({'actuals': pandas_xgbtestDF['tn'] })
y_pred = pd.DataFrame({'prediction': np.array(y_pred) })

In [32]:
r2 = r2_score(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
rmse = math.sqrt(mse)
mae = mean_absolute_error(y_true, y_pred)
ve = explained_variance_score(y_true, y_pred)

print("RMSE: {}\nR2: {}\nMSE: {}\nMAE: {}\nVariance Explained: {}".format(r2, rmse, mse, mae, ve))

## 3. H2O AutoML

Pysparkling is not installed to the Databricks environment by default. You can install additional libraries very easily. Go ahead and install h2o_pysparkling_2.4, pysparkling, colorama==0.3.8.

In [34]:
from pysparkling import *
from pyspark.sql import SparkSession
import h2o
import xgboost

hc = H2OContext.getOrCreate(spark)

In [35]:
# Using non Spark Version

import h2o
from h2o.automl import H2OAutoML

folds = 5

aml = H2OAutoML(max_models = 60,
                max_runtime_secs = 5000,
                max_runtime_secs_per_model = 120,
                nfolds = folds,
                seed = 1234)

response = "tn"
predictor = ["Latitude",
  "Longitude",
  "month",
  "year",
  "HUC12",
  "do",
  "tp",
  "watertemp",
  "airtemp_narr",
  "precip3_narr",
  "precip24_narr",
  "precip48_narr",
  "windspeed_narr",
  "StationName", 
  "GroupCode", 
  "time"]

training_frame = h2o.H2OFrame(trainDF.toPandas())

aml.train(x = predictor, y = response, training_frame = training_frame)

In [36]:
leaderboard = h2o.automl.get_leaderboard(aml, extra_columns = 'ALL')
aml_leaderboard_df=leaderboard.as_data_frame()
aml_leaderboard_df

Unnamed: 0,model_id,mean_residual_deviance,rmse,mse,mae,rmsle,training_time_ms,predict_time_per_row_ms
0,StackedEnsemble_BestOfFamily_AutoML_20200612_0...,0.478472,0.691717,0.478472,0.299250,0.140713,1935,0.103678
1,XGBoost_grid__1_AutoML_20200612_004216_model_13,0.501005,0.707817,0.501005,0.316772,,11804,0.016899
2,GBM_grid__1_AutoML_20200612_004216_model_4,0.503178,0.709350,0.503178,0.320131,0.149007,5850,0.091005
3,StackedEnsemble_AllModels_AutoML_20200612_004216,0.507509,0.712397,0.507509,0.295363,0.139292,4535,0.310028
4,XGBoost_grid__1_AutoML_20200612_004216_model_3,0.508372,0.713002,0.508372,0.321309,0.150678,18387,0.018961
5,XGBoost_grid__1_AutoML_20200612_004216_model_19,0.514605,0.717360,0.514605,0.321414,0.150402,14694,0.015378
6,XGBoost_grid__1_AutoML_20200612_004216_model_11,0.515831,0.718214,0.515831,0.329205,,9121,0.008341
7,GBM_grid__1_AutoML_20200612_004216_model_9,0.515835,0.718216,0.515835,0.320651,0.149351,6653,0.068468
8,XGBoost_grid__1_AutoML_20200612_004216_model_9,0.522853,0.723086,0.522853,0.330457,0.152388,15562,0.012913
9,GBM_3_AutoML_20200612_004216,0.524458,0.724195,0.524458,0.343013,,5342,0.105492


In [37]:
for m in aml.leaderboard.as_data_frame()['model_id']:
    print(m)
    print(h2o.get_model(m))

In [38]:
model_set=aml_leaderboard_df['model_id']

mod_best=h2o.get_model(model_set[0])
mod_best

In [39]:
mod_best._id

In [40]:
testing_frame = h2o.H2OFrame(testDF.toPandas())

predictedDF_automl = mod_best.predict(testing_frame)

perf = mod_best.model_performance(testing_frame)
perf

In [41]:
# Using non ensable model for Variance importance
mod_best1=h2o.get_model(model_set[1])
mod_best1

In [42]:
mod_best1.varimp(use_pandas=True)

Unnamed: 0,variable,relative_importance,scaled_importance,percentage
0,Longitude,46752.085938,1.000000e+00,3.652842e-01
1,Latitude,31911.300781,6.825642e-01,2.493299e-01
2,HUC12,13435.775391,2.873834e-01,1.049766e-01
3,tp,9104.670898,1.947436e-01,7.113677e-02
4,GroupCode.vadeq,4242.400879,9.074249e-02,3.314680e-02
5,GroupCode.padep,3408.214844,7.289974e-02,2.662912e-02
6,watertemp,2152.591797,4.604269e-02,1.681867e-02
7,StationName.cobr3,1948.132080,4.166942e-02,1.522118e-02
8,airtemp_narr,1777.897949,3.802821e-02,1.389110e-02
9,precip24_narr,1492.083374,3.191480e-02,1.165797e-02


In [43]:
mod_best1.varimp_plot()

## 4. Model comparison & Selection

Novel comparison of models

In [45]:
# Linear Regression
print("Linear Regression Metrics: ")
printEval(predictedDF_lr)
print(" ")

# Random Forest
print("Random Forest Metrics: ")
printEval(predictedDF_rf)
print(" ")

# GBT
print("GBT Metrics: ")
printEval(predictedDF_gbt)
print(" ")

# xgBoost
print("xgBoost Metrics: ")
print("RMSE: {}\nR2: {}\nMSE: {}\nMAE: {}\nVariance Explained: {}".format(r2, rmse, mse, mae, ve))
print(" ")

In [46]:
# AutoML
print("H2o AutoML Non - Ensamble Metrics: AutoML xgBoost ")
mod_best1

In [47]:
# AutoML
print("H2o AutoML Ensamble Metrics: ")
mod_best

In [48]:
perf