# Imports & Data Loads

In [14]:
# PySpark Initialization


import findspark
findspark.init('/usr/local/spark')

In [15]:
# Imports

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import classification_report, confusion_matrix





In [16]:
# CReating Spark Session

spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('heart_failure_clinical_records_dataset.csv', header = True, inferSchema = True)
df.show(5)

+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
| age|anaemia|creatinine_phosphokinase|diabetes|ejection_fraction|high_blood_pressure|platelets|serum_creatinine|serum_sodium|sex|smoking|time|DEATH_EVENT|
+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
|75.0|      0|                     582|       0|               20|                  1| 265000.0|             1.9|         130|  1|      0|   4|          1|
|55.0|      0|                    7861|       0|               38|                  0|263358.03|             1.1|         136|  1|      0|   6|          1|
|65.0|      0|                     146|       0|               20|                  0| 162000.0|             1.3|         129|  1|      1|   7|          1|
|50.0|      1|                     111|       0|               2

In [17]:
# Variables

target = "DEATH_EVENT"
numericCols = ['age',
 'anaemia',
 'creatinine_phosphokinase',
 'diabetes',
 'ejection_fraction',
 'high_blood_pressure',
 'platelets',
 'serum_creatinine',
 'serum_sodium',
 'sex',
 'smoking',
 'time']

In [18]:
df.select(target).show()

+-----------+
|DEATH_EVENT|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          0|
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 20 rows



In [19]:
print("Total data count = ", df.count())
print("Total featues = ", len(df.columns) - 1)
print(df.columns)
print("The Schema", df.printSchema())
print("Statistics", df.describe().show())

Total data count =  299
Total featues =  12
['age', 'anaemia', 'creatinine_phosphokinase', 'diabetes', 'ejection_fraction', 'high_blood_pressure', 'platelets', 'serum_creatinine', 'serum_sodium', 'sex', 'smoking', 'time', 'DEATH_EVENT']
root
 |-- age: double (nullable = true)
 |-- anaemia: integer (nullable = true)
 |-- creatinine_phosphokinase: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- ejection_fraction: integer (nullable = true)
 |-- high_blood_pressure: integer (nullable = true)
 |-- platelets: double (nullable = true)
 |-- serum_creatinine: double (nullable = true)
 |-- serum_sodium: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- smoking: integer (nullable = true)
 |-- time: integer (nullable = true)
 |-- DEATH_EVENT: integer (nullable = true)

The Schema None
+-------+------------------+------------------+------------------------+------------------+------------------+-------------------+------------------+------------------+-------

# Random Forest

In [20]:

assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
print(assembler)

VectorAssembler_fb3e00509ca8


In [21]:
out_df = assembler.transform(df)
out_df.show()

+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+--------------------+
| age|anaemia|creatinine_phosphokinase|diabetes|ejection_fraction|high_blood_pressure|platelets|serum_creatinine|serum_sodium|sex|smoking|time|DEATH_EVENT|            features|
+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+--------------------+
|75.0|      0|                     582|       0|               20|                  1| 265000.0|             1.9|         130|  1|      0|   4|          1|[75.0,0.0,582.0,0...|
|55.0|      0|                    7861|       0|               38|                  0|263358.03|             1.1|         136|  1|      0|   6|          1|[55.0,0.0,7861.0,...|
|65.0|      0|                     146|       0|               20|                  0| 162000.0|             1.3|  

In [22]:
model_df = out_df.select(["features", target])
model_df.show(truncate=False)

+--------------------------------------------------------------+-----------+
|features                                                      |DEATH_EVENT|
+--------------------------------------------------------------+-----------+
|[75.0,0.0,582.0,0.0,20.0,1.0,265000.0,1.9,130.0,1.0,0.0,4.0]  |1          |
|[55.0,0.0,7861.0,0.0,38.0,0.0,263358.03,1.1,136.0,1.0,0.0,6.0]|1          |
|[65.0,0.0,146.0,0.0,20.0,0.0,162000.0,1.3,129.0,1.0,1.0,7.0]  |1          |
|[50.0,1.0,111.0,0.0,20.0,0.0,210000.0,1.9,137.0,1.0,0.0,7.0]  |1          |
|[65.0,1.0,160.0,1.0,20.0,0.0,327000.0,2.7,116.0,0.0,0.0,8.0]  |1          |
|[90.0,1.0,47.0,0.0,40.0,1.0,204000.0,2.1,132.0,1.0,1.0,8.0]   |1          |
|[75.0,1.0,246.0,0.0,15.0,0.0,127000.0,1.2,137.0,1.0,0.0,10.0] |1          |
|[60.0,1.0,315.0,1.0,60.0,0.0,454000.0,1.1,131.0,1.0,1.0,10.0] |1          |
|[65.0,0.0,157.0,0.0,65.0,0.0,263358.03,1.5,138.0,0.0,0.0,10.0]|1          |
|[80.0,1.0,123.0,0.0,35.0,1.0,388000.0,9.4,133.0,1.0,1.0,10.0] |1          |

In [23]:
train_df, test_df = model_df.randomSplit([0.67, 0.33], seed=1)
print(train_df.count())
print(test_df.count())

203
96


In [24]:

rf_model = RandomForestClassifier(labelCol = target, numTrees=50)
rf_classifier = rf_model.fit(train_df)

rf_prediction = rf_classifier.transform(test_df)
rf_prediction.show()


+--------------------+-----------+--------------------+--------------------+----------+
|            features|DEATH_EVENT|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|[40.0,0.0,624.0,0...|          0|[46.8304599995571...|[0.93660919999114...|       0.0|
|[40.0,1.0,101.0,0...|          0|[47.7367168617189...|[0.95473433723437...|       0.0|
|[42.0,1.0,86.0,0....|          0|[47.3845057775709...|[0.94769011555141...|       0.0|
|[42.0,1.0,250.0,1...|          1|[17.6214379648799...|[0.35242875929759...|       1.0|
|[43.0,1.0,358.0,0...|          0|[44.1657455266273...|[0.88331491053254...|       0.0|
|[44.0,0.0,84.0,1....|          0|[47.3833270662320...|[0.94766654132464...|       0.0|
|[44.0,0.0,582.0,1...|          0|[32.4355190924756...|[0.64871038184951...|       0.0|
|[45.0,0.0,2060.0,...|          0|[45.3303029558752...|[0.90660605911750...|       0.0|
|[45.0,0.0,7702.0,...|          

In [25]:


evaluator = BinaryClassificationEvaluator(labelCol="DEATH_EVENT")
accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))


rf_y_true = rf_prediction.select(['DEATH_EVENT']).collect()
rf_y_pred = rf_prediction.select(['prediction']).collect()


print(classification_report(rf_y_true, rf_y_pred))

Accuracy = 0.8818858560794046
Test Error = 0.11811414392059538
              precision    recall  f1-score   support

           0       0.88      0.88      0.88        65
           1       0.74      0.74      0.74        31

    accuracy                           0.83        96
   macro avg       0.81      0.81      0.81        96
weighted avg       0.83      0.83      0.83        96



# Logistic Regression

In [26]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression

In [27]:
#indexer = StringIndexer(inputCol=" ", ouputCol="")

In [34]:
logistic_model = LogisticRegression(labelCol = target)
logistic_classifier = logistic_model.fit(train_df)


logistic_output = logistic_classifier.evaluate(test_df).predictions
logistic_output.show()

# logistic_prediction = logistic_classifier.transform(test_df)
# logistic_prediction.show()

+--------------------+-----------+--------------------+--------------------+----------+
|            features|DEATH_EVENT|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|[40.0,0.0,624.0,0...|          0|[5.43632757946729...|[0.99566343869484...|       0.0|
|[40.0,1.0,101.0,0...|          0|[4.11688092331894...|[0.98396601652362...|       0.0|
|[42.0,1.0,86.0,0....|          0|[4.88112377849984...|[0.99246866989706...|       0.0|
|[42.0,1.0,250.0,1...|          1|[-1.4940742452822...|[0.18331099284352...|       1.0|
|[43.0,1.0,358.0,0...|          0|[1.35309420898581...|[0.79463503312921...|       0.0|
|[44.0,0.0,84.0,1....|          0|[2.14680618482791...|[0.89536994955701...|       0.0|
|[44.0,0.0,582.0,1...|          0|[4.05723605641176...|[0.98299733071852...|       0.0|
|[45.0,0.0,2060.0,...|          0|[7.59640311786337...|[0.99949799739373...|       0.0|
|[45.0,0.0,7702.0,...|          

In [35]:
logistic_output.select([target, "prediction"]).show()

+-----------+----------+
|DEATH_EVENT|prediction|
+-----------+----------+
|          0|       0.0|
|          0|       0.0|
|          0|       0.0|
|          1|       1.0|
|          0|       0.0|
|          0|       0.0|
|          0|       0.0|
|          0|       0.0|
|          1|       1.0|
|          0|       0.0|
|          1|       1.0|
|          0|       0.0|
|          1|       0.0|
|          1|       1.0|
|          1|       1.0|
|          0|       0.0|
|          0|       1.0|
|          0|       0.0|
|          1|       1.0|
|          0|       0.0|
+-----------+----------+
only showing top 20 rows



In [42]:
tp = logistic_output[(logistic_output.DEATH_EVENT==1) & (logistic_output.prediction==1)].count()
print("True Positive, tp = ", tp)

tn = logistic_output[(logistic_output.DEATH_EVENT==0) & (logistic_output.prediction==0)].count()
print("True Negative, tn = ", tn)

fp = logistic_output[(logistic_output.DEATH_EVENT==0) & (logistic_output.prediction==1)].count()
print("False Positive, tp = ", fp)

fn = logistic_output[(logistic_output.DEATH_EVENT==1) & (logistic_output.prediction==0)].count()
print("False Negative, tp = ", fn)

True Positive, tp =  24
True Negative, tn =  54
False Positive, tp =  11
False Negative, tp =  7


In [46]:
accuracy = float(tp+tn) / float(logistic_output.count())
print("Accuracy = ", accuracy)

recall = float(tn) / float(tp+tn)
print("Recall = ", recall)

precesion = float(tp) / float(tp+fn)
print("Precision = ", precesion)

f1_score = 2 * ( (precesion*recall) / (precesion+recall) )
print("F1 Score = ", f1_score)

Accuracy =  0.8125
Recall =  0.6923076923076923
Precision =  0.7741935483870968
F1 Score =  0.7309644670050761


In [48]:
log_evaluator = BinaryClassificationEvaluator(labelCol="DEATH_EVENT")
log_accuracy = log_evaluator.evaluate(logistic_output)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))



log_y_true = logistic_output.select(['DEATH_EVENT']).collect()
log_y_pred = logistic_output.select(['prediction']).collect()


print(classification_report(log_y_true, log_y_pred))

Accuracy = 0.8125
Test Error = 0.1875
              precision    recall  f1-score   support

           0       0.89      0.83      0.86        65
           1       0.69      0.77      0.73        31

    accuracy                           0.81        96
   macro avg       0.79      0.80      0.79        96
weighted avg       0.82      0.81      0.82        96

