[Spark-23975][ML] Add support of array input for all clustering methods#21195
[Spark-23975][ML] Add support of array input for all clustering methods#21195lu-wang-dl wants to merge 6 commits intoapache:masterfrom
Conversation
|
Looking now. |
|
add to whitelist |
|
Test build #89981 has finished for PR 21195 at commit
|
|
Rerunning tests in case the R CRAN failure was from flakiness |
|
Test build #4166 has finished for PR 21195 at commit
|
|
Thanks Lu! I had a pass over this PR and it looks pretty straightforward. One thing I noticed is that there are two patterns that we keep repeating. I think we should add private APIs for these patterns and delegate to those. The first pattern is the validate schema method defined in terms of typeCandidates. I suggest we add something like The second pattern is going from a dataframe & column name to an rdd[OldVector]. Lets add a method that does this, maybe something like |
|
Test build #90160 has finished for PR 21195 at commit
|
mengxr
left a comment
There was a problem hiding this comment.
In general, we should keep each PR minimal. First implement Array support for one estimator, get it reviewed and merged, and then implement support for other estimators. If other estimators share exactly the same pattern, we may put the rest in a single PR. But it is totally fine if we split them into multiple PRs. This helps avoid unnecessary code refactoring during code review.
|
|
||
| /** | ||
| * Check whether the given column in the schema is one of the supporting vector type: Vector, | ||
| * Array[Dloat]. Array[Double] |
| val featuresColNameF = "array_float_features" | ||
| val doubleUDF = udf { (features: Vector) => | ||
| val featureArray = Array.fill[Double](features.size)(0.0) | ||
| features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) |
There was a problem hiding this comment.
- If
.toFloatis to keep the same precision, we should leave an inline comment. features.toArray.map(_.toFloat.toDouble)should do the work.
| featureArray | ||
| } | ||
| val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) | ||
| .drop("features") |
There was a problem hiding this comment.
- Unnecessary to drop
features. Or you can simply replace the features column:
val newdatasetD = dataset.withColumn(FEATURES, doubleUDF(col(FEATURES)))| val transformedF = modelF.transform(newdatasetF) | ||
| val predictDifference = transformedD.select("prediction") | ||
| .except(transformedF.select("prediction")) | ||
| assert(predictDifference.count() == 0) |
There was a problem hiding this comment.
This only verifies it handles Array[Double] and Array[Float] the same way. But it doesn't guarantee that the result is correct. We can define a method that takes a dataset, apply one iteration, and return the cost.
def trainAndComputeCost(dataset: DataFrame): Double = {
val model = new BisectingKMeans()
.setK(k).setMaxIter(1).setSeed(1)
.fit(dataset)
model.computeCost(dataset)
}
val trueCost = trainAndComputeCost(dataset)
val floatArrayCost = trainAndComputeCost(newDatasetF)
assert(floatArrayCost === trueCost)
val doubleArrayCost = trainAndComputeCost(newDatasetD)
assert(doubleArrayCost === trueCost)We can map the original dataset to single precision to have exact match. Or we can test equality with a threshold. See https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala
| assert(predictDifference.count() == 0) | ||
| val probabilityDifference = transformedD.select("probability") | ||
| .except(transformedF.select("probability")) | ||
| assert(probabilityDifference.count() == 0) |
| val lpF = modelF.logPerplexity(newdatasetF) | ||
| // assert(lpD == lpF) | ||
| assert(lpD >= 0.0 && lpD != Double.NegativeInfinity) | ||
| assert(lpF >= 0.0 && lpF != Double.NegativeInfinity) |
|
Test build #90233 has finished for PR 21195 at commit
|
|
Test build #90335 has finished for PR 21195 at commit
|
| val floatLikelihood = trainAndComputlogLikelihood(newDatasetF) | ||
|
|
||
| // checking the cost is fine enough as a sanity check | ||
| assert(trueLikelihood == doubleLikelihood) |
There was a problem hiding this comment.
minor: should use === instead of == for assertions, the former gives a better error message. (not necessary to update this PR)
| } | ||
|
|
||
| val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) | ||
| val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset) |
There was a problem hiding this comment.
minor: the output are not used. I expect they will be used once we fixed SPARK-22210
There was a problem hiding this comment.
Yes. I want to use this as the base for the comparison after we fix SPARK-22210.
|
|
||
| /** | ||
| * Helper function for testing different input types for features. Given a DataFrame, generate | ||
| * three output DataFrames: one having vector feature column with float precision, one having |
There was a problem hiding this comment.
minor: should say features column to make the contract clear.
| val toFloatVectorUDF = udf { (features: Vector) => features.toArray.map(_.toFloat).toVector} | ||
| val toDoubleArrayUDF = udf { (features: Vector) => features.toArray} | ||
| val toFloatArrayUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} | ||
| val newDataset = dataset.withColumn("features", toFloatVectorUDF(col("features"))) |
There was a problem hiding this comment.
minor: maybe useful to define "features" as a constant at the beginning of the function
| val toDoubleArrayUDF = udf { (features: Vector) => features.toArray} | ||
| val toFloatArrayUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} | ||
| val newDataset = dataset.withColumn("features", toFloatVectorUDF(col("features"))) | ||
| val newDatasetD = dataset.withColumn("features", toDoubleArrayUDF(col("features"))) |
There was a problem hiding this comment.
This doesn't truncate the precision to single. Did you want to use newDataset instead of dataset?
|
Test build #90344 has finished for PR 21195 at commit
|
|
LGTM. Merged into master. Thanks! |
What changes were proposed in this pull request?
Add support for all of the clustering methods
How was this patch tested?
unit tests added
Please review http://spark.apache.org/contributing.html before opening a pull request.