# Building a Random Forest Classifier with Spark in Batch Mode

In this notebook, you'll learn the basics of working with Spark in batch mode to build a random forest classifier. Before digging in, make sure to read the background context in the related reading in the curriculum!

##  Import dependencies

First off, we need to import the tools we'll need from PySpark. The imports below allow us to connect to the Spark server, load our data, clean it, and prepare, execute, and evaluate a model.

In [22]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col

## Set our constants

Next, we create a set of constants that we can refer to throughout the notebook. These are values that the rest of our code needs to run, but that we might need to change at some point (for instance, if the location of our data changes). 

In [2]:
CSV_PATH = "/home/ds/notebooks/datasets/UCI_HAR/allData.csv"
CSV_ACTIVITY_LABEL_PATH = "/home/ds/notebooks/datasets/UCI_HAR/activity_labels.csv"
APP_NAME = "UCI HAR Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 141107
TRAINING_DATA_RATIO = 0.8
RF_NUM_TREES = 10
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

## Connect to the server and load data

Now we're ready to connect to the Spark server. We do that (relying on the constants set above) and then load our labels (loaded into `activity_labels`) and activity data (loaded into `df`). 

In [3]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
activity_labels = spark.read.options(inferschema = "true").csv(CSV_ACTIVITY_LABEL_PATH)
df = spark.read.options(inferschema = "true").csv(CSV_PATH)

## Validate the data

If our data has been properly cleaned and prepared, it will meet the following criteria, which we'll verify in just a moment:

* The dataframe shape should be 10,299 rows by 562 columns
* All feature columns should be doubles. Note than one of the columns is for our label and it will not be double.
* There should be no nulls. This point is crucial because Spark will fail to build our vector variables for our classifier if there are any null values.

Let's confirm these points.

In [4]:
# Confirm the dataframe shape is 10,299 rows by 562 columns
print(f"Dataset shape is {df.count():d} rows by {len(df.columns):d} columns.")

Dataset shape is 10299 rows by 562 columns.


In [5]:
# Confirm that all feature columns are doubles via a list comprehension
# We're expecting 561 of 562 here, accounting for the labels column
double_cols = [col[0] for col in df.dtypes if col[1] == 'double']
print(f"{len(double_cols):d} columns out of {len(df.columns):d} total are type double.")

561 columns out of 562 total are type double.


In [6]:
# Confirm there are no null values. We use the dataframe select method to build a 
# list that is then converted to a Python dict. This way it's easy to sum up the nulls.
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df.columns]).toPandas().to_dict(orient='records')

print(f"There are {sum(null_counts[0].values()):d} null values in the dataset.")

There are 0 null values in the dataset.


## Set up and run our classifier in Spark

After confirming our data is clean, we're ready to reshape the data and run the random forest model.

In Spark, we manipulate the data to work in a Spark pipeline, define each of the steps in the pipeline, chain them together, and finally run the pipeline.

Apache Spark classifiers expect 2 columns of input:

1. __labels__: an indexed set of numeric variables that represent the classification from the set of features we provide.
2. __features__: an indexed, vector variable that contains all of the feature values in each row. 

In order to do this, we need to create these 2 columns from our dataset - the data is there, but not yet in a format we can use in the classifier.

To create the indexed labels column, we'll create a column called `indexedLabel` using the `StringIndexer` method. We use the column `_c0` as the source for our label index since that contains our labels. The column contains only one value per index.
    
To create the indexed features column, we'll need to do two things. First, we'll create the vector of features using the `VectorAssembler` method. To create this vector, we'll need to use all 561 numeric columns from our data frame. The vector assembler will create a new column called `features`, and each row of this column will contain a 561-element vector that is built from the 561 features in the dataset.

Finally, we'll complete the data preparation by creating an indexed vector from the `features` column. We'll call this vector `indexedFeatures`.
    
Since the classifier expects indexed labels and an indexed vector column of data, we'll use the `indexedLabel` and `indexedFeatures` as inputs to our random forest classifier.

In [7]:
# Generate our feature vector.
# Note that we're doing the work on the `df` object - we don't create new dataframes, 
# just add columns to the one we already are using.

# the transform method creates the column.

df = VectorAssembler(inputCols=double_cols, outputCol="features").transform(df)

Let's confirm that the features are there. It's easy to do this in Apache Spark using the `select` and `show` methods on the dataframe.  

In [8]:
df.select("_c0", "features").show(5)

+---+--------------------+
|_c0|            features|
+---+--------------------+
|  5|[0.289,-0.0203,-0...|
|  5|[0.278,-0.0164,-0...|
|  5|[0.28,-0.0195,-0....|
|  5|[0.279,-0.0262,-0...|
|  5|[0.277,-0.0166,-0...|
+---+--------------------+
only showing top 5 rows



In [9]:
df.columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13',
 '_c14',
 '_c15',
 '_c16',
 '_c17',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24',
 '_c25',
 '_c26',
 '_c27',
 '_c28',
 '_c29',
 '_c30',
 '_c31',
 '_c32',
 '_c33',
 '_c34',
 '_c35',
 '_c36',
 '_c37',
 '_c38',
 '_c39',
 '_c40',
 '_c41',
 '_c42',
 '_c43',
 '_c44',
 '_c45',
 '_c46',
 '_c47',
 '_c48',
 '_c49',
 '_c50',
 '_c51',
 '_c52',
 '_c53',
 '_c54',
 '_c55',
 '_c56',
 '_c57',
 '_c58',
 '_c59',
 '_c60',
 '_c61',
 '_c62',
 '_c63',
 '_c64',
 '_c65',
 '_c66',
 '_c67',
 '_c68',
 '_c69',
 '_c70',
 '_c71',
 '_c72',
 '_c73',
 '_c74',
 '_c75',
 '_c76',
 '_c77',
 '_c78',
 '_c79',
 '_c80',
 '_c81',
 '_c82',
 '_c83',
 '_c84',
 '_c85',
 '_c86',
 '_c87',
 '_c88',
 '_c89',
 '_c90',
 '_c91',
 '_c92',
 '_c93',
 '_c94',
 '_c95',
 '_c96',
 '_c97',
 '_c98',
 '_c99',
 '_c100',
 '_c101',
 '_c102',
 '_c103',
 '_c104',
 '_c105',
 '_c106',
 '_c107',
 '_c108',
 '_c109',
 '_c110',


Now we're ready to build the indexers, split our data for training and testing, define our model, and finally chain everything together into a pipeline.

__It's important to note that when we execute this cell, we're not actually running our model. At this point, we're only defining its parameters__.

In [10]:
# Build the training indexers / split data / classifier
# first we'll generate a labelIndexer
labelIndexer = StringIndexer(inputCol="_c0", outputCol="indexedLabel").fit(df)

# now generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df)
    
# Split the data into training and validation sets (30% held out for testing)
(trainingData, testData) = df.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

This next cell runs the pipeline, delivering a trained model at the end of the process.

In [11]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

It is now easy to test our model and make predictions simply by using the model's `transform` method on the `testData` dataset.

In [12]:
# Make predictions.
predictions = model.transform(testData)

## Evaluate the model

Now we can use the MulticlassClassificationEvaluator to test the model's accuracy.

In [13]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.10103
Accuracy = 0.89897


In [14]:
# Select example rows to display.
predictions.select("predictedLabel", "_c0", "features").show(5)

+--------------+---+--------------------+
|predictedLabel|_c0|            features|
+--------------+---+--------------------+
|             1|  1|[0.168,-0.00914,-...|
|             1|  1|[0.175,-0.0138,-0...|
|             1|  1|[0.181,-0.00228,-...|
|             1|  1|[0.182,-0.0262,-0...|
|             1|  1|[0.184,-0.0227,-0...|
+--------------+---+--------------------+
only showing top 5 rows



In [15]:
rfModel = model.stages#[2]
print(rfModel)  # summary only

[StringIndexer_47579353622939374cc0, VectorIndexer_4c9fb613531f0f8f407e, RandomForestClassificationModel (uid=RandomForestClassifier_4462a0b8ed9fd22340f1) with 10 trees, IndexToString_4ca496815288eb00a03b]


## Next Steps

We've seen how to prepare data and build a classifier in Spark. You might want to play around with this notebook and learn more about how Spark works. Here are some ideas:

- Look at the set of labels, and see if there are any features that would make sense to combine. Spark allows you to map values into a new column.
- Identify the most important features among the 561 source features (using PCA or something similar), then reduce the feature set and see if the model performs better.
- Modify the settings of the random forest to see if the performance improves.
- Use Spark's tools to find other techniques to evaluate the performance of your model. See if you can figure out how to generate an ROC plot, find the AUC value, or plot a confusion matrix.

- ask about what to look for in labels that would indicate anything about combining features|
-- poor question - look for feature engineering relationships

### PySpark PCA

In [16]:
from pyspark.ml.feature import PCA

In [17]:
pca = PCA(k=4, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
df_pca = model.transform(df)


In [18]:
# Build the training indexers / split data / classifier
# first we'll generate a labelIndexer
labelIndexer = StringIndexer(inputCol="_c0", outputCol="indexedLabel").fit(df_pca)

# now generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="pcaFeatures", outputCol="indexedFeatures", maxCategories=4).fit(df_pca)
    
# Split the data into training and validation sets (20% held out for testing)
(trainingData, testData) = df_pca.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [19]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select (prediction, true label) and compute test error
evaluator_pca= MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_pca.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.247702
Accuracy = 0.752298


### PySpark Evaluation

In [20]:
# Select (prediction, true label) and compute test error with f1 score instead of accuracy
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName='f1') # default
f1 = evaluator_f1.evaluate(predictions)

print(f"Test F1 Error = {(1.0 - f1):g}")
print(f"F1 = {f1:g}")

Test F1 Error = 0.255265
F1 = 0.744735


In [25]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark import SparkConf

conf = SparkConf().setAppName(APP_NAME).setMaster(SPARK_URL)
sc = SparkContext.getOrCreate(conf)

# Load training data in LIBSVM format
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_multiclass_classification_data.txt")

# Split data into training (60%) and test (40%)
training, test = data.randomSplit([0.6, 0.4], seed=11)
training.cache()

# Run training algorithm to build the model
model = LogisticRegressionWithLBFGS.train(training, numClasses=3)

# Compute raw scores on the test set
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))

# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# Statistics by class
labels = data.map(lambda lp: lp.label).distinct().collect()
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics.precision(label)))
    print("Class %s recall = %s" % (label, metrics.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))

# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/ds/notebooks/data/mllib/sample_multiclass_classification_data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark import SparkConf

conf = SparkConf().setAppName(APP_NAME).setMaster(SPARK_URL)
sc = SparkContext.getOrCreate(conf)

# Load training data in LIBSVM format
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_multiclass_classification_data.txt")

# Split data into training (60%) and test (40%)
training, test = data.randomSplit([0.6, 0.4], seed=11)
training.cache()

# Run training algorithm to build the model
model = LogisticRegressionWithLBFGS.train(training, numClasses=3)

# Compute raw scores on the test set
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))

# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# Statistics by class
labels = data.map(lambda lp: lp.label).distinct().collect()
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics.precision(label)))
    print("Class %s recall = %s" % (label, metrics.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))

# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
metrics = MulticlassMetrics(predandlab)
    
# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)