#  Homework 6 : Implement random forest in Spark

#### Author: Akshat Gaur
#### AndrewID: agaur

#### Task:

You will now create a Spark implementation of the random forest method that you are familiar with. This will help you come to understand how the Spark architecture differs from MapReduce and standard serial computing, particularly with reference to the pipelines API. Spark ML enables large in-memory parallel learning work to be done, and it can be much faster than MapReduce for certain tasks. This assignment will also give you practice using the pipelines API and understand how to construct a meaningful analytic projects in spark.

## 1. Create Spark Session

In [129]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local") \
        .appName("Random Forest") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

## 2. Create a schema and assign headers to the dataframe

In [130]:
from pyspark.sql.types import *

schema = StructType([StructField('Sex', StringType(), True), StructField('Length', DoubleType(), True), 
                    StructField('Diameter', DoubleType(), True), StructField('Height', DoubleType(), True),
                    StructField("WholeWeight", DoubleType(), True), StructField("ShuckedWeight", DoubleType(), True),
                    StructField("VisceraWeight", DoubleType(), True), StructField("ShellWeight", DoubleType(), True),
                    StructField("Rings", IntegerType(), True)])

df = spark.read.format('com.databricks.spark.csv').load('./abalone.data', header=False, schema=schema)

print 'Schema:'
df.printSchema()


Schema:
root
 |-- Sex: string (nullable = true)
 |-- Length: double (nullable = true)
 |-- Diameter: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- WholeWeight: double (nullable = true)
 |-- ShuckedWeight: double (nullable = true)
 |-- VisceraWeight: double (nullable = true)
 |-- ShellWeight: double (nullable = true)
 |-- Rings: integer (nullable = true)



## 3. Transform categorical features

In [131]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol='Sex', outputCol='NewSex')
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol='NewSex', outputCol='NumericSex')
encoded = encoder.transform(indexed)
encoded.show()

+---+------+--------+------+-----------+-------------+-------------+-----------+-----+------+-------------+
|Sex|Length|Diameter|Height|WholeWeight|ShuckedWeight|VisceraWeight|ShellWeight|Rings|NewSex|   NumericSex|
+---+------+--------+------+-----------+-------------+-------------+-----------+-----+------+-------------+
|  M| 0.455|   0.365| 0.095|      0.514|       0.2245|        0.101|       0.15|   15|   0.0|(2,[0],[1.0])|
|  M|  0.35|   0.265|  0.09|     0.2255|       0.0995|       0.0485|       0.07|    7|   0.0|(2,[0],[1.0])|
|  F|  0.53|    0.42| 0.135|      0.677|       0.2565|       0.1415|       0.21|    9|   2.0|    (2,[],[])|
|  M|  0.44|   0.365| 0.125|      0.516|       0.2155|        0.114|      0.155|   10|   0.0|(2,[0],[1.0])|
|  I|  0.33|   0.255|  0.08|      0.205|       0.0895|       0.0395|      0.055|    7|   1.0|(2,[1],[1.0])|
|  I| 0.425|     0.3| 0.095|     0.3515|        0.141|       0.0775|       0.12|    8|   1.0|(2,[1],[1.0])|
|  F|  0.53|   0.415|  0.15|

## 4. Print top 10 rows of the final features 

In [132]:
encoded = encoded.drop('Sex')
encoded = encoded.drop('NewSex')
encoded = encoded.withColumnRenamed('Rings', 'truelabels')
encoded.show(10)

+------+--------+------+-----------+-------------+-------------+-----------+----------+-------------+
|Length|Diameter|Height|WholeWeight|ShuckedWeight|VisceraWeight|ShellWeight|truelabels|   NumericSex|
+------+--------+------+-----------+-------------+-------------+-----------+----------+-------------+
| 0.455|   0.365| 0.095|      0.514|       0.2245|        0.101|       0.15|        15|(2,[0],[1.0])|
|  0.35|   0.265|  0.09|     0.2255|       0.0995|       0.0485|       0.07|         7|(2,[0],[1.0])|
|  0.53|    0.42| 0.135|      0.677|       0.2565|       0.1415|       0.21|         9|    (2,[],[])|
|  0.44|   0.365| 0.125|      0.516|       0.2155|        0.114|      0.155|        10|(2,[0],[1.0])|
|  0.33|   0.255|  0.08|      0.205|       0.0895|       0.0395|      0.055|         7|(2,[1],[1.0])|
| 0.425|     0.3| 0.095|     0.3515|        0.141|       0.0775|       0.12|         8|(2,[1],[1.0])|
|  0.53|   0.415|  0.15|     0.7775|        0.237|       0.1415|       0.33|      

## 5. Split data in 80:20 

In [133]:
(train, test) = encoded.randomSplit([0.8, 0.2])
print 'total: ', encoded.count()
print 'train: ', train.count()
print 'test: ', test.count()
train.printSchema()

total:  4177
train:  3388
test:  789
root
 |-- Length: double (nullable = true)
 |-- Diameter: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- WholeWeight: double (nullable = true)
 |-- ShuckedWeight: double (nullable = true)
 |-- VisceraWeight: double (nullable = true)
 |-- ShellWeight: double (nullable = true)
 |-- truelabels: integer (nullable = true)
 |-- NumericSex: vector (nullable = true)



## 6. Train a Random Forest Regression Model and make predictions

In [134]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
            inputCols = [x for x in encoded.columns if x != 'truelabels'],
            outputCol = 'features')

rf = RandomForestRegressor(labelCol = 'truelabels', numTrees=10)

## 7. Create pipeline and train model

In [135]:
pipeline = Pipeline(stages = [assembler, rf])
model = pipeline.fit(train)

## 8. Print top 10 rows of "predictions", "true labels" and "features"

In [136]:
predictions = model.transform(test)
predictions.select('prediction', 'truelabels', 'features').show(10)

+-----------------+----------+--------------------+
|       prediction|truelabels|            features|
+-----------------+----------+--------------------+
|4.301595486377783|         3|[0.13,0.1,0.03,0....|
|4.301595486377783|         4|[0.14,0.105,0.035...|
|4.301595486377783|         5|[0.16,0.12,0.035,...|
|4.301595486377783|         4|[0.165,0.11,0.02,...|
|4.301595486377783|         6|[0.17,0.125,0.055...|
|4.353837619244915|         4|[0.175,0.125,0.04...|
|4.301595486377783|         4|[0.175,0.125,0.04...|
|4.301595486377783|         3|[0.18,0.13,0.045,...|
|4.301595486377783|         4|[0.2,0.15,0.04,0....|
|4.301595486377783|         4|[0.2,0.155,0.04,0...|
+-----------------+----------+--------------------+
only showing top 10 rows



## 9. Evaluate Metrics

In [137]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='truelabels', predictionCol='prediction')

# Mean Squared Error
rmse = evaluator.evaluate(predictions, {evaluator.metricName:'rmse'})
print 'Root Mean Squared Error: ', rmse

# Squared Error
mse = evaluator.evaluate(predictions, {evaluator.metricName:'mse'})
print 'Mean Squared Error: ', mse

# R-Squared Error
rsq = evaluator.evaluate(predictions, {evaluator.metricName:"r2"})
print 'R-Squared Error: ', rsq

# Mean Absolute Error
mae = evaluator.evaluate(predictions, {evaluator.metricName:"mae"})
print 'Mean Absolute Error: ', mae



Root Mean Squared Error:  2.502507697
Mean Squared Error:  6.26254477356
R-Squared Error:  0.443364702709
Mean Absolute Error:  1.73269342845


## 10. Train a Random Forest Classification Model

In [138]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier(labelCol='truelabels', featuresCol='features', numTrees=10)
pipeline_rfc = Pipeline(stages = [assembler, rfc])
model_rfc = pipeline_rfc.fit(train)


## 11. Print top 10 rows of "predictions", "true labels" and "features" for RF Classifier

In [139]:
predictions_rfc = model_rfc.transform(test)
predictions_rfc.select('prediction', 'truelabels', 'features').show(10)

+----------+----------+--------------------+
|prediction|truelabels|            features|
+----------+----------+--------------------+
|       4.0|         3|[0.13,0.1,0.03,0....|
|       4.0|         4|[0.14,0.105,0.035...|
|       4.0|         5|[0.16,0.12,0.035,...|
|       4.0|         4|[0.165,0.11,0.02,...|
|       4.0|         6|[0.17,0.125,0.055...|
|       5.0|         4|[0.175,0.125,0.04...|
|       4.0|         4|[0.175,0.125,0.04...|
|       4.0|         3|[0.18,0.13,0.045,...|
|       4.0|         4|[0.2,0.15,0.04,0....|
|       4.0|         4|[0.2,0.155,0.04,0...|
+----------+----------+--------------------+
only showing top 10 rows



## 12. Evaluate Metrics

In [140]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='truelabels', predictionCol='prediction')

# Accuracy
accuracy = evaluator.evaluate(predictions_rfc, {evaluator.metricName:'accuracy'})
print 'Accuracy: ', accuracy

# Weighted Precision
weighted_precision = evaluator.evaluate(predictions_rfc, {evaluator.metricName:'weightedPrecision'})
print 'Weighted Precision: ', weighted_precision

# Weighted Recall
weighted_recall = evaluator.evaluate(predictions_rfc, {evaluator.metricName:'weightedRecall'})
print 'Weighted Recall: ', weighted_recall

# F1
f1 = evaluator.evaluate(predictions_rfc, {evaluator.metricName:'f1'})
print 'Mean Absolute Error: ', mae


Accuracy:  0.247148288973
Weighted Precision:  0.193466730316
Weighted Recall:  0.247148288973
Mean Absolute Error:  1.73269342845


## 13. Observations and Conclusion about the metrics and the two models

From the above two metric evaluation we can conclude that both of them models did not perform well for the given dataset.
The results without feature engineering are not enough to compare the models properly as the error(MSE) for Regression model is 6.26 while accuracy for Classification model is 0.24.
Difference between two models:
Splitting Criteria for both the models is different but depends on the algorithm used.

As per the link: http://www.math.usu.edu/adele/RandomForests/Ovronnaz.pdf
<br>
RF Regression: For regression the predicted value at a node is the average response variable for all observations in the node. This model is used when the predictions are continuous values. This model uses variance as the splitting factor or residual sum of squares. Based on the maximum variance value the nodes are splitted in this model.
<br>
RF Classifier: For classification the predicted class is the most common class in the node (majority vote). This model is used when the predictions are discrete values. This model uses information gain or Gini criterion for splitting the nodes.