# Part 3 - Learning
In this part we will try to train a machine learning model to try and predict the amount of Precipitation that will be recorded
by a given <b>Station</b> at a specific <b>Year</b> and <b>Month</b>.<br>
As we have seen in our analysis, there doesn't seem to be such a strong correlation between Precipitation and Geospatial information such as longitude and Elevation.
We have also seen that there isn't any correlation (both linear and Nonlinear) with any of the other
variables TMIN, TMAX, and SNWD, so we will avoid using models like Linear Regression or Generalized Linear Regression. <br>
Therefore, we will attempt to use a Random Forest Regressor and a Decision Tree Regressor.


We will begin by importing all the necessary libraries and setting up our environment.

In [1]:
import findspark

findspark.init()
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
os.environ[
    'PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,"\
                             "com.microsoft.azure:spark-mssql-connector:1.0.2 pyspark-shell"


Start our spark session and define out global parameters

In [3]:
def init_spark(app_name):
    spark = SparkSession.builder.appName(app_name).getOrCreate()
    sc = spark.sparkContext
    return spark, sc


server_name = "ServerName"
database_name = "DatabaseName"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "TableName"
username = "UserName"
password = "Password"
spark, sc = init_spark('project')
kafka_server = 'dds2020s-kafka.eastus.cloudapp.azure.com:9092'

We will now load our data from the data store into two different Pyspark Dataframes.

In [4]:
jdbcDF = spark.read\
    .format("jdbc")\
    .option("url", url)\
    .option("dbtable", table_name)\
    .option("user", username)\
    .option("password", password).load()

stationData = spark.read\
    .format("jdbc")\
    .option("url", url)\
    .option("dbtable", 'StationData')\
    .option("user", username)\
    .option("password", password).load()
jdbcDF.createOrReplaceTempView("DataStore_Stage2")
stationData.createOrReplaceTempView("stationData")

From our data store, we will build our dataset.<br>
Our chosen features for the predictive model are:<br>
<ul>
<li> <b>TMAX & TMIN</b> from the same Month Year and Station of which we are trying to predict it's PRCP levels.</li>
<li> The <b>Season</b> of the given month.</li>
<li> <b> Elevation & Latitude </b> of the station.</li>
<li> The average <b>SNWD</b> two seasons prior to the current season.</li>
</ul><br>
Our hypothesis is that given a station, year and month the model will be able to capture the phenomena of melting ice as a source of precipitation.<br>
We hope to see that given a Season with high temperatures (both maximum and minimum) with high elevation and low latitude and a high amount of snow two seasons ago will produce high precipitation,
while low tempretures with low elevation and high latitudes and low snow amounts will produce lower amounts of precipitation.

In [5]:
jdbcDF = spark.sql(
    """select d1.StationId ,d1.Year_Rec,d1.Month_Rec, d1.Season,d1.PRCP,d1.TMAX,d1.TMIN,s1.Elevation,
        s1.Latitude, avg(d2.SNWD) Season_SNWD
        from DataStore_Stage2 d1, DataStore_Stage2 d2, StationData s1
        where d1.StationId = d2.StationId and
              d1.StationId = s1.StationId and
              ((d1.Season % 4 = (d2.Season % 4 ) + 2 and d1.Year_Rec=d2.Year_Rec and d1.Season in (3,4)) or
              (d1.Season % 4 = (d2.Season % 4 ) + 2 and d1.Year_Rec=d2.Year_Rec+1 and d1.Season in (1,2)))
            and d2.SNWD is not null and d1.PRCP is not null and d1.TMIN is not null and d1.TMAX is not null
        group by d1.StationId, d1.Year_Rec, d1.Month_Rec, d1.Season, d1.PRCP, d1.TMAX, d1.TMIN,
        s1.Elevation,s1.Latitude""")


Now we begin training our model

In [6]:
assembler = VectorAssembler(
    inputCols=['Season', 'TMAX', 'TMIN', 'Elevation','Latitude', 'Season_SNWD'],
    outputCol="features")
output = assembler.transform(jdbcDF).drop("StationId", "Year_Rec", "Month_Rec", "TMAX", "TMIN", "SNWD", 'Season_SNWD',\
                                          "Elevation", 'Latitude', 'Season')\
    .withColumn("label", F.col("PRCP")).drop("PRCP")

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(output)

We will split the data in a ratio of 7:3 so that we could try to have balanced samples in both the train and test set, all the while preserving a ratio that is translated to a large train set and a smaller test set. <br>
We will train a few models and compare results. <br>


In [7]:
(trainingData, testData) = output.randomSplit([0.7, 0.3])
rf1 = RandomForestRegressor(featuresCol="indexedFeatures", maxDepth=5, numTrees=10)
rf2 = RandomForestRegressor(featuresCol="indexedFeatures", maxDepth=5, numTrees=20)
rf3 = RandomForestRegressor(featuresCol="indexedFeatures", maxDepth=10, numTrees=10)
rf4 = RandomForestRegressor(featuresCol="indexedFeatures", maxDepth=10, numTrees=20)
dt1 = DecisionTreeRegressor(featuresCol="indexedFeatures", maxDepth=10)
dt2 = DecisionTreeRegressor(featuresCol="indexedFeatures", maxDepth=5)


In [8]:
pipeline = Pipeline(stages=[featureIndexer, rf1])
pipeline2 = Pipeline(stages=[featureIndexer, rf2])
pipeline3 = Pipeline(stages=[featureIndexer, rf3])
pipeline4 = Pipeline(stages=[featureIndexer, rf4])
pipeline5 = Pipeline(stages=[featureIndexer, dt1])
pipeline6 = Pipeline(stages=[featureIndexer, dt2])


In [9]:
# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)
model2 = pipeline2.fit(trainingData)
model3 = pipeline3.fit(trainingData)
model4 = pipeline4.fit(trainingData)
model5 = pipeline5.fit(trainingData)
model6 = pipeline6.fit(trainingData)

After fitting our models we will begin predicting.

In [10]:
test_predictions_rf1 = model.transform(testData)
train_predictions_rf1 = model.transform(trainingData)

test_predictions_rf2 = model2.transform(testData)
train_predictions_rf2 = model2.transform(trainingData)

test_predictions_rf3 = model3.transform(testData)
train_predictions_rf3 = model4.transform(trainingData)

test_predictions_rf4 = model4.transform(testData)
train_predictions_rf4 = model5.transform(trainingData)

test_predictions_dt1 = model5.transform(testData)
train_predictions_dt1 = model5.transform(trainingData)

test_predictions_dt2 = model6.transform(testData)
train_predictions_dt2 = model6.transform(trainingData)

Evaluating our models

In [11]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse_test_rf1 = evaluator.evaluate(test_predictions_rf1)
rmse_train_rf1 = evaluator.evaluate(train_predictions_rf1)

print("Random Forest MaxDepth=5, NumTrees=10 - RMSE on test data = %g" % rmse_test_rf1)
print("Random Forest MaxDepth=5, NumTrees=10 - RMSE on train data = %g" % rmse_train_rf1)

rmse_test_rf2 = evaluator.evaluate(test_predictions_rf2)
rmse_train_rf2 = evaluator.evaluate(train_predictions_rf2)

print("Random Forest MaxDepth=5, NumTrees=20 - RMSE on test data = %g" % rmse_test_rf2)
print("Random Forest MaxDepth=5, NumTrees=20 - RMSE on train data = %g" % rmse_train_rf2)

rmse_test_rf3 = evaluator.evaluate(test_predictions_rf3)
rmse_train_rf3 = evaluator.evaluate(train_predictions_rf3)

print("Random Forest MaxDepth=10, NumTrees=10 - RMSE on test data = %g" % rmse_test_rf3)
print("Random Forest MaxDepth=10, NumTrees=10 - RMSE on train data = %g" % rmse_train_rf3)

rmse_test_rf4 = evaluator.evaluate(test_predictions_rf4)
rmse_train_rf4 = evaluator.evaluate(train_predictions_rf4)

print("Random Forest MaxDepth=10, NumTrees=20 - RMSE on test data = %g" % rmse_test_rf4)
print("Random Forest MaxDepth=10, NumTrees=20 - RMSE on train data = %g" % rmse_train_rf4)

rmse_test_dt1 = evaluator.evaluate(test_predictions_dt1)
rmse_train_dt1 = evaluator.evaluate(train_predictions_dt1)
print("Decision Tree MaxDepth=10 - RMSE on test data = %g" % rmse_test_dt1)
print("Decision Tree MaxDepth=10 - RMSE on train data = %g" % rmse_train_dt1)

rmse_test_dt2 = evaluator.evaluate(test_predictions_dt2)
rmse_train_dt2 = evaluator.evaluate(train_predictions_dt2)
print("Decision Tree MaxDepth=5 - RMSE on test data = %g" % rmse_test_dt2)
print("Decision Tree MaxDepth=5 - RMSE on train data = %g" % rmse_train_dt2)

Random Forest MaxDepth=5, NumTrees=10 - RMSE on test data = 12.1776
Random Forest MaxDepth=5, NumTrees=10 - RMSE on train data = 12.1992
Random Forest MaxDepth=5, NumTrees=20 - RMSE on test data = 12.11
Random Forest MaxDepth=5, NumTrees=20 - RMSE on train data = 12.136
Random Forest MaxDepth=10, NumTrees=10 - RMSE on test data = 11.3922
Random Forest MaxDepth=10, NumTrees=10 - RMSE on train data = 11.2317
Random Forest MaxDepth=10, NumTrees=20 - RMSE on test data = 11.3508
Random Forest MaxDepth=10, NumTrees=20 - RMSE on train data = 11.1468
Decision Tree MaxDepth=10 - RMSE on test data = 11.4237
Decision Tree MaxDepth=10 - RMSE on train data = 11.1468
Decision Tree MaxDepth=5 - RMSE on test data = 12.1716
Decision Tree MaxDepth=5 - RMSE on train data = 12.2021


As can be seen, the Random Forest model with max depth = 10 and 20 decision trees
 produces the best result for the <b>Test Set</b> with an RMSE of 11.3508, and a
  Random Forest model with max depth = 10 and 20 decision trees produces the best result on the
  <b>Train Set</b> with an RMSE of 11.1468. <br>
  Overall it doesn't seem like one model is boldly preferable than another, and a Decision Tree of max depth = 10 seems to produce relatively good results. <br><br>
  It is worth mentioning, that we suffer from imbalanced data, since high Precipitation comes from places of high altitudes and high summer temperatures we have significantly less
  observations of high Precipitation, so our model may have trouble predicting high amounts of Precipitation accurately.



