From a4c4ff190235489c080fa69b62264fe73fbc4833 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 3 Nov 2017 18:54:12 +0100 Subject: [PATCH 1/7] initial impl --- .../ml/evaluation/ClusteringEvaluator.scala | 128 +++++++++++++++++- 1 file changed, 122 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index d6ec5223237bb..ff9855632c629 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -24,6 +24,7 @@ import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions.{avg, col, udf} import org.apache.spark.sql.types.DoubleType @@ -66,14 +67,14 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str /** * param for metric name in evaluation - * (supports `"silhouette"` (default)) + * (supports `"silhouette"` (default), "calinski-harabasz") * @group param */ @Since("2.3.0") val metricName: Param[String] = { - val allowedParams = ParamValidators.inArray(Array("silhouette")) - new Param( - this, "metricName", "metric name in evaluation (silhouette)", allowedParams) + val allowedParams = ParamValidators.inArray(Array("silhouette", "calinski-harabasz")) + new Param(this, "metricName", "metric name in evaluation (silhouette, calinski-harabasz)", + allowedParams) } /** @group getParam */ @@ -94,8 +95,9 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str $(metricName) match { case "silhouette" => SquaredEuclideanSilhouette.computeSilhouetteScore( - dataset, $(predictionCol), $(featuresCol) - ) + dataset, $(predictionCol), $(featuresCol)) + case "calinski-harabasz" => + CalinskiHarabasz.computeScore(dataset, ${predictionCol}, ${featuresCol}) } } } @@ -434,3 +436,117 @@ private[evaluation] object SquaredEuclideanSilhouette { silhouetteScore } } + +private[evaluation] object CalinskiHarabasz { + + def computeScore( + dataset: Dataset[_], + predictionCol: String, + featuresCol: String): Double = { + val predictionAndFeaturesDf = dataset.select( + col(predictionCol).cast(DoubleType), col(featuresCol)) + val predictionAndFeaturesRDD = predictionAndFeaturesDf.rdd + .map { row => (row.getDouble(0), row.getAs[Vector](1)) } + + val numFeatures = dataset.select(col(featuresCol)).first().getAs[Vector](0).size + + val featureSumsSquaredNormSumsAndNumOfPoints = computeFeatureSumsSquaredNormSumsAndNumOfPoints( + predictionAndFeaturesRDD, + predictionCol, featuresCol, numFeatures) + + val (datasetCenter, datasetFeatureSums, datasetNumOfPoints) = + computeDatasetCenterDatasetFeatureSumsAndNumOfPoints( + featureSumsSquaredNormSumsAndNumOfPoints, numFeatures) + + val clustersCenters = featureSumsSquaredNormSumsAndNumOfPoints.mapValues { + case (featureSum, _, numOfPoints) => + BLAS.scal(1 / numOfPoints, featureSum) + featureSum + } + + val datasetSquaredNormSum = featureSumsSquaredNormSumsAndNumOfPoints.map { + case (_, (_, clusterSquaredNormSum, _)) => clusterSquaredNormSum + }.sum + + val tss = totalSumOfSquares(datasetCenter, datasetNumOfPoints, datasetFeatureSums, + datasetSquaredNormSum) + val SSW = withinClusterVariance(predictionAndFeaturesRDD, clustersCenters) + // SSB is the overall between-cluster variance + val SSB = tss - SSW + val numOfClusters = clustersCenters.size + + SSB / SSW * (datasetNumOfPoints - numOfClusters) / (numOfClusters -1) + } + + def computeFeatureSumsSquaredNormSumsAndNumOfPoints( + predictionAndFeaturesRDD: RDD[(Double, Vector)], + predictionCol: String, + featuresCol: String, + numFeatures: Int): Map[Double, (Vector, Double, Long)] = { + val featureSumsSquaredNormSumsAndNumOfPoints = predictionAndFeaturesRDD + .aggregateByKey[(DenseVector, Double, Long)]((Vectors.zeros(numFeatures).toDense, 0.0, 0L))( + seqOp = { + case ((featureSum: DenseVector, sumOfSquaredNorm: Double, numOfPoints: Long), + (features)) => + BLAS.axpy(1.0, features, featureSum) + val squaredNorm = math.pow(Vectors.norm(features, 2.0), 2.0) + (featureSum, squaredNorm + sumOfSquaredNorm, numOfPoints + 1) + }, + combOp = { + case ((featureSum1, sumOfSquaredNorm1, numOfPoints1), + (featureSum2, sumOfSquaredNorm2, numOfPoints2)) => + BLAS.axpy(1.0, featureSum2, featureSum1) + (featureSum1, sumOfSquaredNorm1 + sumOfSquaredNorm2, numOfPoints1 + numOfPoints2) + } + ) + + featureSumsSquaredNormSumsAndNumOfPoints + .collectAsMap() + .toMap + } + + def computeDatasetCenterDatasetFeatureSumsAndNumOfPoints( + featureSumsSquaredNormSumsAndNumOfPoints: Map[Double, (Vector, Double, Long)], + numFeatures: Int): (Vector, Vector, Long) = { + val (featureSums, datasetNumOfPoints) = featureSumsSquaredNormSumsAndNumOfPoints + .aggregate((Vectors.zeros(numFeatures).toDense, 0L))( + seqop = { + case ((featureSum: DenseVector, numOfPoints: Long), + (_: Double, (clusterFeatureSum: DenseVector, _: Double, clusterNumOfPoints: Long))) => + BLAS.axpy(1.0, clusterFeatureSum, featureSum) + (featureSum, clusterNumOfPoints + numOfPoints) + }, + combop = { + case ((featureSum1, numOfPoints1), (featureSum2, numOfPoints2)) => + BLAS.axpy(1.0, featureSum2, featureSum1) + (featureSum1, numOfPoints1 + numOfPoints2) + } + ) + val datasetCenter = featureSums.copy + BLAS.scal(1 / datasetNumOfPoints, datasetCenter) + (datasetCenter, featureSums, datasetNumOfPoints) + } + + def totalSumOfSquares( + datasetCenter: Vector, + datasetNumOfPoints: Long, + datasetFeatureSums: Vector, + datasetSquaredNormSum: Double): Double = { + val datasetCenterSquaredNorm = math.pow(Vectors.norm(datasetCenter, 2.0), 2.0) + + datasetSquaredNormSum + datasetNumOfPoints * datasetCenterSquaredNorm - + 2 * BLAS.dot(datasetFeatureSums, datasetCenter) + } + + def withinClusterVariance( + predictionAndFeaturesRDD: RDD[(Double, Vector)], + clusterCenters: Map[Double, Vector]): Double = { + val bClusterCenters = predictionAndFeaturesRDD.sparkContext.broadcast(clusterCenters) + val withinClusterVariance = predictionAndFeaturesRDD.map { + case (clusterId, features) => + Vectors.sqdist(features, bClusterCenters.value(clusterId)) + }.sum() + bClusterCenters.destroy() + withinClusterVariance + } +} From 95a6e4ec6743536a7a1c0f9effd3628cd2b1adef Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 5 Nov 2017 12:07:04 +0100 Subject: [PATCH 2/7] adding doc --- .../ml/evaluation/ClusteringEvaluator.scala | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index ff9855632c629..8c2b5e015b1f4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -437,6 +437,83 @@ private[evaluation] object SquaredEuclideanSilhouette { } } + +/** + * [[CalinskiHarabasz]] computes Calinski-Harabasz index. + * + * This implementation differs slightly from the proposed one, to better fit in a big data + * environment. + * Indeed, Calinski-Harabasz is defined as: + * + *
+ * $$ + * \frac{SSB}{SSW} * \frac{N - k}{k -1} + * $$ + *
+ * + * where `SSB` is the overall between-cluster variance, SSW is the overall within cluster + * variance, `N` is the number of points in the dataset and `k` is the number of clusters. + * + * In the original implementation, `SSW` is the sum of the squared distance between each point and + * the centroid of its cluster and `SSB` is computed as the sum of the squared distance between + * each point and the centroid of the whole datates (defined as the total sum of squares, `tss`) + * minus `SSW`. + * Here `SSW` and `SSB` are computed in the same way, but `tss` in a slightly different one. + * Indeed, we can write it as: + * + *
+ * $$ + * tss = \sum\limits_{i=1}^N (X_{i} - C)^2 = + * \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{ij}-c_{j})^2 \Big) + * = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{ij}^2 + + * \sum\limits_{j=1}^D c_{j}^2 -2\sum\limits_{j=1}^D x_{ij}c_{j} \Big) + * = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{ij}^2 + + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{j}^2 + * -2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{ij}c_{j} + * $$ + *
+ * + * In the last formula we can notice that: + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{ij}^2 = \sum\limits_{i=1}^N \|X_{i}\|^2 + * $$ + *
+ * + * ie. the first element is the sum of the squared norm of all the points in the dataset, and + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{j}^2 = N \|C\|^2 + * $$ + *
+ * + * ie. the second element is `N` multiplied by the squared norm of detaset's centroid, and, if we + * define `Y` as the vector which is the element-wise sum of all the points in the cluster, then + * the third and last element becomes + * + *
+ * $$ + * 2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{ij}c_{j} = 2 Y \cdot C + * $$ + *
+ * + * Thus, `tss` becomes: + * + *
+ * $$ + * tss = \sum\limits_{i=1}^N \|X_{i}\|^2 + N \|C\|^2 - 2 Y \cdot C + * $$ + *
+ * + * where `$\sum\limits_{i=1}^N \|X_{i}\|^2$` and `Y` are precomputed with a single pass on the + * dataset. + * + * @see + * T. Calinski and J. Harabasz, 1974. “A dendrite method for cluster analysis”. + * Communications in Statistics + */ private[evaluation] object CalinskiHarabasz { def computeScore( From ca7797f3c73678b316c7630639826952910f5b8e Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 6 Nov 2017 10:58:25 +0100 Subject: [PATCH 3/7] added ut --- .../evaluation/ClusteringEvaluatorSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index e60ebbd7c852d..f4d47dac0bc9c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -74,6 +74,26 @@ class ClusteringEvaluatorSuite assert(e.getMessage.contains("Number of clusters must be greater than one")) } + /* + Use the following python code to load the data and evaluate it using scikit-learn package. + + from sklearn import datasets + from sklearn.metrics import calinski_harabaz_score + iris = datasets.load_iris() + round(calinski_harabaz_score(iris.data, iris.target, metric='sqeuclidean'), 10) + + 486.3208393186 + */ + test("Calinski-Harabasz") { + val iris = ClusteringEvaluatorSuite.irisDataset(spark) + val evaluator = new ClusteringEvaluator() + .setFeaturesCol("features") + .setPredictionCol("label") + .setMetricName("calinski-harabasz") + + assert(evaluator.evaluate(iris) ~== 486.3208393186 relTol 1e-5) + } + } object ClusteringEvaluatorSuite { From 8a8d016599b3e940934cc4a53d93cbfa081f6279 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 6 Nov 2017 10:58:48 +0100 Subject: [PATCH 4/7] fixes --- .../apache/spark/ml/evaluation/ClusteringEvaluator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index 8c2b5e015b1f4..fc1c9ed5d3c86 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -537,9 +537,9 @@ private[evaluation] object CalinskiHarabasz { val clustersCenters = featureSumsSquaredNormSumsAndNumOfPoints.mapValues { case (featureSum, _, numOfPoints) => - BLAS.scal(1 / numOfPoints, featureSum) + BLAS.scal(1 / numOfPoints.toDouble, featureSum) featureSum - } + }.map(identity) // required by https://issues.scala-lang.org/browse/SI-7005 val datasetSquaredNormSum = featureSumsSquaredNormSumsAndNumOfPoints.map { case (_, (_, clusterSquaredNormSum, _)) => clusterSquaredNormSum @@ -600,7 +600,7 @@ private[evaluation] object CalinskiHarabasz { } ) val datasetCenter = featureSums.copy - BLAS.scal(1 / datasetNumOfPoints, datasetCenter) + BLAS.scal(1 / datasetNumOfPoints.toDouble, datasetCenter) (datasetCenter, featureSums, datasetNumOfPoints) } From 633b21d2762877c89486b9b9e2e2ec029830bfaa Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 6 Nov 2017 11:53:49 +0100 Subject: [PATCH 5/7] minor --- .../apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index f4d47dac0bc9c..ae223fd01773f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -23,7 +23,6 @@ import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.types.IntegerType class ClusteringEvaluatorSuite From 3e5e8c5f9c24c0185802f031f80de57b0d79c84c Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 6 Nov 2017 14:26:48 +0100 Subject: [PATCH 6/7] fix copy and paste issue --- .../org/apache/spark/ml/evaluation/ClusteringEvaluator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index fc1c9ed5d3c86..eb5d3167518f2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -511,7 +511,7 @@ private[evaluation] object SquaredEuclideanSilhouette { * dataset. * * @see - * T. Calinski and J. Harabasz, 1974. “A dendrite method for cluster analysis”. + * T. Calinski and J. Harabasz, 1974. "A dendrite method for cluster analysis". * Communications in Statistics */ private[evaluation] object CalinskiHarabasz { From 05c77b2d8af86554e0adfc0cac0723eae3922547 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 10 Nov 2017 13:05:38 +0100 Subject: [PATCH 7/7] fix merge error --- .../apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index 46935db2c1dce..79f05ac17f5bf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -89,13 +89,12 @@ class ClusteringEvaluatorSuite 486.3208393186 */ test("Calinski-Harabasz") { - val iris = ClusteringEvaluatorSuite.irisDataset(spark) val evaluator = new ClusteringEvaluator() .setFeaturesCol("features") .setPredictionCol("label") .setMetricName("calinski-harabasz") - assert(evaluator.evaluate(iris) ~== 486.3208393186 relTol 1e-5) + assert(evaluator.evaluate(irisDataset) ~== 486.3208393186 relTol 1e-5) } }