# Task 4 (Guided Exercise)

This notebook is part of Milestone 3, task 3 and is a guided exercise. I have put guidelines and helpful links (as comments) along with this notebook to take you through this.

In this exercise you will be using Spark's MLlib. The idea is to tune some hyperparameters of a Random Forest to find an optimum model. Once we know the optimum settings, we'll train a Random Forest in sklearn (task 4)and save it with joblib (task 5) (so that we can use it next week to deploy).

Here consider MLlib as another python package that you are using, like the scikit-learn. You will be seeing many scikit-learn similar classes and methods available in MLlib for various ML related tasks, you might also notice that some of them are not yet implimented in MLlib. What you write using pyspark package will be using the spark engine to run your code, and hence all the benefits of distributed computing what we discussed in class.

NOTE: Here whenever you use spark makes sure that you refer to the right documentation based on the version what you will be using. [Here](https://spark.apache.org/docs/) you can select the version of the spark and go to the correct documentation. In our case we are using spark 3.1.2, and here is the link to spark documetation that you can refer to,
- [MLlib Documentation](https://spark.apache.org/docs/3.1.2/ml-guide.html)
- [MLlib API Reference](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.ml.html)

You may notice that there are RDD-based API and DataFrame-based (Main Guide) API available in the documentation. You want to focus on DataFrame based API as no one these days use RDD based API. We will discuss the difference in class.

Before you start this notebook make sure that you are using EMR jupyterHub and the kernal that you selected is PySpark.

## Import necessary libraries

In [1]:
from pyspark.ml import Pipeline
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import VectorAssembler, UnivariateFeatureSelector
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor as sparkRFR
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1649901246105_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read the data

To start with; read 100 data points for development purpose. Once your code is ready then try on the whole dataset.

In [3]:
aws_credentials = {
    "key": "ASIASXY3VX7TWKMKMD7Y",
    "secret": "A6LSnU1uZ3dG7gAStdrvkPUwWaaEYsc6JMgeTQ2p",
    "token": "FwoGZXIvYXdzENP//////////wEaDEtXmZftXKPrHBqcXiLGAbQzq0tBkHboGfbBipB4PlGoxVXOzf0/Y3wpApJ45/yfM+nhsrEjI04JOPH6ZGgZSGKn0HFoqNdDQnl0NBg1XbVO/FKJZEa+Gq1NN49fU2hWsX5fgGZ8XjliQJIF5lOzXPWlN0fROTQ2Ek038MRThU6Fsgaxgn74NVKq4mUjviz8HmksYYL19MTkqqugzi/fluzpABhRKPKfLpf5DDuv1z0cS/Qo4FBkPl4IisILC+DuY0U1st349SJZAVIyWOnY7JhpkD4NeSi0+N2SBjIt5w2/vAmBy3Ub/QIhWcQwPmlq+M9lsk4rcximK//DRTJDf8sKD5DbKeRIkKkL",
}

pandas_df = pd.read_csv("s3://mds-s3-group11/output/ml_data_SYD.csv", index_col=0, storage_options=aws_credentials)
feature_cols = list(pandas_df.drop(columns="observed").columns)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
for col in pandas_df:
    if pandas_df[col].isna().any:
        print(f"""
                Fillna column: {col}, Na num:{pandas_df[col].isna().sum()}, Mean num:{pandas_df[col].mean()}.
              """)
        pandas_df[col] = pandas_df[col].fillna(pandas_df[col].mean())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


                Fillna column: ACCESS-CM2, Na num:0, Mean num:2.430848328289297.
              

                Fillna column: ACCESS-ESM1-5, Na num:0, Mean num:2.9125832605921573.
              

                Fillna column: AWI-ESM-1-1-LR, Na num:0, Mean num:3.6834341542209.
              

                Fillna column: BCC-CSM2-MR, Na num:0, Mean num:2.214538016348578.
              

                Fillna column: BCC-ESM1, Na num:0, Mean num:2.76620100414919.
              

                Fillna column: CMCC-CM2-HR4, Na num:0, Mean num:3.0940779373859755.
              

                Fillna column: CMCC-CM2-SR5, Na num:0, Mean num:3.592108612623952.
              

                Fillna column: CMCC-ESM2, Na num:0, Mean num:3.4983364677602813.
              

                Fillna column: CanESM5, Na num:0, Mean num:2.906266970531277.
              

                Fillna column: EC-Earth3-Veg-LR, Na num:0, Mean num:2.561699759195896.
              

                F

## Preparing dataset for ML

In [21]:
# Load dataframe and coerce features into a single column called "Features"
# This is a requirement of MLlib
# Here we are converting your pandas dataframe to a spark dataframe, 
# Here "spark" is a spark session I will discuss this in our Wed class. 
# It is automatically created for you in this notebook.
# read more  here https://blog.knoldus.com/spark-createdataframe-vs-todf/

training = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training = assembler.transform(training).select("Features", "observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Find best hyperparameter settings

Official Documentation of MLlib, Random forest regression [here](http://spark.apache.org/docs/3.0.1/ml-classification-regression.html#random-forest-regression).

Here we will be mainly using following classes and methods;

- [RandomForestRegressor](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.RandomForestRegressor.html)
- [ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html)
    - addGrid
    - build
- [CrossValidator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html)
    - fit

Use these parameters for coming up with ideal parameters, you could try more parameters, but make sure you have enough power to do it. But you are required to try only following parameters. This will take around 15 min on entire dataset....

    - Use numTrees as [10, 50,100]
    - maxDepth as [5, 10]
    - bootstrap as [False, True]
    - In the CrossValidator use evaluator to be RegressionEvaluator(labelCol="Observed")
    
***Additional reference:*** You can refer to [here](https://www.sparkitecture.io/machine-learning/regression/random-forest) and [here](https://www.silect.is/blog/random-forest-models-in-spark-ml/).
Some additional reading [here](https://projector-video-pdf-converter.datacamp.com/14989/chapter4.pdf)

In [22]:
##Once you finish testing the model on 100 data points, then load entire dataset and run , this could take ~15 min.
## write code here.

from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol = "Features", labelCol="observed")

rf_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(rf.bootstrap, [False, True]) \
    .build()

evaluator = RegressionEvaluator(labelCol = "observed")

cv = CrossValidator(estimator=rf, 
                    estimatorParamMaps=rf_grid, 
                    evaluator=evaluator,
                    numFolds = 5)

cvModel = cv.fit(training)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Print run info
print("\nBest model")
print("==========")
print(f"\nCV Score: {min(cvModel.avgMetrics):.2f}")
print(f"numTrees: {cvModel.bestModel.getNumTrees}")
print(f"MaxDepth: {cvModel.bestModel.getMaxDepth()}")
print(f"BootStrap: {cvModel.bestModel.getBootstrap()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model

CV Score: 8.18
numTrees: 50
numTrees: 5