In [103]:
# load these packages
import pyspark
from pyspark.ml import feature, classification
from pyspark.ml import Pipeline, pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature as ft, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

We will analyze the Mid-atlantic wage dataset (https://rdrr.io/cran/ISLR/man/Wage.html). 

In [104]:
# read-only
drop_cols = ['_c0', 'logwage', 'sex', 'region']
wage_df = spark.read.csv('/datasets/ISLR/Wage.csv', header=True, inferSchema=True).drop(*drop_cols)
training_df, validation_df, testing_df = wage_df.randomSplit([0.6, 0.3, 0.1], seed=0)
wage_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- maritl: string (nullable = true)
 |-- race: string (nullable = true)
 |-- education: string (nullable = true)
 |-- jobclass: string (nullable = true)
 |-- health: string (nullable = true)
 |-- health_ins: string (nullable = true)
 |-- wage: double (nullable = true)



In [105]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler

In [106]:
# explore the data
wage_df.toPandas()['wage'].max()


318.342430056529

Codify the data using transformers 

Create a fitted pipeline to the entire data `wage_df` and call it `pipe_feat`. This pipeline should codify the columns `maritl`, `race`, `education`, `jobclass`, `health`, and `health_ins`. The codification should be a combination of a `StringIndexer` and a `OneHotEncoder`. For example, for `maritl`, `StringIndexer` should create a column `maritl_index` and `OneHotEncoder` should create a column `maritl_feat`. Investigate the parameters of `StringIndexer` so that the labels are indexed alphabetically in ascending order so that, for example, the 1st index for `maritl_index` corresponds to `1. Never Married`, the 2nd index corresponds to `2. Married`, and so forth. Also, investigate the parameters of  `OneHotEncoder` so that there are no columns dropped as it is usually done for dummy variables. This is, marital status should have one column for each of the classes.

The pipeline should create a column `features` that combines `year`, `age`, and all codified columns.

In [107]:
# create `pipe_feat` below
#columns = ["maritl",'race','education','jobclass','health','health_ins']
#strindex = [ft.StringIndexer(inputCol=cols, outputCol=cols + "_index", stringOrderType = "alphabetAsc") for cols in list(set(wage_df.columns))]
#encoder = [ft.OneHotEncoder(inputCol=cols+ "_index", outputCol=cols + "_feat", dropLast = False) for cols in list(set(wage_df.columns))]
va = ft.VectorAssembler(inputCols= ['year','age',"maritl_feat",'race_feat','education_feat','jobclass_feat','health_feat','health_ins_feat'],outputCol="features" )
#codify=[strindex,encoder,va]
pipe_feat = Pipeline(stages=[StringIndexer(inputCol="maritl", outputCol="maritl_index", stringOrderType = "alphabetAsc") 
                              ,StringIndexer(inputCol='race', outputCol="race_index", stringOrderType = "alphabetAsc")
                              ,StringIndexer(inputCol='education', outputCol="education_index", stringOrderType = "alphabetAsc")
                              ,StringIndexer(inputCol='jobclass', outputCol= "jobclass_index", stringOrderType = "alphabetAsc")
                              ,StringIndexer(inputCol='health', outputCol="health_index", stringOrderType = "alphabetAsc")
                              ,StringIndexer(inputCol='health_ins', outputCol="health_ins_index", stringOrderType = "alphabetAsc")
                              ,OneHotEncoder(inputCol="maritl_index", outputCol="maritl_feat", dropLast = False)
                             ,OneHotEncoder(inputCol="race_index", outputCol="race_feat", dropLast = False)
                              ,OneHotEncoder(inputCol="education_index", outputCol="education_feat", dropLast = False)
                             ,OneHotEncoder(inputCol="jobclass_index", outputCol="jobclass_feat", dropLast = False)
                              ,OneHotEncoder(inputCol="health_index", outputCol="health_feat", dropLast = False)
                             ,OneHotEncoder(inputCol="health_ins_index", outputCol="health_ins_feat", dropLast = False)
                             ,va
                             ]).fit(wage_df) 
#pipe_feat2 = Pipeline(stages=encoder)  
#pipe1=pipe_feat.fit(wage_df).transform(wage_df)
#raise NotImplementedError()

In [108]:
pipe_feat.stages

[StringIndexer_1cf8ad51477f,
 StringIndexer_3d2af357cc3d,
 StringIndexer_273e9d5912db,
 StringIndexer_a9d9e98db5eb,
 StringIndexer_4e01e6a02c5e,
 StringIndexer_df277c103f41,
 OneHotEncoder_422af6e6c991,
 OneHotEncoder_fb1323fd245b,
 OneHotEncoder_6a7785b97220,
 OneHotEncoder_4b9b653e4bb5,
 OneHotEncoder_b92cca740e77,
 OneHotEncoder_68fae43da7ab,
 VectorAssembler_9bc6539478cc]

In [109]:
# investigate the results
pipe_feat.transform(wage_df).limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
year,2006,2004,2003,2003,2005
age,18,24,45,43,50
maritl,1. Never Married,1. Never Married,2. Married,2. Married,4. Divorced
race,1. White,1. White,1. White,3. Asian,1. White
education,1. < HS Grad,4. College Grad,3. Some College,4. College Grad,2. HS Grad
jobclass,1. Industrial,2. Information,1. Industrial,2. Information,2. Information
health,1. <=Good,2. >=Very Good,1. <=Good,2. >=Very Good,1. <=Good
health_ins,2. No,2. No,1. Yes,1. Yes,1. Yes
wage,75.0432,70.476,130.982,154.685,75.0432
maritl_index,0,0,1,1,3




Create three pipelines that contain three different random forest regressions that take in all features from the `wage_df` to predict `wage`. These pipelines should have as first stage the pipeline created in question 1 and should be fitted to the training data.

- `pipe_rf1`: Random forest with `maxDepth=1` and `numTrees=60`
- `pipe_rf2`: Random forest with `maxDepth=3` and `numTrees=40`
- `pipe_rf3`: Random forest with `maxDepth=6`, `numTrees=20`

In [111]:

from pyspark.ml.regression import RandomForestRegressor
pipe_rf1 = Pipeline(stages=[pipe_feat, RandomForestRegressor(labelCol='wage', featuresCol='features',maxDepth=1, numTrees=60)]).fit(training_df)
pipe_rf2 = Pipeline(stages=[pipe_feat, RandomForestRegressor(labelCol='wage', featuresCol='features',maxDepth=3, numTrees=40)]).fit(training_df)
pipe_rf3 = Pipeline(stages=[pipe_feat, RandomForestRegressor(labelCol='wage', featuresCol='features',maxDepth=6, numTrees=20)]).fit(training_df)



In [113]:
evaluator = evaluation.RegressionEvaluator(labelCol='wage', metricName='rmse')

In [114]:

print(evaluator.evaluate(pipe_rf1.transform(validation_df))) 
print(evaluator.evaluate(pipe_rf2.transform(validation_df)))
print(evaluator.evaluate(pipe_rf3.transform(validation_df)))
best_model = Pipeline(stages=[pipe_feat, RandomForestRegressor(labelCol='wage', featuresCol='features',maxDepth=6, numTrees=20)]).fit(training_df)


36.810232083476
33.75261578062595
32.99183604380521



Compute the RMSE of the model on testing data, print it, and assign it to variable `RMSE_best`

In [116]:

RMSE_best = evaluator.evaluate(best_model.transform(testing_df))
RMSE_best


34.047279900775656



Using the parameters of the best model, create a new pipeline called `final_model` and fit it to the entire data (`wage_df`)

In [118]:

final_model = Pipeline(stages=[pipe_feat, RandomForestRegressor(labelCol='wage', featuresCol='features',maxDepth=6, numTrees=20)]).fit(wage_df)
finaldf= final_model.transform(wage_df)
finaldf.columns


['year',
 'age',
 'maritl',
 'race',
 'education',
 'jobclass',
 'health',
 'health_ins',
 'wage',
 'maritl_index',
 'race_index',
 'education_index',
 'jobclass_index',
 'health_index',
 'health_ins_index',
 'maritl_feat',
 'race_feat',
 'education_feat',
 'jobclass_feat',
 'health_feat',
 'health_ins_feat',
 'features',
 'prediction']


Create a pandas dataframe `feature_importance` with the columns `feature` and `importance` which contains the names of the features. Give appropriate column names such as `maritl_1._Never_Married`. Sort the pandas dataframe by `importance` in descending order and display.

In [120]:
featrs = list()

In [121]:
newlist = list()

In [122]:
for i in range(0,6):
    c=pipe_feat.stages[i].labels
    print(c)
    featrs.extend(c)
    

['1. Never Married', '2. Married', '3. Widowed', '4. Divorced', '5. Separated']
['1. White', '2. Black', '3. Asian', '4. Other']
['1. < HS Grad', '2. HS Grad', '3. Some College', '4. College Grad', '5. Advanced Degree']
['1. Industrial', '2. Information']
['1. <=Good', '2. >=Very Good']
['1. Yes', '2. No']


In [123]:
featrs

['1. Never Married',
 '2. Married',
 '3. Widowed',
 '4. Divorced',
 '5. Separated',
 '1. White',
 '2. Black',
 '3. Asian',
 '4. Other',
 '1. < HS Grad',
 '2. HS Grad',
 '3. Some College',
 '4. College Grad',
 '5. Advanced Degree',
 '1. Industrial',
 '2. Information',
 '1. <=Good',
 '2. >=Very Good',
 '1. Yes',
 '2. No']

In [124]:
j=0
for i in featrs:
    #print(j)
    #print(i)
    cd=i.replace(" ","_")
    if 0<=j<=4:
        cd1= "maritl_" + cd
        newlist.append(cd1)
        j = j+1
        if j == 5:
            continue
    if 5<=j<=8:
        cd2= "race_"+ cd
        newlist.append(cd2)
        j = j+1
        if j == 9:
            continue
    if 9<=j<=13:
        cd3= "education_" + cd
        newlist.append(cd3)
        j = j+1
        if j == 14:
            continue
    if 14<=j<=15:
        cd4= "jobclass_" + cd
        newlist.append(cd4)
        j = j+1
        if j == 16:
            continue
    if 16<=j<=17:
        cd5= "health_" + cd
        newlist.append(cd5)
        j = j+1
        if j == 18:
            continue
    if 18<=j<=19:
        cd6= "health_ins_" + cd
        newlist.append(cd6)
        j = j+1

In [125]:
newlist.insert(0, "age") 

In [126]:
newlist.insert(0, "year") 

In [127]:
newlist

['year',
 'age',
 'maritl_1._Never_Married',
 'maritl_2._Married',
 'maritl_3._Widowed',
 'maritl_4._Divorced',
 'maritl_5._Separated',
 'race_1._White',
 'race_2._Black',
 'race_3._Asian',
 'race_4._Other',
 'education_1._<_HS_Grad',
 'education_2._HS_Grad',
 'education_3._Some_College',
 'education_4._College_Grad',
 'education_5._Advanced_Degree',
 'jobclass_1._Industrial',
 'jobclass_2._Information',
 'health_1._<=Good',
 'health_2._>=Very_Good',
 'health_ins_1._Yes',
 'health_ins_2._No']

In [128]:

feature_importance = pd.DataFrame(list(zip(newlist, final_model.stages[1].featureImportances.toArray())),
            columns = ['feature', 'importance']).sort_values('importance',ascending=False)


In [129]:
final_model.stages[1].featureImportances.toArray()

array([2.61552344e-02, 1.18986142e-01, 3.04166943e-02, 5.91385547e-02,
       2.31234528e-04, 2.22026283e-03, 5.00807767e-03, 4.31444287e-03,
       4.55125730e-03, 5.78938587e-03, 1.21916150e-03, 3.19796988e-02,
       6.22135896e-02, 2.33157506e-02, 7.66209547e-02, 3.08061742e-01,
       1.10382872e-02, 1.37374353e-02, 1.01517752e-02, 1.43274929e-02,
       7.40298549e-02, 1.16492971e-01])

In [130]:
# display your feature importances here
feature_importance

Unnamed: 0,feature,importance
15,education_5._Advanced_Degree,0.308062
1,age,0.118986
21,health_ins_2._No,0.116493
14,education_4._College_Grad,0.076621
20,health_ins_1._Yes,0.07403
12,education_2._HS_Grad,0.062214
3,maritl_2._Married,0.059139
11,education_1._<_HS_Grad,0.03198
2,maritl_1._Never_Married,0.030417
0,year,0.026155


Education (education_3._Advanced_Degree) was given the highest importance. I think this is reasonable as higher education is directly correlated with higher wages.It shows that an advanced degree can significantly improve chances of higher wages. Age was also given high importance as wages vary with age.Similarly, the model has given health_ins_no high importance. This is reasonable too as if person cannot afford health insurance, it means his/her wage is low. Similarly, other education like college grad and HS grad was given high importance.



Pick any of the trees from the final model and assign its `toDebugString` property to a variable `example_tree`. Print this variable and add comments to the cell describing how you think this particular tree is fitting the data

In [132]:
# create a variable example_tree with the toDebugString property of a tree from final_model.
# print this string and comment in this same cell about the branches that this tree fit

example_tree = final_model.stages[1].trees[5].toDebugString
print(example_tree)


DecisionTreeRegressionModel (uid=dtr_b05f42e380d5) of depth 6 with 121 nodes
  If (feature 20 in {0.0})
   If (feature 14 in {0.0})
    If (feature 3 in {0.0})
     If (feature 0 <= 2003.5)
      If (feature 11 in {1.0})
       If (feature 1 <= 22.5)
        Predict: 40.226148274439225
       Else (feature 1 > 22.5)
        Predict: 65.66588403451458
      Else (feature 11 not in {1.0})
       If (feature 15 in {0.0})
        Predict: 66.13014972244899
       Else (feature 15 not in {0.0})
        Predict: 143.13494081113413
     Else (feature 0 > 2003.5)
      If (feature 1 <= 28.5)
       If (feature 19 in {0.0})
        Predict: 66.86167201902865
       Else (feature 19 not in {0.0})
        Predict: 71.90594620149272
      Else (feature 1 > 28.5)
       If (feature 18 in {1.0})
        Predict: 76.39473167239255
       Else (feature 18 not in {1.0})
        Predict: 88.64680243901937
    Else (feature 3 not in {0.0})
     If (feature 11 in {1.0})
      If (feature 16 in {0.0})
    

In [133]:
# display the tree here
print(example_tree)

DecisionTreeRegressionModel (uid=dtr_b05f42e380d5) of depth 6 with 121 nodes
  If (feature 20 in {0.0})
   If (feature 14 in {0.0})
    If (feature 3 in {0.0})
     If (feature 0 <= 2003.5)
      If (feature 11 in {1.0})
       If (feature 1 <= 22.5)
        Predict: 40.226148274439225
       Else (feature 1 > 22.5)
        Predict: 65.66588403451458
      Else (feature 11 not in {1.0})
       If (feature 15 in {0.0})
        Predict: 66.13014972244899
       Else (feature 15 not in {0.0})
        Predict: 143.13494081113413
     Else (feature 0 > 2003.5)
      If (feature 1 <= 28.5)
       If (feature 19 in {0.0})
        Predict: 66.86167201902865
       Else (feature 19 not in {0.0})
        Predict: 71.90594620149272
      Else (feature 1 > 28.5)
       If (feature 18 in {1.0})
        Predict: 76.39473167239255
       Else (feature 18 not in {1.0})
        Predict: 88.64680243901937
    Else (feature 3 not in {0.0})
     If (feature 11 in {1.0})
      If (feature 16 in {0.0})
    

The top node is health_ins_1._Yes. The tree next branches on education_college grad and so on. The tree has depth of 6 and total of 121 nodes.

The feature at the top of this tree is "health_ins_1._Yes". Since random forest model generates various trees by selecting random features from original data set, we can say that it makes sense for this feature to come at the top in this tree.