In [1]:
from sklearn.datasets import load_diabetes
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

from spark_shap import *

# Create dataset
We shall use a regression dataset from scikit learn. We also include some categorical variables to show that our implementation works with categorical variables.

In [2]:
x = load_diabetes()

data = pd.DataFrame(x['data'], columns=x['feature_names'])
data['target'] = x['target']

In [3]:
#create one binary variable
data['sex_cat'] = 'F'
data.loc[data['sex'] < 0, ['sex_cat']] = 'M'

data['sex'] = data['sex_cat']
data = data.drop('sex_cat', axis=1)

In [4]:
#revert age to create category (to show it also works with categories)
data = data.replace({"age": dict(zip(sorted(data['age'].unique()), range(19,19+data['age'].nunique())))})
data['age'] = pd.qcut(data['age'], 5).astype(str)

# Spark

We initialize and create the train/test set on spark

In [5]:
#create spark environment
spark = SparkSession.Builder().appName("Example").getOrCreate()

In [6]:
#convert to spark dataframe
data_df = spark.createDataFrame(data)

In [7]:
train_df, test_df = data_df.randomSplit([0.7, 0.3], seed=42)

# Train Random Forest Regression model

In [8]:
#convert age and sex from string to integer
transformers_pipe = Pipeline(stages=[StringIndexer(inputCol="age", outputCol="age_int", handleInvalid='error'),
                                     StringIndexer(inputCol="sex", outputCol="sex_int", handleInvalid='error')])
    
transformers_pipe = transformers_pipe.fit(train_df)

#then create feature vector as input for model
vectorizer = VectorAssembler(inputCols=['age_int', 'sex_int', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6'], outputCol='features')

In [9]:
#create model to train
rf_model = RandomForestRegressor(featuresCol='features', labelCol='target', predictionCol='prediction',
                            minInstancesPerNode=1, impurity="variance", seed=0, numTrees=50, maxDepth=5)

In [10]:
#train model
t_df = vectorizer.transform(transformers_pipe.transform(train_df)) #transform input into final feature vector
rf_model = rf_model.fit(t_df)

# Results

We shall compute RMSE metric to check if our model is somewhat good. We also compute the feature importance

In [11]:
# feature importance
def extract_feature_importance(feature_imp, feature_names):
    """
    Convierte las feature importante en un dataframe de pandas
    """
    varlist = pd.DataFrame(feature_names, columns = ["name"]).reset_index().rename(columns = {"index": "idx"})

    #asocia nombre columna a su score
    varlist['score'] = varlist['idx'].apply(lambda x: feature_imp[x])
    varlist = varlist.sort_values('score', ascending=False)
    return varlist  

feature_imp = extract_feature_importance(rf_model.featureImportances, ['age_int', 'sex_int', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6'])
print("Feature importance:\n{}".format(feature_imp.to_string(index=False)))

Feature importance:
 idx    name    score
   2     bmi 0.289361
   8      s5 0.247779
   6      s3 0.100881
   9      s6 0.095658
   3      bp 0.068733
   7      s4 0.068496
   5      s2 0.041516
   0 age_int 0.040309
   4      s1 0.038783
   1 sex_int 0.008483


In [12]:
# RMSE
reg_eva = RegressionEvaluator(predictionCol='prediction', labelCol='target', metricName='rmse')

# train
print(reg_eva.evaluate(rf_model.transform(t_df)))

# test
n_df = rf_model.transform(vectorizer.transform(transformers_pipe.transform(test_df)))
print(reg_eva.evaluate(n_df))

40.75282146435476
58.402904767375034


The model overfits a little.

# Study individual results using SHAP

We shall consider a random test example to analize the effect of each feature on its prediction.

In [13]:
#select random test example
row = n_df.limit(1).toPandas().iloc[0]

In [14]:
print(row)

age                                              (18.999, 36.0]
sex                                                           F
bmi                                                   -0.022373
bp                                                     0.001215
s1                                                    -0.037344
s2                                                    -0.026366
s3                                                     0.015505
s4                                                    -0.039493
s5                                                    -0.072128
s6                                                    -0.017646
target                                                     49.0
age_int                                                     1.0
sex_int                                                     1.0
features      [1.0, 1.0, -0.0223731352440218, 0.001215130832...
prediction                                            98.684199
Name: 0, dtype: object


- For SHAP we first estimate the mean prediction
- Then all the computed features should add to the prediction

In [15]:
avg_val = n_df.select(F.avg('prediction')).toPandas().iloc[0][0]
print(avg_val)

148.40146099831344


- SHAP values should be computed with the test dataset
- SHAP works with monte carlo sampling, so we shall duplicate the test_df by some factor, to have more examples to sample. The more examples, the more exact our approximation will be to the actual values

In [16]:
def duplicate_df(df, n):
    """
    Duplicate dataset for more accurate monte carlo sampling
    Since we are duplicating the average will not change
    """
    
    res = df
    for _ in range(n):
        res = res.union(df)
        
    return res

entire_df = duplicate_df(n_df.drop('target','features','prediction'), 20)
print(entire_df.persist().count())

3003


We shall compute shap for some variables. For this example we shall ignore the two variables with the smallest feature importance. 

Note that you can compute the SHAP values for all variables, but the more variables you insert, the bigger the sampling data should be.

In [17]:
all_features = ['age_int', 'sex_int', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6']
important_features = feature_imp['name'].values[:-2].tolist() #we want to see if the more important features are the ones with a bigger impact on the result

In [18]:
import importlib
import spark_shap
importlib.reload(spark_shap)
from spark_shap import *

In [19]:
shap_values = compute_shap_values(entire_df, 
                                  row, 
                                  all_features, 
                                  important_features, 
                                  prediction_fun = lambda df: rf_model.transform(df), 
                                  features_col='features', 
                                  prediction_col='prediction')

bmi:-15.656472827859787
s5:-20.949190445891805
s3:-1.5008425770699845
s6:-4.155754621024916
bp:-2.265371745520624
s4:-5.5344540853024995
s2:-0.4375498302434104
age_int:1.4347488888394524


The goal of computing the shap values is that the sum of all shap values should equate the value predicted for that row:

In [20]:
#sum of shap values
avg_val + sum(shap_values.values())

99.33657375423986

In [21]:
#prediction of row being analized
row['prediction']

98.68419853727168

Due to monte carlo sampling, the values will always be an approximation. But the sum is close enough to the actual value, so we can explain the impact of each important feature of our Spark Random Forest Regression on the row.

In [22]:
#order values by importance
print(sorted( ((k,v) for k,v in shap_values.items()), key=lambda x: x[-1]) )

[('s5', -20.949190445891805), ('bmi', -15.656472827859787), ('s4', -5.5344540853024995), ('s6', -4.155754621024916), ('bp', -2.265371745520624), ('s3', -1.5008425770699845), ('s2', -0.4375498302434104), ('age_int', 1.4347488888394524)]


We can see in this case that *bmi* and *s5* are the features with the biggest impact on the prediction of this example row and also the features with the biggest feature importance.