# DSCI 525 - Web and Cloud Computing
### Group 04: Heidi Ye, Junting He, Kamal Moravej, Tanmay Sharma
#### Date: 23-04-2021
### Repo Link: https://github.com/UBC-MDS/group4-525

# Milestone 3: Task 4

We haven't discussed MLlib in detail in our class, so consider MLlib as another python package that you are using, like the scikit-learn. What you write using this package, pyspark will be using the spark engine to run your code. I have put guidelines and helpful links (as comments) along with this notebook for taking you through this.

## Imports

In [1]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1618868345593_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read

#### Read 100 data points for testing the code, once you get to the bottom then read the entire dataset

In [3]:
aws_credentials = {"key": "","secret": ""}
## here 100 data points for testing the code
pandas_df = pd.read_csv("s3://mds-s3-student47/output/ml_data_SYD.csv", storage_options=aws_credentials, index_col=0, parse_dates=True).iloc[:100].dropna()
feature_cols = list(pandas_df.drop(columns="Observed").columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Preparing dataset for ML

In [4]:
# Load dataframe and coerce features into a single column called "Features"
# This is a requirement of MLlib
# Here we are converting your pandas dataframe to a spark dataframe, 
# Here "spark" is a spark session I will discuss this in our Wed class.
# read more  here https://blog.knoldus.com/spark-createdataframe-vs-todf/
training = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training = assembler.transform(training).select("Features", "Observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Find best hyperparameter settings

You can refer to [here](https://www.sparkitecture.io/machine-learning/regression/random-forest) and [here](https://www.silect.is/blog/random-forest-models-in-spark-ml/) as a reference. All what you need to complete this task are in there. 

Some additional info [here](https://projector-video-pdf-converter.datacamp.com/14989/chapter4.pdf)

Official Documentation of MLlib, Random forest regression [here](http://spark.apache.org/docs/3.0.1/ml-classification-regression.html#random-forest-regression). When using spark documentation always keep in my API sometimes change with versions, new updates/features come in every version release, so always make sure you choose the documentation of the correct spark version. Please find version what you use [here](http://spark.apache.org/docs/).

Use these parameters for coming up with ideal parameters, you could try more parameters, but unfourtunately with this single node cluster we dont have enough power to do it.

    - Use numTrees as [10, 50,100]
    - maxDepth as [5, 10]
    - bootstrap as [False, True]

    - In the CrossValidator use evaluator to be RegressionEvaluator(labelCol="Observed")

In [5]:
##Once you finish testing the model on 100 data points, then load entire dataset and run , this could take ~15 min.
## write code here.

#Initialize Random Forest object
rf = RandomForestRegressor(labelCol="Observed", featuresCol="Features")

#Create a parameter grid for tuning the model
rfparamGrid = (ParamGridBuilder()
               .addGrid(rf.numTrees, [5, 20, 100])
               .addGrid(rf.maxDepth, [5, 10])         
               .addGrid(rf.bootstrap, [True,False])           
               .build())

#Define how you want the model to be evaluated
rfevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Observed", metricName="rmse")

#Define the type of cross-validation you want to perform
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

#Fit the model to the data
rfcvModel = rfcv.fit(training)
print(rfcvModel)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

CrossValidatorModel_86faacdbeb41

In [7]:
# Print run info
print("\nBest model")
print("==========")
print(f"\nCV Score: {min(rfcvModel.avgMetrics):.2f}")
print(f"numTrees: {rfcvModel.bestModel.getNumTrees}")
print(f"numTrees: {rfcvModel.bestModel.getMaxDepth()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model

CV Score: 4.08
numTrees: 20
numTrees: 5

## Tuning hyperparameters of the model for the entire dataset 

In [8]:
# read the whole dataset
pandas_df = pd.read_csv("s3://mds-s3-student40/output/ml_data_SYD.csv", 
                        storage_options=aws_credentials, index_col=0, 
                        parse_dates=True).dropna()
feature_cols = list(pandas_df.drop(columns="Observed").columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Load dataframe and coerce features into a single column called "Features"
# This is a requirement of MLlib
# Here we are converting your pandas dataframe to a spark dataframe, 
# Here "spark" is a spark session I will discuss this in our Wed class.
# read more  here https://blog.knoldus.com/spark-createdataframe-vs-todf/
training = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training = assembler.transform(training).select("Features", "Observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#Initialize Random Forest object
rf = RandomForestRegressor(labelCol="Observed", featuresCol="Features")

#Create a parameter grid for tuning the model
rfparamGrid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [5, 10])         
               .addGrid(rf.bootstrap, [True,False])
               .addGrid(rf.numTrees, [5, 20, 100])
             .build())

#Define how you want the model to be evaluated
rfevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Observed", metricName="rmse")

#Define the type of cross-validation you want to perform
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

#Fit the model to the data
rfcvModel = rfcv.fit(training)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Print best model info
print("\nBest model")
print("==========")
print(f"\nCV Score: {min(rfcvModel.avgMetrics):.2f}")
print(f"numTrees: {rfcvModel.bestModel.getNumTrees}")
print(f"numTrees: {rfcvModel.bestModel.getMaxDepth()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model

CV Score: 8.18
numTrees: 20
numTrees: 5