Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10188] [Pyspark] Pyspark CrossValidator with RMSE selects incorrect model #8399

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/pyspark/ml/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def evaluate(self, dataset, params=None):
else:
raise ValueError("Params must be a param map but got %s." % type(params))

def isLargerBetter(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please copy the doc from Scala here? (no need to copy to child classes since the "inherit_doc" tag will handle that)

return True


@inherit_doc
class JavaEvaluator(Evaluator, JavaWrapper):
Expand All @@ -85,6 +88,9 @@ def _evaluate(self, dataset):
self._transfer_params_to_java()
return self._java_obj.evaluate(dataset._jdf)

def isLargerBetter(self):
return self._java_obj.isLargerBetter()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works for current use since it's called after evaluate has been called, but it could fail if a user calls it since the params may not have been transferred to java yet. Call self._transfer_params_to_java() first.



@inherit_doc
class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol):
Expand Down
82 changes: 82 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@

from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
from pyspark.sql import DataFrame, SQLContext
from pyspark.sql.functions import rand
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.util import keyword_only
from pyspark.ml import Estimator, Model, Pipeline, Transformer
from pyspark.ml.feature import *
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.mllib.linalg import DenseVector


Expand Down Expand Up @@ -264,5 +267,84 @@ def test_ngram(self):
self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"])



class HasInducedError(Params):

def __init__(self):
super(HasInducedError, self).__init__()
self.inducedError = Param(self, "inducedError", "Uniformly-distributed error added to feature")

def getInducedError(self):
return self.getOrDefault(self.inducedError)

class InducedErrorModel(Model, HasInducedError):

def __init__(self):
super(InducedErrorModel, self).__init__()

def _transform(self, dataset):
return dataset.withColumn("prediction",
dataset.feature + (rand(0) * self.getInducedError()))

class InducedErrorEstimator(Estimator, HasInducedError):

def __init__(self, inducedError=1.0):
super(InducedErrorEstimator, self).__init__()
self._set(inducedError=inducedError)

def _fit(self, dataset):
model = InducedErrorModel()
self._copyValues(model)
return model

class CrossValidatorTests(PySparkTestCase):

def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])

iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="rmse")

grid = (ParamGridBuilder()
.addGrid( iee.inducedError, [100.0, 0.0, 10000.0] )
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))

self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error")
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")

def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
(500, 500.0)] * 10,
["feature", "label"])

iee = InducedErrorEstimator()
evaluator = RegressionEvaluator(metricName="r2")

grid = (ParamGridBuilder()
.addGrid( iee.inducedError, [100.0, 0.0, 10000.0] )
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
bestModel = cvModel.bestModel
bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))

self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error")
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")


if __name__ == "__main__":
unittest.main()
6 changes: 5 additions & 1 deletion python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ def _fit(self, dataset):
# TODO: duplicate evaluator to take extra params from input
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric
bestIndex = np.argmax(metrics)

if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return CrossValidatorModel(bestModel)

Expand Down