In [152]:
#import required libraries
import pyspark
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Earlier we would create a new context but here the entry point to programming Spark with the Dataset and DataFrame API is SparkSession. So, a new session is built and the dataframe is created from the input dataset. 

In [151]:
spark = SparkSession.builder.appName('Classifier').getOrCreate()
data = spark.read.csv('heart.csv',header = True,inferSchema = True)
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



Rename the 'target' column to 'label'.

In [153]:
data = data.withColumnRenamed("target", "label")
data

DataFrame[age: int, sex: int, cp: int, trestbps: int, chol: int, fbs: int, restecg: int, thalach: int, exang: int, oldpeak: double, slope: int, ca: int, thal: int, label: int]

We can see that the dataset is now a dataframe.

In [120]:
type(data)

pyspark.sql.dataframe.DataFrame

In [121]:
data.show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+-----+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|label|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+-----+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|    1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|    1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|    1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|    1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|    1|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|    1|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|    1|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|    0.0|    2|  0|   3|    1|
| 52|  1|  2|     172| 199|  1|      1|    162|    0|    0.5|    2|  0|   3|    1|
| 57

There are 2 labels, where 0 implies that a person has heart disease and 1 implies that the person does not have heart disease.

In [154]:
data.select("label").distinct().show()

+-----+
|label|
+-----+
|    1|
|    0|
+-----+



In order to be given to a classifer, the attributes need to be concatenated to one column called features. So, for this the label is removed from our features and VectorAssembler() will concatenate the columns. Then the assembler is applied to the original data and transformed accordingly.

In [155]:
X = data.columns
X.remove('label')

In [156]:
assembler=VectorAssembler(inputCols = X,outputCol = 'features') 
out=assembler.transform(data) 

In [157]:
proc_data=out.select('features','label') 

We can see that the features column has been created.

In [158]:
proc_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[63.0,1.0,3.0,145...|    1|
|[37.0,1.0,2.0,130...|    1|
|[41.0,0.0,1.0,130...|    1|
|[56.0,1.0,1.0,120...|    1|
|[57.0,0.0,0.0,120...|    1|
|[57.0,1.0,0.0,140...|    1|
|[56.0,0.0,1.0,140...|    1|
|[44.0,1.0,1.0,120...|    1|
|[52.0,1.0,2.0,172...|    1|
|[57.0,1.0,2.0,150...|    1|
|[54.0,1.0,0.0,140...|    1|
|[48.0,0.0,2.0,130...|    1|
|[49.0,1.0,1.0,130...|    1|
|[64.0,1.0,3.0,110...|    1|
|[58.0,0.0,3.0,150...|    1|
|[50.0,0.0,2.0,120...|    1|
|[58.0,0.0,2.0,120...|    1|
|[66.0,0.0,3.0,150...|    1|
|[43.0,1.0,0.0,150...|    1|
|[69.0,0.0,3.0,140...|    1|
+--------------------+-----+
only showing top 20 rows



Index labels, adding metadata to the label column. Fit on whole dataset to include all labels in index.

In [140]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(proc_data)

Automatically identify categorical features, and index them. Set maxCategories so features with > 4 distinct values are treated as continuous.

In [159]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(proc_data)

Split the data into training and test sets (30% held out for testing)

In [142]:
(trainingData, testData) = proc_data.randomSplit([0.7, 0.3])

Train a RandomForest model using inbuilt package. Here we're considering number of trees as 10.

In [161]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

Convert indexed labels back to original labels.

In [162]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

Chain indexers and forest in a Pipeline.

In [163]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

Train the model. This also runs the indexers.

In [164]:
model = pipeline.fit(trainingData)

In [147]:
rfModel = model.stages[2]
print(rfModel)  

RandomForestClassificationModel (uid=RandomForestClassifier_fb7a9672f022) with 10 trees


Make predictions by applying the test data to the trained model. Some example rows are displayed.

In [165]:
predictions = model.transform(testData)

In [166]:
predictions.select("predictedLabel", "label", "features").show(5)

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|             1|    1|(13,[0,2,3,4,7,10...|
|             1|    1|(13,[0,3,4,7,9,10...|
|             1|    1|(13,[0,3,4,7,9,10...|
|             1|    1|(13,[0,3,4,7,10,1...|
|             1|    1|[29.0,1.0,1.0,130...|
+--------------+-----+--------------------+
only showing top 5 rows



Using MulticlassClassificationEvaluator(), get the accuracy of the predictions.

In [167]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))

Test Accuracy = 0.850575
