Skip to content

Commit

Permalink
Fix random performance bug in CMS due to a Catalyst issue
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiach-msft authored and elibarzilay committed Feb 10, 2018
1 parent 8723e26 commit 0e79693
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
Expand Up @@ -280,8 +280,7 @@
"* Mean absolute error\n",
"\n",
"Use the `ComputeModelStatistics` API to compute basic statistics for\n",
"the Poisson and the Random Forest models. Note that these computations\n",
"can take a relatively long time, and they are therefore commented out."
"the Poisson and the Random Forest models."
]
},
{
Expand All @@ -290,11 +289,10 @@
"metadata": {},
"outputs": [],
"source": [
"# Uncomment the following to evaluate the PoissonRegressor using ComputeStatistics\n",
"# from mmlspark import ComputeModelStatistics\n",
"# poissonMetrics = ComputeModelStatistics().transform(poissonPrediction)\n",
"# print(\"Poisson Metrics\")\n",
"# poissonMetrics.toPandas()"
"from mmlspark import ComputeModelStatistics\n",
"poissonMetrics = ComputeModelStatistics().transform(poissonPrediction)\n",
"print(\"Poisson Metrics\")\n",
"poissonMetrics.toPandas()"
]
},
{
Expand All @@ -303,17 +301,16 @@
"metadata": {},
"outputs": [],
"source": [
"# and uncomment the following to evaluate the RandomForestRegressor\n",
"# randomForestMetrics = ComputeModelStatistics().transform(randomForestPrediction)\n",
"# print(\"Random Forest Metrics\")\n",
"# randomForestMetrics.toPandas()"
"randomForestMetrics = ComputeModelStatistics().transform(randomForestPrediction)\n",
"print(\"Random Forest Metrics\")\n",
"randomForestMetrics.toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Instead of the above, we evaluate some predictions with `poissonPrediction`:"
"We can also compute per instance statistics for `poissonPrediction`:"
]
},
{
Expand Down
Expand Up @@ -257,7 +257,10 @@ class ComputeModelStatistics(override val uid: String) extends Transformer with
private def selectAndCastToDF(dataset: Dataset[_],
predictionColumnName: String,
labelColumnName: String): DataFrame = {
// TODO: We call cache in order to avoid a bug with catalyst where CMS seems to get stuck in a loop
// For future spark upgrade past 2.2.0, we should try to see if the cache() call can be removed
dataset.select(col(predictionColumnName), col(labelColumnName).cast(DoubleType))
.cache()
.na
.drop(Array(predictionColumnName, labelColumnName))
}
Expand All @@ -278,7 +281,10 @@ class ComputeModelStatistics(override val uid: String) extends Transformer with
scoredLabelsColumnName: String,
levelsToIndexMap: Map[Any, Double]): RDD[(Double, Double)] = {
// Calculate confusion matrix and output it as DataFrame
// TODO: We call cache in order to avoid a bug with catalyst where CMS seems to get stuck in a loop
// For future spark upgrade past 2.2.0, we should try to see if the cache() call can be removed
dataset.select(col(scoredLabelsColumnName), col(labelColumnName))
.cache()
.na
.drop(Array(scoredLabelsColumnName, labelColumnName))
.rdd
Expand All @@ -303,7 +309,10 @@ class ComputeModelStatistics(override val uid: String) extends Transformer with
labelColumnName: String,
scoresColumnName: String,
levelsToIndexMap: Map[Any, Double]): RDD[(Double, Double)] = {
// TODO: We call cache in order to avoid a bug with catalyst where CMS seems to get stuck in a loop
// For future spark upgrade past 2.2.0, we should try to see if the cache() call can be removed
dataset.select(col(scoresColumnName), col(labelColumnName))
.cache()
.na
.drop(Array(scoresColumnName, labelColumnName))
.rdd
Expand Down
Expand Up @@ -3,6 +3,7 @@

package com.microsoft.ml.spark

import com.microsoft.ml.spark.FileUtilities.File
import com.microsoft.ml.spark.TrainRegressorTestUtilities._
import com.microsoft.ml.spark.TrainClassifierTestUtilities._
import com.microsoft.ml.spark.metrics.MetricConstants
Expand All @@ -11,6 +12,7 @@ import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.FastVectorAssembler
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.regression.GeneralizedLinearRegression
import org.apache.spark.ml.util.MLReadable
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -100,6 +102,23 @@ class VerifyComputeModelStatistics extends TransformerFuzzing[ComputeModelStatis
assert(firstRow.getDouble(3) === 0.0)
}

test("Verify compute model statistics does not get stuck in a loop in catalyst") {
val name = "AutomobilePriceRaw.csv"
val filePath = new File(s"${sys.env("DATASETS_HOME")}/MissingValuesRegression/Train/", name)
val dataset =
session.read.option("header", "true").option("inferSchema", "true")
.option("nullValue", "?")
.option("treatEmptyValuesAsNulls", "true")
.option("delimiter", ",")
.csv(filePath.toString)
val glr = new GeneralizedLinearRegression().setFamily("poisson").setLink("log")
val tr = new TrainRegressor().setModel(glr).setLabelCol("price").setNumFeatures(256)
val model = tr.fit(dataset)
val prediction = model.transform(dataset)
val evaluatedData = new ComputeModelStatistics().transform(prediction)
assert(evaluatedData.collect()(0)(2).asInstanceOf[Double] === 0.977733)
}

test("Smoke test to train regressor, score and evaluate on a dataset using all three modules") {
val dataset = session.createDataFrame(Seq(
(0, 2, 0.50, 0.60, 0),
Expand Down

0 comments on commit 0e79693

Please sign in to comment.