# Task 4

We haven't discussed MLlib in detail in our class, so consider MLlib as another python package that you are using, like the scikit-learn. What you write using this package, pyspark will be using the spark engine to run your code. I have put guidelines and helpful links (as comments) along with this notebook for taking you through this.

## Imports

In [1]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1618954864898_0010,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read

#### Read 100 data points for testing the code, once you get to the bottom then read the entire dataset

In [2]:
aws_credentials = {"key": "","secret": ""}
## here 100 data points for testing the code
pandas_df = pd.read_csv("s3://mds-s3-student29/output/ml_data_SYD.csv", index_col=0, parse_dates=True).iloc[:100].dropna()
feature_cols = list(pandas_df.drop(columns="observed").columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# create pandas_df for the entire dataset
pandas_df_en = pd.read_csv("s3://mds-s3-student29/output/ml_data_SYD.csv", index_col=0, parse_dates=True).dropna()
feature_cols_en = list(pandas_df_en.drop(columns="observed").columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Preparing dataset for ML

In [3]:
# Load dataframe and coerce features into a single column called "Features"
# This is a requirement of MLlib
# Here we are converting your pandas dataframe to a spark dataframe, 
# Here "spark" is a spark session I will discuss this in our Wed class.
# read more  here https://blog.knoldus.com/spark-createdataframe-vs-todf/
training = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training = assembler.transform(training).select("Features", "observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
training_en = spark.createDataFrame(pandas_df_en)
assembler_en = VectorAssembler(inputCols=feature_cols_en, outputCol="Features")
training_en = assembler_en.transform(training_en).select("Features", "observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Find best hyperparameter settings

You can refer to [here](https://www.sparkitecture.io/machine-learning/regression/random-forest) and [here](https://www.silect.is/blog/random-forest-models-in-spark-ml/) as a reference. All what you need to complete this task are in there. 

Some additional info [here](https://projector-video-pdf-converter.datacamp.com/14989/chapter4.pdf)

Official Documentation of MLlib, Random forest regression [here](http://spark.apache.org/docs/3.0.1/ml-classification-regression.html#random-forest-regression). When using spark documentation always keep in my API sometimes change with versions, new updates/features come in every version release, so always make sure you choose the documentation of the correct spark version. Please find version what you use [here](http://spark.apache.org/docs/).

Use these parameters for coming up with ideal parameters, you could try more parameters, but unfourtunately with this single node cluster we dont have enough power to do it.

    - Use numTrees as [10, 50,100]
    - maxDepth as [5, 10]
    - bootstrap as [False, True]

    - In the CrossValidator use evaluator to be RegressionEvaluator(labelCol="Observed")

In [4]:
##Once you finish testing the model on 100 data points, then load entire dataset and run , this could take ~15 min.
## write code here.

rf = RandomForestRegressor(labelCol="observed", featuresCol="Features")

rfparamGrid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [5, 10])
               .addGrid(rf.bootstrap, [False, True])
               .addGrid(rf.numTrees, [10, 50, 100])
             .build())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
rfevaluator = RegressionEvaluator(labelCol="observed")

# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

cvModel = rfcv.fit(training)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Print run info
print("\nBest model")
print("==========")
print(f"\nCV Score: {min(cvModel.avgMetrics):.2f}")
print(f"numTrees: {cvModel.bestModel.getNumTrees}")
print(f"Max Depth : {cvModel.bestModel.getMaxDepth()}")
print(f"Bootstrap : {cvModel.bestModel.getBootstrap()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model

CV Score: 4.08
numTrees: 50
Max Depth : 5
Bootstrap : True

In [15]:
# Train entire data 
cvModel_en = rfcv.fit(training_en)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print("\nBest model")
print("==========")
print(f"\nCV Score: {min(cvModel_en.avgMetrics):.2f}")
print(f"numTrees: {cvModel_en.bestModel.getNumTrees}")
print(f"Max Depth : {cvModel_en.bestModel.getMaxDepth()}")
print(f"Bootstrap : {cvModel_en.bestModel.getBootstrap()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model

CV Score: 8.18
numTrees: 100
Max Depth : 5
Bootstrap : True