# Loading and defining the dataframe

In [0]:
# File location and type
file_location = "/FileStore/tables/healthcare_dataset_stroke_data-1.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

id,gender,age,hypertension,heart_disease,ever_married,work_type,Residence_type,avg_glucose_level,bmi,smoking_status,stroke
9046,Male,67.0,0,1,Yes,Private,Urban,228.69,36.6,formerly smoked,1
51676,Female,61.0,0,0,Yes,Self-employed,Rural,202.21,,never smoked,1
31112,Male,80.0,0,1,Yes,Private,Rural,105.92,32.5,never smoked,1
60182,Female,49.0,0,0,Yes,Private,Urban,171.23,34.4,smokes,1
1665,Female,79.0,1,0,Yes,Self-employed,Rural,174.12,24.0,never smoked,1
56669,Male,81.0,0,0,Yes,Private,Urban,186.21,29.0,formerly smoked,1
53882,Male,74.0,1,1,Yes,Private,Rural,70.09,27.4,never smoked,1
10434,Female,69.0,0,0,No,Private,Urban,94.39,22.8,never smoked,1
27419,Female,59.0,0,0,Yes,Private,Rural,76.15,,Unknown,1
60491,Female,78.0,0,0,Yes,Private,Urban,58.57,24.2,Unknown,1


In [0]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [0]:
print("Before Count - "+str(df.count()))
# Removing Age more than 110
df = df.filter(f"age<100.0")
print("After Count - "+str(df.count()))

Before Count - 5110
After Count - 5110


In [0]:
from pyspark.sql.functions import col
# Defining Each column to either Numerical or Categorical. Ignored Id column as its just Primary Key and doesnt add Any Information to the model.
numerical_cols =  ["age","avg_glucose_level","bmi"]
categorical_cols = ["gender","hypertension","heart_disease","ever_married","work_type","Residence_type","smoking_status"]
target = "stroke"

#Converting BMI to Double as its referred as String due to NA values.
df = df.withColumn("bmi",col("bmi").cast("double"))

### Missing values imputation: numerical columns

In [0]:
# Imputing all numerical Columns for missing values with median
from pyspark.ml.feature import Imputer
df = Imputer(inputCols = numerical_cols,outputCols =numerical_cols,strategy="median").fit(df).transform(df)

### Normalizing numerical data

In [0]:
from pyspark.ml.feature import VectorAssembler,MinMaxScaler

numeric_assembler = VectorAssembler(inputCols = numerical_cols,outputCol = "numerical_cols")
df = numeric_assembler.transform(df)
scaler = MinMaxScaler(inputCol = "numerical_cols",outputCol = "numerical_scaled").fit(df)
df  = scaler.transform(df)

### Removing outliers with Z score

In [0]:
"""from pyspark.sql.functions import col,countDistinct,mean,stddev,lit
print("Count of dataframe rows before removing outliers = "+str(df.count()))
temp = df.select(numerical_cols).describe().toPandas()
for column in numerical_cols:
    avg = temp[column][1]
    std = temp[column][2]
    df = df.withColumn(column+"_zscore",((col(column)-lit(avg))/lit(std))).filter(f"{column+'_zscore'}<3").drop(column+"_zscore")
print("Count of dataframe rows after removing outliers = "+str(df.count()))"""

Out[7]: 'from pyspark.sql.functions import col,countDistinct,mean,stddev,lit\nprint("Count of dataframe rows before removing outliers = "+str(df.count()))\ntemp = df.select(numerical_cols).describe().toPandas()\nfor column in numerical_cols:\n    avg = temp[column][1]\n    std = temp[column][2]\n    df = df.withColumn(column+"_zscore",((col(column)-lit(avg))/lit(std))).filter(f"{column+\'_zscore\'}<3").drop(column+"_zscore")\nprint("Count of dataframe rows after removing outliers = "+str(df.count()))'

In [0]:
#StringIndexer - maps a string column of labels to an ML column of label indices
#OneHotEncoder - creates n-1 columns for column with 'n' categories
    #maps a column of category indices to a column of binary vectors
#VectorAssembler - merges multiple columns into a vector column

from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler

categorical_indexed = [item+"_indexed" for item in categorical_cols]
categorical_encoded = [item+"_encoded" for item in categorical_cols]

# String Indexing all categorical cols
indexer = StringIndexer(inputCols = categorical_cols,outputCols =categorical_indexed,handleInvalid= "keep").fit(df)
df = indexer.transform(df)

#Converting String Indexed cols to One hot Encoded Cols
encoder = OneHotEncoder(inputCols= categorical_indexed, outputCols = categorical_encoded).fit(df)
df = encoder.transform(df)


#Combining all OneHot Encoded cols to one vector
categorical_assembler = VectorAssembler(inputCols = categorical_encoded,outputCol = "categorical_cols")
df = categorical_assembler.transform(df)

In [0]:
#Combining categorical and numerical features
assembler = VectorAssembler(inputCols = ["categorical_cols","numerical_scaled"],outputCol = "features")
df = assembler.transform(df)

### Logistic regression

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
train,test = df.randomSplit([0.8,0.2], seed=0)
evaluator = BinaryClassificationEvaluator(labelCol=target)
lr = LogisticRegression(featuresCol = 'features', labelCol=target)

paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10,20]) \
    .addGrid(lr.regParam, [0.1]) \
    .build()

#three fold cross validation 
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,seed=1)
model = crossval.fit(train)



In [0]:
lr_predictions = model.transform(test)
print("areaUnderROC on test data = %g" % evaluator.evaluate(lr_predictions))

areaUnderROC on test data = 0.845928


### Random forest

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
train,test = df.randomSplit([0.8,0.2], seed=0)
evaluator = BinaryClassificationEvaluator(labelCol=target)
rf = RandomForestClassifier(featuresCol = 'features', labelCol=target)
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [40,50]) \
    .addGrid(rf.maxDepth, [3,5]) \
    .build()

#three fold cross validation 
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,seed=1)
model = crossval.fit(train)

In [0]:
rf_predictions = model.transform(test)
print("areaUnderROC on test data = %g" % evaluator.evaluate(rf_predictions))

areaUnderROC on test data = 0.872463


### MLP - Multilayer perceptron

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
train,test = df.randomSplit([0.8,0.2], seed=0)
evaluator = BinaryClassificationEvaluator(labelCol=target)
layers = [23, 5, 4, 2]
trainer = MultilayerPerceptronClassifier( layers=layers, blockSize=128, labelCol=target)
paramGrid = ParamGridBuilder() \
    .addGrid(trainer.maxIter, [90,100]) \
    .build()

#three fold cross validation 
crossval = CrossValidator(estimator=trainer,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,seed=1)
model = crossval.fit(train)

In [0]:
mlp_predictions = model.transform(test)
print("areaUnderROC on test data = %g" % evaluator.evaluate(mlp_predictions))

areaUnderROC on test data = 0.827418


### Gradient boosted trees (generally fits well for imbalanced datasets)

In [0]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
train,test = df.randomSplit([0.8,0.2], seed=0)
evaluator = BinaryClassificationEvaluator(labelCol=target)
gbt = GBTClassifier(featuresCol = 'features', labelCol=target)
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [25,35]) \
    .addGrid(gbt.maxDepth, [3]) \
    .build()

#three fold cross validation 
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,seed=1)
model = crossval.fit(train)

In [0]:
gbt_predictions = model.transform(test)
print("areaUnderROC on test data = %g" % evaluator.evaluate(gbt_predictions))

areaUnderROC on test data = 0.877655
