From 75254c91cf8d9c2f3638a3f9b1cfd5c029e10996 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 9 Jun 2016 11:22:53 -0700 Subject: [PATCH 1/5] [SPARK-9623] [ML] Provide variance for RandomForestRegressor predictions --- .../spark/ml/param/shared/sharedParams.scala | 2 + .../ml/regression/RandomForestRegressor.scala | 32 +++++++++++- .../org/apache/spark/ml/tree/treeParams.scala | 18 ++++++- .../RandomForestRegressorSuite.scala | 52 +++++++++++++++++++ 4 files changed, 101 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 64d6af2766ca9..7e16688ec2530 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -149,6 +149,8 @@ private[ml] trait HasVarianceCol extends Params { */ final val varianceCol: Param[String] = new Param[String](this, "varianceCol", "Column name for the biased sample variance of prediction") + setDefault(varianceCol, "variance") + /** @group getParam */ final def getVarianceCol: String = $(varianceCol) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index a6dbf21d55e2b..2965970fb6076 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -34,6 +34,7 @@ import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestMo import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.DoubleType /** @@ -93,6 +94,10 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S override def setFeatureSubsetStrategy(value: String): this.type = super.setFeatureSubsetStrategy(value) + @Since("2.1.0") + /** @group getParam */ + def setVarianceCol(value: String): this.type = set(varianceCol, value) + override protected def train(dataset: Dataset[_]): RandomForestRegressionModel = { val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) @@ -168,15 +173,40 @@ class RandomForestRegressionModel private[ml] ( // Note: We may add support for weights (based on tree performance) later on. private lazy val _treeWeights: Array[Double] = Array.fill[Double](_trees.length)(1.0) + @Since("2.1.0") + /** @group getParam */ + def setVarianceCol(value: String): this.type = set(varianceCol, value) + @Since("1.4.0") override def treeWeights: Array[Double] = _treeWeights override protected def transformImpl(dataset: Dataset[_]): DataFrame = { val bcastModel = dataset.sparkSession.sparkContext.broadcast(this) + + var output = dataset + val predictUDF = udf { (features: Any) => bcastModel.value.predict(features.asInstanceOf[Vector]) } - dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + val predictions = predictUDF(col($(featuresCol))) + output = dataset.withColumn($(predictionCol), predictions) + + val varianceUDF = udf { (features: Any) => + val leafNodes = bcastModel.value.returnLeafNodes(features.asInstanceOf[Vector]) + val variance = leafNodes.map(_.impurityStats.calculate()).sum / getNumTrees + val predSquared = leafNodes.map(x => math.pow(x.prediction, 2)).sum / getNumTrees + val pred = leafNodes.map(_.prediction).sum / getNumTrees + variance + predSquared - math.pow(pred, 2) + } + val variance = varianceUDF(col($(featuresCol))) + + output = output.withColumn($(varianceCol), variance) + output.toDF + } + + private def returnLeafNodes(features: Vector): Array[LeafNode] = { + // Return the leaf nodes of each forest. + _trees.map(_.rootNode.predictImpl(features)) } override protected def predict(features: Vector): Double = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala index d7559f8950c3d..72f1b0ee2ac3f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala @@ -409,10 +409,24 @@ private[ml] trait RandomForestClassificationModelParams extends TreeEnsemblePara with HasFeatureSubsetStrategy with TreeClassifierParams private[ml] trait RandomForestRegressorParams - extends RandomForestParams with TreeRegressorParams + extends RandomForestParams with TreeRegressorParams with HasVarianceCol private[ml] trait RandomForestRegressionModelParams extends TreeEnsembleParams - with HasFeatureSubsetStrategy with TreeRegressorParams + with HasFeatureSubsetStrategy with TreeRegressorParams with HasVarianceCol { + + override protected def validateAndTransformSchema( + schema: StructType, + fitting: Boolean, + featuresDataType: DataType): StructType = { + val newSchema = super.validateAndTransformSchema(schema, fitting, featuresDataType) + if (isDefined(varianceCol) && $(varianceCol).nonEmpty) { + SchemaUtils.appendColumn(newSchema, $(varianceCol), DoubleType) + } else { + newSchema + } + } + +} /** * Parameters for Gradient-Boosted Tree algorithms. diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala index c08335f9f84af..943c651469bd4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.ml.regression import org.apache.spark.SparkFunSuite import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tree.impl.TreeTests import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} @@ -27,6 +28,8 @@ import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ /** * Test suite for [[RandomForestRegressor]]. @@ -105,6 +108,55 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex } } + test("Random Forest variance") { + val categoricalFeatures = Map.empty[Int, Int] + val df: DataFrame = TreeTests.setMetadata( + orderedLabeledPoints50_1000, categoricalFeatures, 0) + + // RF with one tree should have the same variance as that of the tree. + val rf = new RandomForestRegressor() + .setImpurity("variance") + .setMaxDepth(30) + .setNumTrees(1) + .setMaxBins(10) + .setFeatureSubsetStrategy("all") + .setSubsamplingRate(1.0) + .setSeed(123) + + val rfModel = rf.fit(df) + val rfVariances = rfModel.transform(df).select("variance").collect() + + val dt = new DecisionTreeRegressor() + .setImpurity("variance") + .setMaxDepth(30) + .setMaxBins(10) + .setSeed(123) + val dtModel = dt.fit(df) + val dtVariances = dtModel.transform(df).select("variance").collect() + val nSamples = dtVariances.size + (0 to nSamples - 1).foreach { i => + val diff = math.abs(rfVariances(i).getDouble(0) - dtVariances(i).getDouble(0)) + assert(diff < 1e-6) + } + + rf.setMaxDepth(2) + rf.setNumTrees(20) + val rfNewModel = rf.fit(df) + val results = rfNewModel.transform(df).select("features", "variance").collect() + val features = col("features") + val trees = rfNewModel.trees + val numTrees = rfNewModel.getNumTrees + results.map { case Row(features: Vector, variance: Double) => + val rootNodes = trees.map(_.rootNode.predictImpl(features)) + val predsquared = rootNodes.map(x => math.pow(x.prediction, 2)).sum / numTrees + val variance = rootNodes.map(_.impurityStats.calculate()).sum / numTrees + val predictions = rootNodes.map(_.prediction).sum / numTrees + val expectedVariance = -math.pow(predictions, 2) + variance + predsquared + assert(variance === expectedVariance, + s"Expected variance $expectedVariance but got $variance.") + } + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// From 0e4e82fb778e94aa4641b63e09d848a0362e5939 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Mon, 13 Jun 2016 14:45:56 -0700 Subject: [PATCH 2/5] Optimize --- .../spark/ml/regression/RandomForestRegressor.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index 2965970fb6076..5d6701b65eb2f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -193,14 +193,13 @@ class RandomForestRegressionModel private[ml] ( val varianceUDF = udf { (features: Any) => val leafNodes = bcastModel.value.returnLeafNodes(features.asInstanceOf[Vector]) - val variance = leafNodes.map(_.impurityStats.calculate()).sum / getNumTrees - val predSquared = leafNodes.map(x => math.pow(x.prediction, 2)).sum / getNumTrees - val pred = leafNodes.map(_.prediction).sum / getNumTrees - variance + predSquared - math.pow(pred, 2) + leafNodes.map { leafNode => + leafNode.impurityStats.calculate() + math.pow(leafNode.prediction, 2) + }.sum / getNumTrees } val variance = varianceUDF(col($(featuresCol))) - output = output.withColumn($(varianceCol), variance) + output = output.withColumn($(varianceCol), variance - pow(predictions, 2)) output.toDF } From a48f19828ae1a7be5252d0319b5f6c9b4c8ddc09 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 29 Jun 2016 10:38:52 -0700 Subject: [PATCH 3/5] fix tests --- .../spark/ml/param/shared/SharedParamsCodeGen.scala | 3 ++- .../spark/ml/regression/RandomForestRegressorSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 4ab0c16a1b4d0..d24c764640851 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -44,7 +44,8 @@ private[shared] object SharedParamsCodeGen { " probabilities. Note: Not all models output well-calibrated probability estimates!" + " These probabilities should be treated as confidences, not precise probabilities", Some("\"probability\"")), - ParamDesc[String]("varianceCol", "Column name for the biased sample variance of prediction"), + ParamDesc[String]("varianceCol", "Column name for the biased sample variance of prediction", + Some("\"variance\"")), ParamDesc[Double]("threshold", "threshold in binary classification prediction, in range [0, 1]", Some("0.5"), isValid = "ParamValidators.inRange(0, 1)", finalMethods = false), diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala index 943c651469bd4..df550f19c2d8f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tree.impl.TreeTests import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} import org.apache.spark.mllib.tree.{EnsembleTestHelper, RandomForest => OldRandomForest} import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} @@ -135,8 +136,7 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex val dtVariances = dtModel.transform(df).select("variance").collect() val nSamples = dtVariances.size (0 to nSamples - 1).foreach { i => - val diff = math.abs(rfVariances(i).getDouble(0) - dtVariances(i).getDouble(0)) - assert(diff < 1e-6) + assert(rfVariances(i).getDouble(0) ~== dtVariances(i).getDouble(0) absTol 1e-6) } rf.setMaxDepth(2) @@ -149,9 +149,9 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex results.map { case Row(features: Vector, variance: Double) => val rootNodes = trees.map(_.rootNode.predictImpl(features)) val predsquared = rootNodes.map(x => math.pow(x.prediction, 2)).sum / numTrees - val variance = rootNodes.map(_.impurityStats.calculate()).sum / numTrees + val treeVariance = rootNodes.map(_.impurityStats.calculate()).sum / numTrees val predictions = rootNodes.map(_.prediction).sum / numTrees - val expectedVariance = -math.pow(predictions, 2) + variance + predsquared + val expectedVariance = -math.pow(predictions, 2) + treeVariance + predsquared assert(variance === expectedVariance, s"Expected variance $expectedVariance but got $variance.") } From 0048cc8e89d20a5d989ef990ff4e25008ac2a316 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 29 Jun 2016 11:16:10 -0700 Subject: [PATCH 4/5] Refactor predictVariance method --- .../ml/regression/RandomForestRegressor.scala | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index 5d6701b65eb2f..d6bac66a5b8c3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -180,6 +180,14 @@ class RandomForestRegressionModel private[ml] ( @Since("1.4.0") override def treeWeights: Array[Double] = _treeWeights + private def predictVariance(features: Vector): Double = { + val meanSumOfPredictionsAndVariance = _trees.map { tree => + val leafNode = tree.rootNode.predictImpl(features) + leafNode.impurityStats.calculate() + math.pow(leafNode.prediction, 2) + }.sum / getNumTrees + meanSumOfPredictionsAndVariance - math.pow(predict(features), 2) + } + override protected def transformImpl(dataset: Dataset[_]): DataFrame = { val bcastModel = dataset.sparkSession.sparkContext.broadcast(this) @@ -191,23 +199,13 @@ class RandomForestRegressionModel private[ml] ( val predictions = predictUDF(col($(featuresCol))) output = dataset.withColumn($(predictionCol), predictions) - val varianceUDF = udf { (features: Any) => - val leafNodes = bcastModel.value.returnLeafNodes(features.asInstanceOf[Vector]) - leafNodes.map { leafNode => - leafNode.impurityStats.calculate() + math.pow(leafNode.prediction, 2) - }.sum / getNumTrees + val predictVarianceUDF = udf { (features: Any) => + bcastModel.value.predictVariance(features.asInstanceOf[Vector]) } - val variance = varianceUDF(col($(featuresCol))) - - output = output.withColumn($(varianceCol), variance - pow(predictions, 2)) + output = output.withColumn($(varianceCol), predictVarianceUDF(col($(featuresCol)))) output.toDF } - private def returnLeafNodes(features: Vector): Array[LeafNode] = { - // Return the leaf nodes of each forest. - _trees.map(_.rootNode.predictImpl(features)) - } - override protected def predict(features: Vector): Double = { // TODO: When we add a generic Bagging class, handle transform there. SPARK-7128 // Predict average of tree predictions. From 505e3c289f19f5d04f9fd51d8f5050822f155738 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 19 Aug 2016 11:39:41 -0700 Subject: [PATCH 5/5] Addresses @yanboliang 's comments --- .../ml/regression/RandomForestRegressor.scala | 7 +++---- .../regression/RandomForestRegressorSuite.scala | 15 +++++++-------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index d6bac66a5b8c3..2b3ed080581d7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -95,7 +95,7 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S super.setFeatureSubsetStrategy(value) @Since("2.1.0") - /** @group getParam */ + /** @group setParam */ def setVarianceCol(value: String): this.type = set(varianceCol, value) override protected def train(dataset: Dataset[_]): RandomForestRegressionModel = { @@ -174,7 +174,7 @@ class RandomForestRegressionModel private[ml] ( private lazy val _treeWeights: Array[Double] = Array.fill[Double](_trees.length)(1.0) @Since("2.1.0") - /** @group getParam */ + /** @group setParam */ def setVarianceCol(value: String): this.type = set(varianceCol, value) @Since("1.4.0") @@ -202,8 +202,7 @@ class RandomForestRegressionModel private[ml] ( val predictVarianceUDF = udf { (features: Any) => bcastModel.value.predictVariance(features.asInstanceOf[Vector]) } - output = output.withColumn($(varianceCol), predictVarianceUDF(col($(featuresCol)))) - output.toDF + output.withColumn($(varianceCol), predictVarianceUDF(col($(featuresCol)))).toDF } override protected def predict(features: Vector): Double = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala index df550f19c2d8f..a19229c82fca2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala @@ -135,7 +135,7 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex val dtModel = dt.fit(df) val dtVariances = dtModel.transform(df).select("variance").collect() val nSamples = dtVariances.size - (0 to nSamples - 1).foreach { i => + (0 until nSamples).foreach { i => assert(rfVariances(i).getDouble(0) ~== dtVariances(i).getDouble(0) absTol 1e-6) } @@ -143,15 +143,14 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex rf.setNumTrees(20) val rfNewModel = rf.fit(df) val results = rfNewModel.transform(df).select("features", "variance").collect() - val features = col("features") val trees = rfNewModel.trees val numTrees = rfNewModel.getNumTrees - results.map { case Row(features: Vector, variance: Double) => - val rootNodes = trees.map(_.rootNode.predictImpl(features)) - val predsquared = rootNodes.map(x => math.pow(x.prediction, 2)).sum / numTrees - val treeVariance = rootNodes.map(_.impurityStats.calculate()).sum / numTrees - val predictions = rootNodes.map(_.prediction).sum / numTrees - val expectedVariance = -math.pow(predictions, 2) + treeVariance + predsquared + results.foreach { case Row(features: Vector, variance: Double) => + val predEachTree = trees.map(_.rootNode.predictImpl(features)) + val avgPredSquared = predEachTree.map(x => math.pow(x.prediction, 2)).sum / numTrees + val treeVariance = predEachTree.map(_.impurityStats.calculate()).sum / numTrees + val avgPrediction = predEachTree.map(_.prediction).sum / numTrees + val expectedVariance = -math.pow(avgPrediction, 2) + treeVariance + avgPredSquared assert(variance === expectedVariance, s"Expected variance $expectedVariance but got $variance.") }