In [2]:
import findspark
# findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
findspark.init('../spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('multiple_iterations').getOrCreate()

from pyspark.ml import Pipeline
from pyspark.ml.classification import (RandomForestClassifier, GBTClassifier, DecisionTreeClassifier,\
                                       LogisticRegression, MultilayerPerceptronClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator


from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import ChiSqSelector

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Read the previouly cleaned dataset

In [3]:
df = spark.read.csv('cleaned.csv', header=True)
# turn type string into double
cols = ['HighBP', 'HighChol', 'CholCheck',
       'Smoker', 'Stroke', 'HeartDiseaseorAttack', 'PhysActivity', 'Fruits',
       'Veggies', 'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost',
       'DiffWalk', 'Sex', 'BMI', 'GenHlth', 'MentHlth', 'PhysHlth', 'Age',
       'Education', 'Income']
df = df.withColumn('Diabetes',col('Diabetes').cast('double'))
for c in cols:
    df = df.withColumn(c,col(c).cast('double'))
    
assembler = VectorAssembler(inputCols=cols, outputCol="features")
df = assembler.transform(df)

from pyspark.sql.functions import col, explode, array, lit
major_df = df.filter(col("Diabetes") == 0)
minor_df = df.filter(col("Diabetes") == 1)
ratio = int(major_df.count()/minor_df.count())
sampled_majority_df = major_df.sample(False, 1/ratio)
balanced_data = sampled_majority_df.unionAll(minor_df)

                                                                                

#### Instead of keeping 15 features, we can iterate with more or less number of features.

13 Features:

In [4]:
selector_13 = ChiSqSelector(featuresCol="features", outputCol="selected_features", \
                         labelCol="Diabetes", numTopFeatures=13)
model_13 = selector_13.fit(balanced_data)
df_13 = model_13.transform(balanced_data)
selected_indices_13 = model_13.selectedFeatures
selected_feature_names_13 = [df_13.columns[index] for index in selected_indices_13]
selected_feature_names_13

                                                                                

['Diabetes',
 'HighBP',
 'HighChol',
 'CholCheck',
 'Smoker',
 'Stroke',
 'HeartDiseaseorAttack',
 'PhysActivity',
 'Fruits',
 'Veggies',
 'AnyHealthcare',
 'NoDocbcCost',
 'DiffWalk']

18 Features:

In [5]:
selector_18 = ChiSqSelector(featuresCol="features", outputCol="selected_features", \
                         labelCol="Diabetes", numTopFeatures=18)
model_18 = selector_18.fit(balanced_data)
df_18 = model_18.transform(balanced_data)
selected_indices_18 = model_18.selectedFeatures
selected_feature_names_18 = [df_18.columns[index] for index in selected_indices_18]
selected_feature_names_18

                                                                                

['Diabetes',
 'HighBP',
 'HighChol',
 'CholCheck',
 'Smoker',
 'Stroke',
 'HeartDiseaseorAttack',
 'PhysActivity',
 'Fruits',
 'Veggies',
 'AnyHealthcare',
 'NoDocbcCost',
 'DiffWalk',
 'Sex',
 'BMI',
 'GenHlth',
 'MentHlth',
 'PhysHlth']

In [6]:
df_18.printSchema()

root
 |-- Diabetes: double (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- HeartDiseaseorAttack: double (nullable = true)
 |-- PhysActivity: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Veggies: double (nullable = true)
 |-- HvyAlcoholConsump: double (nullable = true)
 |-- AnyHealthcare: double (nullable = true)
 |-- NoDocbcCost: double (nullable = true)
 |-- DiffWalk: double (nullable = true)
 |-- Sex: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- GenHlth: double (nullable = true)
 |-- MentHlth: double (nullable = true)
 |-- PhysHlth: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- selected_features: vector (nullable = true)



Train test data set split:

In [7]:
train_data_13,test_data_13 = df_13.select(['Diabetes', 'features']).randomSplit([0.8,0.2])
train_data_18,test_data_18 = df_18.select(['Diabetes', 'features']).randomSplit([0.8,0.2])

Fit Random Forest models:

In [8]:
rfc_13 = RandomForestClassifier(labelCol='Diabetes',featuresCol='features',numTrees=5)
rfc_model_13 = rfc_13.fit(train_data_13)
rfc_predictions_13 = rfc_model_13.transform(test_data_13)

rfc_18 = RandomForestClassifier(labelCol='Diabetes',featuresCol='features',numTrees=5)
rfc_model_18 = rfc_18.fit(train_data_18)
rfc_predictions_18 = rfc_model_18.transform(test_data_18)

24/05/23 22:21:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [9]:
# Random Forest Feature Importances
rfc_importances_13 = rfc_model_13.featureImportances
print("Random Forest Feature Importances with 13 features selected:")
for i, (col, importance) in enumerate(zip(selected_feature_names_13, rfc_importances_13)):
    print(f"{col}: {importance}")
    
rfc_importances_18 = rfc_model_18.featureImportances
print("\nRandom Forest Feature Importances with 18 features selected:")
for i, (col, importance) in enumerate(zip(selected_feature_names_18, rfc_importances_18)):
    print(f"{col}: {importance}")

Random Forest Feature Importances with 13 features selected:
Diabetes: 0.2946739432390753
HighBP: 0.18010727160221165
HighChol: 0.001884897754532452
CholCheck: 0.0
Smoker: 0.0
Stroke: 0.07125229299318833
HeartDiseaseorAttack: 0.00040143573109477387
PhysActivity: 0.0
Fruits: 0.0
Veggies: 0.0037464973386321008
AnyHealthcare: 0.0
NoDocbcCost: 0.0
DiffWalk: 0.08955484588924426

Random Forest Feature Importances with 18 features selected:
Diabetes: 0.19038681054852624
HighBP: 0.18324729287987845
HighChol: 0.0016371884569633514
CholCheck: 0.0
Smoker: 0.0012985031082045101
Stroke: 0.0774202830560662
HeartDiseaseorAttack: 0.0016105034010983695
PhysActivity: 0.00012035488822141431
Fruits: 0.0
Veggies: 0.006218193637338278
AnyHealthcare: 0.0
NoDocbcCost: 0.0
DiffWalk: 0.07963826962368326
Sex: 0.0
BMI: 0.05009016013139143
GenHlth: 0.37843599492649216
MentHlth: 0.00018196688478028434
PhysHlth: 0.0018211825845470414


In [11]:
result_eval = BinaryClassificationEvaluator(labelCol = 'Diabetes')

rfc_auc_13 = result_eval.evaluate(rfc_predictions_13, {result_eval.metricName: "areaUnderROC"})
rfc_acc_13 = result_eval.evaluate(rfc_predictions_13)
print("Area Under Curve for 13 features:",rfc_auc_13)
print("Accuracy for 13 features:",rfc_acc_13)

rfc_auc_18 = result_eval.evaluate(rfc_predictions_18, {result_eval.metricName: "areaUnderROC"})
rfc_acc_18 = result_eval.evaluate(rfc_predictions_18)
print("Area Under Curve for 18 features:",rfc_auc_18)
print("Accuracy for 18 features:",rfc_acc_18)


                                                                                

Area Under Curve for 13 features: 0.8042322455957966
Accuracy for 13 features: 0.8042322455957965




Area Under Curve for 18 features: 0.8057071074055722
Accuracy for 18 features: 0.8057071074055722


                                                                                

In [14]:
# iterate through numTrees = [5, 10, 20, 50, 100, 150, 200]
num_Trees = [5, 10, 20, 50, 100, 150, 200]
result_eval = BinaryClassificationEvaluator(labelCol = 'Diabetes')
for numtree in num_Trees:
    rfc = RandomForestClassifier(labelCol='Diabetes',featuresCol='features',numTrees=numtree)
    rfc_model = rfc.fit(train_data_18)
    rfc_predictions = rfc_model.transform(test_data_18)
    rfc_auc = result_eval.evaluate(rfc_predictions, {result_eval.metricName: "areaUnderROC"})
    rfc_acc = result_eval.evaluate(rfc_predictions)
    print("Area Under Curve for",  numtree, "trees:",rfc_auc)
    print("Accuracy for",  numtree, "trees:",rfc_acc)

                                                                                

Area Under Curve for 5 trees: 0.8057071074055723
Accuracy for 5 trees: 0.805707107405572


                                                                                

Area Under Curve for 10 trees: 0.8084779431638714
Accuracy for 10 trees: 0.8084780250192647


                                                                                

Area Under Curve for 20 trees: 0.809138352476854
Accuracy for 20 trees: 0.8091343927222041


                                                                                

Area Under Curve for 50 trees: 0.8125208015018174
Accuracy for 50 trees: 0.8125212414745564


                                                                                

Area Under Curve for 100 trees: 0.8129806344054917
Accuracy for 100 trees: 0.8129760505034677


                                                                                

Area Under Curve for 150 trees: 0.8134656378425997
Accuracy for 150 trees: 0.8134620566691437


24/05/23 22:36:42 WARN DAGScheduler: Broadcasting large task binary with size 1035.0 KiB
24/05/23 22:36:45 WARN DAGScheduler: Broadcasting large task binary with size 1035.0 KiB
                                                                                

Area Under Curve for 200 trees: 0.8137769952947884
Accuracy for 200 trees: 0.8137680423611482
