# Task 4

We haven't discussed MLlib in detail in our class, so consider MLlib as another python package that you are using, like the scikit-learn. What you write using this package, pyspark will be using the spark engine to run your code. I have put guidelines and helpful links (as comments) along with this notebook for taking you through this.

## Imports

In [1]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

import json

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1618858334555_0010,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read

#### Read 100 data points for testing the code, once you get to the bottom then read the entire dataset

In [2]:
# Credential is stored in the hard disk
f = open("/tmp/keys.txt", "r")
aws_credentials = json.loads(f.read())

## here 100 data points for testing the code
pandas_df = pd.read_csv("s3://mds-s3-student34/output/ml_data_SYD.csv", storage_options=aws_credentials, index_col=0, parse_dates=True).iloc[:100].dropna()
feature_cols = list(pandas_df.drop(columns="Observed").columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Preparing dataset for ML

In [3]:
# Load dataframe and coerce features into a single column called "Features"
# This is a requirement of MLlib
# Here we are converting your pandas dataframe to a spark dataframe, 
# Here "spark" is a spark session I will discuss this in our Wed class.
# read more  here https://blog.knoldus.com/spark-createdataframe-vs-todf/
training_100 = spark.createDataFrame(pandas_df)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training_100 = assembler.transform(training_100).select("Features", "Observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Find best hyperparameter settings

You can refer to [here](https://www.sparkitecture.io/machine-learning/regression/random-forest) and [here](https://www.silect.is/blog/random-forest-models-in-spark-ml/) as a reference. All what you need to complete this task are in there. 

Some additional info [here](https://projector-video-pdf-converter.datacamp.com/14989/chapter4.pdf)

Official Documentation of MLlib, Random forest regression [here](http://spark.apache.org/docs/3.0.1/ml-classification-regression.html#random-forest-regression). When using spark documentation always keep in my API sometimes change with versions, new updates/features come in every version release, so always make sure you choose the documentation of the correct spark version. Please find version what you use [here](http://spark.apache.org/docs/).

Use these parameters for coming up with ideal parameters, you could try more parameters, but unfourtunately with this single node cluster we dont have enough power to do it.

    - Use numTrees as [10, 50,100]
    - maxDepth as [5, 10]
    - bootstrap as [False, True]

    - In the CrossValidator use evaluator to be RegressionEvaluator(labelCol="Observed")

In [4]:
##Once you finish testing the model on 100 data points, then load entire dataset and run , this could take ~15 min.
## write code here.

# Looking at the data
training_100.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+
|            Features|    Observed|
+--------------------+------------+
|[0.04042703467348...|0.0066115437|
|[0.07377680940408...|  0.09042179|
|[0.23265613926923...|   1.4014516|
|[0.91131909575779...|   14.869798|
|[0.69801307108718...|  0.46762803|
|[0.17684631748124...|         0.0|
|[0.34710155159700...|  0.40364525|
|[0.04275485698599...|  0.11174101|
|[0.02940914982900...|         0.0|
|[0.02364127412874...|         0.0|
|[0.00594996538438...|         0.0|
|[0.00470868758384...|         0.0|
|[1.25690648274030...|         0.0|
|[0.26763034402392...| 0.047772575|
|[0.01480196910961...|   0.5970082|
|[0.19029487157240...|  0.54629517|
|[4.04952006647363...|   1.5323128|
|[5.93606340698897...|   1.6153923|
|[0.66639061260502...|  0.49775785|
|[5.21085580112412...|  0.34187767|
+--------------------+------------+
only showing top 20 rows

In [5]:
def get_cv_score(training, title):
    """Create a RandomForestRegressor model and run grid search
    to find the best hyperparameters
    """
    # Building a model using RandomForestRegressor()
    rf = RandomForestRegressor(featuresCol="Features", labelCol="Observed")

    # Constructing a grid of parameters to search over
    paramGrid = ParamGridBuilder()\
        .addGrid(rf.numTrees, [10, 50, 100]) \
        .addGrid(rf.maxDepth, [5, 10])\
        .addGrid(rf.bootstrap, [False, True])\
        .build()

    # Running cross-validation, and choosing the best hyperparameters
    crossval = CrossValidator(estimator=rf,
                              estimatorParamMaps=paramGrid,
                              evaluator=RegressionEvaluator(labelCol="Observed"),
                              numFolds=5)

    cvModel = crossval.fit(training)
    
    print("\nBest model for", title)
    print("==========")
    print(f"\nCV Score: {min(cvModel.avgMetrics):.2f}")
    print(f"numTrees: {cvModel.bestModel.getNumTrees}")
    print(f"maxDepth: {cvModel.bestModel.getMaxDepth()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Running for 100 data points
get_cv_score(training_100, "100 data points")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model for 100 data points

CV Score: 3.54
numTrees: 50
maxDepth: 5

In [7]:
# Loading the entire dataset
pandas_df_full = pd.read_csv("s3://mds-s3-student34/output/ml_data_SYD.csv", storage_options=aws_credentials, index_col=0, parse_dates=True).dropna()
feature_cols = list(pandas_df_full.drop(columns="Observed").columns)

# Converting the pandas dataframe to a spark dataframe
training_full = spark.createDataFrame(pandas_df_full)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
training_full = assembler.transform(training_full).select("Features", "Observed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Running for all data points
get_cv_score(training_full, "all data points")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Best model for all data points

CV Score: 8.17
numTrees: 100
maxDepth: 5