In [1]:
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
spark = SparkSession.builder.appName('stroke').getOrCreate()
df = spark.read.csv('/FileStore/tables/train.csv', inferSchema=True,header=True)
df.printSchema()

In [2]:
df = df.na.fill('Unknown', subset=['smoking_status'])

In [3]:
import pandas
data = df.toPandas()
print(data.describe())
print(data.dtypes)
print('\nCount missing value of each column:')
print(data.isnull().sum())

In [4]:
data.groupby('stroke').size()   #check whether is an imbalanced dataset

In [5]:
import sklearn
print(sklearn.__version__)

In [6]:
%sh
#need to run ***ONCE*** to install SMOTE package
/home/ubuntu/databricks/python/bin/pip install 'imbalanced-learn<0.2.1'
pip freeze | grep imbalanced-learn

In [7]:
%sh
pip install --upgrade pip

In [8]:
from pyspark.ml.feature import StringIndexer

categoricalColumns = ['gender', 'ever_married', 'work_type', 'Residence_type', 'smoking_status']
df_indexed = df

for categoricalCol in categoricalColumns:
  stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
  df_indexed = stringIndexer.fit(df_indexed).transform(df_indexed)
  
display(df_indexed)

id,gender,age,hypertension,heart_disease,ever_married,work_type,Residence_type,avg_glucose_level,bmi,smoking_status,stroke,genderIndex,ever_marriedIndex,work_typeIndex,Residence_typeIndex,smoking_statusIndex
30669,Male,3.0,0,0,No,children,Rural,95.12,18.0,Unknown,0,1.0,1.0,2.0,1.0,1.0
30468,Male,58.0,1,0,Yes,Private,Urban,87.96,39.2,never smoked,0,1.0,0.0,0.0,0.0,0.0
16523,Female,8.0,0,0,No,Private,Urban,110.89,17.6,Unknown,0,0.0,1.0,0.0,0.0,1.0
56543,Female,70.0,0,0,Yes,Private,Rural,69.04,35.9,formerly smoked,0,0.0,0.0,0.0,1.0,2.0
46136,Male,14.0,0,0,No,Never_worked,Rural,161.28,19.1,Unknown,0,1.0,1.0,4.0,1.0,1.0
32257,Female,47.0,0,0,Yes,Private,Urban,210.95,50.1,Unknown,0,0.0,0.0,0.0,0.0,1.0
52800,Female,52.0,0,0,Yes,Private,Urban,77.59,17.7,formerly smoked,0,0.0,0.0,0.0,0.0,2.0
41413,Female,75.0,0,1,Yes,Self-employed,Rural,243.53,27.0,never smoked,0,0.0,0.0,1.0,1.0,0.0
15266,Female,32.0,0,0,Yes,Private,Rural,77.67,32.3,smokes,0,0.0,0.0,0.0,1.0,3.0
28674,Female,74.0,1,0,Yes,Self-employed,Urban,205.84,54.6,never smoked,0,0.0,0.0,1.0,0.0,0.0


In [9]:
df_new = df_indexed.select(['genderIndex',
                            'age',
                            'hypertension',
                            'heart_disease',
                            'ever_marriedIndex',
                            'work_typeIndex',
                            'Residence_typeIndex',
                            'avg_glucose_level',
                            'bmi',
                            'smoking_statusIndex',
                            'stroke'])
display(df_new)

genderIndex,age,hypertension,heart_disease,ever_marriedIndex,work_typeIndex,Residence_typeIndex,avg_glucose_level,bmi,smoking_statusIndex,stroke
1.0,3.0,0,0,1.0,2.0,1.0,95.12,18.0,1.0,0
1.0,58.0,1,0,0.0,0.0,0.0,87.96,39.2,0.0,0
0.0,8.0,0,0,1.0,0.0,0.0,110.89,17.6,1.0,0
0.0,70.0,0,0,0.0,0.0,1.0,69.04,35.9,2.0,0
1.0,14.0,0,0,1.0,4.0,1.0,161.28,19.1,1.0,0
0.0,47.0,0,0,0.0,0.0,0.0,210.95,50.1,1.0,0
0.0,52.0,0,0,0.0,0.0,0.0,77.59,17.7,2.0,0
0.0,75.0,0,1,0.0,1.0,1.0,243.53,27.0,0.0,0
0.0,32.0,0,0,0.0,0.0,1.0,77.67,32.3,3.0,0
0.0,74.0,1,0,0.0,1.0,0.0,205.84,54.6,0.0,0


In [10]:
d = df_new.toPandas()
print(d.shape)
print(d.dtypes)

In [11]:
import numpy
from sklearn.utils import resample
from imblearn.over_sampling import SMOTE

sm = SMOTE(random_state=7)
x_val = d.values[:,0:10]
y_val = d.values[:,10]
X_res, y_res = sm.fit_sample(x_val, y_val)

feature=['genderIndex', 'age', 'hypertension', 'heart_disease', 'ever_marriedIndex', 'work_typeIndex', 'Residence_typeIndex', 'avg_glucose_level', 'bmi', 'smoking_statusIndex']
oversampled_df = pandas.DataFrame(X_res)
oversampled_df.columns = feature
oversampled_df = oversampled_df.assign(label = numpy.asarray(y_res))
oversampled_df = oversampled_df.sample(frac=1).reset_index(drop=True)

oversampling_attr = oversampled_df.values[:,0:10]
oversampling_label = oversampled_df.values[:,10]

print("oversampled_df", oversampled_df.groupby('label').size()) 


In [12]:
oversampled_df.describe()

In [13]:
spark_df = spark.createDataFrame(oversampled_df)
spark_df.printSchema()

In [14]:
from pyspark.ml.feature import VectorAssembler

stages = []
assembler = VectorAssembler(inputCols=['genderIndex',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedIndex',
 'work_typeIndex',
 'Residence_typeIndex',
 'avg_glucose_level',
 'bmi',
 'smoking_statusIndex'],outputCol='features')

stages += [assembler]

In [15]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(spark_df)
df_pipeline = pipelineModel.transform(spark_df)

In [16]:
train,test = df_pipeline.randomSplit([0.8,0.2])
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

### Measure the impact of core number

In [18]:
# Decision Tree (maxDepth=5, impurity="gini")
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(labelCol='label',featuresCol='features')

In [19]:
dt_model = dt.fit(train)
dt_predictions = dt_model.transform(test)
dt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_acc = dt_acc_evaluator.evaluate(dt_predictions)
print(dt_acc)

In [20]:
# Random Forest (maxDepth=5, impurity="gini", numTrees=20, subsamplingRate=1.0)
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')

In [21]:
rf_model = rf.fit(train)
rf_predictions = rf_model.transform(test)
rf_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_acc = rf_acc_evaluator.evaluate(rf_predictions)
print(rf_acc)

In [22]:
# Gradient-Boosted Tree Classifier (maxDepth=5, maxIter=5, subsamplingRate=1.0)

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol = 'features', labelCol = 'label', maxIter=5)

In [23]:
gbt_model = gbt.fit(train)
gbt_predictions = gbt_model.transform(test)
gbt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
gbt_acc = gbt_acc_evaluator.evaluate(gbt_predictions)
print(gbt_acc)

In [24]:
# Logistic Regression Classifier (threshold=0.5, maxIter=10)

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)

In [25]:
lr_model = lr.fit(train)
lr_predictions = lr_model.transform(test)
lr_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_acc = lr_acc_evaluator.evaluate(lr_predictions)
print(lr_acc)

### Measure the impact of scale

In [27]:
tr1 = train.limit(15000)
tr2 = train.limit(30000)
tr3 = train.limit(45000)
tr4 = train.limit(60000)

In [28]:
# Decision Tree (maxDepth=5, impurity="gini")
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(labelCol='label',featuresCol='features')

In [29]:
dt_model = dt.fit(tr1)
dt_predictions = dt_model.transform(test)
dt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_acc = dt_acc_evaluator.evaluate(dt_predictions)
print(dt_acc)

In [30]:
dt_model = dt.fit(tr2)
dt_predictions = dt_model.transform(test)
dt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_acc = dt_acc_evaluator.evaluate(dt_predictions)
print(dt_acc)

In [31]:
dt_model = dt.fit(tr3)
dt_predictions = dt_model.transform(test)
dt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_acc = dt_acc_evaluator.evaluate(dt_predictions)
print(dt_acc)

In [32]:
dt_model = dt.fit(tr4)
dt_predictions = dt_model.transform(test)
dt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_acc = dt_acc_evaluator.evaluate(dt_predictions)
print(dt_acc)

In [33]:
# Random Forest (maxDepth=5, impurity="gini", numTrees=20, subsamplingRate=1.0)
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')

In [34]:
rf_model = rf.fit(tr1)
rf_predictions = rf_model.transform(test)
rf_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_acc = rf_acc_evaluator.evaluate(rf_predictions)
print(rf_acc)

In [35]:
rf_model = rf.fit(tr2)
rf_predictions = rf_model.transform(test)
rf_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_acc = rf_acc_evaluator.evaluate(rf_predictions)
print(rf_acc)

In [36]:
rf_model = rf.fit(tr3)
rf_predictions = rf_model.transform(test)
rf_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_acc = rf_acc_evaluator.evaluate(rf_predictions)
print(rf_acc)

In [37]:
rf_model = rf.fit(tr4)
rf_predictions = rf_model.transform(test)
rf_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_acc = rf_acc_evaluator.evaluate(rf_predictions)
print(rf_acc)

In [38]:
# Gradient-Boosted Tree Classifier (maxDepth=5, maxIter=5, subsamplingRate=1.0)

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol = 'features', labelCol = 'label', maxIter=5)

In [39]:
gbt_model = gbt.fit(tr1)
gbt_predictions = gbt_model.transform(test)
gbt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
gbt_acc = gbt_acc_evaluator.evaluate(gbt_predictions)
print(gbt_acc)

In [40]:
gbt_model = gbt.fit(tr2)
gbt_predictions = gbt_model.transform(test)
gbt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
gbt_acc = gbt_acc_evaluator.evaluate(gbt_predictions)
print(gbt_acc)

In [41]:
gbt_model = gbt.fit(tr3)
gbt_predictions = gbt_model.transform(test)
gbt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
gbt_acc = gbt_acc_evaluator.evaluate(gbt_predictions)
print(gbt_acc)

In [42]:
gbt_model = gbt.fit(tr4)
gbt_predictions = gbt_model.transform(test)
gbt_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
gbt_acc = gbt_acc_evaluator.evaluate(gbt_predictions)
print(gbt_acc)

In [43]:
# Logistic Regression Classifier (threshold=0.5, maxIter=10)

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)

In [44]:
lr_model = lr.fit(tr1)
lr_predictions = lr_model.transform(test)
lr_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_acc = lr_acc_evaluator.evaluate(lr_predictions)
print(lr_acc)

In [45]:
lr_model = lr.fit(tr2)
lr_predictions = lr_model.transform(test)
lr_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_acc = lr_acc_evaluator.evaluate(lr_predictions)
print(lr_acc)

In [46]:
lr_model = lr.fit(tr3)
lr_predictions = lr_model.transform(test)
lr_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_acc = lr_acc_evaluator.evaluate(lr_predictions)
print(lr_acc)

In [47]:
lr_model = lr.fit(tr4)
lr_predictions = lr_model.transform(test)
lr_acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_acc = lr_acc_evaluator.evaluate(lr_predictions)
print(lr_acc)