Skip to content

Commit

Permalink
[SPARK-10188] [PYSPARK] Pyspark CrossValidator with RMSE selects inco…
Browse files Browse the repository at this point in the history
…rrect model

* Added isLargerBetter() method to Pyspark Evaluator to match the Scala version.
* JavaEvaluator delegates isLargerBetter() to underlying Scala object.
* Added check for isLargerBetter() in CrossValidator to determine whether to use argmin or argmax.
* Added test cases for where smaller is better (RMSE) and larger is better (R-Squared).

(This contribution is my original work and that I license the work to the project under Sparks' open source license)

Author: noelsmith <mail@noelsmith.com>

Closes #8399 from noel-smith/pyspark-rmse-xval-fix.

(cherry picked from commit 7583681)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
  • Loading branch information
noel-smith authored and jkbradley committed Aug 28, 2015
1 parent c77cf86 commit bcb8fa8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 1 deletion.
12 changes: 12 additions & 0 deletions python/pyspark/ml/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def evaluate(self, dataset, params=None):
else:
raise ValueError("Params must be a param map but got %s." % type(params))

def isLargerBetter(self):
"""
Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
(True, default) or minimized (False).
A given evaluator may support multiple metrics which may be maximized or minimized.
"""
return True


@inherit_doc
class JavaEvaluator(Evaluator, JavaWrapper):
Expand All @@ -85,6 +93,10 @@ def _evaluate(self, dataset):
self._transfer_params_to_java()
return self._java_obj.evaluate(dataset._jdf)

def isLargerBetter(self):
self._transfer_params_to_java()
return self._java_obj.isLargerBetter()


@inherit_doc
class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol):
Expand Down
87 changes: 87 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@

from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
from pyspark.sql import DataFrame, SQLContext
from pyspark.sql.functions import rand
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.util import keyword_only
from pyspark.ml import Estimator, Model, Pipeline, Transformer
from pyspark.ml.feature import *
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.mllib.linalg import DenseVector


Expand Down Expand Up @@ -264,5 +267,89 @@ def test_ngram(self):
self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"])


class HasInducedError(Params):

def __init__(self):
super(HasInducedError, self).__init__()
self.inducedError = Param(self, "inducedError",
"Uniformly-distributed error added to feature")

def getInducedError(self):
return self.getOrDefault(self.inducedError)


class InducedErrorModel(Model, HasInducedError):

def __init__(self):
super(InducedErrorModel, self).__init__()

def _transform(self, dataset):
return dataset.withColumn("prediction",
dataset.feature + (rand(0) * self.getInducedError()))


class InducedErrorEstimator(Estimator, HasInducedError):

def __init__(self, inducedError=1.0):
super(InducedErrorEstimator, self).__init__()
self._set(inducedError=inducedError)

def _fit(self, dataset):
model = InducedErrorModel()
self._copyValues(model)
return model


class CrossValidatorTests(PySparkTestCase):

def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])

iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")

grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))

self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")

def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])

iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")

grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))

self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
"Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")


if __name__ == "__main__":
unittest.main()
6 changes: 5 additions & 1 deletion python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ def _fit(self, dataset):
# TODO: duplicate evaluator to take extra params from input
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric
bestIndex = np.argmax(metrics)

if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return CrossValidatorModel(bestModel)

Expand Down

0 comments on commit bcb8fa8

Please sign in to comment.