In [26]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_methods_adv').getOrCreate()

In [27]:
# Load training data. 
data = spark.read.csv('./Full_data.csv', inferSchema = True, header = True)

In [28]:
# Let's get an idea of what the data looks like. 
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- ap_hi: integer (nullable = true)
 |-- ap_lo: integer (nullable = true)
 |-- cholesterol: integer (nullable = true)
 |-- gluc: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- Age_Group: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- cardio: integer (nullable = true)



In [29]:
data.head()

Row(age=10878, gender=1, weight=59.0, ap_hi=120, ap_lo=80, cholesterol=1, gluc=1, active=1, Age_Group=2, BMI=19.26530612, cardio=0)

In [30]:
# We can use the describe method get some general statistics on our data too. Remember to show the DataFrame!
# But what about data type?
data.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+----------------+------------------+------------------+-------------------+-----------------+------------------+
|summary|              age|            gender|            weight|             ap_hi|             ap_lo|     cholesterol|              gluc|            active|          Age_Group|              BMI|            cardio|
+-------+-----------------+------------------+------------------+------------------+------------------+----------------+------------------+------------------+-------------------+-----------------+------------------+
|  count|            10000|             10000|             10000|             10000|             10000|           10000|             10000|             10000|              10000|            10000|             10000|
|   mean|       19449.3376|            1.3454| 74.30370999999998|           128.037|           97.9541|           1.365|            1.22

In [31]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns: "label" and "features".

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [32]:
# Let's visualise the columns to help with assembly. 
data.columns

['age',
 'gender',
 'weight',
 'ap_hi',
 'ap_lo',
 'cholesterol',
 'gluc',
 'active',
 'Age_Group',
 'BMI',
 'cardio']

In [33]:
# Combine all features into one vector named features.
assembler = VectorAssembler(
  inputCols=['age',
             'gender',
             'weight',
             'ap_hi',
             'ap_lo',
             'cholesterol',
             'gluc',
             'active',
             'Age_Group',
             'BMI'],
              outputCol="features")

In [34]:
# Let's transform the data. 
output = assembler.transform(data)

In [35]:
# Let's import the string indexer (similar to the logistic regression exercises).
from pyspark.ml.feature import StringIndexer

In [36]:
indexer = StringIndexer(inputCol="cardio", outputCol="CardioIndex")
output_fixed = indexer.fit(output).transform(output)

In [37]:
# Let's select the two columns we want. Features (which contains vectors), and the predictor.
final_data = output_fixed.select("features",'CardioIndex')

In [38]:
# Split the training and testing set.
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [39]:
train_data.describe().show()

+-------+------------------+
|summary|       CardioIndex|
+-------+------------------+
|  count|              7066|
|   mean|0.4973110670818002|
| stddev|0.5000281535279102|
|    min|               0.0|
|    max|               1.0|
+-------+------------------+



In [40]:
test_data.describe().show()

+-------+-------------------+
|summary|        CardioIndex|
+-------+-------------------+
|  count|               2934|
|   mean|0.49591002044989774|
| stddev| 0.5000684986306095|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+



In [41]:
# Let's import the relevant classifiers. 
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [42]:
# Use defaults to make the comparison "fair". This simplifies the comparison process.

dtc = DecisionTreeClassifier(labelCol='CardioIndex',featuresCol='features')
rfc = RandomForestClassifier(labelCol='CardioIndex',featuresCol='features', numTrees = 20)
gbt = GBTClassifier(labelCol='CardioIndex',featuresCol='features')

In [43]:
# Train the models (it's three models, so it might take some time).
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [44]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [45]:
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'CardioIndex')

In [46]:
# This is the area under the curve. This indicates that the data is highly seperable.
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

# RFC improves accuracy but also model complexity. RFC outperforms DTC in nearly every situation.
print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

# We can't repeat these exact steps for GBT. If you print the schema of all three, you may be able to notice why.
# Instead, let's redefine the object:
my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='CardioIndex', rawPredictionCol='prediction')
print("GBT")
print(my_binary_gbt_eval.evaluate(gbt_predictions))

# Interesting, GBT didn't perform as well as RFC or DTC. But that's because we left the model's settings as default. 
# In most cases, we should adjust these parameters. More trees may increase accuracy, but decrease precision and recall. 


DTC
0.7442374224248296
RFC
0.7927381973052269
GBT
0.7282846448213128


In [47]:
# Let's import the evaluator.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [48]:
# Select (prediction, true label) and compute test error. 
acc_evaluator = MulticlassClassificationEvaluator(labelCol="CardioIndex", predictionCol="prediction", metricName="accuracy")

In [49]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [50]:
# Let's do something a bit more complex in terms of printing, just so it's formatted nicer. 
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 72.70%
----------------------------------------
A random forest ensemble has an accuracy of: 72.87%
----------------------------------------
An ensemble using GBT has an accuracy of: 72.77%
