In [1]:
#The goal of this notebook is to create two custom evaluators

In [2]:
#Everything except evaluators comes from:
#https://spark.apache.org/docs/2.1.3/ml-classification-regression.html#decision-tree-regression

In [3]:
from pyspark.sql import SparkSession

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import Evaluator

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

In [5]:
import random
random.seed(42)

In [6]:
# Load training data
data = spark.read.format("libsvm").load("/Users/jennifer.heffernan/anaconda/envs/myenv_exercises/lib/python3.7/site-packages/pyspark/data/mllib/sample_libsvm_data.txt")
print(data)
data.show()

DataFrame[label: double, features: vector]
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [7]:
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [8]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [9]:
# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

In [10]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

In [11]:
# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

In [12]:
# Make predictions.
predictions = model.transform(testData)

In [13]:
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|  0.0|(692,[98,99,100,1...|
|       0.0|  0.0|(692,[121,122,123...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[127,128,129...|
+----------+-----+--------------------+
only showing top 5 rows



In [14]:
# Select (prediction, true label) and compute test error
#evaluator = RegressionEvaluator(
#    labelCol="label", predictionCol="prediction", metricName="rmse")
#rmse = evaluator.evaluate(predictions)
#print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [15]:
class RandomEvaluator(Evaluator):
    
    def __init__(self, predictionCol="prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset):
        return random.random()

In [16]:
class SpearmanRegressionEvaluator(Evaluator):

    def __init__(self, predictionCol="prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset):
        temp = dataset.select("prediction", "label")
        assembler = VectorAssembler(inputCols=temp.columns, outputCol="features")
        f = assembler.transform(temp).select("features")
        c = Correlation.corr(f, 'features', 'pearson').head()[0][0,1]
        return c

In [17]:
# Select (prediction, true label) and compute test error
evaluator = RandomEvaluator(
    labelCol="label", predictionCol="prediction")
random_metric = evaluator.evaluate(predictions)
print("Random Metric on test data = %g" % random_metric)

Random Metric on test data = 0.0250108


In [18]:
# Select (prediction, true label) and compute test error
evaluator = SpearmanRegressionEvaluator(
    labelCol="label", predictionCol="prediction")
spear = evaluator.evaluate(predictions)
print("Spearman Correlation on test data = %g" % spear)

Spearman Correlation on test data = 0.828571


In [19]:
#double check with Pandas
import pandas as pd
df = predictions.select("prediction", "label").toPandas()
df.prediction.corr(df.label, method="spearman")

0.8285714285714285