# Machine learning using Databricks

### Logistic Regression model for Coronary Heart Disease Prediction

1. Data Cleaning in Spark
2. Feature Engineering in Databricks
3. Logistic regression model training & evaluation
4. Model tuning

In [3]:
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [4]:
hd = spark.read.format("delta").load("/newwave_workshop/predict_heart_disease/delta/heart_disease")
hd.count()

In [5]:
display(hd)

Demographic:  
• Sex: male or female(Nominal)  
• Age: Age of the patient;(Continuous - Although the recorded ages have been truncated to whole numbers, the concept of age is continuous)

Behavioral  
• Current Smoker: whether or not the patient is a current smoker (Nominal)  
• Cigs Per Day: the number of cigarettes that the person smoked on average in one day.(can be considered continuous as one can have any number of cigarettes, even half a cigarette.) 

Medical( history)  
• BP Meds: whether or not the patient was on blood pressure medication (Nominal)  
• Prevalent Stroke: whether or not the patient had previously had a stroke (Nominal)  
• Prevalent Hyp: whether or not the patient was hypertensive (Nominal)  
• Diabetes: whether or not the patient had diabetes (Nominal)  

Medical(current)  
• Tot Chol: total cholesterol level (Continuous)  
• Sys BP: systolic blood pressure (Continuous)  
• Dia BP: diastolic blood pressure (Continuous)  
• BMI: Body Mass Index (Continuous)  
• Heart Rate: heart rate (Continuous - In medical research, variables such as heart rate though in fact discrete, yet are considered continuous because of large number of possible values.)  
• Glucose: glucose level (Continuous)  

Predict variable (desired target)  
• 10 year risk of coronary heart disease CHD (binary: “1”, means “Yes”, “0” means “No”)

In [7]:
hd.printSchema()

#### 1. Data cleaning in Spark

In [9]:
display(hd.describe())

In [10]:
hd = hd.withColumnRenamed('male','gender')

In [11]:
display(hd.select([count(when(col(c)=='NA',c)).alias(c) for c in hd.columns]))

In [12]:
# education (1,2,3,4 four levels)
display(hd.groupBy('education').count().orderBy('education'))

In [13]:
# currentSmoker(1,0)
# cigsPerDay: the number of cigarettes that the person smoked on average in one day.(can be considered continuous as one can have any number of cigarettes, even half a cigarette.)
display(hd.select('currentSmoker','cigsPerDay').where(col('cigsPerDay')=='NA'))

In [14]:
# BPMeds: whether or not the patient was on blood pressure medication (1:96%,0:3%)
display(hd.select('BPMeds'))

In [15]:
display(hd.select('heartRate'))

In [16]:
# education: (1,2,3,4 four levels) use most populer one: 1
# cigsPerDay:use average 
# BPMeds:whether or not the patient was on blood pressure medication (1:96%,0:3%), use most populer one: 0
# totChol:total cholesterol level (Continuous),use average
# BMI:Body Mass Index (Continuous),use average
# heartRate:use average
# glucose:glucose level,use average

In [17]:
avg_cigsPerDay = hd.where((col("currentSmoker") == 1) & (col("cigsPerDay") != 'NA')).agg(avg("cigsPerDay")).first()[0]
avg_totChol = hd.where((col("totChol") != 'NA')).agg(avg("totChol")).first()[0]
avg_BMI = hd.where((col("BMI") != 'NA')).agg(avg("BMI")).first()[0]
avg_heartRate = hd.where((col("heartRate") != 'NA')).agg(avg("heartRate")).first()[0]
avg_glucose = hd.where((col("glucose") != 'NA')).agg(avg("glucose")).first()[0]

In [18]:
# data imputation

hd = (hd.withColumn("education_imp", when(col("education") == 'NA', 1).otherwise(col("education")).cast(DoubleType()))
        .withColumn("cigsPerDay_imp", when(col("cigsPerDay") == 'NA', avg_cigsPerDay).otherwise(col("cigsPerDay")).cast(DoubleType()))
        .withColumn("BPMeds_imp", when(col("BPMeds") == 'NA', 0).otherwise(col("BPMeds")).cast(DoubleType()))
        .withColumn("totChol_imp", when(col("totChol") == 'NA', avg_totChol).otherwise(col("totChol")).cast(DoubleType()))
        .withColumn("BMI_imp", when(col("BMI") == 'NA', avg_BMI).otherwise(col("BMI")).cast(DoubleType()))
        .withColumn("heartRate_imp", when(col("heartRate") == 'NA', avg_heartRate).otherwise(col("heartRate")).cast(DoubleType()))
        .withColumn("glucose_imp", when(col("glucose") == 'NA', avg_glucose).otherwise(col("glucose")).cast(DoubleType()))
     )

In [19]:
display(hd.describe())

####2. Feature Engineering in Databricks

In [21]:
display(hd.groupBy("TenYearCHD").count().alias("count"))

In [22]:
# BalancingRatio= numNegatives/dataset_size
BalancingRatio = hd.where(col("TenYearCHD") =='0').count()/hd.count()
BalancingRatio

In [23]:
# For Outcome = 1, we put BalancingRatio in column “classWeights”
# For Outcome = 0, we put (1-BalancingRatio) in column “classWeights”
hd1 = hd.withColumn("classWeights", when(col("TenYearCHD") =='1',BalancingRatio).otherwise(1-BalancingRatio))

In [24]:
hd1.write.mode("overwrite").format('delta').option("overwriteSchema", True).save('/newwave_workshop/predict_heart_disease/delta/hd_for_lr/')

In [25]:
from pyspark.ml.feature import VectorAssembler

# We will use `VectorAssembler` to combine all feature columns into a feature vector (optimized data structure for ML)
input_cols = ['gender', 'age', 'currentSmoker', 'prevalentStroke', 'prevalentHyp', 'diabetes', 'sysBP', 'diaBP', 'education_imp', 'cigsPerDay_imp', 'BPMeds_imp', 'totChol_imp', 'BMI_imp', 'heartRate_imp', 'glucose_imp']

vectorAssembler = VectorAssembler(inputCols=input_cols, outputCol="features", handleInvalid='skip')

hd2 = vectorAssembler.transform(hd1)

df = hd2.withColumnRenamed('TenYearCHD','label').select("features", "label", "classWeights")
display(df)

In [26]:
display(hd1)

#### 3. Logistic regression model training & evaluation

https://en.wikipedia.org/wiki/Logistic_regression#:~:text=Logistic%20regression%20is%20a%20statistical,a%20form%20of%20binary%20regression

In [29]:
#Logistic function
k = 1.
x0 = 0.
x = np.arange(-10., 10, 0.1)
y = [ 1/(1+ np.exp(-k * (x_ - x0))) for x_ in x]
display(plt.plot(x, y))

15 features : ![15 features](http://www.sciweavers.org/upload/Tex2Img_1594121026/render.png)  
Logistic function : ![Logistic Regression](http://www.sciweavers.org/upload/Tex2Img_1594120942/render.png)

In [31]:
train_df, test_df = df.randomSplit([0.75, 0.25])

In [32]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, weightCol="classWeights" )

# Train model with Training Data
lrModel = lr.fit(train_df)

In [33]:
predictions = lrModel.transform(test_df)

In [34]:
selected = predictions.select("label", "prediction", "probability")
display(selected)

In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# default metric for the BinaryClassificationEvaluator is areaUnderROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

print("areaUnderROC is: ", evaluator.evaluate(predictions))

https://en.wikipedia.org/wiki/Binary_classification
https://en.wikipedia.org/wiki/Receiver_operating_characteristic#:~:text=A%20receiver%20operating%20characteristic%20curve,its%20discrimination%20threshold%20is%20varied.

In [37]:
df_result = predictions.select("label", "prediction").toPandas()

cnf_matrix = pd.DataFrame(df_result.groupby(['label','prediction']).size()).reset_index()
cnf_matrix.columns=['label', 'prediction', 'count']
cnf_matrix

In [38]:
sensitivity = predictions.where(col('label')==1).agg(avg('prediction')).first()[0]
specificity = predictions.where(col('label')==0).agg(avg(lit(1) - col('prediction'))).first()[0]
print("sensitivity is: ", sensitivity)
print("specificity is: ", specificity)

####4. Model tuning

In [40]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [41]:
paramGrid

Cross Validation

https://en.wikipedia.org/wiki/Cross-validation_(statistics)#:~:text=Cross%2Dvalidation%2C%20sometimes%20called%20rotation,to%20an%20independent%20data%20set.

In [43]:
lr = LogisticRegression(labelCol="label", featuresCol="features", weightCol="classWeights")
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_df)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [44]:
cvModel.bestModel.intercept

In [45]:
cvModel.bestModel.coefficientMatrix

In [46]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(test_df)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print("areaUnderROC is: ", evaluator.evaluate(predictions))

In [47]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability")
display(selected)

In [48]:
df_result = predictions.select("label", "prediction").toPandas()

cnf_matrix = pd.DataFrame(df_result.groupby(['label','prediction']).size()).reset_index()
cnf_matrix.columns=['label', 'prediction', 'count']
cnf_matrix

In [49]:
sensitivity = predictions.where(col('label')==1).agg(avg('prediction')).first()[0]
specificity = predictions.where(col('label')==0).agg(avg(lit(1) - col('prediction'))).first()[0]
print("sensitivity is: ", sensitivity)
print("specificity is: ", specificity)

&copy; 2020 NewWave. Data Science Group