In [7]:
from pyspark.sql import SparkSession,DataFrame,Column,Row,GroupedData,DataFrameNaFunctions,DataFrameStatFunctions,functions,Window
import pyspark.sql as sparksql
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,StringIndexer)
from pyspark.sql.functions import mean
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import os

spark = SparkSession.builder.appName('stroke').getOrCreate()


In [2]:
train = spark.read.csv('/Users/ferdinand/Desktop/CEBD1261_assignment/assignment4/healthcare-dataset-stroke-data/train_2v.csv', inferSchema=True,header=True)

In [8]:
# Data Exploration
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [9]:
train.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  783|
|     0|42617|
+------+-----+



In [3]:
train.createOrReplaceTempView('table')

In [None]:
spark.sql("SELECT work_type, count(work_type) AS work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY work_type_count DESC").show()
spark.sql("SELECT gender, count(gender) AS count_gender FROM table GROUP BY gender").show()
spark.sql("SELECT gender, count(gender) FROM table WHERE stroke = '1' and gender = 'Male' GROUP BY gender").show()
spark.sql("SELECT gender, count(gender) FROM table WHERE stroke = '1' and gender = 'Female' GROUP BY gender").show()
spark.sql("SELECT age, count(age) as age_count FROM table WHERE stroke == 1 GROUP BY age ORDER BY age_count DESC").show()

In [10]:
train.filter((train['stroke'] == 1) & (train['age'] > '50')).count()

IllegalArgumentException: 'Unsupported class file major version 57'

In [11]:
# fill in missing values
train_f = train.na.fill('No Info', subset=['smoking_status'])
# fill in miss values with mean
mean = train_f.select(mean(train_f['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train_f.na.fill(mean_bmi,['bmi'])

IllegalArgumentException: 'Unsupported class file major version 57'

In [None]:
assembler = VectorAssembler(inputCols=['genderVec',
                                        'age',
                                        'hypertension',
                                        'heart_disease',
                                        'ever_marriedVec',
                                        'work_typeVec',
                                        'Residence_typeVec',
                                        'avg_glucose_level',
                                        'bmi',
                                        'smoking_statusVec'],outputCol='features')

In [None]:
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [None]:
pipeline = Pipeline(stages=[gender_indexer, ever_married_indexer, work_type_indexer, Residence_type_indexer,
                           smoking_status_indexer, gender_encoder, ever_married_encoder, work_type_encoder,
                           Residence_type_encoder, smoking_status_encoder, assembler, dtc])

In [None]:
train_data,test_data = train_f.randomSplit([0.7,0.3])

In [None]:
model = pipeline.fit(train_data)

In [None]:
dtc_predictions = model.transform(test_data)

In [None]:
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))