From d0fe433b5ee7ae3ba41838dd30b5dbe9f6c4e713 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 24 May 2020 08:42:42 -0500 Subject: [PATCH] [SPARK-31768][ML] add getMetrics in Evaluators ### What changes were proposed in this pull request? add getMetrics in Evaluators to get the corresponding Metrics instance, so users can use it to get any of the metrics scores. For example: ``` val trainer = new LinearRegression val model = trainer.fit(dataset) val predictions = model.transform(dataset) val evaluator = new RegressionEvaluator() val metrics = evaluator.getMetrics(predictions) val rmse = metrics.rootMeanSquaredError val r2 = metrics.r2 val mae = metrics.meanAbsoluteError val variance = metrics.explainedVariance ``` ### Why are the changes needed? Currently, Evaluator.evaluate only access to one metrics, but most users may need to get multiple metrics. This PR adds getMetrics in all the Evaluators, so users can use it to get an instance of the corresponding Metrics to get any of the metrics they want. ### Does this PR introduce _any_ user-facing change? Yes. Add getMetrics in Evaluators. For example: ``` /** * Get a RegressionMetrics, which can be used to get any of the regression * metrics such as rootMeanSquaredError, meanSquaredError, etc. * * param dataset a dataset that contains labels/observations and predictions. * return RegressionMetrics */ Since("3.1.0") def getMetrics(dataset: Dataset[_]): RegressionMetrics ``` ### How was this patch tested? Add new unit tests Closes #28590 from huaxingao/getMetrics. Authored-by: Huaxin Gao Signed-off-by: Sean Owen --- .../BinaryClassificationEvaluator.scala | 26 +- .../ml/evaluation/ClusteringEvaluator.scala | 559 +---------------- .../ml/evaluation/ClusteringMetrics.scala | 575 ++++++++++++++++++ .../MulticlassClassificationEvaluator.scala | 54 +- .../MultilabelClassificationEvaluator.scala | 36 +- .../ml/evaluation/RankingEvaluator.scala | 28 +- .../ml/evaluation/RegressionEvaluator.scala | 28 +- .../mllib/evaluation/MulticlassMetrics.scala | 3 +- .../BinaryClassificationEvaluatorSuite.scala | 23 + .../evaluation/ClusteringEvaluatorSuite.scala | 16 + ...lticlassClassificationEvaluatorSuite.scala | 29 + ...ltilabelClassificationEvaluatorSuite.scala | 48 ++ .../ml/evaluation/RankingEvaluatorSuite.scala | 38 ++ .../evaluation/RegressionEvaluatorSuite.scala | 33 + 14 files changed, 905 insertions(+), 591 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 82b8e14f010af..fac4d92b1810c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -98,6 +98,24 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va @Since("2.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + val metric = $(metricName) match { + case "areaUnderROC" => metrics.areaUnderROC() + case "areaUnderPR" => metrics.areaUnderPR() + } + metrics.unpersist() + metric + } + + /** + * Get a BinaryClassificationMetrics, which can be used to get binary classification + * metrics such as areaUnderROC and areaUnderPR. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return BinaryClassificationMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): BinaryClassificationMetrics = { val schema = dataset.schema SchemaUtils.checkColumnTypes(schema, $(rawPredictionCol), Seq(DoubleType, new VectorUDT)) SchemaUtils.checkNumericType(schema, $(labelCol)) @@ -119,13 +137,7 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va case Row(rawPrediction: Double, label: Double, weight: Double) => (rawPrediction, label, weight) } - val metrics = new BinaryClassificationMetrics(scoreAndLabelsWithWeights, $(numBins)) - val metric = $(metricName) match { - case "areaUnderROC" => metrics.areaUnderROC() - case "areaUnderPR" => metrics.areaUnderPR() - } - metrics.unpersist() - metric + new BinaryClassificationMetrics(scoreAndLabelsWithWeights, $(numBins)) } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index 641a1eb5f61db..63b99a0de4b65 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -17,16 +17,12 @@ package org.apache.spark.ml.evaluation -import org.apache.spark.SparkContext import org.apache.spark.annotation.Since -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors} import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} import org.apache.spark.ml.util._ -import org.apache.spark.sql.{Column, DataFrame, Dataset} -import org.apache.spark.sql.functions.{avg, col, udf} -import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.functions.col /** * Evaluator for clustering results. @@ -102,22 +98,33 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str @Since("2.3.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + + $(metricName) match { + case ("silhouette") => metrics.silhouette + case (other) => + throw new IllegalArgumentException(s"No support for metric $other") + } + } + + /** + * Get a ClusteringMetrics, which can be used to get clustering metrics such as + * silhouette score. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return ClusteringMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): ClusteringMetrics = { SchemaUtils.validateVectorCompatibleColumn(dataset.schema, $(featuresCol)) SchemaUtils.checkNumericType(dataset.schema, $(predictionCol)) val vectorCol = DatasetUtils.columnToVector(dataset, $(featuresCol)) val df = dataset.select(col($(predictionCol)), vectorCol.as($(featuresCol), dataset.schema($(featuresCol)).metadata)) - - ($(metricName), $(distanceMeasure)) match { - case ("silhouette", "squaredEuclidean") => - SquaredEuclideanSilhouette.computeSilhouetteScore( - df, $(predictionCol), $(featuresCol)) - case ("silhouette", "cosine") => - CosineSilhouette.computeSilhouetteScore(df, $(predictionCol), $(featuresCol)) - case (mn, dm) => - throw new IllegalArgumentException(s"No support for metric $mn, distance $dm") - } + val metrics = new ClusteringMetrics(df) + metrics.setDistanceMeasure($(distanceMeasure)) + metrics } @Since("3.0.0") @@ -136,523 +143,3 @@ object ClusteringEvaluator override def load(path: String): ClusteringEvaluator = super.load(path) } - - -private[evaluation] abstract class Silhouette { - - /** - * It computes the Silhouette coefficient for a point. - */ - def pointSilhouetteCoefficient( - clusterIds: Set[Double], - pointClusterId: Double, - pointClusterNumOfPoints: Long, - averageDistanceToCluster: (Double) => Double): Double = { - if (pointClusterNumOfPoints == 1) { - // Single-element clusters have silhouette 0 - 0.0 - } else { - // Here we compute the average dissimilarity of the current point to any cluster of which the - // point is not a member. - // The cluster with the lowest average dissimilarity - i.e. the nearest cluster to the current - // point - is said to be the "neighboring cluster". - val otherClusterIds = clusterIds.filter(_ != pointClusterId) - val neighboringClusterDissimilarity = otherClusterIds.map(averageDistanceToCluster).min - // adjustment for excluding the node itself from the computation of the average dissimilarity - val currentClusterDissimilarity = - averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints / - (pointClusterNumOfPoints - 1) - if (currentClusterDissimilarity < neighboringClusterDissimilarity) { - 1 - (currentClusterDissimilarity / neighboringClusterDissimilarity) - } else if (currentClusterDissimilarity > neighboringClusterDissimilarity) { - (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1 - } else { - 0.0 - } - } - } - - /** - * Compute the mean Silhouette values of all samples. - */ - def overallScore(df: DataFrame, scoreColumn: Column): Double = { - df.select(avg(scoreColumn)).collect()(0).getDouble(0) - } -} - -/** - * SquaredEuclideanSilhouette computes the average of the - * Silhouette over all the data of the dataset, which is - * a measure of how appropriately the data have been clustered. - * - * The Silhouette for each point `i` is defined as: - * - *
- * $$ - * s_{i} = \frac{b_{i}-a_{i}}{max\{a_{i},b_{i}\}} - * $$ - *
- * - * which can be rewritten as - * - *
- * $$ - * s_{i}= \begin{cases} - * 1-\frac{a_{i}}{b_{i}} & \text{if } a_{i} \leq b_{i} \\ - * \frac{b_{i}}{a_{i}}-1 & \text{if } a_{i} \gt b_{i} \end{cases} - * $$ - *
- * - * where `$a_{i}$` is the average dissimilarity of `i` with all other data - * within the same cluster, `$b_{i}$` is the lowest average dissimilarity - * of `i` to any other cluster, of which `i` is not a member. - * `$a_{i}$` can be interpreted as how well `i` is assigned to its cluster - * (the smaller the value, the better the assignment), while `$b_{i}$` is - * a measure of how well `i` has not been assigned to its "neighboring cluster", - * ie. the nearest cluster to `i`. - * - * Unfortunately, the naive implementation of the algorithm requires to compute - * the distance of each couple of points in the dataset. Since the computation of - * the distance measure takes `D` operations - if `D` is the number of dimensions - * of each point, the computational complexity of the algorithm is `O(N^2^*D)`, where - * `N` is the cardinality of the dataset. Of course this is not scalable in `N`, - * which is the critical number in a Big Data context. - * - * The algorithm which is implemented in this object, instead, is an efficient - * and parallel implementation of the Silhouette using the squared Euclidean - * distance measure. - * - * With this assumption, the total distance of the point `X` - * to the points `$C_{i}$` belonging to the cluster `$\Gamma$` is: - * - *
- * $$ - * \sum\limits_{i=1}^N d(X, C_{i} ) = - * \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{j}-c_{ij})^2 \Big) - * = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{j}^2 + - * \sum\limits_{j=1}^D c_{ij}^2 -2\sum\limits_{j=1}^D x_{j}c_{ij} \Big) - * = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 + - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 - * -2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} - * $$ - *
- * - * where `$x_{j}$` is the `j`-th dimension of the point `X` and - * `$c_{ij}$` is the `j`-th dimension of the `i`-th point in cluster `$\Gamma$`. - * - * Then, the first term of the equation can be rewritten as: - * - *
- * $$ - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 = N \xi_{X} \text{ , - * with } \xi_{X} = \sum\limits_{j=1}^D x_{j}^2 - * $$ - *
- * - * where `$\xi_{X}$` is fixed for each point and it can be precomputed. - * - * Moreover, the second term is fixed for each cluster too, - * thus we can name it `$\Psi_{\Gamma}$` - * - *
- * $$ - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 = - * \sum\limits_{i=1}^N \xi_{C_{i}} = \Psi_{\Gamma} - * $$ - *
- * - * Last, the third element becomes - * - *
- * $$ - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} = - * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} - * $$ - *
- * - * thus defining the vector - * - *
- * $$ - * Y_{\Gamma}:Y_{\Gamma j} = \sum\limits_{i=1}^N c_{ij} , j=0, ..., D - * $$ - *
- * - * which is fixed for each cluster `$\Gamma$`, we have - * - *
- * $$ - * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} = - * \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} - * $$ - *
- * - * In this way, the previous equation becomes - * - *
- * $$ - * N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} - * $$ - *
- * - * and the average distance of a point to a cluster can be computed as - * - *
- * $$ - * \frac{\sum\limits_{i=1}^N d(X, C_{i} )}{N} = - * \frac{N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} = - * \xi_{X} + \frac{\Psi_{\Gamma} }{N} - 2 \frac{\sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} - * $$ - *
- * - * Thus, it is enough to precompute: the constant `$\xi_{X}$` for each point `X`; the - * constants `$\Psi_{\Gamma}$`, `N` and the vector `$Y_{\Gamma}$` for - * each cluster `$\Gamma$`. - * - * In the implementation, the precomputed values for the clusters - * are distributed among the worker nodes via broadcasted variables, - * because we can assume that the clusters are limited in number and - * anyway they are much fewer than the points. - * - * The main strengths of this algorithm are the low computational complexity - * and the intrinsic parallelism. The precomputed information for each point - * and for each cluster can be computed with a computational complexity - * which is `O(N/W)`, where `N` is the number of points in the dataset and - * `W` is the number of worker nodes. After that, every point can be - * analyzed independently of the others. - * - * For every point we need to compute the average distance to all the clusters. - * Since the formula above requires `O(D)` operations, this phase has a - * computational complexity which is `O(C*D*N/W)` where `C` is the number of - * clusters (which we assume quite low), `D` is the number of dimensions, - * `N` is the number of points in the dataset and `W` is the number - * of worker nodes. - */ -private[evaluation] object SquaredEuclideanSilhouette extends Silhouette { - - private[this] var kryoRegistrationPerformed: Boolean = false - - /** - * This method registers the class - * [[org.apache.spark.ml.evaluation.SquaredEuclideanSilhouette.ClusterStats]] - * for kryo serialization. - * - * @param sc `SparkContext` to be used - */ - def registerKryoClasses(sc: SparkContext): Unit = { - if (!kryoRegistrationPerformed) { - sc.getConf.registerKryoClasses( - Array( - classOf[SquaredEuclideanSilhouette.ClusterStats] - ) - ) - kryoRegistrationPerformed = true - } - } - - case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long) - - /** - * The method takes the input dataset and computes the aggregated values - * about a cluster which are needed by the algorithm. - * - * @param df The DataFrame which contains the input data - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @param featuresCol The name of the column which contains the feature vector of the point. - * @return A [[scala.collection.immutable.Map]] which associates each cluster id - * to a [[ClusterStats]] object (which contains the precomputed values `N`, - * `$\Psi_{\Gamma}$` and `$Y_{\Gamma}$` for a cluster). - */ - def computeClusterStats( - df: DataFrame, - predictionCol: String, - featuresCol: String): Map[Double, ClusterStats] = { - val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) - val clustersStatsRDD = df.select( - col(predictionCol).cast(DoubleType), col(featuresCol), col("squaredNorm")) - .rdd - .map { row => (row.getDouble(0), (row.getAs[Vector](1), row.getDouble(2))) } - .aggregateByKey[(DenseVector, Double, Long)]((Vectors.zeros(numFeatures).toDense, 0.0, 0L))( - seqOp = { - case ( - (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long), - (features, squaredNorm) - ) => - BLAS.axpy(1.0, features, featureSum) - (featureSum, squaredNormSum + squaredNorm, numOfPoints + 1) - }, - combOp = { - case ( - (featureSum1, squaredNormSum1, numOfPoints1), - (featureSum2, squaredNormSum2, numOfPoints2) - ) => - BLAS.axpy(1.0, featureSum2, featureSum1) - (featureSum1, squaredNormSum1 + squaredNormSum2, numOfPoints1 + numOfPoints2) - } - ) - - clustersStatsRDD - .collectAsMap() - .mapValues { - case (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long) => - SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, numOfPoints) - } - .toMap - } - - /** - * It computes the Silhouette coefficient for a point. - * - * @param broadcastedClustersMap A map of the precomputed values for each cluster. - * @param point The [[org.apache.spark.ml.linalg.Vector]] representing the current point. - * @param clusterId The id of the cluster the current point belongs to. - * @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) precomputed for the point. - * @return The Silhouette for the point. - */ - def computeSilhouetteCoefficient( - broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]], - point: Vector, - clusterId: Double, - squaredNorm: Double): Double = { - - def compute(targetClusterId: Double): Double = { - val clusterStats = broadcastedClustersMap.value(targetClusterId) - val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum) - - squaredNorm + - clusterStats.squaredNormSum / clusterStats.numOfPoints - - 2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints - } - - pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, - clusterId, - broadcastedClustersMap.value(clusterId).numOfPoints, - compute) - } - - /** - * Compute the Silhouette score of the dataset using squared Euclidean distance measure. - * - * @param dataset The input dataset (previously clustered) on which compute the Silhouette. - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @param featuresCol The name of the column which contains the feature vector of the point. - * @return The average of the Silhouette values of the clustered data. - */ - def computeSilhouetteScore( - dataset: Dataset[_], - predictionCol: String, - featuresCol: String): Double = { - SquaredEuclideanSilhouette.registerKryoClasses(dataset.sparkSession.sparkContext) - - val squaredNormUDF = udf { - features: Vector => math.pow(Vectors.norm(features, 2.0), 2.0) - } - val dfWithSquaredNorm = dataset.withColumn("squaredNorm", squaredNormUDF(col(featuresCol))) - - // compute aggregate values for clusters needed by the algorithm - val clustersStatsMap = SquaredEuclideanSilhouette - .computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol) - - // Silhouette is reasonable only when the number of clusters is greater then 1 - assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") - - val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) - - val computeSilhouetteCoefficientUDF = udf { - computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double) - } - - val silhouetteScore = overallScore(dfWithSquaredNorm, - computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType), - col("squaredNorm"))) - - bClustersStatsMap.destroy() - - silhouetteScore - } -} - - -/** - * The algorithm which is implemented in this object, instead, is an efficient and parallel - * implementation of the Silhouette using the cosine distance measure. The cosine distance - * measure is defined as `1 - s` where `s` is the cosine similarity between two points. - * - * The total distance of the point `X` to the points `$C_{i}$` belonging to the cluster `$\Gamma$` - * is: - * - *
- * $$ - * \sum\limits_{i=1}^N d(X, C_{i} ) = - * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big) - * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} - * \frac{c_{ij}}{\|C_{i}\|} - * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N - * \frac{c_{ij}}{\|C_{i}\|} \Big) - * $$ - *
- * - * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` is the `j`-th dimension - * of the `i`-th point in cluster `$\Gamma$`. - * - * Then, we can define the vector: - * - *
- * $$ - * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D - * $$ - *
- * - * which can be precomputed for each point and the vector - * - *
- * $$ - * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D - * $$ - *
- * - * which can be precomputed too for each cluster `$\Gamma$` by its points `$C_{i}$`. - * - * With these definitions, the numerator becomes: - * - *
- * $$ - * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j} - * $$ - *
- * - * Thus the average distance of a point `X` to the points of the cluster `$\Gamma$` is: - * - *
- * $$ - * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N} - * $$ - *
- * - * In the implementation, the precomputed values for the clusters are distributed among the worker - * nodes via broadcasted variables, because we can assume that the clusters are limited in number. - * - * The main strengths of this algorithm are the low computational complexity and the intrinsic - * parallelism. The precomputed information for each point and for each cluster can be computed - * with a computational complexity which is `O(N/W)`, where `N` is the number of points in the - * dataset and `W` is the number of worker nodes. After that, every point can be analyzed - * independently from the others. - * - * For every point we need to compute the average distance to all the clusters. Since the formula - * above requires `O(D)` operations, this phase has a computational complexity which is - * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite low), `D` is the number - * of dimensions, `N` is the number of points in the dataset and `W` is the number of worker - * nodes. - */ -private[evaluation] object CosineSilhouette extends Silhouette { - - private[this] val normalizedFeaturesColName = "normalizedFeatures" - - /** - * The method takes the input dataset and computes the aggregated values - * about a cluster which are needed by the algorithm. - * - * @param df The DataFrame which contains the input data - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @return A [[scala.collection.immutable.Map]] which associates each cluster id to a - * its statistics (ie. the precomputed values `N` and `$\Omega_{\Gamma}$`). - */ - def computeClusterStats( - df: DataFrame, - featuresCol: String, - predictionCol: String): Map[Double, (Vector, Long)] = { - val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) - val clustersStatsRDD = df.select( - col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName)) - .rdd - .map { row => (row.getDouble(0), row.getAs[Vector](1)) } - .aggregateByKey[(DenseVector, Long)]((Vectors.zeros(numFeatures).toDense, 0L))( - seqOp = { - case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), (normalizedFeatures)) => - BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum) - (normalizedFeaturesSum, numOfPoints + 1) - }, - combOp = { - case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, numOfPoints2)) => - BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1) - (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2) - } - ) - - clustersStatsRDD - .collectAsMap() - .toMap - } - - /** - * It computes the Silhouette coefficient for a point. - * - * @param broadcastedClustersMap A map of the precomputed values for each cluster. - * @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] representing the - * normalized features of the current point. - * @param clusterId The id of the cluster the current point belongs to. - */ - def computeSilhouetteCoefficient( - broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]], - normalizedFeatures: Vector, - clusterId: Double): Double = { - - def compute(targetClusterId: Double): Double = { - val (normalizedFeatureSum, numOfPoints) = broadcastedClustersMap.value(targetClusterId) - 1 - BLAS.dot(normalizedFeatures, normalizedFeatureSum) / numOfPoints - } - - pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, - clusterId, - broadcastedClustersMap.value(clusterId)._2, - compute) - } - - /** - * Compute the Silhouette score of the dataset using the cosine distance measure. - * - * @param dataset The input dataset (previously clustered) on which compute the Silhouette. - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @param featuresCol The name of the column which contains the feature vector of the point. - * @return The average of the Silhouette values of the clustered data. - */ - def computeSilhouetteScore( - dataset: Dataset[_], - predictionCol: String, - featuresCol: String): Double = { - val normalizeFeatureUDF = udf { - features: Vector => { - val norm = Vectors.norm(features, 2.0) - BLAS.scal(1.0 / norm, features) - features - } - } - val dfWithNormalizedFeatures = dataset.withColumn(normalizedFeaturesColName, - normalizeFeatureUDF(col(featuresCol))) - - // compute aggregate values for clusters needed by the algorithm - val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, featuresCol, - predictionCol) - - // Silhouette is reasonable only when the number of clusters is greater then 1 - assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") - - val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) - - val computeSilhouetteCoefficientUDF = udf { - computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double) - } - - val silhouetteScore = overallScore(dfWithNormalizedFeatures, - computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName), - col(predictionCol).cast(DoubleType))) - - bClustersStatsMap.destroy() - - silhouetteScore - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala new file mode 100644 index 0000000000000..30970337d7d3b --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.Since +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors} +import org.apache.spark.ml.util.MetadataUtils +import org.apache.spark.sql.{Column, DataFrame, Dataset} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.DoubleType + + +/** + * Metrics for clustering, which expects two input columns: prediction and label. + */ +@Since("3.1.0") +class ClusteringMetrics private[spark](dataset: Dataset[_]) { + + private var distanceMeasure: String = "squaredEuclidean" + + def getDistanceMeasure: String = distanceMeasure + + def setDistanceMeasure(value: String) : Unit = distanceMeasure = value + + /** + * Returns the silhouette score + */ + @Since("3.1.0") + lazy val silhouette: Double = { + val columns = dataset.columns.toSeq + if (distanceMeasure.equalsIgnoreCase("squaredEuclidean")) { + SquaredEuclideanSilhouette.computeSilhouetteScore( + dataset, columns(0), columns(1)) + } else { + CosineSilhouette.computeSilhouetteScore(dataset, columns(0), columns(1)) + } + } +} + + +private[evaluation] abstract class Silhouette { + + /** + * It computes the Silhouette coefficient for a point. + */ + def pointSilhouetteCoefficient( + clusterIds: Set[Double], + pointClusterId: Double, + pointClusterNumOfPoints: Long, + averageDistanceToCluster: (Double) => Double): Double = { + if (pointClusterNumOfPoints == 1) { + // Single-element clusters have silhouette 0 + 0.0 + } else { + // Here we compute the average dissimilarity of the current point to any cluster of which the + // point is not a member. + // The cluster with the lowest average dissimilarity - i.e. the nearest cluster to the current + // point - is said to be the "neighboring cluster". + val otherClusterIds = clusterIds.filter(_ != pointClusterId) + val neighboringClusterDissimilarity = otherClusterIds.map(averageDistanceToCluster).min + // adjustment for excluding the node itself from the computation of the average dissimilarity + val currentClusterDissimilarity = + averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints / + (pointClusterNumOfPoints - 1) + if (currentClusterDissimilarity < neighboringClusterDissimilarity) { + 1 - (currentClusterDissimilarity / neighboringClusterDissimilarity) + } else if (currentClusterDissimilarity > neighboringClusterDissimilarity) { + (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1 + } else { + 0.0 + } + } + } + + /** + * Compute the mean Silhouette values of all samples. + */ + def overallScore(df: DataFrame, scoreColumn: Column): Double = { + df.select(avg(scoreColumn)).collect()(0).getDouble(0) + } +} + +/** + * SquaredEuclideanSilhouette computes the average of the + * Silhouette over all the data of the dataset, which is + * a measure of how appropriately the data have been clustered. + * + * The Silhouette for each point `i` is defined as: + * + *
+ * $$ + * s_{i} = \frac{b_{i}-a_{i}}{max\{a_{i},b_{i}\}} + * $$ + *
+ * + * which can be rewritten as + * + *
+ * $$ + * s_{i}= \begin{cases} + * 1-\frac{a_{i}}{b_{i}} & \text{if } a_{i} \leq b_{i} \\ + * \frac{b_{i}}{a_{i}}-1 & \text{if } a_{i} \gt b_{i} \end{cases} + * $$ + *
+ * + * where `$a_{i}$` is the average dissimilarity of `i` with all other data + * within the same cluster, `$b_{i}$` is the lowest average dissimilarity + * of `i` to any other cluster, of which `i` is not a member. + * `$a_{i}$` can be interpreted as how well `i` is assigned to its cluster + * (the smaller the value, the better the assignment), while `$b_{i}$` is + * a measure of how well `i` has not been assigned to its "neighboring cluster", + * ie. the nearest cluster to `i`. + * + * Unfortunately, the naive implementation of the algorithm requires to compute + * the distance of each couple of points in the dataset. Since the computation of + * the distance measure takes `D` operations - if `D` is the number of dimensions + * of each point, the computational complexity of the algorithm is `O(N^2^*D)`, where + * `N` is the cardinality of the dataset. Of course this is not scalable in `N`, + * which is the critical number in a Big Data context. + * + * The algorithm which is implemented in this object, instead, is an efficient + * and parallel implementation of the Silhouette using the squared Euclidean + * distance measure. + * + * With this assumption, the total distance of the point `X` + * to the points `$C_{i}$` belonging to the cluster `$\Gamma$` is: + * + *
+ * $$ + * \sum\limits_{i=1}^N d(X, C_{i} ) = + * \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{j}-c_{ij})^2 \Big) + * = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{j}^2 + + * \sum\limits_{j=1}^D c_{ij}^2 -2\sum\limits_{j=1}^D x_{j}c_{ij} \Big) + * = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 + + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 + * -2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} + * $$ + *
+ * + * where `$x_{j}$` is the `j`-th dimension of the point `X` and + * `$c_{ij}$` is the `j`-th dimension of the `i`-th point in cluster `$\Gamma$`. + * + * Then, the first term of the equation can be rewritten as: + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 = N \xi_{X} \text{ , + * with } \xi_{X} = \sum\limits_{j=1}^D x_{j}^2 + * $$ + *
+ * + * where `$\xi_{X}$` is fixed for each point and it can be precomputed. + * + * Moreover, the second term is fixed for each cluster too, + * thus we can name it `$\Psi_{\Gamma}$` + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 = + * \sum\limits_{i=1}^N \xi_{C_{i}} = \Psi_{\Gamma} + * $$ + *
+ * + * Last, the third element becomes + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} = + * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} + * $$ + *
+ * + * thus defining the vector + * + *
+ * $$ + * Y_{\Gamma}:Y_{\Gamma j} = \sum\limits_{i=1}^N c_{ij} , j=0, ..., D + * $$ + *
+ * + * which is fixed for each cluster `$\Gamma$`, we have + * + *
+ * $$ + * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} = + * \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} + * $$ + *
+ * + * In this way, the previous equation becomes + * + *
+ * $$ + * N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} + * $$ + *
+ * + * and the average distance of a point to a cluster can be computed as + * + *
+ * $$ + * \frac{\sum\limits_{i=1}^N d(X, C_{i} )}{N} = + * \frac{N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} = + * \xi_{X} + \frac{\Psi_{\Gamma} }{N} - 2 \frac{\sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} + * $$ + *
+ * + * Thus, it is enough to precompute: the constant `$\xi_{X}$` for each point `X`; the + * constants `$\Psi_{\Gamma}$`, `N` and the vector `$Y_{\Gamma}$` for + * each cluster `$\Gamma$`. + * + * In the implementation, the precomputed values for the clusters + * are distributed among the worker nodes via broadcasted variables, + * because we can assume that the clusters are limited in number and + * anyway they are much fewer than the points. + * + * The main strengths of this algorithm are the low computational complexity + * and the intrinsic parallelism. The precomputed information for each point + * and for each cluster can be computed with a computational complexity + * which is `O(N/W)`, where `N` is the number of points in the dataset and + * `W` is the number of worker nodes. After that, every point can be + * analyzed independently of the others. + * + * For every point we need to compute the average distance to all the clusters. + * Since the formula above requires `O(D)` operations, this phase has a + * computational complexity which is `O(C*D*N/W)` where `C` is the number of + * clusters (which we assume quite low), `D` is the number of dimensions, + * `N` is the number of points in the dataset and `W` is the number + * of worker nodes. + */ +private[evaluation] object SquaredEuclideanSilhouette extends Silhouette { + + private[this] var kryoRegistrationPerformed: Boolean = false + + /** + * This method registers the class + * [[org.apache.spark.ml.evaluation.SquaredEuclideanSilhouette.ClusterStats]] + * for kryo serialization. + * + * @param sc `SparkContext` to be used + */ + def registerKryoClasses(sc: SparkContext): Unit = { + if (!kryoRegistrationPerformed) { + sc.getConf.registerKryoClasses( + Array( + classOf[SquaredEuclideanSilhouette.ClusterStats] + ) + ) + kryoRegistrationPerformed = true + } + } + + case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long) + + /** + * The method takes the input dataset and computes the aggregated values + * about a cluster which are needed by the algorithm. + * + * @param df The DataFrame which contains the input data + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @param featuresCol The name of the column which contains the feature vector of the point. + * @return A [[scala.collection.immutable.Map]] which associates each cluster id + * to a [[ClusterStats]] object (which contains the precomputed values `N`, + * `$\Psi_{\Gamma}$` and `$Y_{\Gamma}$` for a cluster). + */ + def computeClusterStats( + df: DataFrame, + predictionCol: String, + featuresCol: String): Map[Double, ClusterStats] = { + val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) + val clustersStatsRDD = df.select( + col(predictionCol).cast(DoubleType), col(featuresCol), col("squaredNorm")) + .rdd + .map { row => (row.getDouble(0), (row.getAs[Vector](1), row.getDouble(2))) } + .aggregateByKey[(DenseVector, Double, Long)]((Vectors.zeros(numFeatures).toDense, 0.0, 0L))( + seqOp = { + case ( + (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long), + (features, squaredNorm) + ) => + BLAS.axpy(1.0, features, featureSum) + (featureSum, squaredNormSum + squaredNorm, numOfPoints + 1) + }, + combOp = { + case ( + (featureSum1, squaredNormSum1, numOfPoints1), + (featureSum2, squaredNormSum2, numOfPoints2) + ) => + BLAS.axpy(1.0, featureSum2, featureSum1) + (featureSum1, squaredNormSum1 + squaredNormSum2, numOfPoints1 + numOfPoints2) + } + ) + + clustersStatsRDD + .collectAsMap() + .mapValues { + case (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long) => + SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, numOfPoints) + } + .toMap + } + + /** + * It computes the Silhouette coefficient for a point. + * + * @param broadcastedClustersMap A map of the precomputed values for each cluster. + * @param point The [[org.apache.spark.ml.linalg.Vector]] representing the current point. + * @param clusterId The id of the cluster the current point belongs to. + * @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) precomputed for the point. + * @return The Silhouette for the point. + */ + def computeSilhouetteCoefficient( + broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]], + point: Vector, + clusterId: Double, + squaredNorm: Double): Double = { + + def compute(targetClusterId: Double): Double = { + val clusterStats = broadcastedClustersMap.value(targetClusterId) + val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum) + + squaredNorm + + clusterStats.squaredNormSum / clusterStats.numOfPoints - + 2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints + } + + pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, + clusterId, + broadcastedClustersMap.value(clusterId).numOfPoints, + compute) + } + + /** + * Compute the Silhouette score of the dataset using squared Euclidean distance measure. + * + * @param dataset The input dataset (previously clustered) on which compute the Silhouette. + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @param featuresCol The name of the column which contains the feature vector of the point. + * @return The average of the Silhouette values of the clustered data. + */ + def computeSilhouetteScore( + dataset: Dataset[_], + predictionCol: String, + featuresCol: String): Double = { + SquaredEuclideanSilhouette.registerKryoClasses(dataset.sparkSession.sparkContext) + + val squaredNormUDF = udf { + features: Vector => math.pow(Vectors.norm(features, 2.0), 2.0) + } + val dfWithSquaredNorm = dataset.withColumn("squaredNorm", squaredNormUDF(col(featuresCol))) + + // compute aggregate values for clusters needed by the algorithm + val clustersStatsMap = SquaredEuclideanSilhouette + .computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol) + + // Silhouette is reasonable only when the number of clusters is greater then 1 + assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") + + val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) + + val computeSilhouetteCoefficientUDF = udf { + computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double) + } + + val silhouetteScore = overallScore(dfWithSquaredNorm, + computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType), + col("squaredNorm"))) + + bClustersStatsMap.destroy() + + silhouetteScore + } +} + + +/** + * The algorithm which is implemented in this object, instead, is an efficient and parallel + * implementation of the Silhouette using the cosine distance measure. The cosine distance + * measure is defined as `1 - s` where `s` is the cosine similarity between two points. + * + * The total distance of the point `X` to the points `$C_{i}$` belonging to the cluster `$\Gamma$` + * is: + * + *
+ * $$ + * \sum\limits_{i=1}^N d(X, C_{i} ) = + * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big) + * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} + * \frac{c_{ij}}{\|C_{i}\|} + * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N + * \frac{c_{ij}}{\|C_{i}\|} \Big) + * $$ + *
+ * + * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` is the `j`-th dimension + * of the `i`-th point in cluster `$\Gamma$`. + * + * Then, we can define the vector: + * + *
+ * $$ + * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D + * $$ + *
+ * + * which can be precomputed for each point and the vector + * + *
+ * $$ + * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D + * $$ + *
+ * + * which can be precomputed too for each cluster `$\Gamma$` by its points `$C_{i}$`. + * + * With these definitions, the numerator becomes: + * + *
+ * $$ + * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j} + * $$ + *
+ * + * Thus the average distance of a point `X` to the points of the cluster `$\Gamma$` is: + * + *
+ * $$ + * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N} + * $$ + *
+ * + * In the implementation, the precomputed values for the clusters are distributed among the worker + * nodes via broadcasted variables, because we can assume that the clusters are limited in number. + * + * The main strengths of this algorithm are the low computational complexity and the intrinsic + * parallelism. The precomputed information for each point and for each cluster can be computed + * with a computational complexity which is `O(N/W)`, where `N` is the number of points in the + * dataset and `W` is the number of worker nodes. After that, every point can be analyzed + * independently from the others. + * + * For every point we need to compute the average distance to all the clusters. Since the formula + * above requires `O(D)` operations, this phase has a computational complexity which is + * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite low), `D` is the number + * of dimensions, `N` is the number of points in the dataset and `W` is the number of worker + * nodes. + */ +private[evaluation] object CosineSilhouette extends Silhouette { + + private[this] val normalizedFeaturesColName = "normalizedFeatures" + + /** + * The method takes the input dataset and computes the aggregated values + * about a cluster which are needed by the algorithm. + * + * @param df The DataFrame which contains the input data + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @return A [[scala.collection.immutable.Map]] which associates each cluster id to a + * its statistics (ie. the precomputed values `N` and `$\Omega_{\Gamma}$`). + */ + def computeClusterStats( + df: DataFrame, + featuresCol: String, + predictionCol: String): Map[Double, (Vector, Long)] = { + val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) + val clustersStatsRDD = df.select( + col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName)) + .rdd + .map { row => (row.getDouble(0), row.getAs[Vector](1)) } + .aggregateByKey[(DenseVector, Long)]((Vectors.zeros(numFeatures).toDense, 0L))( + seqOp = { + case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), (normalizedFeatures)) => + BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum) + (normalizedFeaturesSum, numOfPoints + 1) + }, + combOp = { + case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, numOfPoints2)) => + BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1) + (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2) + } + ) + + clustersStatsRDD + .collectAsMap() + .toMap + } + + /** + * It computes the Silhouette coefficient for a point. + * + * @param broadcastedClustersMap A map of the precomputed values for each cluster. + * @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] representing the + * normalized features of the current point. + * @param clusterId The id of the cluster the current point belongs to. + */ + def computeSilhouetteCoefficient( + broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]], + normalizedFeatures: Vector, + clusterId: Double): Double = { + + def compute(targetClusterId: Double): Double = { + val (normalizedFeatureSum, numOfPoints) = broadcastedClustersMap.value(targetClusterId) + 1 - BLAS.dot(normalizedFeatures, normalizedFeatureSum) / numOfPoints + } + + pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, + clusterId, + broadcastedClustersMap.value(clusterId)._2, + compute) + } + + /** + * Compute the Silhouette score of the dataset using the cosine distance measure. + * + * @param dataset The input dataset (previously clustered) on which compute the Silhouette. + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @param featuresCol The name of the column which contains the feature vector of the point. + * @return The average of the Silhouette values of the clustered data. + */ + def computeSilhouetteScore( + dataset: Dataset[_], + predictionCol: String, + featuresCol: String): Double = { + val normalizeFeatureUDF = udf { + features: Vector => { + val norm = Vectors.norm(features, 2.0) + BLAS.scal(1.0 / norm, features) + features + } + } + val dfWithNormalizedFeatures = dataset.withColumn(normalizedFeaturesColName, + normalizeFeatureUDF(col(featuresCol))) + + // compute aggregate values for clusters needed by the algorithm + val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, featuresCol, + predictionCol) + + // Silhouette is reasonable only when the number of clusters is greater then 1 + assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") + + val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) + + val computeSilhouetteCoefficientUDF = udf { + computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double) + } + + val silhouetteScore = overallScore(dfWithNormalizedFeatures, + computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName), + col(predictionCol).cast(DoubleType))) + + bClustersStatsMap.destroy() + + silhouetteScore + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala index 1d6540e970383..ad1b70915e157 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala @@ -153,6 +153,34 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid @Since("2.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + $(metricName) match { + case "f1" => metrics.weightedFMeasure + case "accuracy" => metrics.accuracy + case "weightedPrecision" => metrics.weightedPrecision + case "weightedRecall" => metrics.weightedRecall + case "weightedTruePositiveRate" => metrics.weightedTruePositiveRate + case "weightedFalsePositiveRate" => metrics.weightedFalsePositiveRate + case "weightedFMeasure" => metrics.weightedFMeasure($(beta)) + case "truePositiveRateByLabel" => metrics.truePositiveRate($(metricLabel)) + case "falsePositiveRateByLabel" => metrics.falsePositiveRate($(metricLabel)) + case "precisionByLabel" => metrics.precision($(metricLabel)) + case "recallByLabel" => metrics.recall($(metricLabel)) + case "fMeasureByLabel" => metrics.fMeasure($(metricLabel), $(beta)) + case "hammingLoss" => metrics.hammingLoss + case "logLoss" => metrics.logLoss($(eps)) + } + } + + /** + * Get a MulticlassMetrics, which can be used to get multiclass classification + * metrics such as accuracy, weightedPrecision, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return MulticlassMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): MulticlassMetrics = { val schema = dataset.schema SchemaUtils.checkColumnType(schema, $(predictionCol), DoubleType) SchemaUtils.checkNumericType(schema, $(labelCol)) @@ -163,9 +191,13 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid lit(1.0) } - val rdd = if ($(metricName) == "logLoss") { + if ($(metricName) == "logLoss") { // probabilityCol is only needed to compute logloss - require(isDefined(probabilityCol) && $(probabilityCol).nonEmpty) + require(schema.fieldNames.contains($(probabilityCol)), + "probabilityCol is needed to compute logloss") + } + + val rdd = if (schema.fieldNames.contains($(probabilityCol))) { val p = DatasetUtils.columnToVector(dataset, $(probabilityCol)) dataset.select(col($(predictionCol)), col($(labelCol)).cast(DoubleType), w, p) .rdd.map { @@ -179,23 +211,7 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid } } - val metrics = new MulticlassMetrics(rdd) - $(metricName) match { - case "f1" => metrics.weightedFMeasure - case "accuracy" => metrics.accuracy - case "weightedPrecision" => metrics.weightedPrecision - case "weightedRecall" => metrics.weightedRecall - case "weightedTruePositiveRate" => metrics.weightedTruePositiveRate - case "weightedFalsePositiveRate" => metrics.weightedFalsePositiveRate - case "weightedFMeasure" => metrics.weightedFMeasure($(beta)) - case "truePositiveRateByLabel" => metrics.truePositiveRate($(metricLabel)) - case "falsePositiveRateByLabel" => metrics.falsePositiveRate($(metricLabel)) - case "precisionByLabel" => metrics.precision($(metricLabel)) - case "recallByLabel" => metrics.recall($(metricLabel)) - case "fMeasureByLabel" => metrics.fMeasure($(metricLabel), $(beta)) - case "hammingLoss" => metrics.hammingLoss - case "logLoss" => metrics.logLoss($(eps)) - } + new MulticlassMetrics(rdd) } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala index a8db5452bd56c..1a82ac7a9472f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala @@ -98,18 +98,7 @@ class MultilabelClassificationEvaluator @Since("3.0.0") (@Since("3.0.0") overrid @Since("3.0.0") override def evaluate(dataset: Dataset[_]): Double = { - val schema = dataset.schema - SchemaUtils.checkColumnTypes(schema, $(predictionCol), - Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) - SchemaUtils.checkColumnTypes(schema, $(labelCol), - Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) - - val predictionAndLabels = - dataset.select(col($(predictionCol)), col($(labelCol))) - .rdd.map { row => - (row.getSeq[Double](0).toArray, row.getSeq[Double](1).toArray) - } - val metrics = new MultilabelMetrics(predictionAndLabels) + val metrics = getMetrics(dataset) $(metricName) match { case "subsetAccuracy" => metrics.subsetAccuracy case "accuracy" => metrics.accuracy @@ -126,6 +115,29 @@ class MultilabelClassificationEvaluator @Since("3.0.0") (@Since("3.0.0") overrid } } + /** + * Get a MultilabelMetrics, which can be used to get multilabel classification + * metrics such as accuracy, precision, precisionByLabel, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return MultilabelMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): MultilabelMetrics = { + val schema = dataset.schema + SchemaUtils.checkColumnTypes(schema, $(predictionCol), + Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) + SchemaUtils.checkColumnTypes(schema, $(labelCol), + Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) + + val predictionAndLabels = + dataset.select(col($(predictionCol)), col($(labelCol))) + .rdd.map { row => + (row.getSeq[Double](0).toArray, row.getSeq[Double](1).toArray) + } + new MultilabelMetrics(predictionAndLabels) + } + @Since("3.0.0") override def isLargerBetter: Boolean = { $(metricName) match { diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala index c5dea6c177e21..82dda4109771d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala @@ -95,6 +95,25 @@ class RankingEvaluator @Since("3.0.0") (@Since("3.0.0") override val uid: String @Since("3.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + $(metricName) match { + case "meanAveragePrecision" => metrics.meanAveragePrecision + case "meanAveragePrecisionAtK" => metrics.meanAveragePrecisionAt($(k)) + case "precisionAtK" => metrics.precisionAt($(k)) + case "ndcgAtK" => metrics.ndcgAt($(k)) + case "recallAtK" => metrics.recallAt($(k)) + } + } + + /** + * Get a RankingMetrics, which can be used to get ranking metrics + * such as meanAveragePrecision, meanAveragePrecisionAtK, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return RankingMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): RankingMetrics[Double] = { val schema = dataset.schema SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) @@ -106,14 +125,7 @@ class RankingEvaluator @Since("3.0.0") (@Since("3.0.0") override val uid: String .rdd.map { row => (row.getSeq[Double](0).toArray, row.getSeq[Double](1).toArray) } - val metrics = new RankingMetrics[Double](predictionAndLabels) - $(metricName) match { - case "meanAveragePrecision" => metrics.meanAveragePrecision - case "meanAveragePrecisionAtK" => metrics.meanAveragePrecisionAt($(k)) - case "precisionAtK" => metrics.precisionAt($(k)) - case "ndcgAtK" => metrics.ndcgAt($(k)) - case "recallAtK" => metrics.recallAt($(k)) - } + new RankingMetrics[Double](predictionAndLabels) } @Since("3.0.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala index 18a8dda0c76ef..aca017762deca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala @@ -97,6 +97,25 @@ final class RegressionEvaluator @Since("1.4.0") (@Since("1.4.0") override val ui @Since("2.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + $(metricName) match { + case "rmse" => metrics.rootMeanSquaredError + case "mse" => metrics.meanSquaredError + case "r2" => metrics.r2 + case "mae" => metrics.meanAbsoluteError + case "var" => metrics.explainedVariance + } + } + + /** + * Get a RegressionMetrics, which can be used to get regression + * metrics such as rootMeanSquaredError, meanSquaredError, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return RegressionMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): RegressionMetrics = { val schema = dataset.schema SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType)) SchemaUtils.checkNumericType(schema, $(labelCol)) @@ -107,14 +126,7 @@ final class RegressionEvaluator @Since("1.4.0") (@Since("1.4.0") override val ui .rdd .map { case Row(prediction: Double, label: Double, weight: Double) => (prediction, label, weight) } - val metrics = new RegressionMetrics(predictionAndLabelsWithWeights, $(throughOrigin)) - $(metricName) match { - case "rmse" => metrics.rootMeanSquaredError - case "mse" => metrics.meanSquaredError - case "r2" => metrics.r2 - case "mae" => metrics.meanAbsoluteError - case "var" => metrics.explainedVariance - } + new RegressionMetrics(predictionAndLabelsWithWeights, $(throughOrigin)) } @Since("1.4.0") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala index 050ebb0fa4fbd..1a91801a9da28 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala @@ -283,7 +283,8 @@ class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[_ <: Product]) (loss * weight, weight) case other => - throw new IllegalArgumentException(s"Expected quadruples, got $other") + throw new IllegalArgumentException(s"Invalid RDD value for MulticlassMetrics.logLoss. " + + s"Expected quadruples, got $other") }.treeReduce { case ((l1, w1), (l2, w2)) => (l1 + l2, w1 + w2) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala index 83b213ab51d43..008bf0e108e13 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala @@ -102,4 +102,27 @@ class BinaryClassificationEvaluatorSuite val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("prediction") MLTestingUtils.checkNumericTypes(evaluator, spark) } + + test("getMetrics") { + val weightCol = "weight" + // get metric with weight column + val evaluator = new BinaryClassificationEvaluator() + .setWeightCol(weightCol) + val vectorDF = Seq( + (0.0, Vectors.dense(2.5, 12), 1.0), + (1.0, Vectors.dense(1, 3), 1.0), + (0.0, Vectors.dense(10, 2), 1.0) + ).toDF("label", "rawPrediction", weightCol) + + val metrics = evaluator.getMetrics(vectorDF) + val roc = metrics.areaUnderROC() + val pr = metrics.areaUnderPR() + + // default = areaUnderROC + assert(evaluator.evaluate(vectorDF) == roc) + + // areaUnderPR + evaluator.setMetricName("areaUnderPR") + assert(evaluator.evaluate(vectorDF) == pr) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index 6cf3b1deeac93..29fed5322c9c9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -145,4 +145,20 @@ class ClusteringEvaluatorSuite assert(evaluator.evaluate(twoSingleItemClusters) === 0.0) } + test("getMetrics") { + val evaluator = new ClusteringEvaluator() + .setFeaturesCol("features") + .setPredictionCol("label") + + val metrics1 = evaluator.getMetrics(irisDataset) + val silhouetteScoreEuclidean = metrics1.silhouette + + assert(evaluator.evaluate(irisDataset) == silhouetteScoreEuclidean) + + evaluator.setDistanceMeasure("cosine") + val metrics2 = evaluator.getMetrics(irisDataset) + val silhouetteScoreCosin = metrics2.silhouette + + assert(evaluator.evaluate(irisDataset) == silhouetteScoreCosin) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala index 5b5212abdf7cc..3dfd860a5b9d8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala @@ -80,4 +80,33 @@ class MulticlassClassificationEvaluatorSuite .setMetricName("logLoss") assert(evaluator.evaluate(df) ~== 0.9682005730687164 absTol 1e-5) } + + test("getMetrics") { + val predictionAndLabels = Seq((0.0, 0.0), (0.0, 1.0), + (0.0, 0.0), (1.0, 0.0), (1.0, 1.0), + (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)).toDF("prediction", "label") + + val evaluator = new MulticlassClassificationEvaluator() + + val metrics = evaluator.getMetrics(predictionAndLabels) + val f1 = metrics.weightedFMeasure + val accuracy = metrics.accuracy + val precisionByLabel = metrics.precision(evaluator.getMetricLabel) + + // default = f1 + assert(evaluator.evaluate(predictionAndLabels) == f1) + + // accuracy + evaluator.setMetricName("accuracy") + assert(evaluator.evaluate(predictionAndLabels) == accuracy) + + // precisionByLabel + evaluator.setMetricName("precisionByLabel") + assert(evaluator.evaluate(predictionAndLabels) == precisionByLabel) + + // truePositiveRateByLabel + evaluator.setMetricName("truePositiveRateByLabel").setMetricLabel(1.0) + assert(evaluator.evaluate(predictionAndLabels) == + metrics.truePositiveRate(evaluator.getMetricLabel)) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala index f41fc04a5faed..520103d6aed92 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala @@ -59,4 +59,52 @@ class MultilabelClassificationEvaluatorSuite .setMetricName("precisionByLabel") testDefaultReadWrite(evaluator) } + + test("getMetrics") { + val scoreAndLabels = Seq((Array(0.0, 1.0), Array(0.0, 2.0)), + (Array(0.0, 2.0), Array(0.0, 1.0)), + (Array.empty[Double], Array(0.0)), + (Array(2.0), Array(2.0)), + (Array(2.0, 0.0), Array(2.0, 0.0)), + (Array(0.0, 1.0, 2.0), Array(0.0, 1.0)), + (Array(1.0), Array(1.0, 2.0))).toDF("prediction", "label") + + val evaluator = new MultilabelClassificationEvaluator() + + val metrics = evaluator.getMetrics(scoreAndLabels) + val f1 = metrics.f1Measure + val accuracy = metrics.accuracy + val precision = metrics.precision + val recall = metrics.recall + val hammingLoss = metrics.hammingLoss + val precisionByLabel = metrics.precision(evaluator.getMetricLabel) + + // default = f1 + assert(evaluator.evaluate(scoreAndLabels) == f1) + + // accuracy + evaluator.setMetricName("accuracy") + assert(evaluator.evaluate(scoreAndLabels) == accuracy) + + // precision + evaluator.setMetricName("precision") + assert(evaluator.evaluate(scoreAndLabels) == precision) + + // recall + evaluator.setMetricName("recall") + assert(evaluator.evaluate(scoreAndLabels) == recall) + + // hammingLoss + evaluator.setMetricName("hammingLoss") + assert(evaluator.evaluate(scoreAndLabels) == hammingLoss) + + // precisionByLabel + evaluator.setMetricName("precisionByLabel") + assert(evaluator.evaluate(scoreAndLabels) == precisionByLabel) + + // truePositiveRateByLabel + evaluator.setMetricName("recallByLabel").setMetricLabel(1.0) + assert(evaluator.evaluate(scoreAndLabels) == + metrics.recall(evaluator.getMetricLabel)) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala index 02d26d7eb351f..b3457981a08e9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala @@ -59,4 +59,42 @@ class RankingEvaluatorSuite .setK(2) assert(evaluator.evaluate(scoreAndLabels) ~== 1.0 / 3 absTol 1e-5) } + + test("getMetrics") { + val scoreAndLabels = Seq( + (Array(1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0), + Array(1.0, 2.0, 3.0, 4.0, 5.0)), + (Array(4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0), + Array(1.0, 2.0, 3.0)), + (Array(1.0, 2.0, 3.0, 4.0, 5.0), Array.empty[Double]) + ).toDF("prediction", "label") + + val evaluator = new RankingEvaluator().setK(5) + + val metrics = evaluator.getMetrics(scoreAndLabels) + val meanAveragePrecision = metrics.meanAveragePrecision + val meanAveragePrecisionAtK = metrics.meanAveragePrecisionAt(evaluator.getK) + val precisionAtK = metrics.precisionAt(evaluator.getK) + val ndcgAtK = metrics.ndcgAt(evaluator.getK) + val recallAtK = metrics.recallAt(evaluator.getK) + + // default = meanAveragePrecision + assert(evaluator.evaluate(scoreAndLabels) == meanAveragePrecision) + + // meanAveragePrecisionAtK + evaluator.setMetricName("meanAveragePrecisionAtK") + assert(evaluator.evaluate(scoreAndLabels) == meanAveragePrecisionAtK) + + // precisionAtK + evaluator.setMetricName("precisionAtK") + assert(evaluator.evaluate(scoreAndLabels) == precisionAtK) + + // ndcgAtK + evaluator.setMetricName("ndcgAtK") + assert(evaluator.evaluate(scoreAndLabels) == ndcgAtK) + + // recallAtK + evaluator.setMetricName("recallAtK") + assert(evaluator.evaluate(scoreAndLabels) == recallAtK) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala index f4f858c3e92dc..5ee161ce8dd33 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala @@ -93,4 +93,37 @@ class RegressionEvaluatorSuite test("should support all NumericType labels and not support other types") { MLTestingUtils.checkNumericTypes(new RegressionEvaluator, spark) } + + test("getMetrics") { + val dataset = LinearDataGenerator.generateLinearInput( + 6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1) + .map(_.asML).toDF() + + val trainer = new LinearRegression + val model = trainer.fit(dataset) + val predictions = model.transform(dataset) + + val evaluator = new RegressionEvaluator() + + val metrics = evaluator.getMetrics(predictions) + val rmse = metrics.rootMeanSquaredError + val r2 = metrics.r2 + val mae = metrics.meanAbsoluteError + val variance = metrics.explainedVariance + + // default = rmse + assert(evaluator.evaluate(predictions) == rmse) + + // r2 score + evaluator.setMetricName("r2") + assert(evaluator.evaluate(predictions) == r2) + + // mae + evaluator.setMetricName("mae") + assert(evaluator.evaluate(predictions) == mae) + + // var + evaluator.setMetricName("var") + assert(evaluator.evaluate(predictions) == variance) + } }