In [81]:
# Importing Packages

import numpy as np
import pandas as pd
import time
import six
import sys
import requests
import subprocess
import re
from google.cloud import bigquery
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from matplotlib import pyplot as plt
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import RandomForestRegressor


In [82]:
# Starting and Creating Spark Session

sc = SparkContext.getOrCreate()
start = time.time()
sqlContext = SQLContext(sc)

In [83]:
###################
# Importing TCI/VCI Data and Output Variable of Interest
###################
csv_path = "gs://bigdatabucketcolumbia/final/data/maine/finalmainedatanick-tcitest.csv"

In [84]:
# Loading Data

data = spark.read.csv(csv_path, header="true", inferSchema="true")
data.show()

+------------+------------+----------+----------+-------+--------+
|centroid_lon|centroid_lat|    tasmin|    tasmax|     pr|     tci|
+------------+------------+----------+----------+-------+--------+
|  -68.598904|   46.658916|274.504189|286.380914|2.04E-5|0.022787|
|  -68.598904|   46.658916|272.747242| 287.68392|1.76E-6|0.012174|
|  -68.598904|   46.658916|270.779844|281.652701|1.66E-5|0.031852|
|  -68.598904|   46.658916|270.088352|287.826573|2.47E-6|0.033768|
|  -68.598904|   46.658916|274.572469|289.075817|4.44E-6| 0.05191|
|  -68.598904|   46.658916|273.365276|290.612225|2.34E-5|0.030312|
|  -68.598904|   46.658916|270.955019|285.772168|1.46E-5|0.021773|
|  -68.598904|   46.658916|271.374597|289.092053|5.91E-8|0.026801|
|  -68.598904|   46.658916|270.667428|289.947926|3.16E-6|0.023726|
|  -68.598904|   46.658916|276.431317|297.264688|1.08E-6|    0.02|
|  -68.598904|   46.658916|273.717478| 295.37846|4.62E-8|0.025048|
|  -68.598904|   46.658916|277.204342|300.034611| 1.9E-8|0.023

In [85]:
data.take(1)

[Row(centroid_lon=-68.598904, centroid_lat=46.658916, tasmin=274.504189, tasmax=286.380914, pr=2.04e-05, tci=0.022787)]

In [86]:
data.cache()
data.printSchema()

root
 |-- centroid_lon: double (nullable = true)
 |-- centroid_lat: double (nullable = true)
 |-- tasmin: double (nullable = true)
 |-- tasmax: double (nullable = true)
 |-- pr: double (nullable = true)
 |-- tci: double (nullable = true)



In [87]:
data.describe().toPandas()

Unnamed: 0,summary,centroid_lon,centroid_lat,tasmin,tasmax,pr,tci
0,count,42.0,42.0,42.0,42.0,42.0,42.0
1,mean,-68.59890399999996,46.65891600000003,278.8533380238096,296.8393392380953,8.508419047619049e-06,0.0322491904761904
2,stddev,0.0,0.0,7.075998518396975,7.683194022882964,1.255629090548013e-05,0.0137233783052561
3,min,-68.598904,46.658916,269.460985,281.652701,1.9e-08,0.012174
4,max,-68.598904,46.658916,290.520231,309.680348,5.61e-05,0.080826


In [88]:
# Vector Assembler
    
feature_columns = data.columns[:-1] # Output 
vectorAssembler = VectorAssembler(inputCols=feature_columns,outputCol="features")
vdata = vectorAssembler.transform(data)
vdata = vdata.select(['features', 'tci'])
vdata.show(3)

+--------------------+--------+
|            features|     tci|
+--------------------+--------+
|[-68.598904,46.65...|0.022787|
|[-68.598904,46.65...|0.012174|
|[-68.598904,46.65...|0.031852|
+--------------------+--------+
only showing top 3 rows



In [89]:
splits = vdata.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [90]:
lr = LinearRegression(featuresCol = 'features', labelCol='tci', maxIter=20, regParam=0.01, elasticNetParam=0.1)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0,0.0,82.71126817690886]
Intercept: 0.031357542048902426


In [91]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.013962
r2: 0.013536


In [92]:
train_df.describe().show()

+-------+--------------------+
|summary|                 tci|
+-------+--------------------+
|  count|                  30|
|   mean|0.031992266666666665|
| stddev|0.014297869354011028|
|    min|            0.012174|
|    max|            0.080826|
+-------+--------------------+



In [93]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","tci","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="tci",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+--------------------+--------+--------------------+
|          prediction|     tci|            features|
+--------------------+--------+--------------------+
| 0.03145762268339648|0.035789|[-68.598904,46.65...|
|  0.0313613633094922|0.025048|[-68.598904,46.65...|
|0.031724780079607905| 0.05191|[-68.598904,46.65...|
| 0.03183395895360142|0.032738|[-68.598904,46.65...|
|0.031446870218533485|    0.02|[-68.598904,46.65...|
+--------------------+--------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.162878


In [94]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.0111627


In [95]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 5
objectiveHistory: [0.5000000000000004, 0.49830710916198984, 0.497788884971253, 0.49778873398644324, 0.4977887339424533]
+--------------------+
|           residuals|
+--------------------+
| 0.00460029700000545|
| 0.00220616111870061|
|-0.00613022796652...|
|0.005419772033479...|
|-0.00789290965634146|
|-0.00100336976025...|
|-8.78549100639114...|
| 0.02962364632952262|
|-0.01079212656428...|
|-0.00456143028485...|
|-0.01932911388089...|
|-0.00298098572424...|
|0.005307853352494937|
|-0.01025785191971...|
|-0.01608242233162086|
|0.007525365356012...|
|0.012209618865065226|
| 0.04691267976443109|
|-0.00701127339100...|
|0.005605392452689...|
+--------------------+
only showing top 20 rows



In [96]:
# Linear Regression Model
predictions = lr_model.transform(test_df)
#predictions.select("prediction","tci","features").show(3)
predictions.select("features","tci", "prediction").show(3)

+--------------------+--------+--------------------+
|            features|     tci|          prediction|
+--------------------+--------+--------------------+
|[-68.598904,46.65...|0.035789| 0.03145762268339648|
|[-68.598904,46.65...|0.025048|  0.0313613633094922|
|[-68.598904,46.65...| 0.05191|0.031724780079607905|
+--------------------+--------+--------------------+
only showing top 3 rows



In [97]:
#################################################

# Decision Tree Regression
# https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a

from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'tci')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="tci", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 0.0156863


In [98]:
dt_model.featureImportances

SparseVector(5, {2: 0.0657, 3: 0.1774, 4: 0.7569})

In [99]:
data.take(1)

[Row(centroid_lon=-68.598904, centroid_lat=46.658916, tasmin=274.504189, tasmax=286.380914, pr=2.04e-05, tci=0.022787)]

In [100]:
##############################################

# Gradient-Boosted Tree Regression

gbt = GBTRegressor(featuresCol = 'features', labelCol = 'tci', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('features', 'tci', 'prediction').show(3)

+--------------------+--------+-------------------+
|            features|     tci|         prediction|
+--------------------+--------+-------------------+
|[-68.598904,46.65...|0.035789|0.03278491249895562|
|[-68.598904,46.65...|0.025048|0.02828629821263735|
|[-68.598904,46.65...| 0.05191|0.02828146186845441|
+--------------------+--------+-------------------+
only showing top 3 rows



In [101]:
gbt_evaluator = RegressionEvaluator(labelCol="tci", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.017353


In [102]:
#########################################

# Random Forest Regression

rf = RandomForestRegressor(featuresCol="features", labelCol = 'tci')
rf_model = rf.fit(train_df)
rf_predictions = gbt_model.transform(test_df)
rf_predictions.select('features','prediction', 'tci' ).show(5)

+--------------------+--------------------+--------+
|            features|          prediction|     tci|
+--------------------+--------------------+--------+
|[-68.598904,46.65...| 0.03278491249895562|0.035789|
|[-68.598904,46.65...| 0.02828629821263735|0.025048|
|[-68.598904,46.65...| 0.02828146186845441| 0.05191|
|[-68.598904,46.65...|0.030801852536936614|0.032738|
|[-68.598904,46.65...|0.025855105045989413|    0.02|
+--------------------+--------------------+--------+
only showing top 5 rows



In [103]:
rf_evaluator = RegressionEvaluator(labelCol="tci", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.017353
