# Task 4 (Guided Exercise)

This notebook is part of Milestone 3, Question 4, and is a guided exercise. I have included guidelines and helpful links (as comments) along with this notebook to guide you through the exercise. Daniel's tutorial will also be helpful, but even if you haven't completed it, you should be able to finish this exercise with the help of the links and guidelines provided here. 

For some of you, this may be the first time exploring a package independently using documentation, so it might take some time to get used to it. But with practice, you will get better at it. At work, you may come across many packages that you have never used before, and you will have to learn them on your own. So, this is a good practice to get used to it.

In this exercise, you will use Spark's MLlib. The idea is to tune some hyperparameters of a Random Forest to find the optimum model. Once you know the optimum settings, you will train a Random Forest in sklearn (`Milestone3-task3.ipynb` part 2) and save it with joblib (`Milestone3-task3.ipynb` part 2) so that you can use it next week to deploy.

Here, consider MLlib as another Python package that you are using, like scikit-learn. You will see many scikit-learn similar classes and methods available in MLlib for various ML-related tasks. You may also notice that some of them are not yet implemented in MLlib. What you write using the PySpark package will use the spark engine to run your code, and hence, all the benefits of distributed computing that we discussed in class.

Note: Whenever you use Spark, make sure that you refer to the right documentation based on the version you are using. You can select the version of Spark from [here](https://spark.apache.org/docs/)  and go to the correct documentation. In our case, we are using Spark 3.3.1, and here is the link to the Spark documentation that you can refer to:

- [MLlib Documentation](https://spark.apache.org/docs/3.3.1/ml-guide.html)
- [MLlib API Reference](https://spark.apache.org/docs/3.3.1/api/python/reference/pyspark.ml.html)

You may notice that there are RDD-based API and DataFrame-based (Main Guide) API available in the documentation. You want to focus on DataFrame-based API as no one uses RDD-based API these days. We will discuss the difference in class.

Before you start this notebook, make sure that you setup your EMR notebooks, uploaded this notebook there, and the kernel you selected is PySpark.

## Install packages

You only want to install following packages for this exercise:

In [1]:
sc.install_pypi_package("pandas")
sc.install_pypi_package("s3fs")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6,application_1681773940944_0007,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Downloading pandas-1.3.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.3 MB)
Collecting python-dateutil>=2.7.3
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.3.5 python-dateutil-2.8.2

Collecting s3fs
  Downloading s3fs-2023.1.0-py3-none-any.whl (27 kB)
Collecting aiohttp!=4.0.0a0,!=4.0.0a1
  Downloading aiohttp-3.8.4-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (948 kB)
Collecting aiobotocore~=2.4.2
  Downloading aiobotocore-2.4.2-py3-none-any.whl (66 kB)
Collecting fsspec==2023.1.0
  Downloading fsspec-2023.1.0-py3-none-any.whl (143 kB)
Collecting typing-extensions>=3.7.4; python_version < "3.8"
  Downloading typing_extensions-4.5.0-py3-none-any.whl (27 kB)
Collecting aiosignal>=1.1.2
  Downloading aiosignal-1.3.1-py3-none-any.whl (7.6 kB)
Collecting frozenlist>=1.1.1
  Downloading frozenlist-1.3.3-cp37-cp37m-manylinux_2_5_x86_64.m

## Import necessary libraries

In [2]:
from pyspark.ml import Pipeline
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import VectorAssembler, UnivariateFeatureSelector
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor as sparkRFR
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read the data

To start with; read 100 data points for development purpose. Once your code is ready you should try on the whole dataset.

In [8]:
## Remember by default it looks for credentials in home directory. 
## Makes sure your updated credentials are in home directory
## Where this notebook is running? Clue: It is running on our master node. So you want to ssh into master node and update credentials there.
## or pass credentials explicitly and pass as storage_options=aws_credentials (not a good idea)
# aws_credentials = {"key": "","secret": "","token":""}
# replace with s3 path to your data
## here 100 data points for testing the code, 
# pandas_df = pd.read_csv("s3://mds-s3-20-lauren/output/ml_data_SYD.csv", index_col=0, parse_dates=True).iloc[:100].dropna()
pandas_df = pd.read_csv("s3://mds-s3-20-lauren/output/ml_data_SYD.csv", index_col=0, parse_dates=True).dropna()
feature_cols = list(pandas_df.drop(columns="Observed").columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Preparing dataset for ML

In [9]:
# Load dataframe and coerce features into a single column called "Features"
# This is a requirement of MLlib
# Here we are converting your pandas dataframe to a spark dataframe, 
# Here "spark" is a spark session I will discuss in class. 
# It is automatically created for you in this notebook.
# read more  here https://blog.knoldus.com/spark-createdataframe-vs-todf/
training = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training = assembler.transform(training).select("Features", "Observed")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
training

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[Features: vector, Observed: double]

## Find best hyperparameter settings

Official Documentation of MLlib, Random forest regression [here](https://spark.apache.org/docs/3.3.1/ml-classification-regression.html#random-forest-regression).

Here we will be mainly using following classes and methods;

- [RandomForestRegressor](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.RandomForestRegressor.html)
- [ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html)
    - addGrid
    - build
- [CrossValidator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html)
    - fit

Use these parameters for coming up with ideal parameters, you could try more parameters, but make sure you have enough power to do it. But you are required to try only following parameters. This will take around 15 min on entire dataset....

    - Use numTrees as [10, 50,100]
    - maxDepth as [5, 10]
    - bootstrap as [False, True]
    - In the CrossValidator use evaluator to be `RegressionEvaluator(labelCol="Observed")`
    
***Additional reference:*** You can refer to [here](https://www.sparkitecture.io/machine-learning/regression/random-forest) and [here](https://www.silect.is/blog/random-forest-models-in-spark-ml/).

Some additional reading [here](https://projector-video-pdf-converter.datacamp.com/14989/chapter4.pdf)

In [None]:
##Once you finish testing the model on 100 data points, then load entire dataset and run , this could take ~15 min.
## write code below.
#
# Construct RFR
rf = sparkRFR(labelCol="Observed", featuresCol="Features")

# Set up parameter grid
rf_paramGrid = (ParamGridBuilder()
               .addGrid(rf.numTrees, [10, 50,100])
               .addGrid(rf.maxDepth, [5, 10])
               .addGrid(rf.bootstrap, [False, True]).build())

# Set-up evaluator
rf_evaluator = RegressionEvaluator(labelCol="Observed")

# Set-up cross-validation pipeline
cv = CrossValidator(estimator = rf,
                    estimatorParamMaps = rf_paramGrid,
                    evaluator = rf_evaluator,
                    numFolds = 5)

# Run cross-validation
cvModel = cv.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Print run info
# cvModel is a variable that stores the best model obtained after performing cross-validation (crossval.fit(training))
print("\nBest model")
print("==========")
print(f"\nCV Score: {min(cvModel.avgMetrics):.2f}")
print(f"numTrees: {cvModel.bestModel.getNumTrees}")
print(f"maxDepth: {cvModel.bestModel.getMaxDepth()}")
print(f"bootstrap: {cvModel.bestModel.getBootstrap()}")