In [1]:
#Load Dataset

from pyspark.context import SparkContext
sc=SparkContext.getOrCreate()

heartRdd=sc.textFile("/FileStore/tables/data.csv")
heartRdd.cache()
heartRdd.count()

In [2]:
#Remove the first line (contains headers)
heartRdd = heartRdd.filter(lambda x: "cp" not in x)
heartRdd.count()

In [3]:
import math
from pyspark.ml.linalg import Vectors, VectorUDT
#from pyspark.ml.linalg import Vectors

avgChol =sc.broadcast(250)
avgFbs = sc.broadcast(0.06)
avgRrestecg =sc.broadcast(0.21)
avgThalach=sc.broadcast(139.1)
avgExang=sc.broadcast(0.3)
avgSlop=sc.broadcast(1.89)
avgCa=sc.broadcast(0)
avgThal=sc.broadcast(5.6)
avgTres=sc.broadcast(132.5)

def transformToVector( inputStr) :

    attList=inputStr.split(",")    
    
    cholValue = attList[4]
    if cholValue == "?":
        cholValue=avgChol.value
    fbsValue = attList[5]
    if fbsValue == "?":
        fbsValue=avgFbs.value
    restecgValue = attList[6]
    if restecgValue == "?":
        restecgValue=avgRrestecg.value
    thalachValue = attList[7]
    if thalachValue == "?":
        thalachValue=avgThalach.value
    exangValue = attList[8]
    if exangValue == "?":
        exangValue=avgExang.value
    slopValue = attList[10]
    if slopValue == "?":
        slopValue=avgSlop.value
    caValue = attList[11]
    if caValue == "?":
        caValue=avgCa.value
    thalValue = attList[12]
    if thalValue == "?":
        thalValue=avgThal.value
    tresetValue= attList[3]
    if tresetValue =="?":
       tresetValue=avgTres.value
        
    
    #Filter out columns not wanted at this stage
    values= Vectors.dense([float(attList[13]), \
                     float(attList[2]),\
                     tresetValue,\
                     cholValue,  \
                     fbsValue,  \
                     restecgValue,  \
                     thalachValue,  \
                     exangValue,  \
                     float(attList[9]),\
                     slopValue,  \
                     caValue, \
                     thalValue \
                    ])

  
    return values





In [4]:
heartVectors = heartRdd.map(transformToVector)
heartVectors.collect()

In [5]:
def transformToLabeledPoint(inStr) :
    labelPoint = (float(inStr[0]), Vectors.dense([inStr[1],inStr[2],inStr[3],inStr[4],inStr[5],inStr[6], inStr[7],inStr[8],inStr[9], inStr[10], inStr[11]]))
    return labelPoint
  
heartLp = heartVectors.map(transformToLabeledPoint)
heartDF = sqlContext.createDataFrame(heartLp,["label", "features"])
heartDF.select("label","features").show(10)

In [6]:
(trainingData, testData) = heartDF.randomSplit([0.9, 0.1])
trainingData.count()

In [7]:
testData.count()

In [8]:
#Setup pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
nbClassifier=NaiveBayes()
pipeline = Pipeline(stages=[nbClassifier])
model = pipeline.fit(trainingData)

In [9]:
predictions=model.transform(testData)

In [10]:
predictions.filter(predictions['prediction'] == 0) \
    .select("label","features","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 23, truncate = 30)

In [11]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [12]:
predictions.groupBy("label","prediction").count().show()

In [13]:

dtClassifer = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)
dtModel = dtClassifer.fit(trainingData)


In [14]:
predictions1 = dtModel.transform(testData)
  
predictions1.filter(predictions1['prediction'] == 0) \
    .select("label","features","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 23, truncate = 30)


In [15]:

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions1, {evaluator.metricName: "areaUnderROC"})))

In [16]:
prediction1=dtModel.transform(testData)

In [17]:
prediction1.groupBy("label","prediction").count().show()

In [18]:
accuracy = evaluator.evaluate(predictions1)
print("Test Error of DTModel = %g" % (1.0 - accuracy))

In [19]:
accuracy = evaluator.evaluate(predictions)
print("Test Error of BCModel = %g" % (1.0 - accuracy))