In [5]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("Heart Failure Predictions")\
    .getOrCreate()

df = spark.read\
    .option("header", True)\
    .csv("heart_failure.csv")

In [6]:
numeric_features = [
  "age",
  "creatinine_phosphokinase",
  "ejection_fraction",
  "high_blood_pressure",
  "platelets",
  "serum_creatinine",
  "serum_sodium",
  "DEATH_EVENT"
]
df = df[numeric_features]

In [7]:
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as f

df = df.withColumn("age", f.col("age").cast(DoubleType()))\
  .withColumn("creatinine_phosphokinase", f.col("creatinine_phosphokinase").cast(DoubleType()))\
  .withColumn("ejection_fraction", f.col("ejection_fraction").cast(DoubleType()))\
  .withColumn("high_blood_pressure", f.col("high_blood_pressure").cast(DoubleType()))\
  .withColumn("platelets", f.col("platelets").cast(DoubleType()))\
  .withColumn("serum_creatinine", f.col("serum_creatinine").cast(DoubleType()))\
  .withColumn("serum_sodium", f.col("serum_sodium").cast(DoubleType()))\
  .withColumn("DEATH_EVENT", f.col("DEATH_EVENT").cast(DoubleType()))\
  .withColumnRenamed("DEATH_EVENT", "label")

In [8]:
from pyspark.sql import Window
from pyspark.sql.types import BooleanType

# serum_creatinine is highly corellated to label
window = Window.partitionBy("label").orderBy("serum_creatinine")
udf = f.udf(lambda x: x % 5 == 0, BooleanType())

In [9]:
df = df.withColumn("_test_set", f.row_number().over(window))\
  .withColumn("_test_set", udf(f.col("_test_set")))

In [10]:
test = df.where(df["_test_set"] == True)
test = test.drop("_test_set")
train = df.where(df["_test_set"] == False)
train = train.drop("_test_set")

In [11]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

assembler = VectorAssembler(
    inputCols = ["age",
                 "creatinine_phosphokinase",
                 "ejection_fraction",
                 "high_blood_pressure",
                 "platelets",
                 "serum_creatinine",
                 "serum_sodium" ], 
    outputCol = "vec"
)
scaler = MinMaxScaler(
    inputCol = "vec", 
    outputCol = "features"
)
pipeline = Pipeline(stages = [
    assembler,
    scaler
])

In [12]:
pipeline = pipeline.fit(train)
train = pipeline.transform(train)
test = pipeline.transform(test)

In [13]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

paramGrid = ParamGridBuilder()\
  .addGrid(lr.regParam, [0.1, 0.01])\
  .addGrid(lr.elasticNetParam, [0.5, 0.8])\
  .build()

crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(metricName = "areaUnderPR"),
                          numFolds = 3)
cvModel = crossval.fit(train)

In [14]:
bestModel = cvModel.bestModel
test = bestModel.transform(test)

In [15]:
evaluator = BinaryClassificationEvaluator(metricName = "areaUnderPR")
evaluator.evaluate(test)

0.7002535895081374