# Group JABS 
## Notebook for Pipeline Creation and Tuning

## __Please note that there is some repetition of code from the previous notebook so that this file may be run independently.__


The purpose of this notebook is to showcase pipeline creation and more robust model creation and evaluation. Please see the section labelled "Pipeline and Cross Validation" for new code (compared to the previous notebook).

Please also note that the benchmark model was create in this notebook.

# Pulling in the data

In [2]:
# import zipfile

In [3]:
# with zipfile.ZipFile("ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1.zip", "r") as zip_ref:


In [4]:
#import wfdb
import pandas as pd
import numpy as np
import ast
import pyspark.sql.functions as F
from pyspark.sql.functions import col,sum, isnan
from scipy.signal import find_peaks
from scipy import sparse
from scipy.sparse.linalg import spsolve
import matplotlib.pyplot as plt 



## Load from csv

In [5]:
Y = pd.read_csv('ECG_features.csv', index_col='ecg_id')

In [6]:
#Resetting the index so we can include the ecg_id column when converting to a Spark datafram
Y.reset_index(drop=False, inplace=True)
Y.index += 1

In [7]:
len(Y.columns)

42

In [8]:
Y.sample(10)

Unnamed: 0,ecg_id,patient_id,age,sex,height,weight,nurse,site,device,recording_date,...,HYP,CD,bpm,bif,bif2,TRinterval,TRratio,PRinterval,PRratio,QRinterval
19434,19434,9321.0,69.0,0,,,0.0,0.0,CS100 3,1998-07-26 12:52:32,...,50.0,,70.257611,0.046838,0.017525,266.0,0.268318,327.0,0.101439,43.0
21588,21588,17435.0,67.0,0,,,0.0,0.0,CS100 3,2000-11-26 13:34:18,...,,,66.257669,0.077301,0.024478,766.666667,0.193526,148.888889,0.193526,71.111111
21377,21377,18433.0,82.0,0,,,0.0,0.0,CS100 3,2000-09-20 07:55:14,...,,,129.032258,0.666667,0.187788,163.684211,0.124978,210.0,0.094251,153.157895
17884,17884,13588.0,33.0,1,,50.0,0.0,0.0,CS-12 E,1997-05-11 09:42:14,...,,,80.783354,0.080783,0.025084,407.272727,0.138047,139.090909,0.076199,61.818182
8254,8254,12080.0,60.0,1,,,1.0,2.0,CS-12,1992-06-22 17:28:17,...,,,61.146497,0.621656,0.162294,435.0,0.231547,162.5,0.187036,62.5
13145,13145,18250.0,67.0,1,,,1.0,2.0,CS-12,1994-10-06 18:16:39,...,,,68.35443,0.04557,0.014977,263.333333,0.211568,138.888889,0.041927,37.777778
10990,10990,1122.0,44.0,1,158.0,,10.0,1.0,AT-6 C 5.5,1993-09-11 10:59:06,...,,,126.666667,1.33,0.320694,209.473684,0.073049,171.052632,0.050314,53.684211
17862,17862,1895.0,76.0,1,174.0,73.0,6.0,1.0,AT-6 6,1997-05-05 14:14:00,...,,100.0,56.40423,0.169213,0.0627,501.25,0.181249,187.5,0.146324,68.75
11500,11500,13887.0,90.0,1,,,1.0,2.0,CS-12,1993-12-11 15:48:16,...,,100.0,96.10984,0.048055,0.013145,241.428571,0.182264,170.0,0.127693,43.571429
15963,15963,12535.0,34.0,1,,59.0,0.0,0.0,CS-12 E,1996-04-18 09:00:13,...,,,82.191781,0.534247,0.176672,245.833333,0.138094,321.666667,0.098816,179.166667


In [9]:
Y = Y.replace(float("nan"), 0)

In [10]:
Y['NORM'] = Y['NORM'].astype(bool).astype(int)
Y['STTC'] = Y['STTC'].astype(bool).astype(int)
Y['MI'] = Y['MI'].astype(bool).astype(int)
Y['HYP'] = Y['HYP'].astype(bool).astype(int)
Y['CD'] = Y['CD'].astype(bool).astype(int)

In [11]:
#converting to Spark df
from pyspark.sql import SparkSession
from pyspark import SQLContext
import os

spark = SparkSession.builder \
        .master("local") \
        .appName("mllib_classifier") \
        .getOrCreate()
sc = spark.sparkContext

sql = SQLContext(sc)

In [12]:
# spark_df = sql.createDataFrame(Y)
from pyspark.sql.types import *
mySchema = StructType([ 
    StructField("ecg_id", IntegerType(), True)\
    ,StructField("patient_id", FloatType(), True)\
    ,StructField("age", FloatType(), True)\
    ,StructField("sex", IntegerType(), True)\
    ,StructField("height", FloatType(), True)\
    ,StructField("weight", FloatType(), True)\
    ,StructField("nurse", FloatType(), True)\
    ,StructField("site", StringType(), True)\
    ,StructField("device", StringType(), True)\
    ,StructField("recording_date", StringType(), True)\
    ,StructField("report", StringType(), True)\
    ,StructField("scp_codes", StringType(), True)\
    ,StructField("heart_axis", StringType(), True)\
    ,StructField("infarction_stadium1", StringType(), True)\
    ,StructField("infarction_stadium2", StringType(), True)\
    ,StructField("validated_by", FloatType(), True)\
    ,StructField("second_opinion", StringType(), True)\
    ,StructField("initial_autogenerated_report", StringType(), True)\
    ,StructField("validated_by_human", StringType(), True)\
    ,StructField("baseline_drift", StringType(), True)\
    ,StructField("static_noise", StringType(), True)\
    ,StructField("burst_noise", StringType(), True)\
    ,StructField("electrodes_problems", StringType(), True)\
    ,StructField("extra_beats", StringType(), True)\
    ,StructField("pacemaker", StringType(), True)\
    ,StructField("strat_fold", StringType(), True)\
    ,StructField("filename_lr", StringType(), True)\
    ,StructField("filename_hr", StringType(), True)\
    ,StructField("diagnostic_superclass", StringType(), True)\
    ,StructField("NORM", IntegerType(), True)\
    ,StructField("MI", IntegerType(), True)\
    ,StructField("STTC", IntegerType(), True)\
    ,StructField("HYP", IntegerType(), True)\
    ,StructField("CD", IntegerType(), True)\
    ,StructField("bpm", FloatType(), True)\
    ,StructField("bif", FloatType(), True)\
    ,StructField("bif2", FloatType(), True)\
    ,StructField("TRinterval", FloatType(), True)\
    ,StructField("TRratio", FloatType(), True)\
    ,StructField("PRinterval", FloatType(), True)\
    ,StructField("PRratio", FloatType(), True)\
    ,StructField("QRinterval", FloatType(), True)\
])



In [13]:
df = spark.createDataFrame(Y,schema=mySchema)

In [14]:
print('counts of MI: ', df.filter(df['MI']==1).count())

counts of MI:  5486


In [15]:
df.count()

21837

### __Statistical summary of response variable__

In [16]:
#these numbers match the numbers on the physionet website 
print('counts of NORM: ', df.filter(df['NORM']!=0).count())
print('counts of MI: ', df.filter(df['MI']!=0).count())
print('counts of STTC: ', df.filter(df['STTC']!=0).count())
print('counts of CD: ', df.filter(df['CD']!=0).count())
print('counts of HYP: ', df.filter(df['HYP']!=0).count())

counts of NORM:  9528
counts of MI:  5486
counts of STTC:  5123
counts of CD:  4907
counts of HYP:  2655


In [16]:
df.groupby('strat_fold').count().show()

+----------+-----+
|strat_fold|count|
+----------+-----+
|         7| 2178|
|         3| 2194|
|         8| 2179|
|         5| 2176|
|         6| 2178|
|         9| 2193|
|         1| 2177|
|        10| 2203|
|         4| 2175|
|         2| 2184|
+----------+-----+



In [17]:
type(df)

pyspark.sql.dataframe.DataFrame

## Keep only columns we will need

In [26]:
# retain these predictors for Part 1
vars_to_keep = ["diagnostic_superclass","age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval","NORM", "MI","STTC","HYP","CD",
               "strat_fold"]

# subset the dataframe on these predictors
df2 = df[vars_to_keep]
df2.show()

+---------------------+----+---+---------+-----------+-----------+----------+----------+----------+-----------+----------+----+---+----+---+---+----------+
|diagnostic_superclass| age|sex|      bpm|        bif|       bif2|TRinterval|   TRratio|PRinterval|    PRratio|QRinterval|NORM| MI|STTC|HYP| CD|strat_fold|
+---------------------+----+---+---------+-----------+-----------+----------+----------+----------+-----------+----------+----+---+----+---+---+----------+
|      {'NORM': 100.0}|56.0|  1|47.244095| 0.16535433|0.058571953|     285.0|0.22329463| 151.66667| 0.10876027|      45.0|   1|  0|   0|  0|  0|         3|
|       {'NORM': 80.0}|19.0|  0| 63.75443| 0.07438017|0.022087706|  478.8889|0.22907318| 168.88889| 0.17119157|  61.11111|   1|  0|   0|  0|  0|         2|
|      {'NORM': 100.0}|37.0|  1| 74.40812| 0.14881623|0.045572452| 231.81818|0.25935754| 204.54546| 0.09707572| 75.454544|   1|  0|   0|  0|  0|         5|
|      {'NORM': 100.0}|24.0|  0| 66.29834| 0.16574585| 0.0541887

In [18]:
from pyspark.ml.linalg import DenseVector

In [19]:
#Vector Assembler is a useful function for this, 
# here is a useful tutorial: https://spark.apache.org/docs/latest/ml-features#vectorassembler

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval"],
    outputCol="features")

df2 = assembler.transform(df2)
df2.show()

+---------------------+----+---+---------+-----------+-----------+----------+----------+----------+-----------+----------+----+---+----+---+---+----------+--------------------+
|diagnostic_superclass| age|sex|      bpm|        bif|       bif2|TRinterval|   TRratio|PRinterval|    PRratio|QRinterval|NORM| MI|STTC|HYP| CD|strat_fold|            features|
+---------------------+----+---+---------+-----------+-----------+----------+----------+----------+-----------+----------+----+---+----+---+---+----------+--------------------+
|      {'NORM': 100.0}|56.0|  1|47.244095| 0.16535433|0.058571953|     285.0|0.22329463| 151.66667| 0.10876027|      45.0|   1|  0|   0|  0|  0|         3|[56.0,1.0,47.2440...|
|       {'NORM': 80.0}|19.0|  0| 63.75443| 0.07438017|0.022087706|  478.8889|0.22907318| 168.88889| 0.17119157|  61.11111|   1|  0|   0|  0|  0|         2|[19.0,0.0,63.7544...|
|      {'NORM': 100.0}|37.0|  1| 74.40812| 0.14881623|0.045572452| 231.81818|0.25935754| 204.54546| 0.09707572| 75.

In [20]:
# load modules
import pandas as pd
import pyspark.sql.functions as F
import pyspark.mllib.regression as reg
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.regression import LabeledPoint

In [21]:
# Feature scaling
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)

# Fit the DataFrame to the scaler; this computes the mean, standard deviation of each feature
scaler = standardScaler.fit(df2)

# Transform the data in `df2` with the scaler
scaled_df = scaler.transform(df2)

scaled_df.show()

+---------------------+----+---+---------+-----------+-----------+----------+----------+----------+-----------+----------+----+---+----+---+---+----------+--------------------+--------------------+
|diagnostic_superclass| age|sex|      bpm|        bif|       bif2|TRinterval|   TRratio|PRinterval|    PRratio|QRinterval|NORM| MI|STTC|HYP| CD|strat_fold|            features|     features_scaled|
+---------------------+----+---+---------+-----------+-----------+----------+----------+----------+-----------+----------+----+---+----+---+---+----------+--------------------+--------------------+
|      {'NORM': 100.0}|56.0|  1|47.244095| 0.16535433|0.058571953|     285.0|0.22329463| 151.66667| 0.10876027|      45.0|   1|  0|   0|  0|  0|         3|[56.0,1.0,47.2440...|[3.22901707019790...|
|       {'NORM': 80.0}|19.0|  0| 63.75443| 0.07438017|0.022087706|  478.8889|0.22907318| 168.88889| 0.17119157|  61.11111|   1|  0|   0|  0|  0|         2|[19.0,0.0,63.7544...|[1.09555936310286...|
|      {'N

In [23]:
scaled_df.select('features_scaled').show(10,False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_scaled                                                                                                                                                                                |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[3.2290170701979064,2.0017353669023907,2.7237711054696416,0.6134522334681485,0.731419493527598,1.3959372577100275,1.1713254574007597,2.099795681158586,0.7921416087791956,0.7367600389800208]  |
|[1.095559363102861,0.0,3.6756439453557546,0.2759448783056445,0.27582107140359,2.3456099563858066,1.2016377134218348,2.3382338932194275,1.2468521021593948,1.0005383176012221]                  |
|[2.133457707095045,2.00173536

In [27]:
# according to the documentation, we should reserve "strat_fold"'s #9 and #10 for testing
# seed = 314
train_data = scaled_df[scaled_df.strat_fold < 9]
test_data = scaled_df[scaled_df.strat_fold > 8]


In [25]:
print(train_data.count())
print(test_data.count())

17441
4396


## Training a few different models

In [27]:
from pyspark.ml.classification import LogisticRegression
maxIter=10
regParam=0.3
elasticNetParam=0.8


In [22]:
def sum_col(df, col): 
    return df.select(F.sum(col)).collect()[0][0]

def getpredictions(model, test_dat, column):
    predictions = model.transform(test_dat)
    predictions = predictions.withColumn('TP', F.when((F.col(column) == 1) & (F.col("prediction")==1), 1).otherwise(0))
    predictions = predictions.withColumn('TN', F.when((F.col(column) == 0) & (F.col("prediction")==0), 1).otherwise(0))
    predictions = predictions.withColumn('FP', F.when((F.col(column) == 0) & (F.col("prediction")==1), 1).otherwise(0))
    predictions = predictions.withColumn('FN', F.when((F.col(column) == 1) & (F.col("prediction")==0), 1).otherwise(0))
    accuracy = (sum_col(predictions, "TP")+sum_col(predictions, "TN"))/predictions.count()
    if (sum_col(predictions, "TP")+sum_col(predictions, "FP"))>0:
        precision = sum_col(predictions, "TP")/(sum_col(predictions, "TP")+sum_col(predictions, "FP"))
    else:
        precision = "Precision could not be calculated due to a lack of TP/FP values"
    if (sum_col(predictions, "TP")+sum_col(predictions, "FN"))>0:
        recall = sum_col(predictions, "TP")/(sum_col(predictions, "TP")+sum_col(predictions, "FN"))
    else:
        precision = "Recall could not be calculated due to a lack of TP/FN values"
    if ((2*sum_col(predictions, "TP")+sum_col(predictions, "FP")+sum_col(predictions, "FN")))>0:
        fmeasure = (2*sum_col(predictions, "TP"))/(2*sum_col(predictions, "TP")+sum_col(predictions, "FP")+sum_col(predictions, "FN"))
    else:
        fmeasure = "Fmeasure could not be calculated due to a lack of TP/FN/FP values"
    print("accuracy:"+str(accuracy))
    print("precision:"+str(precision))
    print("recall:"+str(recall))
    print("fmeasure:"+str(fmeasure))

### Benchmark Model with only 2 variables, age and sex

In [100]:
assembler = VectorAssembler(
    inputCols=["age","sex"],
    outputCol="ageandsex")

df_bm = assembler.transform(df)

model1 = LogisticRegression(featuresCol="ageandsex", labelCol="NORM",maxIter=3, regParam=1, elasticNetParam=1)
model_bm = model1.fit(df_bm)
print("this benchmark are for training data:")
getpredictions(model_bm, df_bm, "NORM")

this benchmark are for training data:
accuracy:0.5636763291660942
precision:Precision could not be calculated due to a lack of TP/FP values
recall:0.0
fmeasure:0.0


### Logistic Regression

In [98]:
from pyspark.ml.classification import LogisticRegression
maxIter=10
regParam=0.3
elasticNetParam=0.8

In [34]:
lrNORM = LogisticRegression(featuresCol="features_scaled", labelCol="NORM",maxIter=100, )
modelLR_NORM = lrNORM.fit(train_data)
getpredictions(modelLR_NORM, test_data, "NORM")

accuracy:0.6831210191082803
precision:0.6924198250728864
recall:0.49453409682457056
fmeasure:0.5769814758578804


In [35]:
lrSTTC = LogisticRegression(featuresCol="features_scaled", labelCol="STTC",maxIter=20, regParam=0.0, elasticNetParam=0)
modelLR_STTC = lrSTTC.fit(train_data)
getpredictions(modelLR_STTC, test_data, "STTC")

accuracy:0.7659235668789809
precision:Precision could not be calculated due to a lack of TP/FP values
recall:0.0
fmeasure:0.0


In [36]:
lrMI = LogisticRegression(featuresCol="features_scaled", labelCol="MI",maxIter=20, regParam=0.0, elasticNetParam=0.)
modelLR_MI = lrMI.fit(train_data)
getpredictions(modelLR_MI, test_data, "MI")

accuracy:0.7538671519563239
precision:0.5773195876288659
recall:0.05104831358249772
fmeasure:0.09380234505862646


In [37]:
lrHYP = LogisticRegression(featuresCol="features_scaled", labelCol="HYP",maxIter=20, regParam=0.01, elasticNetParam=0.01)
modelLR_HYP = lrHYP.fit(train_data)
getpredictions(modelLR_HYP, test_data, "HYP")

accuracy:0.87852593266606
precision:Precision could not be calculated due to a lack of TP/FP values
recall:0.0
fmeasure:0.0


In [38]:
lrCD = LogisticRegression(featuresCol="features_scaled", labelCol="CD",maxIter=20, regParam=0.01, elasticNetParam=0.01)
modelLR_CD = lrCD.fit(train_data)
getpredictions(modelLR_CD, test_data, "CD")

accuracy:0.7734303912647862
precision:0.0
recall:0.0
fmeasure:0.0


### Random Forest 

In [39]:
from pyspark.ml.classification import RandomForestClassifier

In [40]:
rfNORM = RandomForestClassifier(featuresCol="features_scaled", labelCol="NORM",maxDepth=10, numTrees=10, seed=10)
modelrf_NORM = rfNORM.fit(train_data)
getpredictions(modelrf_NORM, test_data, "NORM")

accuracy:0.6762966333030027
precision:0.6828193832599119
recall:0.4841228526808954
fmeasure:0.5665549802010357


In [41]:
rfSTTC = RandomForestClassifier(featuresCol="features_scaled", labelCol="STTC",maxDepth=10, numTrees=10, seed=10)
modelrf_STTC = rfSTTC.fit(train_data)
getpredictions(modelrf_STTC, train_data, "STTC")

accuracy:0.7859067714007224
precision:0.9522613065326633
recall:0.09257449926722032
fmeasure:0.16874443455031166


### Gradient Boost Tree

In [42]:
from pyspark.ml.classification import GBTClassifier

In [43]:
gbtNORM = GBTClassifier(featuresCol="features_scaled", labelCol="NORM", maxDepth=10, maxIter=10, seed=10)
modelgbt_NORM = gbtNORM.fit(train_data)
getpredictions(modelgbt_NORM, test_data, "NORM")

accuracy:0.6603730664240218
precision:0.6453804347826086
recall:0.49453409682457056
fmeasure:0.5599764220453876


### Linear SVM classifier

In [44]:
from pyspark.ml.classification import LinearSVC

In [45]:
svmNORM = LinearSVC(featuresCol="features_scaled", labelCol="NORM", maxIter=20, regParam=0.001)
modelsvm_NORM = svmNORM.fit(train_data)
getpredictions(modelsvm_NORM, test_data, "NORM")

accuracy:0.6364877161055505
precision:0.6551392891450528
recall:0.3550234252993233
fmeasure:0.46049966239027684


# Pipeline and Cross Validation 

### Logistic regression

In [23]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC


In [46]:
#start with df table loaded from .csv file
vars_to_keep = ["diagnostic_superclass","age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval","NORM", "MI","STTC","HYP","CD",
               "strat_fold"]

# subset the dataframe on these predictors
df2 = df[vars_to_keep]

#re-define train_data and test_data, assembler and standardScaler will be part of pipeline
train_data = df2[df2.strat_fold < 9]
test_data = df2[df2.strat_fold > 8]


In [56]:
train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")

# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and lr.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval"],
    outputCol="features")
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)
lr = LogisticRegression(featuresCol="features_scaled", maxIter=100)
pipeline = Pipeline(stages=[assembler, standardScaler, lr])

# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0, 0.2, 0.4, 0.6, 0.8, 1.0]) \
    .addGrid(lr.regParam, [0, 0.01, 0.05, 1, 2, 5]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=8)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_lr=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_lr.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_reg = bestModel_lr.stages[2]._java_obj.getRegParam()
best_elastic_net = bestModel_lr.stages[2]._java_obj.getElasticNetParam()

print("best regParam is:", best_reg)
print("best elasticNetParam is:", best_elastic_net )

len(paramGrid): 36
ROC for test is 0.7585495769774779
best regParam is: 0.01
best elasticNetParam is: 0.2


In [64]:
getpredictions(bestModel_lr, test, "label")

accuracy:0.6808462238398544
precision:0.6938622754491018
recall:0.48256116605934407
fmeasure:0.5692354927847713


In [65]:
#for MI

train=train_data.withColumnRenamed("MI","label")
test=test_data.withColumnRenamed("MI","label")

assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval"],
    outputCol="features")
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)

lr = LogisticRegression(featuresCol="features_scaled", maxIter=100)


pipeline = Pipeline(stages=[assembler, standardScaler, lr])

# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0, 0.2, 0.4, 0.6, 0.8, 1.0]) \
    .addGrid(lr.regParam, [0, 0.01, 0.05, 1, 2, 5]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the lr as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=8)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_lr_MI=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_lr_MI.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_reg = bestModel_lr_MI.stages[2]._java_obj.getRegParam()
best_elastic_net = bestModel_lr_MI.stages[2]._java_obj.getElasticNetParam()

print("best regParam is:", best_reg)
print("best elasticNetParam is:", best_elastic_net )


len(paramGrid): 36
ROC for test is 0.6895960019928093
best regParam is: 0.01
best elasticNetParam is: 0.4


In [66]:
getpredictions(bestModel_lr_MI, test, "label")

accuracy:0.7511373976342129
precision:0.5405405405405406
recall:0.018231540565177756
fmeasure:0.03527336860670194


### Random Forest

In [68]:
#for NORM

train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")

# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and rf.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval"],
    outputCol="features")
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)
rf = RandomForestClassifier(featuresCol="features_scaled", seed=2020)
pipeline = Pipeline(stages=[assembler, standardScaler, rf])




# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [4, 7, 10, 12]) \
    .addGrid(rf.numTrees, [10,100,200]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=10)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_rf=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_rf.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_Depth = bestModel_rf.stages[2]._java_obj.getMaxDepth()
best_numTree = bestModel_rf.stages[2]._java_obj.getNumTrees()

print("best maxDepth is:", best_Depth)
print("best numTrees is:", best_numTree )


len(paramGrid): 12
ROC for test is 0.7581011573307237
best maxDepth is: 7
best numTrees is: 200


In [69]:
getpredictions(bestModel_rf, test, "label")

accuracy:0.6867606915377616
precision:0.7111801242236024
recall:0.47683498178032274
fmeasure:0.5708943596135868


### Gradient Boost Tree

In [70]:
from pyspark.ml.classification import GBTClassifier

train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")


# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and gbt.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval"],
    outputCol="features")
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)
gbt = GBTClassifier(featuresCol="features_scaled", maxIter=10, seed=2020)
pipeline = Pipeline(stages=[assembler, standardScaler, gbt])



# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [4,7,10,15]) \
    .addGrid(gbt.stepSize, [0.25,0.1,0.05,0.01]) \
    .build()



print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=10)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_gbt=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_gbt.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_Depth = bestModel_gbt.stages[2]._java_obj.getMaxDepth()
best_step = bestModel_gbt.stages[2]._java_obj.getStepSize()

print("best maxDepth is:", best_Depth)
print("best stepSize is:", best_step )

len(paramGrid): 16
ROC for test is 0.756104512064948
best maxDepth is: 4
best stepSize is: 0.1


In [71]:
getpredictions(bestModel_gbt, test, "label")

accuracy:0.6792538671519563
precision:0.7085714285714285
recall:0.45184799583550234
fmeasure:0.5518118245390973


### SVM

In [72]:
from pyspark.ml.classification import LinearSVC

train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")

# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and svm.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2", "TRinterval", "TRratio","PRinterval","PRratio","QRinterval"],
    outputCol="features")
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)
svm = LinearSVC(featuresCol="features_scaled")
pipeline = Pipeline(stages=[assembler, standardScaler, svm])


# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(svm.maxIter, [20,40,60]) \
    .addGrid(svm.regParam, [0.25,0.1,0.01,0.001]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=10)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_svm=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_svm.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_Iter = bestModel_svm.stages[2]._java_obj.getMaxIter()
best_regParam = bestModel_svm.stages[2]._java_obj.getRegParam()

print("best maxIter is:", best_Iter)
print("best regParam is:", best_regParam )

len(paramGrid): 12
ROC for test is 0.7518752333328067
best maxIter is: 60
best regParam is: 0.25


In [73]:
getpredictions(bestModel_svm, test, "label")

accuracy:0.6530937215650592
precision:0.7487437185929648
recall:0.31025507548152004
fmeasure:0.4387191755612808


# Use only three generated features: bpm, bif, bif2, plus age and sex

### Logistic regression

In [74]:
train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")

# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and lr.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2"],
    outputCol="features2")
standardScaler = StandardScaler(inputCol="features2", outputCol="features2_scaled", 
                                withStd=True, withMean=False)
lr = LogisticRegression(featuresCol="features2_scaled", maxIter=100)
pipeline = Pipeline(stages=[assembler, standardScaler, lr])

# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0, 0.2, 0.4, 0.6, 0.8, 1.0]) \
    .addGrid(lr.regParam, [0, 0.01, 0.05, 1, 2, 5]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=8)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_lr=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_lr.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_reg = bestModel_lr.stages[2]._java_obj.getRegParam()
best_elastic_net = bestModel_lr.stages[2]._java_obj.getElasticNetParam()

print("best regParam is:", best_reg)
print("best elasticNetParam is:", best_elastic_net )



len(paramGrid): 36
ROC for test is 0.7590557317053933
best regParam is: 0.01
best elasticNetParam is: 0.6


In [75]:
getpredictions(bestModel_lr, test, "label")

accuracy:0.6819836214740673
precision:0.6955871353777113
recall:0.4841228526808954
fmeasure:0.570902394106814


### Random Forest

In [77]:
train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")

# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and rf.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2"],
    outputCol="features2")
standardScaler = StandardScaler(inputCol="features2", outputCol="features2_scaled", 
                                withStd=True, withMean=False)
rf = RandomForestClassifier(featuresCol="features2_scaled", seed=2020)
pipeline = Pipeline(stages=[assembler, standardScaler, rf])

# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [4, 7, 10, 12]) \
    .addGrid(rf.numTrees, [10,100,200]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=8)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_rf=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_rf.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_Depth = bestModel_rf.stages[2]._java_obj.getMaxDepth()
best_numTree = bestModel_rf.stages[2]._java_obj.getNumTrees()

print("best maxDepth is:", best_Depth)
print("best numTrees is:", best_numTree )


len(paramGrid): 12
ROC for test is 0.7594174961483713
best maxDepth is: 7
best numTrees is: 200


In [78]:
getpredictions(bestModel_rf, test, "label")

accuracy:0.6813011828935396
precision:0.708
recall:0.46069755335762624
fmeasure:0.5581835383159887


### Gradient Boost Tree

In [80]:
train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")


# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and gbt.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2"],
    outputCol="features2")
standardScaler = StandardScaler(inputCol="features2", outputCol="features2_scaled", 
                                withStd=True, withMean=False)
gbt = GBTClassifier(featuresCol="features2_scaled", maxIter=10, seed=2020)
pipeline = Pipeline(stages=[assembler, standardScaler, gbt])



# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [4,7,10,15]) \
    .addGrid(gbt.stepSize, [0.25,0.1,0.05,0.01]) \
    .build()



print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=10)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_gbt=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_gbt.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_Depth = bestModel_gbt.stages[2]._java_obj.getMaxDepth()
best_step = bestModel_gbt.stages[2]._java_obj.getStepSize()

print("best maxDepth is:", best_Depth)
print("best stepSize is:", best_step )

len(paramGrid): 16
ROC for test is 0.7587829360760141
best maxDepth is: 4
best stepSize is: 0.1


In [81]:
getpredictions(bestModel_gbt, test, "label")

accuracy:0.6799363057324841
precision:0.7086038961038961
recall:0.45445080687142114
fmeasure:0.5537583254043768


### SVM

In [82]:
train=train_data.withColumnRenamed("NORM","label")
test=test_data.withColumnRenamed("NORM","label")

# Configure an ML pipeline, which consists of tree stages: assembler, standardScaler, and svm.
assembler = VectorAssembler(
    inputCols=["age","sex", "bpm", "bif", "bif2"],
    outputCol="features2")
standardScaler = StandardScaler(inputCol="features2", outputCol="features2_scaled", 
                                withStd=True, withMean=False)
svm = LinearSVC(featuresCol="features2_scaled")
pipeline = Pipeline(stages=[assembler, standardScaler, svm])


# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(svm.maxIter, [20,40,60]) \
    .addGrid(svm.regParam, [0.25,0.1,0.01,0.001]) \
    .build()

print('len(paramGrid): {}'.format(len(paramGrid)))

# Treat the pipeline as an Estimator, wrapping it in a CrossValidator instance.
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10,
                          parallelism=10)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

bestModel_svm=cvModel.bestModel


# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = bestModel_svm.transform(test)

evaluator=BinaryClassificationEvaluator()
print("ROC for test is {}".format(evaluator.evaluate(prediction)))

best_Iter = bestModel_svm.stages[2]._java_obj.getMaxIter()
best_regParam = bestModel_svm.stages[2]._java_obj.getRegParam()

print("best maxIter is:", best_Iter)
print("best regParam is:", best_regParam )

len(paramGrid): 12
ROC for test is 0.7590226050194803
best maxIter is: 60
best regParam is: 0.25


In [83]:
getpredictions(bestModel_svm, test, "label")

accuracy:0.6599181073703366
precision:0.75177304964539
recall:0.3310775637688704
fmeasure:0.459703650162631
