# Prediction of potential risk of coronary heart disease

In [53]:
from pyspark import Row

from pyspark.sql import SparkSession
from pyspark.sql.functions import corr

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression

spark = SparkSession.builder.appName("heart_disease").getOrCreate()

df = spark.read.csv(path="data_cardiovascular_risk.csv", inferSchema=True, header=True)

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- education: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- is_smoking: string (nullable = true)
 |-- cigsPerDay: double (nullable = true)
 |-- BPMeds: double (nullable = true)
 |-- prevalentStroke: integer (nullable = true)
 |-- prevalentHyp: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- totChol: double (nullable = true)
 |-- sysBP: double (nullable = true)
 |-- diaBP: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- heartRate: double (nullable = true)
 |-- glucose: double (nullable = true)
 |-- TenYearCHD: integer (nullable = true)



### Count rows before drop null rows

In [54]:
print(df.count())

3390


In [55]:
droped = df.dropna()

### Count rows after drop null rows

In [56]:
print(droped.count())

2927


### Replace categorical columns with numeric columns

In [58]:
indexer = StringIndexer(inputCols=['sex', 'is_smoking'], outputCols=['sex_cat', 'is_smoking_cat'])
indexed = indexer.fit(droped).transform(droped)
indexed.select(['sex', 'is_smoking', 'sex_cat', 'is_smoking_cat' ]).show(10)

+---+----------+-------+--------------+
|sex|is_smoking|sex_cat|is_smoking_cat|
+---+----------+-------+--------------+
|  M|        NO|    1.0|           0.0|
|  F|       YES|    0.0|           1.0|
|  M|       YES|    1.0|           1.0|
|  F|       YES|    0.0|           1.0|
|  F|        NO|    0.0|           0.0|
|  M|        NO|    1.0|           0.0|
|  M|       YES|    1.0|           1.0|
|  F|        NO|    0.0|           0.0|
|  F|        NO|    0.0|           0.0|
|  M|       YES|    1.0|           1.0|
+---+----------+-------+--------------+
only showing top 10 rows



### Create features Vector

In [59]:
assembler = VectorAssembler(inputCols=['age', 'education', 'cigsPerDay', 'BPMeds', 'prevalentStroke',
                             'prevalentHyp', 'diabetes', 'totChol', 'sysBP', 'diaBP', 'BMI',
                             'heartRate', 'glucose', 'sex_cat', 'is_smoking_cat'],
                            outputCol="features")
output = assembler.transform(indexed)
final_data = output.select(["features", "TenYearCHD"])
final_data.show(10)

+--------------------+----------+
|            features|TenYearCHD|
+--------------------+----------+
|[36.0,4.0,0.0,0.0...|         0|
|[46.0,1.0,10.0,0....|         0|
|[50.0,1.0,20.0,0....|         1|
|[64.0,1.0,30.0,0....|         0|
|[61.0,3.0,0.0,0.0...|         1|
|[61.0,1.0,0.0,0.0...|         0|
|[36.0,4.0,35.0,0....|         0|
|(15,[0,1,7,8,9,10...|         0|
|(15,[0,1,7,8,9,10...|         0|
|[44.0,1.0,40.0,0....|         0|
+--------------------+----------+
only showing top 10 rows



### Divide the data to train and test data and check distribution

In [60]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

train_data.describe().show()

test_data.describe().show()

+-------+-------------------+
|summary|         TenYearCHD|
+-------+-------------------+
|  count|               2034|
|   mean|0.15191740412979352|
| stddev| 0.3590290796596648|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+-------+------------------+
|summary|        TenYearCHD|
+-------+------------------+
|  count|               893|
|   mean|0.1511758118701008|
| stddev|0.3584209036421307|
|    min|                 0|
|    max|                 1|
+-------+------------------+



### Train data and evaluete test data

In [None]:
disease_lr = LinearRegression(labelCol="TenYearCHD")

trained_disease_model = disease_lr.fit(train_data)

prediction_results = trained_disease_model.evaluate(test_data)

### Check results

In [63]:
prediction_results.residuals.show(10)

+--------------------+
|           residuals|
+--------------------+
| 0.08666855499698228|
|  0.0900765171492714|
|-0.00729876292817...|
| 0.05401499782812674|
| 0.05133043671483739|
| 0.04138121357884006|
| 0.06971411606445549|
| 0.03793237749182621|
| 0.05907279914849878|
|0.006055685773997954|
+--------------------+
only showing top 10 rows



In [64]:
prediction_results.r2

0.10477742909698706

In [65]:
prediction_results.rootMeanSquaredError

0.3389343143480262

### The most correlation column with TenYearCHD

In [66]:
indexed.select(corr(col1='TenYearCHD', col2='age')).show()

+---------------------+
|corr(TenYearCHD, age)|
+---------------------+
|  0.23442278429915242|
+---------------------+



### Test with real data

In [68]:
real_data = [Row(age=29, education=4.0,cigsPerDay=0.0, BPMeds=0.0, prevalentStroke=0,
                prevalentHyp=1, diabetes=0, totChol=250.0, sysBP=130.0, diaBP=80.0,
                BMI=26.2, heartRate=72.0, glucose=87, sex_cat=1.0, is_smoking_cat=0.0),
             Row(age=64, education=4.0,cigsPerDay=10.0, BPMeds=1.0, prevalentStroke=1,
                prevalentHyp=1, diabetes=1, totChol=300.0, sysBP=146.0, diaBP=90.0,
                BMI=37.0, heartRate=100.0, glucose=190, sex_cat=1.0, is_smoking_cat=1.0),
             ]
real_df = spark.createDataFrame(real_data)
transform_real_data = assembler.transform(real_df)
real_data = transform_real_data.select(["features"])

real_prediction = trained_disease_model.transform(real_data)
real_prediction.show()

+--------------------+-------------------+
|            features|         prediction|
+--------------------+-------------------+
|[29.0,4.0,0.0,0.0...|0.02699482613603954|
|[64.0,4.0,10.0,1....| 0.6910128092139037|
+--------------------+-------------------+



In [69]:
spark.stop()