### Introduction

Using a well-known and clean dataset containing cancer diagnoses, I'll implement and evaluate a Random Forest Classifier using PySpark. 

It goes without saying that using Spark isn't a necessity for this dataset, but a flash challenge like this is a great place to implement a quick machine learning pipeline using Spark. 

### Imports, Spark Session etc

In [1]:
import os, sys, pyspark
from pyspark.sql import SparkSession
import pandas as pd

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['SPARK_HOME'] = pyspark.__path__[0]

In [2]:
#Set up SparkSession
spark = SparkSession \
        .builder \
        .master('local[*]') \
        .appName("Cancer Data ML test") \
        .getOrCreate()

In [3]:
spark

In [4]:
#Read CSV and display schema
cancer_df = spark.read.csv('../data/cancer_data.csv', header=True, inferSchema=True)

cancer_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- diagnosis: string (nullable = true)
 |-- radius_mean: double (nullable = true)
 |-- texture_mean: double (nullable = true)
 |-- perimeter_mean: double (nullable = true)
 |-- area_mean: double (nullable = true)
 |-- smoothness_mean: double (nullable = true)
 |-- compactness_mean: double (nullable = true)
 |-- concavity_mean: double (nullable = true)
 |-- concave points_mean: double (nullable = true)
 |-- symmetry_mean: double (nullable = true)
 |-- fractal_dimension_mean: double (nullable = true)
 |-- radius_se: double (nullable = true)
 |-- texture_se: double (nullable = true)
 |-- perimeter_se: double (nullable = true)
 |-- area_se: double (nullable = true)
 |-- smoothness_se: double (nullable = true)
 |-- compactness_se: double (nullable = true)
 |-- concavity_se: double (nullable = true)
 |-- concave points_se: double (nullable = true)
 |-- symmetry_se: double (nullable = true)
 |-- fractal_dimension_se: double (nullable = true)
 |-- radi

### Quick exploration

In [5]:
#What does one entry look like? 
cancer_df.take(1)

[Row(id=842302, diagnosis='M', radius_mean=17.99, texture_mean=10.38, perimeter_mean=122.8, area_mean=1001.0, smoothness_mean=0.1184, compactness_mean=0.2776, concavity_mean=0.3001, concave points_mean=0.1471, symmetry_mean=0.2419, fractal_dimension_mean=0.07871, radius_se=1.095, texture_se=0.9053, perimeter_se=8.589, area_se=153.4, smoothness_se=0.006399, compactness_se=0.04904, concavity_se=0.05373, concave points_se=0.01587, symmetry_se=0.03003, fractal_dimension_se=0.006193, radius_worst=25.38, texture_worst=17.33, perimeter_worst=184.6, area_worst=2019.0, smoothness_worst=0.1622, compactness_worst=0.6656, concavity_worst=0.7119, concave points_worst=0.2654, symmetry_worst=0.4601, fractal_dimension_worst=0.1189, _c32=None)]

In [6]:
#How is our label distribution? 
cancer_df.groupBy('diagnosis').count().show()

+---------+-----+
|diagnosis|count|
+---------+-----+
|        B|  357|
|        M|  212|
+---------+-----+



In [7]:
#Quick data overview - no NaNs!
cancer_df.describe().toPandas().T

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
id,569,3.0371831432337433E7,1.2502058561222367E8,8670,911320502
diagnosis,569,,,B,M
radius_mean,569,14.127291739894563,3.5240488262120793,6.981,28.11
texture_mean,569,19.28964850615117,4.301035768166948,9.71,39.28
perimeter_mean,569,91.96903339191566,24.2989810387549,43.79,188.5
area_mean,569,654.8891036906857,351.9141291816529,143.5,2501.0
smoothness_mean,569,0.096360281195079,0.014064128137673616,0.05263,0.1634
compactness_mean,569,0.10434098418277686,0.0528127579325122,0.01938,0.3454
concavity_mean,569,0.08879931581722322,0.07971980870789354,0.0,0.4268


### Preprocessing

In [8]:
#Get rid of some columns we don't need
def fix_df(df):
    return df.drop('id') \
    .drop('_c32')
cancer_df_fixed = fix_df(cancer_df)

In [9]:
(trainingData, testData) = cancer_df_fixed.randomSplit([0.75, 0.25])
trainingData.cache()
testData.cache()

DataFrame[diagnosis: string, radius_mean: double, texture_mean: double, perimeter_mean: double, area_mean: double, smoothness_mean: double, compactness_mean: double, concavity_mean: double, concave points_mean: double, symmetry_mean: double, fractal_dimension_mean: double, radius_se: double, texture_se: double, perimeter_se: double, area_se: double, smoothness_se: double, compactness_se: double, concavity_se: double, concave points_se: double, symmetry_se: double, fractal_dimension_se: double, radius_worst: double, texture_worst: double, perimeter_worst: double, area_worst: double, smoothness_worst: double, compactness_worst: double, concavity_worst: double, concave points_worst: double, symmetry_worst: double, fractal_dimension_worst: double]

### Pipeline

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.classification import RandomForestClassifier

In [11]:
stages = []
#Index M/B to 1/0: Malignant becomes 1, Benign becomes 0
indexer = StringIndexer(inputCol='diagnosis', outputCol='label')
labels = indexer.fit(cancer_df_fixed).labels

#Assemble other columns into a single feature column
assembler_input = [name for name in cancer_df_fixed.schema.names if not name == 'diagnosis' or name == 'label']
assembler = VectorAssembler(inputCols=assembler_input, outputCol='features')

#Random Forest - leave everything default, we'll do cross-validation with a param grid anyway
ranfor = RandomForestClassifier(featuresCol='features', labelCol='label')

#Reset labels to M/B
label_converter = IndexToString(inputCol='prediction', outputCol='predictedLabel', labels=labels)

#Bring it all together
pipeline = Pipeline(stages=[indexer, assembler, ranfor, label_converter])

### Hyper-parameter Tuning and Fitting

In [12]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [13]:
param_grid = (ParamGridBuilder()
            .addGrid(ranfor.maxDepth, [2, 5, 10, 15])
            .addGrid(ranfor.maxBins, [8, 16, 32, 48])
            .addGrid(ranfor.numTrees, [10, 15, 20, 25])
            .build())

In [14]:
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=BinaryClassificationEvaluator(), numFolds=5)

In [15]:
#Don't run this too often - for obvious reasons it's quite slow. 
model = cv.fit(trainingData)

### Results and evaluation

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [17]:
#Get our predictions
predictions = model.transform(testData)

In [18]:
#Instantiate
roc_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
f1 = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
accuracy = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedRecall')

In [19]:
#Cross-Validation results
print('Max Depth: ', model.bestModel.stages[2]._java_obj.parent().getMaxDepth())
print('Max Bins: ', model.bestModel.stages[2]._java_obj.parent().getMaxBins())
print('Number of Trees: ', model.bestModel.stages[2]._java_obj.parent().getNumTrees())

Max Depth:  10
Max Bins:  8
Number of Trees:  25


In [20]:
#Evaluate the model itself
print("Accuracy on test set: {:.3f}".format(accuracy.evaluate(predictions)))
print("Area under ROC: {:.3f}".format(roc_evaluator.evaluate(predictions)))
print("Weighted Recall: {:.3f}".format(recall.evaluate(predictions)))
print("F1-metric: {:.3f}".format(f1.evaluate(predictions)))

Accuracy on test set: 0.950
Area under ROC: 0.996
Weighted Recall: 0.950
F1-metric: 0.950


In [21]:
#Obviously, those metrics are great. Still, let's specify.
#Split by group: How many diagnoses did we miss? 
predictions.groupBy('diagnosis','predictedLabel').count().show()

+---------+--------------+-----+
|diagnosis|predictedLabel|count|
+---------+--------------+-----+
|        B|             M|    2|
|        M|             B|    5|
|        M|             M|   50|
|        B|             B|   84|
+---------+--------------+-----+



Most relevant here is the recall (the part of the total amount of malignant tumors that was discovered by the model), which, at 0.95, is very high. In the test set, out of a total of 55 malignant tumors, 5 were "missed". Of course, the false positives (2) are also unwanted, but are not *as* bad als false negatives, in this specific case.

The F1-score (0.95) and the area under the ROC-curve (0.996) further support the notion that we've established a high quality model. 