# IST 718: Big Data Analytics

- Prepared by: Prof Daniel E Acuna <deacuna@syr.edu>
- Modified by: Prof Humayun Khan <hhkhan@syr.edu>
- Faculty Assistant: Eashani Deorukhkar <edeorukh@syr.edu>
- Faculty Assistant: Yash Kapadia <ykapadia@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers from your classmates.  Short code snippets are allowed from the internet.  Code from the class text books or class provided code can be copied in its entirety.__
- __Do not change homework file names.__ The FAs and the professor use these names to grade your homework.  Changing file names may result in a point reduction penalty.
- There could be tests in some cells (i.e., `assert` and `np.testing.` statements). These tests (if present) are used to grade your answers. **However, the professor and FAs could use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before submitting your work, remember to check for run time errors with the following procedure:
`Kernel`$\rightarrow$`Restart and Run All`.
- All plots shall include a title, and axis labels.
- Grading feedback cells are there for graders to provide feedback to students.  Don't change or remove grading feedback cells.
- Good Luck!

In [1]:
# load these packages
import findspark
findspark.init()

import pyspark
from pyspark.ml import feature, classification
from pyspark.ml import Pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

# Part 1: Random Forest and gradient boosted trees

In these questions, we will examine the famous [Auto dataset](https://vincentarelbundock.github.io/Rdatasets/doc/ISLR/Auto.html). With this dataset, the goal is to predict the miles per gallon (`mpg`) performance based on characteristics of the car such as number of cylinders (`cylinders`), displacement between wheels (`displacement`), horsepower of the engine (`horsepower`), weight of the car (`weight`), top acceleration (`acceleration`), year of the model (`year`), and origin (`origin`).

In [2]:
# data
mpg_df = spark.read.csv('Auto.csv', header=True, inferSchema=True).\
    drop('_c0').\
    withColumn('horsepower2', fn.col('horsepower').cast('int')).\
    drop('horsepower').\
    withColumnRenamed('horsepower2', 'horsepower').\
    dropna()
training_df, validation_df, testing_df = mpg_df.randomSplit([0.6, 0.3, 0.1], seed=0)
mpg_df.printSchema()

root
 |-- mpg: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: double (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- horsepower: integer (nullable = true)



# Question 1: (20 pts)

Create three pipelines that contain three different random forests that take in all features from `mpg_df` (`cylinders`, `displacement`, `horsepower`, `weight`, `acceleration`, `year`, and `origin`) to predict (`mpg`). **Set the `seed` parameter of the random forest to 0.** Fit these pipelines to the training data (`training_df`):

- `pipe_rf1`: Random forest with `maxDepth=1` and `numTrees=60`
- `pipe_rf2`: Random forest with `maxDepth=3` and `numTrees=40`
- `pipe_rf3`: Random forest with `maxDepth=6`, `numTrees=20`

In [3]:
# create the fitted pipelines `pipe_rf1`, `pipe_rf2`, and `pipe_rf3` here
from pyspark.ml.regression import RandomForestRegressor

pipe_rf1=Pipeline(stages=[feature.VectorAssembler(inputCols=['cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'year', 'origin'],outputCol='features'),RandomForestRegressor(featuresCol='features',labelCol='mpg',maxDepth=1,numTrees=60,seed=0)]).fit(training_df)
pipe_rf2=Pipeline(stages=[feature.VectorAssembler(inputCols=['cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'year', 'origin'],outputCol='features'),RandomForestRegressor(featuresCol='features',labelCol='mpg',maxDepth=3,numTrees=40,seed=0)]).fit(training_df)
pipe_rf3=Pipeline(stages=[feature.VectorAssembler(inputCols=['cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'year', 'origin'],outputCol='features'),RandomForestRegressor(featuresCol='features',labelCol='mpg',maxDepth=6,numTrees=20,seed=0)]).fit(training_df)

In [4]:
# tests
np.testing.assert_equal(type(pipe_rf1.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf2.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf3.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf1.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf2.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf3.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf1.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf2.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf3.transform(training_df)), pyspark.sql.dataframe.DataFrame)

# Question 2 (20 pts)

Use the following evaluator to compute the $R^2$ of the models on validation data. Assign the $R^2$ of the three models to `R2_1`, `R2_2`, and `R2_3`, respectively, and the performance. Assign the best pipeline based on validation performance to a variable `best_model`

In [5]:
evaluator = evaluation.RegressionEvaluator(labelCol='mpg', metricName='r2')
# use it as follows:
#   evaluator.evaluate(fitted_pipeline.transform(df)) -> R2

In [6]:
# YOUR CODE HERE
R2_1=evaluator.evaluate(pipe_rf1.transform(validation_df))
R2_2=evaluator.evaluate(pipe_rf2.transform(validation_df))
R2_3=evaluator.evaluate(pipe_rf3.transform(validation_df))
print(R2_1)
print(R2_2)
print(R2_3)
best_model=pipe_rf3

0.6356640531609501
0.8222168008753123
0.8833324964226545


In [7]:
# tests
np.testing.assert_equal(type(best_model.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(best_model.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(best_model.transform(validation_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_array_less(R2_1, 1.)
np.testing.assert_array_less(0.5, R2_1)
np.testing.assert_array_less(R2_2, 1.)
np.testing.assert_array_less(0.5, R2_2)
np.testing.assert_array_less(R2_3, 1.)
np.testing.assert_array_less(0.5, R2_3)

# Question 3: 10 pts

Compute the $R^2$ of the model on testing data, print it, and assign it to variable `R2_best`

In [8]:
# create AUC_best below
# YOUR CODE HERE
R2_best=evaluator.evaluate(pipe_rf3.transform(testing_df))
print(R2_best)

0.8116746291631238


In [9]:
# tests
np.testing.assert_array_less(R2_best, 1.)
np.testing.assert_array_less(0.5, R2_best)

# Question 4: 10 pts

Using the parameters of the best model, create a new pipeline called `final_model` and fit it to the entire data (`mpg_df`)

In [10]:
# create the fitted pipeline `final_model` here
# YOUR CODE HERE
final_model=Pipeline(stages=[feature.VectorAssembler(inputCols=['cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'year', 'origin'],outputCol='features'),RandomForestRegressor(featuresCol='features',labelCol='mpg',maxDepth=6,numTrees=20,seed=0)]).fit(mpg_df)

In [11]:
# tests
np.testing.assert_equal(type(final_model.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(final_model.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(final_model.transform(mpg_df)), pyspark.sql.dataframe.DataFrame)

# Question 5: 10 pts

Create a pandas dataframe `feature_importance` with the columns `feature` and `importance` which contains the names of the features (`cylinder`, `displacement`, etc.) and their feature importances as determined by the random forest of the final model. Sort the dataframe by `importance` in descending order.

In [12]:
# create feature_importance below
# YOUR CODE HERE
feature_importance=pd.DataFrame(list(zip(final_model.stages[0].getInputCols(),final_model.stages[1].featureImportances)),columns=['feature','importance']).sort_values('importance',ascending=False)

In [13]:
# display it here
feature_importance

Unnamed: 0,feature,importance
1,displacement,0.379758
2,horsepower,0.172883
3,weight,0.143796
0,cylinders,0.134298
5,year,0.133876
4,acceleration,0.024922
6,origin,0.010467


In [14]:
# tests
assert type(feature_importance) == pd.core.frame.DataFrame
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])

Comment below on the importance that random forest has given to each feature. Are they reasonable? Do they tell you anything valuable about the mpg dataset? Answer in the cell below

***The random forest has given the highest importance to displacement followed by horsepower. The least importance is given to acceleration and origin. The features with more importance help in determining the mpg.***

# Question 6:  10 pts.

Pick any of the trees from the final model and assign its `toDebugString` property to a variable `example_tree`. Print this variable and add comments to the cell describing how you think this particular tree is fitting the data

In [15]:
# create a variable example_tree with the toDebugString property of a tree from final_model.
# print this string and comment in this same cell about the branches that this tree fit
# YOUR CODE HERE

example_tree = final_model.stages[1].trees[14].toDebugString
print(example_tree)

#The feature at the top of the tree checks if feature 1 i.e. displacement is less than or equal to 182.0, if yes it checks if feature 3 i.e. weight is less than or equal to 2224.5. 
#This continues further till the if condition is false. It then goes into the else loop and checks the condition. Each time it predicts a value.

DecisionTreeRegressionModel: uid=dtr_80ab275624c1, depth=6, numNodes=87, numFeatures=7
  If (feature 1 <= 182.0)
   If (feature 3 <= 2224.5)
    If (feature 4 <= 21.25)
     If (feature 2 <= 74.5)
      If (feature 6 <= 2.5)
       If (feature 5 <= 77.5)
        Predict: 28.416666666666668
       Else (feature 5 > 77.5)
        Predict: 34.67727272727273
      Else (feature 6 > 2.5)
       If (feature 1 <= 93.5)
        Predict: 36.08461538461539
       Else (feature 1 > 93.5)
        Predict: 31.316666666666645
     Else (feature 2 > 74.5)
      If (feature 4 <= 13.75)
       Predict: 18.0
      Else (feature 4 > 13.75)
       If (feature 1 <= 113.5)
        Predict: 29.6875
       Else (feature 1 > 113.5)
        Predict: 25.5
    Else (feature 4 > 21.25)
     If (feature 3 <= 2139.5)
      Predict: 44.224999999999994
     Else (feature 3 > 2139.5)
      Predict: 24.5
   Else (feature 3 > 2224.5)
    If (feature 2 <= 84.5)
     If (feature 2 <= 63.5)
      Predict: 23.0
     Else (fe

In [16]:
# tests
assert type(example_tree) == str
assert 'DecisionTreeRegressionModel' in example_tree
assert 'feature 0' in example_tree
assert 'If' in example_tree
assert 'Else' in example_tree
assert 'Predict' in example_tree

# **Question 7 (20 pts)**

Gradient boosted trees are becoming increasingly popular for competitions. There is a high-performance implementation, [xgboost](https://en.wikipedia.org/wiki/XGBoost), that is particularly popular. Compare gradient boosted regression to the best model found with random forest in Question 3. Use the validation set. For GBR, use all the default parameters except make `seed=0`. Assign the pipeline and the $R^2$ of the model to `gbr_pipe` and `R2_gbr`, respectively. Does it have an amazing or dissapointing $R^2$? Comment.

In [17]:
# YOUR CODE HERE
from pyspark.ml.regression import GBTRegressor

gbr_pipe=Pipeline(stages=[feature.VectorAssembler(inputCols=['cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'year', 'origin'],outputCol='features'),GBTRegressor(featuresCol='features',labelCol='mpg',seed=0)]).fit(training_df)
R2_gbr=evaluator.evaluate(gbr_pipe.transform(validation_df))

In [18]:
# test your models here
print("Performance of best RF: ", evaluator.evaluate(best_model.transform(validation_df)))
print("Performance of GBR: ", R2_gbr)

Performance of best RF:  0.8833324964226545
Performance of GBR:  0.8404903251080562


In [19]:
# tests
np.testing.assert_equal(type(gbr_pipe.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(gbr_pipe.stages[1]), regression.GBTRegressionModel)
np.testing.assert_equal(type(gbr_pipe.transform(validation_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_array_less(R2_gbr, 1.)
np.testing.assert_array_less(0.5, R2_gbr)

***The R^2 value of the best model found with random forest is better than the R^2 value of the Gradient Boosted Tree model, but the R^2 value of the GBT model is not dissapointing either.***