# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>
- TAs: Tong Zeng <tozeng@syr.edu>, Priya Matnani <psmatnan@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [1]:
# load these packages
import pyspark
from pyspark.ml import feature, classification
from pyspark.ml import Pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

# Part 2: Random Forest

In these questions, we will examine the famous Titanic dataset

In [2]:
# read-only
drop_cols = ['boat', 'body']
titanic_df = spark.read.csv('/datasets/titanic_original.csv', header=True, inferSchema=True).\
    drop(*drop_cols).\
    fillna('O').\
    dropna(subset=['pclass', 'age', 'sibsp', 'parch', 'fare', 'survived']).\
    select((fn.col('sex') == 'male').alias('is_male').cast('float'),           
           'pclass',
           'age',
           'sibsp',
           'parch',
           'fare',
           'survived')
training_df, validation_df, testing_df = titanic_df.randomSplit([0.6, 0.3, 0.1], seed=0)
titanic_df.printSchema()

root
 |-- is_male: float (nullable = false)
 |-- pclass: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- survived: integer (nullable = true)



# Question 1: (10 pts)

Create three pipelines that contain three different random forest classifiers that take in all features from the `titanic_df` (`is_male`, `pclass`, `age`, `sibsp`, `parch`, and `fare`) to predict whether someone survived (`survived`). Fit these pipelines to the training data

- `pipe_rf1`: Random forest with `maxDepth=1` and `numTrees=60`
- `pipe_rf2`: Random forest with `maxDepth=3` and `numTrees=40`
- `pipe_rf3`: Random forest with `maxDepth=6`, `numTrees=20`

In [3]:
# create the fitted pipelines `pipe_rf1`, `pipe_rf2`, and `pipe_rf3` here
# YOUR CODE HERE
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

va = VectorAssembler().setInputCols(training_df.columns[0:6]).setOutputCol('features')
rf1 = RandomForestClassifier(labelCol="survived", featuresCol="features", maxDepth=1,numTrees=60)
rf2 = RandomForestClassifier(labelCol="survived", featuresCol="features", maxDepth=3,numTrees=40)
rf3 = RandomForestClassifier(labelCol="survived", featuresCol="features", maxDepth=6,numTrees=20)
pipe_rf1=Pipeline(stages=[va, rf1]).fit(training_df)
pipe_rf2=Pipeline(stages=[va, rf2]).fit(training_df)
pipe_rf3=Pipeline(stages=[va, rf3]).fit(training_df)
#raise NotImplementedError()

In [4]:
# tests for 10 pts
np.testing.assert_equal(type(pipe_rf1.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf2.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf3.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf1.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(pipe_rf2.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(pipe_rf3.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(pipe_rf1.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf2.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf3.transform(training_df)), pyspark.sql.dataframe.DataFrame)

# Question 2 (10 pts)

Use the following evaluator to compute the area under the curve of the models on validation data. Print the AUC of the three models and assign the best one (i.e., the best pipeline) to a variable `best_model`

In [5]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='survived')
# use it as follows:
# evaluator.evaluate(fitted_pipeline.transform(df)) -> AUC

In [6]:
#YOUR CODE HERE

AUC1=evaluator.evaluate(pipe_rf1.transform(validation_df))
AUC2=evaluator.evaluate(pipe_rf2.transform(validation_df))
AUC3=evaluator.evaluate(pipe_rf3.transform(validation_df))

#raise NotImplementedError()

In [7]:
AUC1
#0.8182622582872928

0.8182622582872928

In [8]:
AUC2
#0.8348584254143648

0.8348584254143648

In [9]:
AUC3
#0.8445916781767949

0.8445916781767949

In [10]:
best_model=pipe_rf3

In [11]:
# tests for 10 pts
np.testing.assert_equal(type(best_model.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(best_model.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(best_model.transform(validation_df)), pyspark.sql.dataframe.DataFrame)

# Question 3: 5 pts

Compute the AUC of the model on testing data, print it, and assign it to variable `AUC_best`

In [12]:
# create AUC_best below
# YOUR CODE HERE
AUC_best=evaluator.evaluate(pipe_rf3.transform(testing_df))
#raise NotImplementedError()

In [13]:
# tests for 5 pts
np.testing.assert_array_less(AUC_best, 1.)
np.testing.assert_array_less(0.5, AUC_best)

In [14]:
AUC_best
#0.9104813315339629

0.9104813315339629

# Question 4: 5 pts

Using the parameters of the best model, create a new pipeline called `final_model` and fit it to the entire data (`titanic_df`)

In [15]:
# create the fitted pipeline `final_model` here
# YOUR CODE HERE
final_model= Pipeline(stages=[va, rf3]).fit(titanic_df)
#raise NotImplementedError()

In [16]:
# tests for 5 pts
np.testing.assert_equal(type(final_model.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(final_model.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(final_model.transform(titanic_df)), pyspark.sql.dataframe.DataFrame)

# Question 5: 10 + 5 pts

Create a pandas dataframe `feature_importance` with the columns `feature` and `importance` which contains the names of the features (`is_male`, `pclass`, etc.) and their feature importance as determined by the random forest of the final model. Sort the dataframe by `importance` in descending order.

In [17]:
# create feature_importance below
# YOUR CODE HERE
rf_model = final_model.stages[-1]
feature_importance= pd.DataFrame(list(zip(titanic_df.columns[0:6], rf_model.featureImportances.toArray())),
            columns = ['feature', 'importance']).sort_values('importance', ascending= False)

#raise NotImplementedError()

In [18]:
# display it here
feature_importance

Unnamed: 0,feature,importance
0,is_male,0.465462
5,fare,0.168049
1,pclass,0.148183
2,age,0.143203
3,sibsp,0.039139
4,parch,0.035964


In [19]:
# tests for 10 pts
assert type(feature_importance) == pd.core.frame.DataFrame
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])

In [23]:
titanic_df.show(50)

+-------+------+------+-----+-----+--------+--------+
|is_male|pclass|   age|sibsp|parch|    fare|survived|
+-------+------+------+-----+-----+--------+--------+
|    0.0|     1|  29.0|    0|    0|211.3375|       1|
|    1.0|     1|0.9167|    1|    2|  151.55|       1|
|    0.0|     1|   2.0|    1|    2|  151.55|       0|
|    1.0|     1|  30.0|    1|    2|  151.55|       0|
|    0.0|     1|  25.0|    1|    2|  151.55|       0|
|    1.0|     1|  48.0|    0|    0|   26.55|       1|
|    0.0|     1|  63.0|    1|    0| 77.9583|       1|
|    1.0|     1|  39.0|    0|    0|     0.0|       0|
|    0.0|     1|  53.0|    2|    0| 51.4792|       1|
|    1.0|     1|  71.0|    0|    0| 49.5042|       0|
|    1.0|     1|  47.0|    1|    0| 227.525|       0|
|    0.0|     1|  18.0|    1|    0| 227.525|       1|
|    0.0|     1|  24.0|    0|    0|    69.3|       1|
|    0.0|     1|  26.0|    0|    0|   78.85|       1|
|    1.0|     1|  80.0|    0|    0|    30.0|       1|
|    1.0|     1|  24.0|    0

**(5 pts)** Comment below on the importance that random forest has given to each feature. Are they reasonable? Do they tell you anything valuable about the titanic dataset? Answer in the cell below

Random forest has given most importance to the gender (is_male) feature followed by fare, pclass, age, sibsp and parch respectively. The information gained by random forest is the maximum from is_male. Yes, they are reasonable because females and children are those who are saved first and then the men if possible. 

# Question 6:  5 pts.

Pick any of the trees from the final model and assign its `toDebugString` property to a variable `example_tree`. Print this variable and add comments to the cell describing how you think this particular tree is fitting the data

In [21]:
# create a variable example_tree with the toDebugString property of a tree from final_model.
# print this string and comment in this same cell about the branches that this tree fit
# YOUR CODE HERE
len(rf_model.trees)
example_tree=rf_model.trees[9].toDebugString
print(example_tree)
#raise NotImplementedError()

DecisionTreeClassificationModel (uid=dtc_c550ad529565) of depth 6 with 97 nodes
  If (feature 5 <= 15.8)
   If (feature 0 <= 0.5)
    If (feature 2 <= 23.25)
     If (feature 5 <= 8.658349999999999)
      If (feature 2 <= 19.5)
       Predict: 1.0
      Else (feature 2 > 19.5)
       If (feature 2 <= 20.75)
        Predict: 0.0
       Else (feature 2 > 20.75)
        Predict: 1.0
     Else (feature 5 > 8.658349999999999)
      If (feature 3 <= 1.5)
       If (feature 3 <= 0.5)
        Predict: 1.0
       Else (feature 3 > 0.5)
        Predict: 0.0
      Else (feature 3 > 1.5)
       Predict: 0.0
    Else (feature 2 > 23.25)
     If (feature 1 <= 2.5)
      If (feature 3 <= 0.5)
       If (feature 2 <= 50.5)
        Predict: 1.0
       Else (feature 2 > 50.5)
        Predict: 0.0
      Else (feature 3 > 0.5)
       Predict: 1.0
     Else (feature 1 > 2.5)
      If (feature 4 <= 0.5)
       If (feature 2 <= 30.75)
        Predict: 0.0
       Else (feature 2 > 30.75)
        Predict: 1.0


The first split is using fare feature and the next split is the gender. These two features have maximum importance as shown above. This justifies why this tree is fitting the data well. It is not overfitting as it is not using is_male as the first split and fare as the second.

In [22]:
# tests for 5 points
assert type(example_tree) == str
assert 'DecisionTreeClassificationModel' in example_tree
assert 'feature 0' in example_tree
assert 'If' in example_tree
assert 'Else' in example_tree
assert 'Predict' in example_tree