From 5440dc598cf973f83fec1e0356325ebd6d762430 Mon Sep 17 00:00:00 2001 From: yuhaoyang Date: Mon, 12 Oct 2015 00:13:44 -0700 Subject: [PATCH 1/3] add computeCost to ml.Kmeans --- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index f40ab71fb22a6..b20eae9218112 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -24,6 +24,7 @@ import org.apache.spark.ml.util.{Identifiable, SchemaUtils} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.{DataFrame, Row} @@ -117,6 +118,10 @@ class KMeansModel private[ml] ( @Since("1.5.0") def clusterCenters: Array[Vector] = parentModel.clusterCenters + + // TODO: Replace the temp fix until we have proper evaluators defined for clustering. + @Since("1.6.0") + def computeCost(data: RDD[Vector]): Double = parentModel.computeCost(data) } /** From 738ab032cf7ad10c081d2199fe2855e4803ee585 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Mon, 12 Oct 2015 15:16:55 -0700 Subject: [PATCH 2/3] change to DataFrame and add ut --- .../scala/org/apache/spark/ml/clustering/KMeans.scala | 8 ++++++-- .../org/apache/spark/ml/clustering/KMeansSuite.scala | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index b20eae9218112..e48af39ef7d0c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -119,9 +119,13 @@ class KMeansModel private[ml] ( @Since("1.5.0") def clusterCenters: Array[Vector] = parentModel.clusterCenters - // TODO: Replace the temp fix until we have proper evaluators defined for clustering. + // TODO: Replace the temp fix when we have proper evaluators defined for clustering. @Since("1.6.0") - def computeCost(data: RDD[Vector]): Double = parentModel.computeCost(data) + def computeCost(dataset: DataFrame): Double = { + SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) + val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point } + parentModel.computeCost(data) + } } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 688b0e31f91dc..c05f90550d161 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -104,5 +104,6 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet assert(clusters.size === k) assert(clusters === Set(0, 1, 2, 3, 4)) + assert(model.computeCost(dataset) < 0.1) } } From cff3d5a3333dd8fb10490cd744d26143a9311d8a Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Fri, 16 Oct 2015 15:20:28 -0700 Subject: [PATCH 3/3] add comments --- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index e48af39ef7d0c..509be63002396 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -24,7 +24,6 @@ import org.apache.spark.ml.util.{Identifiable, SchemaUtils} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.{DataFrame, Row} @@ -119,6 +118,10 @@ class KMeansModel private[ml] ( @Since("1.5.0") def clusterCenters: Array[Vector] = parentModel.clusterCenters + /** + * Return the K-means cost (sum of squared distances of points to their nearest center) for this + * model on the given data. + */ // TODO: Replace the temp fix when we have proper evaluators defined for clustering. @Since("1.6.0") def computeCost(dataset: DataFrame): Double = {