# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [1]:
# load these packages
import pyspark
from pyspark.ml import feature, classification
from pyspark.ml import Pipeline, pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

We will analyze the Mid-atlantic wage dataset (https://rdrr.io/cran/ISLR/man/Wage.html). 

In [2]:
# read-only
drop_cols = ['_c0', 'logwage', 'sex', 'region']
wage_df = spark.read.csv('/datasets/ISLR/Wage.csv', header=True, inferSchema=True).drop(*drop_cols)
training_df, validation_df, testing_df = wage_df.randomSplit([0.6, 0.3, 0.1], seed=0)
wage_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- maritl: string (nullable = true)
 |-- race: string (nullable = true)
 |-- education: string (nullable = true)
 |-- jobclass: string (nullable = true)
 |-- health: string (nullable = true)
 |-- health_ins: string (nullable = true)
 |-- wage: double (nullable = true)



In [3]:
# explore the data
wage_df.limit(10).toPandas()

Unnamed: 0,year,age,maritl,race,education,jobclass,health,health_ins,wage
0,2006,18,1. Never Married,1. White,1. < HS Grad,1. Industrial,1. <=Good,2. No,75.043154
1,2004,24,1. Never Married,1. White,4. College Grad,2. Information,2. >=Very Good,2. No,70.47602
2,2003,45,2. Married,1. White,3. Some College,1. Industrial,1. <=Good,1. Yes,130.982177
3,2003,43,2. Married,3. Asian,4. College Grad,2. Information,2. >=Very Good,1. Yes,154.685293
4,2005,50,4. Divorced,1. White,2. HS Grad,2. Information,1. <=Good,1. Yes,75.043154
5,2008,54,2. Married,1. White,4. College Grad,2. Information,2. >=Very Good,1. Yes,127.115744
6,2009,44,2. Married,4. Other,3. Some College,1. Industrial,2. >=Very Good,1. Yes,169.528538
7,2008,30,1. Never Married,3. Asian,3. Some College,2. Information,1. <=Good,1. Yes,111.720849
8,2006,41,1. Never Married,2. Black,3. Some College,2. Information,2. >=Very Good,1. Yes,118.884359
9,2004,52,2. Married,1. White,2. HS Grad,2. Information,2. >=Very Good,1. Yes,128.680488


# Question 1: Codify the data using transformers (20 pts)

Create a fitted pipeline to the entire data `wage_df` and call it `pipe_feat`. This pipeline should codify the columns `maritl`, `race`, `education`, `jobclass`, `health`, and `health_ins`. The codification should be a combination of a `StringIndexer` and a `OneHotEncoder`. For example, for `maritl`, `StringIndexer` should create a column `maritl_index` and `OneHotEncoder` should create a column `maritl_feat`. Investigate the parameters of `StringIndexer` so that the labels are indexed alphabetically in ascending order so that, for example, the 1st index for `maritl_index` corresponds to `1. Never Married`, the 2nd index corresponds to `2. Married`, and so forth. Also, investigate the parameters of  `OneHotEncoder` so that there are no columns dropped as it is usually done for dummy variables. This is, marital status should have one column for each of the classes.

The pipeline should create a column `features` that combines `year`, `age`, and all codified columns.

In [4]:
# create `pipe_feat` below
# YOUR CODE HERE

# Chose only categorical columns
categorical_cols = wage_df.columns[2:8]

# String indexer model
string_indexer = [feature.StringIndexer(inputCol=c,outputCol=c+'_index') for c in categorical_cols]

# One Hot Encoder model
onehot_encoder = [feature.OneHotEncoder(inputCol=c+'_index',outputCol=c+'_feat') for c in categorical_cols]

# Building the feature model using Vector assembler
feature_col = [c+'_feat' for c in categorical_cols]
feature_col += [c+'_index' for c in categorical_cols]
feature_col += ["year","age"]

vector_assembler = feature.VectorAssembler(inputCols = feature_col, outputCol = 'features')

# Combining all the stages
all_stages = string_indexer + onehot_encoder + [vector_assembler]

# Building the fitted pipeline
pipe_feat = Pipeline(stages=all_stages).fit(wage_df)

In [5]:
# investigate the results
pipe_feat.transform(wage_df).limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
year,2006,2004,2003,2003,2005
age,18,24,45,43,50
maritl,1. Never Married,1. Never Married,2. Married,2. Married,4. Divorced
race,1. White,1. White,1. White,3. Asian,1. White
education,1. < HS Grad,4. College Grad,3. Some College,4. College Grad,2. HS Grad
jobclass,1. Industrial,2. Information,1. Industrial,2. Information,2. Information
health,1. <=Good,2. >=Very Good,1. <=Good,2. >=Very Good,1. <=Good
health_ins,2. No,2. No,1. Yes,1. Yes,1. Yes
wage,75.0432,70.476,130.982,154.685,75.0432
maritl_index,1,1,0,0,2


In [6]:
# (20 pts)
assert set(type(pm) for pm in pipe_feat.stages) == {feature.OneHotEncoder, feature.StringIndexerModel, feature.VectorAssembler}
assert len(pipe_feat.transform(wage_df).first().features) == 22


# Question 2: (15 pts)

Create three pipelines that contain three different random forest regressions that take in all features from the `wage_df` to predict `wage`. These pipelines should have as first stage the pipeline created in question 1 and should be fitted to the training data.

- `pipe_rf1`: Random forest with `maxDepth=1` and `numTrees=60`
- `pipe_rf2`: Random forest with `maxDepth=3` and `numTrees=40`
- `pipe_rf3`: Random forest with `maxDepth=6`, `numTrees=20`

In [7]:
# create the fitted pipelines `pipe_rf1`, `pipe_rf2`, and `pipe_rf3` here
# YOUR CODE HERE

# Random Forest regressor models
rf1 = regression.RandomForestRegressor(featuresCol="features",labelCol="wage",predictionCol="prediction",maxDepth=1,numTrees=60)
rf2 = regression.RandomForestRegressor(featuresCol="features",labelCol="wage",predictionCol="prediction",maxDepth=3,numTrees=40)
rf3 = regression.RandomForestRegressor(featuresCol="features",labelCol="wage",predictionCol="prediction",maxDepth=6,numTrees=20)

# Fitted pipelines,with first stage being the created pipeline and the second stage being the random forest regressor
pipe_rf1 = Pipeline(stages=[pipe_feat,rf1]).fit(training_df)
pipe_rf2 = Pipeline(stages=[pipe_feat,rf2]).fit(training_df)
pipe_rf3 = Pipeline(stages=[pipe_feat,rf3]).fit(training_df)

In [8]:
# tests for 15 pts
np.testing.assert_equal(type(pipe_rf1.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(pipe_rf2.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(pipe_rf3.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(pipe_rf1.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf2.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf3.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf1.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf2.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf3.transform(training_df)), pyspark.sql.dataframe.DataFrame)

# Question 3 (10 pts)

Use the following evaluator to compute the RMSE of the models on validation data. Print the RMSE of the three models and assign the best one (i.e., the best pipeline) to a variable `best_model`

In [9]:
evaluator = evaluation.RegressionEvaluator(labelCol='wage', metricName='rmse')
# use it as follows:
#   evaluator.evaluate(fitted_pipeline.transform(df)) -> RMSE

In [10]:
# print MSE of each model and define `best_model`
# YOUR CODE HERE

# Calculated the RMSE for all the three pipelines, for the validation set
rmse1 = evaluator.evaluate(pipe_rf1.transform(validation_df))
print(rmse1)
rmse2 = evaluator.evaluate(pipe_rf2.transform(validation_df))
print(rmse2)
rmse3 = evaluator.evaluate(pipe_rf3.transform(validation_df))
print(rmse3)

# Assigned the pipeline with lowest RMSE to the best model
best_model = pipe_rf3

36.202853882645535
33.617519747629125
33.44699490860644


In [11]:
# tests for 10 pts
np.testing.assert_equal(type(best_model.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(best_model.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(best_model.transform(training_df)), pyspark.sql.dataframe.DataFrame)

# Question 4: 5 pts

Compute the RMSE of the model on testing data, print it, and assign it to variable `RMSE_best`

In [12]:
# create RMSE_best below
# YOUR CODE HERE

# Used the best model on the test data, and calculated the RMSE
RMSE_best = evaluator.evaluate(best_model.transform(testing_df))
print(RMSE_best)

34.28723242088405


In [13]:
# tests for 5 pts
np.testing.assert_array_less(RMSE_best, 40)
np.testing.assert_array_less(30, RMSE_best)

# Question 5: 5 pts

Using the parameters of the best model, create a new pipeline called `final_model` and fit it to the entire data (`wage_df`)

In [14]:
# create final_model pipeline below
# YOUR CODE HERE

# Created the final model, which fit the entire dataset using the stages of best_model
final_model = Pipeline(stages=[best_model.stages[0],best_model.stages[1]]).fit(wage_df)

In [41]:
# Printing the stages of the final model
final_model.stages

[PipelineModel_3884e0caa527,
 RandomForestRegressionModel (uid=RandomForestRegressor_463cb1b263d6) with 20 trees]

In [15]:
# tests for 5 pts
np.testing.assert_equal(type(final_model.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(final_model.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(final_model.transform(wage_df)), pyspark.sql.dataframe.DataFrame)

# Question 6: 30 pts

Create a pandas dataframe `feature_importance` with the columns `feature` and `importance` which contains the names of the features. Give appropriate column names such as `maritl_1._Never_Married`. You can build these feature names by using the labels from the fitted `StringIndexer` used in Question 1. Use as feature importance as determined by the random forest of the final model (`final_model`). Sort the pandas dataframe by `importance` in descending order and display.

In [38]:
# f = []
# for key, value in dr1.items():
#     temp = [key, value]
#     f.append(temp)

# for key, value in dr2.items():
#     temp = [key, value]
#     f.append(temp)
    
# for key, value in dr3.items():
#     temp = [key, value]
#     f.append(temp)
    
# for key, value in dr4.items():
#     temp = [key, value]
#     f.append(temp)
    
# for key, value in dr5.items():
#     temp = [key, value]
#     f.append(temp)

# for key, value in dr6.items():
#     temp = [key, value]
#     f.append(temp)
    
# feature_importance = pd.DataFrame(f,columns = ["features","importance"])

# print(feature_importance)

In [63]:
# create feature_importance below
# YOUR CODE HERE
# Returning the feature importances of the random forest model

# Imported chain function from itertools
#!pip install more-itertools
from itertools import chain

# Fit the pipeline on the training data, and used the final model
pipe = final_model.stages[0].transform(wage_df)
postrf_df = rf3.fit(pipe)

# Identified the various feature importances for the values 
feature_imp = postrf_df.featureImportances
feat = postrf_df.transform(pipe)

# Assigning the sorted values of the feature importances to the variable attrs
attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*feat
        .schema["features"]
        .metadata["ml_attr"]["attrs"].values())))

In [69]:
# Creating the list of all the feature names, and their respective values
a = [(name, postrf_df.featureImportances[idx])
 for idx, name in attrs
 if postrf_df.featureImportances[idx]]

# Building the dataframe out of the list of feature names and values, renaming the columns, and sorting them in descending order
feature_importance = pd.DataFrame(a)
feature_importance.columns = ['feature','importance']
feature_importance = feature_importance.sort_values('importance',ascending=False)
feature_importance

Unnamed: 0,feature,importance
16,education_index,0.31707
10,education_feat_5. Advanced Degree,0.132371
13,health_ins_feat_1. Yes,0.091746
21,age,0.083808
19,health_ins_index,0.073294
14,maritl_index,0.070556
8,education_feat_4. College Grad,0.037428
20,year,0.029142
7,education_feat_2. HS Grad,0.028323
1,maritl_feat_1. Never Married,0.025999


In [78]:
#######################################################################################################################
# Tried to change the name of the columns as per the specifications, but no regex expression fit the requirements
#######################################################################################################################

# edu_values=list(wage_df.toPandas().education.unique())
# edu_dir={}
# for ele in edu_values:
#     edu_dir[int(ele[0])]=ele
# print(edu_dir)

# race_values=list(wage_df.toPandas().race.unique())
# race_dir = {}
# for ele in race_values:
#     race_dir[int(ele[0])]=ele
# print(race_dir)

# maritl_values=list(wage_df.toPandas().maritl.unique())
# maritl_dir = {}
# for ele in maritl_values:
#     maritl_dir[int(ele[0])]=ele
# print(maritl_dir)

# jobclass_values=list(wage_df.toPandas().jobclass.unique())
# jobclass_dir = {}
# for ele in jobclass_values:
#     jobclass_dir[int(ele[0])]=ele
# print(jobclass_dir)

# health_values=list(wage_df.toPandas().health.unique())
# health_dir = {}
# for ele in health_values:
#     health_dir[int(ele[0])]=ele
# print(health_dir)

# health_ins_values=list(wage_df.toPandas().health_ins.unique())
# health_ins_dir = {}
# for ele in health_ins_values:
#     health_ins_dir[int(ele[0])]=ele
# print(health_ins_dir)

In [85]:
import re
edu_idx=edu_dir.keys()
edu_list=[]
dr1={}
for ele in list(feature_importance1["feature"].unique()):
    if re.search("education",ele):
        edu_list.append(ele)
        dr1[str(ele)]=float(feature_importance1.loc[feature_importance1["feature"]==ele,"importance"])
flag=0
for idx in edu_idx:
    for col in edu_list:
        if str(idx) in col:
            flag=1
            break
    if flag==0:
        miss=idx

edu_list[-1]="education_feat_"+str(edu_dir[int(miss)])
dr1_keys=list(dr1.keys())
dr1[edu_list[-1]]=dr1.pop(dr1_keys[-1])
print(dr1)

race_idx=race_dir.keys()
race_list=[]
dr2={}
for ele in list(feature_importance1["feature"].unique()):
    if re.search("race",ele):
        race_list.append(ele)
        dr2[str(ele)]=float(feature_importance1.loc[feature_importance1["feature"]==ele,"importance"])
flag=0
for idx in race_idx:
    for col in race_list:
        if str(idx) in col:
            flag=1
            break
    if flag==0:
        miss=idx

race_list[-1]="race_feat_"+str(race_dir[int(miss)])
dr2_keys=list(dr2.keys())
dr2[race_list[-1]]=dr2.pop(dr2_keys[-1])
print(dr2)

maritl_idx=maritl_dir.keys()
maritl_list=[]
dr3={}
for ele in list(feature_importance1["feature"].unique()):
    if re.search("maritl",ele):
        maritl_list.append(ele)
        dr3[str(ele)]=float(feature_importance1.loc[feature_importance1["feature"]==ele,"importance"])
flag=0
for idx in maritl_idx:
    for col in maritl_list:
        if str(idx) in col:
            flag=1
            break
    if flag==0:
        miss=idx

maritl_list[-1]="maritl_feat_"+str(maritl_dir[int(miss)])
dr3_keys=list(dr3.keys())
dr3[maritl_list[-1]]=dr3.pop(dr3_keys[-1])
print(dr3)

jobclass_idx=jobclass_dir.keys()
jobclass_list=[]
dr4={}
for ele in list(feature_importance1["feature"].unique()):
    if re.search("jobclass",ele):
        jobclass_list.append(ele)
        dr4[str(ele)]=float(feature_importance1.loc[feature_importance1["feature"]==ele,"importance"])
flag=0
for idx in jobclass_idx:
    for col in jobclass_list:
        if str(idx) in col:
            flag=1
            break
    if flag==0:
        miss=idx

jobclass_list[-1]="jobclass_feat_"+str(jobclass_dir[int(miss)])
dr4_keys=list(dr4.keys())
dr4[jobclass_list[-1]]=dr4.pop(dr4_keys[-1])
print(dr4)

health_idx=health_dir.keys()
health_list=[]
dr5={}
for ele in list(feature_importance1["feature"].unique()):
    if re.search("health_feat",ele):
        health_list.append(ele)
        dr5[str(ele)]=float(feature_importance1.loc[feature_importance1["feature"]==ele,"importance"])
flag=0
for idx in health_idx:
    for col in health_list:
        if str(idx) in col:
            flag=1
            break
    if flag==0:
        miss=idx

health_list[-1]="health_feat_"+str(health_dir[int(miss)])
dr5_keys=list(dr5.keys())
dr5[health_list[-1]]=dr5.pop(dr5_keys[-1])
print(dr5)

health_ins_idx=health_ins_dir.keys()
health_ins_list=[]
dr6={}
for ele in list(feature_importance1["feature"].unique()):
    if re.search("health_ins",ele):
        health_ins_list.append(ele)
        dr6[str(ele)]=float(feature_importance1.loc[feature_importance1["feature"]==ele,"importance"])
flag=0
for idx in health_ins_idx:
    for col in health_ins_list:
        if str(idx) in col:
            flag=1
            break
    if flag==0:
        miss=idx

health_ins_list[1]="health_ins_feat_"+str(health_ins_dir[int(miss)])
dr6_keys=list(dr6.keys())
dr6[health_ins_list[-1]]=dr6.pop(dr6_keys[-1])
print(dr6)

{'education_feat_4. College Grad': 0.2991217048703026, 'education_feat_3. Some College': 0.015599903745202131, 'education_feat_2. HS Grad': 0.01312854815331528, 'education_feat_5. Advanced Degree': 0.009456500988297544, 'education_feat_1. < HS Grad': 0.0052934928295298835}
{'race_feat_2. Black': 0.06454927286165087, 'race_feat_3. Asian': 0.052493458043539824, 'race_feat_1. White': 0.0001209745135330191}
{'maritl_feat_4. Divorced': 0.14902578576404216, 'maritl_feat_2. Married': 0.04386079962086879, 'maritl_feat_5. Separated': 0.014966275072135401, 'maritl_feat_1. Never Married': 0.0036178147587944787}
{'jobclass_feat_1. Industrial': 0.003444107108784947}
{'health_feat_1. <=Good': 0.043042763162742026}
{'health_ins_feat_1. Yes': 0.0992499239804704, 'health_ins_feat_2. No': 0.018427146210230692}


In [81]:
# tests for 25 pts
assert type(feature_importance) == pd.core.frame.DataFrame
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])

**(5 pts)** Comment below on the importance that random forest has given to each feature. Are they reasonable? Do they tell you anything valuable about the titanic dataset? Answer in the cell below

YOUR ANSWER HERE

# Question 7:  15 pts.

Pick any of the trees from the final model and assign its `toDebugString` property to a variable `example_tree`. Print this variable and add comments to the cell describing how you think this particular tree is fitting the data

In [82]:
# create a variable example_tree with the toDebugString property of a tree from final_model.
# print this string and comment in this same cell about the branches that this tree fit
# YOUR CODE HERE

# Using the 8th tree for evaluating how the particular tree fits the data
example_tree = final_model.stages[1].trees[8].toDebugString

In [83]:
# display the tree here
print(example_tree)

DecisionTreeRegressionModel (uid=dtr_e0d332c284ac) of depth 6 with 119 nodes
  If (feature 16 in {0.0,2.0,4.0})
   If (feature 13 in {0.0})
    If (feature 21 <= 28.5)
     If (feature 0 in {0.0})
      If (feature 21 <= 24.5)
       If (feature 16 in {2.0,4.0})
        Predict: 62.44825501585237
       Else (feature 16 not in {2.0,4.0})
        Predict: 70.02706576550946
      Else (feature 21 > 24.5)
       If (feature 4 in {0.0})
        Predict: 62.96143718082374
       Else (feature 4 not in {0.0})
        Predict: 76.80127438255825
     Else (feature 0 not in {0.0})
      If (feature 21 <= 24.5)
       If (feature 16 in {0.0})
        Predict: 67.22541563344623
       Else (feature 16 not in {0.0})
        Predict: 137.7644514573805
      Else (feature 21 > 24.5)
       If (feature 4 in {1.0})
        Predict: 70.6756099066326
       Else (feature 4 not in {1.0})
        Predict: 80.0801440199342
    Else (feature 21 > 28.5)
     If (feature 9 in {0.0})
      If (feature 6 in {1.

In [84]:
# tests for 10 points
assert type(example_tree) == str
assert 'DecisionTreeRegressionModel' in example_tree
assert 'feature 0' in example_tree
assert 'If' in example_tree
assert 'Else' in example_tree
assert 'Predict' in example_tree

**(5 pts)** Comment on the feature that is at the top of the tree. Does it make sense that that is the feature there?

The feature at the top of the tree is the most important feature in the tree. The reason is, changes in value of the root node (first feature) can throw the prediction anywhere across the tree, as it is the first node. This means, the prediction is the most sensitive to the variable which is the most important.

As per tree 8, the most important feature is 16 (education less than high-school grad) , which is the most important feature according to the feature_importances dataframe as well.  It makes sense because whether someone has education or not (basic) changes the amount of wage one can get significantly. Degrees after high-school do not carry as big a difference as educated/not-educated.

NOTE: The output changes as per different random-forest trees, and in this case we are only look at the behavior of tree 8.