From b611fee00ead1941e794b935f666bc7599ca727b Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Sun, 12 Jul 2015 16:50:13 -0400 Subject: [PATCH 1/4] SPARK-8671. Added first version of isotonic regression to pipeline API --- .../ml/regression/IsotonicRegression.scala | 97 +++++++++++++++++++ .../regression/IsotonicRegressionSuite.scala | 85 ++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala new file mode 100644 index 0000000000000..0e41fa6c39b8a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -0,0 +1,97 @@ +package org.apache.spark.ml.regression + +import org.apache.spark.annotation.Experimental +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam} +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{DoubleType, DataType} +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.storage.StorageLevel + +/** + * Params for isotonic regression. + */ +private[regression] trait IsotonicRegressionParams extends PredictorParams { + + /** + * Param for weight column name. + * @group param + */ + final val weightCol: Param[String] = new Param[String](this, "weightCol", "weight column name") + + /** @group getParam */ + final def getWeightCol: String = $(weightCol) + + /** + * Param for isotonic parameter. + * @group param + */ + final val isotonicParam: BooleanParam = new BooleanParam(this, "isotonicParam", "isotonic parameter") + + /** @group getParam */ + final def getIsotonicParam: Boolean = $(isotonicParam) +} + +@Experimental +class IsotonicRegression(override val uid: String) + extends Regressor[Double, IsotonicRegression, IsotonicRegressionModel] + with IsotonicRegressionParams { + + def this() = this(Identifiable.randomUID("isoReg")) + + /** + * Set the isotonic parameter. + * Default is true. + * @group setParam + */ + def setIsotonicParam(value: Boolean): this.type = set(isotonicParam, value) + setDefault(isotonicParam -> true) + + /** + * Set the isotonic parameter. + * Default is true. + * @group setParam + */ + def setWeightParam(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "weight") + + override def featuresDataType: DataType = DoubleType + + override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra) + + private def extractWeightedLabeledPoints(dataset: DataFrame): RDD[(Double, Double, Double)] = { + dataset.select($(labelCol), $(featuresCol), $(weightCol)) + .map { case Row(label: Double, features: Double, weights: Double) => (label, features, weights) } + } + + override protected def train(dataset: DataFrame): IsotonicRegressionModel = { + // Extract columns from data. If dataset is persisted, do not persist oldDataset. + val instances = extractWeightedLabeledPoints(dataset) + val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + + val isotonicRegression = new org.apache.spark.mllib.regression.IsotonicRegression().setIsotonic($(isotonicParam)) + val model = isotonicRegression.run(instances) + + new IsotonicRegressionModel(uid, model) + } +} + +class IsotonicRegressionModel private[ml] ( + override val uid: String, + val model: org.apache.spark.mllib.regression.IsotonicRegressionModel) + extends RegressionModel[Double, IsotonicRegressionModel] + with IsotonicRegressionParams { + + override def featuresDataType: DataType = DoubleType + + override protected def predict(features: Double): Double = { + model.predict(features) + } + + override def copy(extra: ParamMap): IsotonicRegressionModel = { + copyValues(new IsotonicRegressionModel(uid, model), extra) + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala new file mode 100644 index 0000000000000..ff92b91f18ca9 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -0,0 +1,85 @@ +package org.apache.spark.ml.regression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.classification.{LogisticRegressionModel, LogisticRegression} +import org.apache.spark.ml.param.ParamsSuite +import org.apache.spark.mllib.classification.LogisticRegressionSuite._ +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.types.{DoubleType, StructField, StructType} + +class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { + private val schema = StructType( + Array( + StructField("label", DoubleType), + StructField("features", DoubleType), + StructField("weight", DoubleType))) + + @transient var dataset: DataFrame = _ + + override def beforeAll(): Unit = { + super.beforeAll() + val data = sc.parallelize( + Seq( + Row(1d, 0d, 1d), + Row(2d, 1d, 1d), + Row(3d, 2d, 1d), + Row(1d, 3d, 1d), + Row(6d, 4d, 1d), + Row(17d, 5d, 1d), + Row(16d, 6d, 1d), + Row(17d, 7d, 1d), + Row(18d, 8d, 1d))) + + dataset = sqlContext.createDataFrame(data, schema) + } + + test("isotonic regression") { + val trainer = new IsotonicRegression() + .setIsotonicParam(true) + + val model = trainer.fit(dataset) + + val predictions = model + .transform(dataset) + .select("prediction").map { + case Row(pred) => pred + } + .collect() + + assert(predictions === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18)) + + assert(model.model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8)) + assert(model.model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) + assert(model.model.isotonic) + } + + test("params") { + val ir = new IsotonicRegression + ParamsSuite.checkParams(ir) + val model = ir.fit(dataset) + ParamsSuite.checkParams(model) + } + + test("isotonic regression: default params") { + val ir = new IsotonicRegression() + assert(ir.getLabelCol === "label") + assert(ir.getFeaturesCol === "features") + assert(ir.getWeightCol === "weight") + assert(ir.getPredictionCol === "prediction") + assert(ir.getIsotonicParam === true) + + val model = ir.fit(dataset) + model.transform(dataset) + .select("label", "features", "prediction", "weight") + .collect() + + assert(model.getLabelCol === "label") + assert(model.getFeaturesCol === "features") + assert(model.getWeightCol === "weight") + assert(model.getPredictionCol === "prediction") + assert(model.getIsotonicParam === true) + assert(model.hasParent) + } +} From 07c12bd2b8139945d6cac3b92b4c2655ccf4bf60 Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Sun, 19 Jul 2015 12:31:34 -0400 Subject: [PATCH 2/4] Comments and refactoring. --- .../ml/regression/IsotonicRegression.scala | 70 +++++++++++++--- .../regression/IsotonicRegressionSuite.scala | 84 +++++++++++++------ 2 files changed, 114 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 0e41fa6c39b8a..be98611d2ad6c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -1,9 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.ml.regression import org.apache.spark.annotation.Experimental import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam} import org.apache.spark.ml.util.Identifiable +import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression} +import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DoubleType, DataType} import org.apache.spark.sql.{Row, DataFrame} @@ -18,7 +37,8 @@ private[regression] trait IsotonicRegressionParams extends PredictorParams { * Param for weight column name. * @group param */ - final val weightCol: Param[String] = new Param[String](this, "weightCol", "weight column name") + final val weightCol: Param[String] = + new Param[String](this, "weightCol", "weight column name") /** @group getParam */ final def getWeightCol: String = $(weightCol) @@ -27,12 +47,22 @@ private[regression] trait IsotonicRegressionParams extends PredictorParams { * Param for isotonic parameter. * @group param */ - final val isotonicParam: BooleanParam = new BooleanParam(this, "isotonicParam", "isotonic parameter") + final val isotonicParam: BooleanParam = + new BooleanParam(this, "isotonicParam", "isotonic parameter") /** @group getParam */ final def getIsotonicParam: Boolean = $(isotonicParam) } +/** + * :: Experimental :: + * Isotonic regression. + * + * Currently implemented using parallelized pool adjacent violators algorithm. + * Only univariate (single feature) algorithm supported. + * + * Uses [[org.apache.spark.mllib.regression.IsotonicRegression]]. + */ @Experimental class IsotonicRegression(override val uid: String) extends Regressor[Double, IsotonicRegression, IsotonicRegressionModel] @@ -49,20 +79,24 @@ class IsotonicRegression(override val uid: String) setDefault(isotonicParam -> true) /** - * Set the isotonic parameter. - * Default is true. + * Set weight column param. + * Default is weight. * @group setParam */ def setWeightParam(value: String): this.type = set(weightCol, value) setDefault(weightCol -> "weight") - override def featuresDataType: DataType = DoubleType + override private[ml] def featuresDataType: DataType = DoubleType override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra) - private def extractWeightedLabeledPoints(dataset: DataFrame): RDD[(Double, Double, Double)] = { + private[this] def extractWeightedLabeledPoints( + dataset: DataFrame): RDD[(Double, Double, Double)] = { + dataset.select($(labelCol), $(featuresCol), $(weightCol)) - .map { case Row(label: Double, features: Double, weights: Double) => (label, features, weights) } + .map { + case Row(label: Double, features: Double, weights: Double) => (label, features, weights) + } } override protected def train(dataset: DataFrame): IsotonicRegressionModel = { @@ -71,27 +105,37 @@ class IsotonicRegression(override val uid: String) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - val isotonicRegression = new org.apache.spark.mllib.regression.IsotonicRegression().setIsotonic($(isotonicParam)) - val model = isotonicRegression.run(instances) + val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonicParam)) + val parentModel = isotonicRegression.run(instances) - new IsotonicRegressionModel(uid, model) + new IsotonicRegressionModel(uid, parentModel) } } +/** + * :: Experimental :: + * Model fitted by IsotonicRegression. + * Predicts using a piecewise linear function. + * + * For detailed rules see [[org.apache.spark.mllib.regression.IsotonicRegressionModel.predict()]]. + * + * @param parentModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]] + * model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]]. + */ class IsotonicRegressionModel private[ml] ( override val uid: String, - val model: org.apache.spark.mllib.regression.IsotonicRegressionModel) + private[ml] val parentModel: MLlibIsotonicRegressionModel) extends RegressionModel[Double, IsotonicRegressionModel] with IsotonicRegressionParams { override def featuresDataType: DataType = DoubleType override protected def predict(features: Double): Double = { - model.predict(features) + parentModel.predict(features) } override def copy(extra: ParamMap): IsotonicRegressionModel = { - copyValues(new IsotonicRegressionModel(uid, model), extra) + copyValues(new IsotonicRegressionModel(uid, parentModel), extra) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala index ff92b91f18ca9..a002d6978d08b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -1,13 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.ml.regression import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.classification.{LogisticRegressionModel, LogisticRegression} import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.mllib.classification.LogisticRegressionSuite._ -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} -import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.types.{DoubleType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row} class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { private val schema = StructType( @@ -16,28 +30,25 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { StructField("features", DoubleType), StructField("weight", DoubleType))) - @transient var dataset: DataFrame = _ - - override def beforeAll(): Unit = { - super.beforeAll() - val data = sc.parallelize( - Seq( - Row(1d, 0d, 1d), - Row(2d, 1d, 1d), - Row(3d, 2d, 1d), - Row(1d, 3d, 1d), - Row(6d, 4d, 1d), - Row(17d, 5d, 1d), - Row(16d, 6d, 1d), - Row(17d, 7d, 1d), - Row(18d, 8d, 1d))) - - dataset = sqlContext.createDataFrame(data, schema) + private val predictionSchema = StructType(Array(StructField("features", DoubleType))) + + private def generateIsotonicInput(labels: Seq[Double]): DataFrame = { + val data = Seq.tabulate(labels.size)(i => Row(labels(i), i.toDouble, 1d)) + val parallelData = sc.parallelize(data) + + sqlContext.createDataFrame(parallelData, schema) + } + + private def generatePredictionInput(features: Seq[Double]): DataFrame = { + val data = Seq.tabulate(features.size)(i => Row(features(i))) + + val parallelData = sc.parallelize(data) + sqlContext.createDataFrame(parallelData, predictionSchema) } test("isotonic regression") { - val trainer = new IsotonicRegression() - .setIsotonicParam(true) + val dataset = generateIsotonicInput(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18)) + val trainer = new IsotonicRegression().setIsotonicParam(true) val model = trainer.fit(dataset) @@ -50,12 +61,30 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { assert(predictions === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18)) - assert(model.model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8)) - assert(model.model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) - assert(model.model.isotonic) + assert(model.parentModel.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8)) + assert(model.parentModel.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) + assert(model.parentModel.isotonic) + } + + test("antitonic regression") { + val dataset = generateIsotonicInput(Seq(7, 5, 3, 5, 1)) + val trainer = new IsotonicRegression().setIsotonicParam(false) + + val model = trainer.fit(dataset) + val features = generatePredictionInput(Seq(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0)) + + val predictions = model + .transform(features) + .select("prediction").map { + case Row(pred) => pred + } + .collect() + + assert(predictions === Array(7, 7, 6, 5.5, 5, 4, 1)) } test("params") { + val dataset = generateIsotonicInput(Seq(1, 2, 3)) val ir = new IsotonicRegression ParamsSuite.checkParams(ir) val model = ir.fit(dataset) @@ -63,6 +92,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { } test("isotonic regression: default params") { + val dataset = generateIsotonicInput(Seq(1, 2, 3)) val ir = new IsotonicRegression() assert(ir.getLabelCol === "label") assert(ir.getFeaturesCol === "features") From b68efc0737edb4870d848772eb37ff57832b1bf5 Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Sun, 19 Jul 2015 15:05:00 -0400 Subject: [PATCH 3/4] Added tests for param validation. --- .../ml/regression/IsotonicRegression.scala | 4 +- .../regression/IsotonicRegressionSuite.scala | 43 +++++++++++++++++-- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index be98611d2ad6c..9768f580b7ab5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.regression import org.apache.spark.annotation.Experimental import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam} -import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.util.{SchemaUtils, Identifiable} import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression} import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel} import org.apache.spark.rdd.RDD @@ -100,6 +100,7 @@ class IsotonicRegression(override val uid: String) } override protected def train(dataset: DataFrame): IsotonicRegressionModel = { + SchemaUtils.checkColumnType(dataset.schema, $(weightCol), DoubleType) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE @@ -138,4 +139,3 @@ class IsotonicRegressionModel private[ml] ( copyValues(new IsotonicRegressionModel(uid, parentModel), extra) } } - diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala index a002d6978d08b..9dbe1211a0261 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -46,7 +46,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { sqlContext.createDataFrame(parallelData, predictionSchema) } - test("isotonic regression") { + test("isotonic regression predictions") { val dataset = generateIsotonicInput(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18)) val trainer = new IsotonicRegression().setIsotonicParam(true) @@ -66,7 +66,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { assert(model.parentModel.isotonic) } - test("antitonic regression") { + test("antitonic regression predictions") { val dataset = generateIsotonicInput(Seq(7, 5, 3, 5, 1)) val trainer = new IsotonicRegression().setIsotonicParam(false) @@ -83,7 +83,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { assert(predictions === Array(7, 7, 6, 5.5, 5, 4, 1)) } - test("params") { + test("params validation") { val dataset = generateIsotonicInput(Seq(1, 2, 3)) val ir = new IsotonicRegression ParamsSuite.checkParams(ir) @@ -91,7 +91,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { ParamsSuite.checkParams(model) } - test("isotonic regression: default params") { + test("default params") { val dataset = generateIsotonicInput(Seq(1, 2, 3)) val ir = new IsotonicRegression() assert(ir.getLabelCol === "label") @@ -112,4 +112,39 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { assert(model.getIsotonicParam === true) assert(model.hasParent) } + + test("set parameters") { + val isotonicRegression = new IsotonicRegression() + .setIsotonicParam(false) + .setWeightParam("w") + .setFeaturesCol("f") + .setLabelCol("l") + .setPredictionCol("p") + + assert(isotonicRegression.getIsotonicParam === false) + assert(isotonicRegression.getWeightCol === "w") + assert(isotonicRegression.getFeaturesCol === "f") + assert(isotonicRegression.getLabelCol === "l") + assert(isotonicRegression.getPredictionCol === "p") + } + + test("missing column") { + val dataset = generateIsotonicInput(Seq(1, 2, 3)) + + intercept[IllegalArgumentException] { + new IsotonicRegression().setWeightParam("w").fit(dataset) + } + + intercept[IllegalArgumentException] { + new IsotonicRegression().setFeaturesCol("f").fit(dataset) + } + + intercept[IllegalArgumentException] { + new IsotonicRegression().setLabelCol("l").fit(dataset) + } + + intercept[IllegalArgumentException] { + new IsotonicRegression().fit(dataset).setFeaturesCol("f").transform(dataset) + } + } } From 8c435c1f07913b2a6c8dbf89434c469da75ceb9a Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Mon, 20 Jul 2015 20:17:36 -0400 Subject: [PATCH 4/4] Review https://github.com/apache/spark/pull/7517 feedback update. --- .../ml/regression/IsotonicRegression.scala | 19 +++++++++++-------- .../regression/IsotonicRegressionSuite.scala | 8 +++----- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 9768f580b7ab5..4ece8cf8cf0b6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -35,6 +35,8 @@ private[regression] trait IsotonicRegressionParams extends PredictorParams { /** * Param for weight column name. + * TODO: Move weightCol to sharedParams. + * * @group param */ final val weightCol: Param[String] = @@ -45,13 +47,14 @@ private[regression] trait IsotonicRegressionParams extends PredictorParams { /** * Param for isotonic parameter. + * Isotonic (increasing) or antitonic (decreasing) sequence. * @group param */ - final val isotonicParam: BooleanParam = - new BooleanParam(this, "isotonicParam", "isotonic parameter") + final val isotonic: BooleanParam = + new BooleanParam(this, "isotonic", "isotonic (increasing) or antitonic (decreasing) sequence") /** @group getParam */ - final def getIsotonicParam: Boolean = $(isotonicParam) + final def getIsotonicParam: Boolean = $(isotonic) } /** @@ -75,8 +78,8 @@ class IsotonicRegression(override val uid: String) * Default is true. * @group setParam */ - def setIsotonicParam(value: Boolean): this.type = set(isotonicParam, value) - setDefault(isotonicParam -> true) + def setIsotonicParam(value: Boolean): this.type = set(isotonic, value) + setDefault(isotonic -> true) /** * Set weight column param. @@ -94,8 +97,8 @@ class IsotonicRegression(override val uid: String) dataset: DataFrame): RDD[(Double, Double, Double)] = { dataset.select($(labelCol), $(featuresCol), $(weightCol)) - .map { - case Row(label: Double, features: Double, weights: Double) => (label, features, weights) + .map { case Row(label: Double, features: Double, weights: Double) => + (label, features, weights) } } @@ -106,7 +109,7 @@ class IsotonicRegression(override val uid: String) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonicParam)) + val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic)) val parentModel = isotonicRegression.run(instances) new IsotonicRegressionModel(uid, parentModel) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala index 9dbe1211a0261..66e4b170bae80 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -56,8 +56,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { .transform(dataset) .select("prediction").map { case Row(pred) => pred - } - .collect() + }.collect() assert(predictions === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18)) @@ -76,9 +75,8 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { val predictions = model .transform(features) .select("prediction").map { - case Row(pred) => pred - } - .collect() + case Row(pred) => pred + }.collect() assert(predictions === Array(7, 7, 6, 5.5, 5, 4, 1)) }