# Random forest implementation

#### Discussion of decision tree algorithms (as it precursor to random forest)

Decision tree algorithm for classification is in the classification tree family of algorithms.

A decision tree is a machine learning model based upon binary trees (trees with at most a left and right child). A decision tree learns the relationship between observations in a training set, represented as feature vectors $x$ and target values $y$, by examining and condensing training data into a binary tree of interior nodes and leaf nodes.

A decision tree carves up the feature space into groups of observations that share similar target values and each leaf represents one of these groups. For classification, it means that most or all targets are of a single class.

We predict that each feature instance belongs to the most commonly occurring class of feature observations in the region to which is belongs. Following the ISL section 8.1 reading, we are interested in the class proportions among the training observations falling into that feature region. Binary splits in growing the classification tree are made by using the classification error rate. The classification error rate is the fraction of feature value instances in the feature region that do not belong to the most common feature value:

$\begin{equation}
E = 1-max_k(\hat{p}_{mk})\end{equation}$ where $\hat{p}_{mk}$ represents the proportion of feature value observations in the $m$th feature region that are from the $k$th feature value class.

However, since the classification error is not sensitive to the distribution of data points across binary splits (for tree-growing), and hence, the purity of a split, the entropy measure is preferred to measure the quality of a particular split. The entropy measure is given by:

$D = - \sum_{k=1}^{K}\hat{p}_{mk}log\hat{p}_{mk}$

where 0 $\le$ $\hat{p}_{mk}$ $\le$ 1 and 0$\le$ $-\hat{p}_{mk}log\hat{p}_{mk}$   



#### Random forest explanation

Random forests are an ensemble algorithm in which a number of decision trees are trained and then used for prediction in classification or regression. The decision trees are created on samples of the training data using a process called bootstrap aggregation or "bagging". Bagging works by sampling with replacement $B$ samples of size $n$ from a dataset $X$ giving datasets $X_i$ for $i \in \{1, ..., |B|\}$. Each dataset $X_i$ is used to train its own decision tree, resulting in a total of $|B|$ decision trees. During inference, each tree in the ensemble receives a copy of the input and produces its own prediction. In the case of classification, the class with the most results is the final prediction. For regression, the predictions are averaged to produce a single prediction.

Random forests improve over bagging by taking random samples of $m$ predictor features chosen as a subset of candidates for splitting from the full set of $p$ predictors. The split is allowed to use only one of the predictor features and a new sample of $m$ predictors is taken at each split. Typically, $m$ $\approx$ $\sqrt{p}$ such that the number of predictors considered at each split is equal to the square root of the total number of predictors (nominally, not even a majority of the entire set of predictors). This random sampling approach effectively reduces the variance attributed to a tree resulting from single strong predictor by considering only a subset of predictors each sample. The average prediction of the resulting trees is less variable and more reliable. That is, random forests effectively de-correlates the trees. The difference between bagging and random forests is the choice predictor subset size $m$, where $m$ $=$ $p$ implies bagging and $m$ $\approx$ $\sqrt{p}$ implies random forests.

#### Set autoscaling policy to run on dataproc

In [None]:
# to create autoscaling policy:
# gcloud dataproc autoscaling-policies import autoscaling-policy --source=autoscaling.yml
#
# to run:
# python dataproc/submit_job_to_cluster.py \
# --project_id=${PROJECT_ID} \
# --zone=${ZONE} \
# --cluster_name=${CLUSTER_NAME} \
# --gcs_bucket=${BUCKET} \
# --key_file=${KEY} \
# --create_new_cluster \
# --pyspark_file=train.py \
# --instance_type=n1-highmem-4

#### Load libraries and set Spark configurations

In [26]:
import math
import random
from datetime import datetime

import numpy as np
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession 

from pyspark.ml import Pipeline
from pyspark.ml.classification import (DecisionTreeClassifier,
                                       RandomForestClassifier)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import (OneHotEncoderEstimator, StringIndexer,
                                VectorAssembler, VectorIndexer)
from pyspark.mllib.util import MLUtils
from pyspark.sql import Row
from pyspark.sql.functions import col, column, udf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import DoubleType
from sklearn import neighbors
from pyspark.conf import SparkConf

app_name = "random_forest"

# needed for autoscaling in dataproc
SparkContext.setSystemProperty('spark.dynamicAllocation.enabled', 'true')
SparkContext.setSystemProperty('spark.shuffle.service.enabled', 'true')

#initiate the spark session
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

spark
#print all the session options and settings
for object in sc.getConf().getAll():
    print(object)

('spark.shuffle.service.enabled', 'true')
('spark.rdd.compress', 'True')
('spark.serializer.objectStreamReset', '100')
('spark.master', 'local[*]')
('spark.executor.id', 'driver')
('spark.driver.port', '41099')
('spark.app.name', 'random_forest')
('spark.submit.deployMode', 'client')
('spark.app.id', 'local-1576352004449')
('spark.dynamicAllocation.enabled', 'true')
('spark.ui.showConsoleProgress', 'true')
('spark.driver.host', 'docker.w261')


#### Discussion on data pre-processing prior to algorithm implementation

PySpark provides a `RandomForestClassifier` algorithm in the `pyspark.ml.classification` package and we chose to use this rather than write one from scratch. In order to use this classifier, we must perform some preprocessing on the input data to get it into the format the algorithm expects.

### Preprocesing - Random Forest

1. One hot encode categorical features but only emit features for categorical values who have more than 10,000 instances
2. Combine these one hot encoded features with the numerical values and vectorize them to a single dataframe column
3. Convert the target variable from a boolean value to a string value

For the first step, we chose to filter out categorical values who have less than 10,000 instances as there were several columns that had over 1 million distinct values (in the full dataset), many of which only had a handful of instances. If we were to include these in the final dataset, it would increase the dimensionality of our data by many magnitudes while leading to a high variance and failing to generalize.

Many of the included algorithms in the `pyspark.ml` package require the data to be in a vectorized format, so this step was required. Given a list of columns, the `pyspark.ml.feature.VectorAssembler` class generates a single vector column of the concatenated features with the elements ordered by the input columns. For example given this dataframe:

|`x1`|`x2`|`x3`|
|---|---|---|
|`5`|`3`|`-10`|

The `VectorAssembler` would output this value to a new column: `[5,3,-10]`.

Lastly, the target variable was changed from a boolean type to a string type which is required by the `pyspark.ml.feature.StringIndexer` class. The `StringIndexer` indexes each class and is required by the classification algorithms in PySpark.

Since random forests do not require feature engineering and are able to handle `NA` values, no other preprocessing was done. This data was saved to parquet for later use when we run the training of our algorithm. This code can be found in `random_forest/transform.py` and was executed as a PySpark job.

### 4.3.2 Training - Random Forest 
 
The data used for training was read in from the parquet files generated during the preprocessing step above but was first split into a train and test dataset with a 70/30 split. The training dataset was used for training and the test dataset was used for testing the performance of the trained model. This code for this section is located in `random_forest/train.py` and was also executed as a PySpark job.

On our first several trainings, we noticed the loss was fairly high and we discovered the class imbalance in our dataset was the likely culprit since random forests are sensitive to class imbalances in the training data. Given an imbalance, they will bias towards the class(es) that have more examples. To address this, we performed an oversampling of the underrepresented class in the training dataset. In this case the "not clicked" class ($y = 0$) comprised around 75% of the dataset, while the "clicked" class ($y = 1$) comprised the remaining 25%. Our oversampling strategy then became to duplicate the "clicked" examples twice giving a more balanced dataset. This approach lowered our loss over 35% on the test dataset. Note that this oversampling was only performed on the training dataset and not the test dataset.

In [27]:
def oversample(df, target_col='y', feature_col='features', majority_class='0.0', minority_class='1.0', percentage_over=2):
    minority = df[df[target_col] == minority_class].cache()

    for i in range(percentage_over):
        df = df.unionAll(minority)

    return df

def log_loss(actual, predicted, epsilon=1e-15):
    acutal = int(actual)
    predicted = predicted[0]  # class 0

    # clip value between [epsion, 1-epsilon]
    predicted = max(predicted, epsilon)
    predicted = min(predicted, 1-epsilon)

    if actual == 1:
        return -math.log(predicted)
    else:
        return -math.log(1 - predicted)


spark.udf.register('log_loss', log_loss, DoubleType())
log_loss_udf = udf(log_loss, DoubleType())

# read in processed parquet file into an RDD
# df = spark.read.parquet(
#     'gs://w261-f19-team1/ohe_10k_numeric').\
#     select(['label', 'features']).\
#     where(col('label').isNotNull())
    
#import one hot encoded file
#INPUT_FILE = 'gs://261_projectdata/261project_data/df_ohe_10k.parquet'
INPUT_FILE = 'data/df_ohe_300.parquet'

# read in Spark dataframe
df = spark.read.parquet(INPUT_FILE).select('y', 'features')

# split test/train datasets
train, test = df.randomSplit([0.7, 0.3], seed=0)

train = train.cache()
test = test.cache()

print('original dataset:')
print('train count = ', train.count())
print('train class = 0', train.filter(train.y == '0.0').count())
print('train class = 1', train.filter(train.y == '1.0').count())
print('test count = ', test.count())

print('-' * 100)

# make 2 artificial examples of clicks per each example since we have a 25/75 split of click/non-click examples
train = oversample(train, minority_class = '1.0', majority_class = '0.0', percentage_over = 2).cache()

print('oversample dataset:')
print('train count = ', train.count())
print('train class = 0', train.filter(train.y == '0.0').count())
print('train class = 1', train.filter(train.y == '1.0').count())

# random forest training
n_trees = 40
max_depth = 5
max_bins = 32

print(f'Random forest classifier with {n_trees} trees, max depth of {max_depth} and max bins of {max_bins}')

rf = RandomForestClassifier(featuresCol='features',
                            labelCol='y', numTrees=n_trees, maxDepth=max_depth, maxBins=max_bins)
rfModel = rf.fit(train)
rfPredictions = rfModel.transform(test)
rfPredictions = rfPredictions.select('y',
                                     'prediction',
                                     'rawPrediction',
                                     'probability',
                                     log_loss_udf('y', 'probability').alias('log_loss')).\
    cache()

print('Log loss: ', rfPredictions.groupBy().mean('log_loss').collect()[0]['avg(log_loss)'])

rfPredictions.filter(rfPredictions.y == '0.0').show(10, truncate=False)
rfPredictions.filter(rfPredictions.y == '1.0').show(10, truncate=False)


original dataset:
train count =  32293
train class = 0 24194
train class = 1 8099
test count =  13755
----------------------------------------------------------------------------------------------------
oversample dataset:
train count =  48491
train class = 0 24194
train class = 1 24297
Random forest classifier with 40 trees, max depth of 5 and max bins of 32
Log loss:  0.7616105161396188
+---+----------+---------------------------------------+----------------------------------------+------------------+
|y  |prediction|rawPrediction                          |probability                             |log_loss          |
+---+----------+---------------------------------------+----------------------------------------+------------------+
|0  |0.0       |[25.212588834658035,14.787411165341963]|[0.6303147208664509,0.3696852791335491] |0.9951032322430656|
|0  |0.0       |[24.443691892172758,15.55630810782724] |[0.6110922973043189,0.388907702695681]  |0.9444132316507237|
|0  |0.0       |[24.790

#### Show confusion matrix metrics

In [28]:
analyze = rfPredictions.select('y', 'prediction')
analyze = analyze.withColumn("prediction", analyze["prediction"])
analyze = analyze.withColumn("y", analyze["y"])
rfPredictions.unpersist()
#Create SQL queries table
sqlContext.registerDataFrameAsTable(analyze, "results")
#Get metrics for precision recall calculations
TP = spark.sql("""SELECT COUNT(prediction)
                    FROM results
                    WHERE y = 1 AND prediction = 1
                    """).collect()[0][0]

TN = spark.sql("""SELECT COUNT(prediction)
                    FROM results
                    WHERE y = 0 AND prediction = 0
                    """).collect()[0][0]

FP = spark.sql("""SELECT COUNT(prediction)
                    FROM results
                    WHERE y = 0 AND prediction = 1
                    """).collect()[0][0]

FN = spark.sql("""SELECT COUNT(prediction)
                    FROM results
                    WHERE y = 1 AND prediction = 0
                    """).collect()[0][0]


print(f'Precision = {TP/(TP + FP) * 100} %')

print(f'Recall = {TP/(TP + FN) * 100} %')

Precision = 39.01987201077804 %
Recall = 64.058612109483 %


Save model outputs and predictions

In [29]:
# Save model outputs and predictions

# rfModel.save(
#     f'gs://w261-f19-team1/oversample_rf_{n_trees}_{max_depth}_{max_bins}/model')
rfModel.save(f'data/oversample_rf_{n_trees}_{max_depth}_{max_bins}/model')

# rfPredictions.write.parquet(
#     f'gs://w261-f19-team1/oversample_rf_{n_trees}_{max_depth}_{max_bins}/predictions')

rfPredictions.write.parquet(f'data/oversample_rf_{n_trees}_{max_depth}_{max_bins}/predictions', compression='snappy', mode='overwrite')

Py4JJavaError: An error occurred while calling o1883.save.
: java.io.IOException: Path data/oversample_rf_40_5_32/model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:702)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:179)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
