From 13961129e2e6e519114e9ddca61420c0aff91b57 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sat, 30 Jan 2016 21:26:02 -0800 Subject: [PATCH 01/10] init version of kmeans --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 4 ++ R/pkg/R/mllib.R | 31 +++++++++++++ R/pkg/inst/tests/testthat/test_mllib.R | 10 +++++ .../apache/spark/ml/r/SparkRWrappers.scala | 44 +++++++++++++++++-- 5 files changed, 88 insertions(+), 4 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 2cc1544bef080..47b8f3075e038 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -13,7 +13,8 @@ export("print.jobj") # MLlib integration exportMethods("glm", "predict", - "summary") + "summary", + "kmeans") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 04784d51566cb..fb5f32971a9a2 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1152,3 +1152,7 @@ setGeneric("predict", function(object, ...) { standardGeneric("predict") }) #' @rdname rbind #' @export setGeneric("rbind", signature = "...") + +#' @rdname kmeans +#' @export +setGeneric("kmeans") diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 8d3b4388ae575..895ca05b34778 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -126,3 +126,34 @@ setMethod("summary", signature(object = "PipelineModel"), return(list(coefficients = coefficients)) } }) + +#' @title S4 class that represents a KMeansModel +#' @param model A Java object reference to the backing Scala KMeansModel +#' @export +setClass("KMeansModel", representation(model = "jobj")) + +#' Fit a k-means model +#' +#' Fit a k-means model, similarly to R's kmeans(). +#' +#' @param x DataFrame for training +#' @param centers Number of centers +#' @param iter.max Maximum iteration number +#' @param nstart Number of start points +#' @param algorithm Algorithm choosen to fit the model +#' @return A k-means model +#' @rdname kmeans +#' @export +#' @examples +#'\dontrun{ +#' model <- kmeans(x, algorithm="random") +#'} +setMethod("kmeans", signature(x = "DataFrame"), + function(x) { + cat("Am I in the right function?") + columnNames <- "Sepal_Length,Sepal_Width,Petal_Length,Petal_Width" + model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "fitKMeans", "random", x@sdf, 10, + 10, 2, columnNames) + return(new("KMeansModel", model = model)) + }) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 08099dd96a87b..361719a2c09f3 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -113,3 +113,13 @@ test_that("summary works on base GLM models", { baseSummary <- summary(baseModel) expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) }) + +test_that("kmeans", { + newIris <- iris + newIris$Species <- NULL + training <- suppressWarnings(createDataFrame(sqlContext, newIris)) + model <- kmeans(x = training) + print("Is there any problems?") + print(model) + #expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") +}) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 551e75dc0a02d..1787925c0ee5e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -17,14 +17,16 @@ package org.apache.spark.ml.api.r +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.ml.clustering.{KMeansModel, KMeans} import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.attribute._ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} -import org.apache.spark.ml.feature.RFormula +import org.apache.spark.ml.feature.{VectorAssembler, RFormula} import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Row, SQLContext, DataFrame} -private[r] object SparkRWrappers { +object SparkRWrappers { def fitRModelFormula( value: String, df: DataFrame, @@ -51,6 +53,26 @@ private[r] object SparkRWrappers { pipeline.fit(df) } + def fitKMeans( + initMode: String, + df: DataFrame, + maxIter: Double, + initSteps: Double, + k: Double, + columns: String): KMeansModel = { + val assembler = new VectorAssembler().setInputCols(columns.split(",")).setOutputCol("features") + val features = assembler.transform(df).select("features") + // scalastyle:off println + features.collect().foreach { case Row(v) => println(v) } + // scalastyle:on println + val kMeans = new KMeans() + .setInitMode(initMode) + .setMaxIter(maxIter.toInt) + .setInitSteps(initSteps.toInt) + .setK(k.toInt) + kMeans.fit(features) + } + def getModelCoefficients(model: PipelineModel): Array[Double] = { model.stages.last match { case m: LinearRegressionModel => { @@ -114,4 +136,20 @@ private[r] object SparkRWrappers { "LogisticRegressionModel" } } + + // scalastyle:off println + def main(args: Array[String]): Unit = { + val conf = new SparkConf().setAppName("test-R").setMaster("local") + val sc = new SparkContext(conf) + val sqlCtx = new SQLContext(sc) + import sqlCtx.implicits._ + val initMode = "random" + val columns = "Sepal_Length,Sepal_Width,Petal_Length,Petal_Width" + val df = sc.textFile("iris.txt").map(_.split("\\s+")) + .map(ary => (ary(1).toDouble, ary(2).toDouble, ary(3).toDouble, ary(4).toDouble)) + .toDF("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width") + val model = fitKMeans(initMode, df, 10, 10, 3, columns) + println(model) + } + // scalastyle:on println } From 509bfedf1b152b87621baff5245e2e74ae379166 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 4 Feb 2016 11:10:24 -0800 Subject: [PATCH 02/10] refine code --- R/pkg/R/mllib.R | 9 ++++----- .../scala/org/apache/spark/ml/r/SparkRWrappers.scala | 9 +++------ 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 895ca05b34778..2d8f91c59dcde 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -148,12 +148,11 @@ setClass("KMeansModel", representation(model = "jobj")) #'\dontrun{ #' model <- kmeans(x, algorithm="random") #'} -setMethod("kmeans", signature(x = "DataFrame"), - function(x) { - cat("Am I in the right function?") - columnNames <- "Sepal_Length,Sepal_Width,Petal_Length,Petal_Width" +setMethod("kmeans", signature(x = "DataFrame", centers = "numeric"), + function(x, centers) { + columnNames <- as.array(colnames(x)) model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", "random", x@sdf, 10, - 10, 2, columnNames) + 10, centers, columnNames) return(new("KMeansModel", model = model)) }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 1787925c0ee5e..65d01824ff8a6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -59,12 +59,9 @@ object SparkRWrappers { maxIter: Double, initSteps: Double, k: Double, - columns: String): KMeansModel = { - val assembler = new VectorAssembler().setInputCols(columns.split(",")).setOutputCol("features") - val features = assembler.transform(df).select("features") - // scalastyle:off println - features.collect().foreach { case Row(v) => println(v) } - // scalastyle:on println + columns: Array[String]): KMeansModel = { + val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("features") + val features = assembler.transform(df) val kMeans = new KMeans() .setInitMode(initMode) .setMaxIter(maxIter.toInt) From 376f0bec2230e6723595c68de70f18040a0d990e Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 4 Feb 2016 16:08:53 -0800 Subject: [PATCH 03/10] refine spark side wrapper --- .../apache/spark/ml/r/SparkRWrappers.scala | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 65d01824ff8a6..211c35609ebdb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -54,19 +54,24 @@ object SparkRWrappers { } def fitKMeans( - initMode: String, df: DataFrame, - maxIter: Double, + initMode: String, initSteps: Double, + maxIter: Double, + seed: Double, + epsilon: Double, k: Double, columns: Array[String]): KMeansModel = { - val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("features") + val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("temp-features") val features = assembler.transform(df) val kMeans = new KMeans() .setInitMode(initMode) - .setMaxIter(maxIter.toInt) .setInitSteps(initSteps.toInt) + .setMaxIter(maxIter.toInt) + .setSeed(seed.toLong) + .setTol(epsilon) .setK(k.toInt) + .setFeaturesCol("temp-features") kMeans.fit(features) } @@ -133,20 +138,4 @@ object SparkRWrappers { "LogisticRegressionModel" } } - - // scalastyle:off println - def main(args: Array[String]): Unit = { - val conf = new SparkConf().setAppName("test-R").setMaster("local") - val sc = new SparkContext(conf) - val sqlCtx = new SQLContext(sc) - import sqlCtx.implicits._ - val initMode = "random" - val columns = "Sepal_Length,Sepal_Width,Petal_Length,Petal_Width" - val df = sc.textFile("iris.txt").map(_.split("\\s+")) - .map(ary => (ary(1).toDouble, ary(2).toDouble, ary(3).toDouble, ary(4).toDouble)) - .toDF("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width") - val model = fitKMeans(initMode, df, 10, 10, 3, columns) - println(model) - } - // scalastyle:on println } From 4e38f7e104eb8895dfdc067a10933d864b07e708 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 8 Feb 2016 15:21:06 -0800 Subject: [PATCH 04/10] refine interface --- R/pkg/R/mllib.R | 22 +++++++------------ .../apache/spark/ml/r/SparkRWrappers.scala | 19 +++++----------- 2 files changed, 14 insertions(+), 27 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 2d8f91c59dcde..6c6c028631485 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -127,11 +127,6 @@ setMethod("summary", signature(object = "PipelineModel"), } }) -#' @title S4 class that represents a KMeansModel -#' @param model A Java object reference to the backing Scala KMeansModel -#' @export -setClass("KMeansModel", representation(model = "jobj")) - #' Fit a k-means model #' #' Fit a k-means model, similarly to R's kmeans(). @@ -139,20 +134,19 @@ setClass("KMeansModel", representation(model = "jobj")) #' @param x DataFrame for training #' @param centers Number of centers #' @param iter.max Maximum iteration number -#' @param nstart Number of start points #' @param algorithm Algorithm choosen to fit the model -#' @return A k-means model +#' @return A fitted k-means model #' @rdname kmeans #' @export #' @examples #'\dontrun{ -#' model <- kmeans(x, algorithm="random") +#' model <- kmeans(x, centers = 2, algorithm="random") #'} -setMethod("kmeans", signature(x = "DataFrame", centers = "numeric"), - function(x, centers) { +setMethod("kmeans", signature(x = "DataFrame"), + function(x, centers, iter.max = 10, algorithm = c("random", "k-means||")) { columnNames <- as.array(colnames(x)) - model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "fitKMeans", "random", x@sdf, 10, - 10, centers, columnNames) - return(new("KMeansModel", model = model)) + algorithm <- if(missing(algorithm)) "random" else match.arg(algorithm) + model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", x@sdf, + algorithm, iter.max, centers, columnNames) + return(new("PipelineModel", model = model)) }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 211c35609ebdb..527247403711b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -17,14 +17,13 @@ package org.apache.spark.ml.api.r -import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.ml.clustering.{KMeansModel, KMeans} import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.attribute._ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} -import org.apache.spark.ml.feature.{VectorAssembler, RFormula} +import org.apache.spark.ml.clustering.KMeans +import org.apache.spark.ml.feature.{RFormula, VectorAssembler} import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} -import org.apache.spark.sql.{Row, SQLContext, DataFrame} +import org.apache.spark.sql.DataFrame object SparkRWrappers { def fitRModelFormula( @@ -56,23 +55,17 @@ object SparkRWrappers { def fitKMeans( df: DataFrame, initMode: String, - initSteps: Double, maxIter: Double, - seed: Double, - epsilon: Double, k: Double, - columns: Array[String]): KMeansModel = { + columns: Array[String]): PipelineModel = { val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("temp-features") - val features = assembler.transform(df) val kMeans = new KMeans() .setInitMode(initMode) - .setInitSteps(initSteps.toInt) .setMaxIter(maxIter.toInt) - .setSeed(seed.toLong) - .setTol(epsilon) .setK(k.toInt) .setFeaturesCol("temp-features") - kMeans.fit(features) + val pipeline = new Pipeline().setStages(Array(assembler, kMeans)) + pipeline.fit(df) } def getModelCoefficients(model: PipelineModel): Array[Double] = { From fcf749d4e7f21c055fcc35fedf3261bdd11e7af7 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 8 Feb 2016 15:49:30 -0800 Subject: [PATCH 05/10] add test --- R/pkg/inst/tests/testthat/test_mllib.R | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 361719a2c09f3..933844740b08c 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -118,8 +118,13 @@ test_that("kmeans", { newIris <- iris newIris$Species <- NULL training <- suppressWarnings(createDataFrame(sqlContext, newIris)) - model <- kmeans(x = training) - print("Is there any problems?") - print(model) - #expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") + + # Cahce the DataFrame here to work around the bug SPARK-13178. + cache(training) + take(training, 1) + + model <- kmeans(x = training, centers = 2) + sample <- take(select(predict(model, training), "prediction"), 1) + expect_equal(typeof(sample$prediction), "integer") + expect_equal(sample$prediction, 1) }) From ca0ea4a2e709f9c381be8fe164216e76e2823603 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 8 Feb 2016 15:51:39 -0800 Subject: [PATCH 06/10] revert private for SparkRWrappers --- mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 527247403711b..4b3c52bed0e8a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -25,7 +25,7 @@ import org.apache.spark.ml.feature.{RFormula, VectorAssembler} import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame -object SparkRWrappers { +private[r] object SparkRWrappers { def fitRModelFormula( value: String, df: DataFrame, From ecd89e3c183030c0d3926f51dc58f5bdc8ef2ea4 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 11 Feb 2016 16:27:10 -0800 Subject: [PATCH 07/10] add summary and coefficient --- R/pkg/R/mllib.R | 9 +++- .../apache/spark/ml/clustering/KMeans.scala | 45 ++++++++++++++++++- .../apache/spark/ml/r/SparkRWrappers.scala | 20 +++++++-- 3 files changed, 68 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 6c6c028631485..ceac5791843b8 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -119,11 +119,18 @@ setMethod("summary", signature(object = "PipelineModel"), colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)") rownames(coefficients) <- unlist(features) return(list(devianceResiduals = devianceResiduals, coefficients = coefficients)) - } else { + } else if (modelName == "LogisticRegressionModel") { coefficients <- as.matrix(unlist(coefficients)) colnames(coefficients) <- c("Estimate") rownames(coefficients) <- unlist(features) return(list(coefficients = coefficients)) + } else if (modelName == "KMeansModel") { + k <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getKMeansModelSize", object@model) + coefficients <- t(matrix(coefficients, ncol = k)) + colnames(coefficients) <- unlist(features) + rownames(coefficients) <- 1:k + return(list(coefficients = coefficients)) } }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index dc6d5d9280970..403a49293d18d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -19,6 +19,7 @@ package org.apache.spark.ml.clustering import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param.{IntParam, Param, ParamMap, Params} @@ -135,6 +136,26 @@ class KMeansModel private[ml] ( @Since("1.6.0") override def write: MLWriter = new KMeansModel.KMeansModelWriter(this) + + private var trainingSummary: Option[KMeansSummary] = None + + private[clustering] def setSummary(summary: KMeansSummary): this.type = { + this.trainingSummary = Some(summary) + this + } + + /** + * Gets summary of model on training set. An exception is + * thrown if `trainingSummary == None`. + */ + @Since("2.0.0") + def summary: KMeansSummary = trainingSummary match { + case Some(summ) => summ + case None => + throw new SparkException( + s"No training summary available for the ${this.getClass.getSimpleName}", + new NullPointerException()) + } } @Since("1.6.0") @@ -249,8 +270,9 @@ class KMeans @Since("1.5.0") ( .setSeed($(seed)) .setEpsilon($(tol)) val parentModel = algo.run(rdd) - val model = new KMeansModel(uid, parentModel) - copyValues(model) + val model = copyValues(new KMeansModel(uid, parentModel)) + val summary = new KMeansSummary(model.transform(dataset), $(predictionCol), $(featuresCol)) + model.setSummary(summary) } @Since("1.5.0") @@ -266,3 +288,22 @@ object KMeans extends DefaultParamsReadable[KMeans] { override def load(path: String): KMeans = super.load(path) } +class KMeansSummary private[clustering] ( + @Since("2.0.0") @transient val predictions: DataFrame, + @Since("2.0.0") val predictionCol: String, + @Since("2.0.0") val featuresCol: String) { + + /** + * Cluster centers of the transformed data. + */ + @Since("2.0.0") + @transient lazy val cluster: DataFrame = predictions.select(predictionCol) + + /** + * Size of each cluster. + */ + @Since("2.0.0") + lazy val size: Array[Int] = cluster.map { + case Row(clusterIdx: Int) => (clusterIdx, 1) + }.reduceByKey(_ + _).collect().sortBy(_._1).map(_._2) +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 4b3c52bed0e8a..663e41f4150d4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.api.r import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.attribute._ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} -import org.apache.spark.ml.clustering.KMeans +import org.apache.spark.ml.clustering.{KMeans, KMeansModel} import org.apache.spark.ml.feature.{RFormula, VectorAssembler} import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame @@ -58,12 +58,12 @@ private[r] object SparkRWrappers { maxIter: Double, k: Double, columns: Array[String]): PipelineModel = { - val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("temp-features") + val assembler = new VectorAssembler().setInputCols(columns) val kMeans = new KMeans() .setInitMode(initMode) .setMaxIter(maxIter.toInt) .setK(k.toInt) - .setFeaturesCol("temp-features") + .setFeaturesCol(assembler.getOutputCol) val pipeline = new Pipeline().setStages(Array(assembler, kMeans)) pipeline.fit(df) } @@ -89,6 +89,8 @@ private[r] object SparkRWrappers { m.coefficients.toArray } } + case m: KMeansModel => + m.clusterCenters.flatMap(_.toArray) } } @@ -102,6 +104,12 @@ private[r] object SparkRWrappers { } } + def getKMeansModelSize(model: PipelineModel): Int = { + model.stages.last match { + case m: KMeansModel => m.getK + } + } + def getModelFeatures(model: PipelineModel): Array[String] = { model.stages.last match { case m: LinearRegressionModel => @@ -120,6 +128,10 @@ private[r] object SparkRWrappers { } else { attrs.attributes.get.map(_.name.get) } + case m: KMeansModel => + val attrs = AttributeGroup.fromStructField( + m.summary.predictions.schema(m.summary.featuresCol)) + attrs.attributes.get.map(_.name.get) } } @@ -129,6 +141,8 @@ private[r] object SparkRWrappers { "LinearRegressionModel" case m: LogisticRegressionModel => "LogisticRegressionModel" + case m: KMeansModel => + "KMeansModel" } } } From 44c759567916bb651103d36535bd77f358ab0186 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sat, 13 Feb 2016 15:47:55 -0800 Subject: [PATCH 08/10] update fitted to R --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 4 ++ R/pkg/R/mllib.R | 49 ++++++++++++++++--- .../apache/spark/ml/clustering/KMeans.scala | 2 +- .../apache/spark/ml/r/SparkRWrappers.scala | 23 ++++++++- 5 files changed, 70 insertions(+), 11 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 61d74b70d7392..6a3d63f43f785 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -14,7 +14,8 @@ export("print.jobj") exportMethods("glm", "predict", "summary", - "kmeans") + "kmeans", + "fitted") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 3967fc8fee52e..ab61bce03df23 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1164,3 +1164,7 @@ setGeneric("rbind", signature = "...") #' @rdname kmeans #' @export setGeneric("kmeans") + +#' @rdname fitted +#' @export +setGeneric("fitted") diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index ceac5791843b8..346f33d7dab2c 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -104,11 +104,11 @@ setMethod("predict", signature(object = "PipelineModel"), setMethod("summary", signature(object = "PipelineModel"), function(object, ...) { modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelName", object@model) + "getModelName", object@model) features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelFeatures", object@model) + "getModelFeatures", object@model) coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelCoefficients", object@model) + "getModelCoefficients", object@model) if (modelName == "LinearRegressionModel") { devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "getModelDevianceResiduals", object@model) @@ -125,12 +125,18 @@ setMethod("summary", signature(object = "PipelineModel"), rownames(coefficients) <- unlist(features) return(list(coefficients = coefficients)) } else if (modelName == "KMeansModel") { - k <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getKMeansModelSize", object@model) + modelSize <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getKMeansModelSize", object@model) + cluster <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getKMeansCluster", object@model, "classes") + k <- unlist(modelSize)[1] + size <- unlist(modelSize)[-1] coefficients <- t(matrix(coefficients, ncol = k)) colnames(coefficients) <- unlist(features) rownames(coefficients) <- 1:k - return(list(coefficients = coefficients)) + return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster))) + } else { + stop(paste("Unsupported model", modelName, sep = " ")) } }) @@ -152,8 +158,37 @@ setMethod("summary", signature(object = "PipelineModel"), setMethod("kmeans", signature(x = "DataFrame"), function(x, centers, iter.max = 10, algorithm = c("random", "k-means||")) { columnNames <- as.array(colnames(x)) - algorithm <- if(missing(algorithm)) "random" else match.arg(algorithm) + algorithm <- match.arg(algorithm) model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", x@sdf, algorithm, iter.max, centers, columnNames) return(new("PipelineModel", model = model)) }) + +#' Get fitted result from a model +#' +#' Get fitted result from a model, similarly to R's fitted(). +#' +#' @param object A fitted MLlib model +#' @return DataFrame containing fitted values +#' @rdname fitted +#' @export +#' @examples +#'\dontrun{ +#' model <- kmeans(trainingData, 2) +#' fitted.model <- fitted(model) +#' showDF(fitted.model) +#'} +setMethod("fitted", signature(object = "PipelineModel"), + function(object, method = c("centers", "classes"), ...) { + modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getModelName", object@model) + + if (modelName == "KMeansModel") { + method <- match.arg(method) + fittedResult <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getKMeansCluster", object@model, method) + return(dataFrame(fittedResult)) + } else { + stop(paste("Unsupported model", modelName, sep = " ")) + } + }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 403a49293d18d..cd04e9c785307 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -291,7 +291,7 @@ object KMeans extends DefaultParamsReadable[KMeans] { class KMeansSummary private[clustering] ( @Since("2.0.0") @transient val predictions: DataFrame, @Since("2.0.0") val predictionCol: String, - @Since("2.0.0") val featuresCol: String) { + @Since("2.0.0") val featuresCol: String) extends Serializable { /** * Cluster centers of the transformed data. diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 663e41f4150d4..d23e4fc9d1f57 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -104,9 +104,28 @@ private[r] object SparkRWrappers { } } - def getKMeansModelSize(model: PipelineModel): Int = { + def getKMeansModelSize(model: PipelineModel): Array[Int] = { model.stages.last match { - case m: KMeansModel => m.getK + case m: KMeansModel => Array(m.getK) ++ m.summary.size + case other => throw new UnsupportedOperationException( + s"KMeansModel required but ${other.getClass.getSimpleName} found.") + } + } + + def getKMeansCluster(model: PipelineModel, method: String): DataFrame = { + model.stages.last match { + case m: KMeansModel => + if (method == "centers") { + // Drop the assembled vector for easy-print to R side. + m.summary.predictions.drop(m.summary.featuresCol) + } else if (method == "classes") { + m.summary.cluster + } else { + throw new UnsupportedOperationException( + s"Method (centers or classes) required but $method found.") + } + case other => throw new UnsupportedOperationException( + s"KMeansModel required but ${other.getClass.getSimpleName} found.") } } From 89200d2bbcb20754f9ccb8ce621d946453645506 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sat, 13 Feb 2016 17:46:06 -0800 Subject: [PATCH 09/10] add more tests --- R/pkg/inst/tests/testthat/test_mllib.R | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 933844740b08c..e2a44b9481c6e 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -127,4 +127,17 @@ test_that("kmeans", { sample <- take(select(predict(model, training), "prediction"), 1) expect_equal(typeof(sample$prediction), "integer") expect_equal(sample$prediction, 1) + + # Test stats::kmeans is working + statsModel <- kmeans(x = newIris, centers = 2) + expect_equal(unique(statsModel$cluster), c(1, 2)) + + # Test fitted works on KMeans + fitted.model <- fitted(model) + expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1)) + + # Test summary works on KMeans + summary.model <- summary(model) + cluster <- summary.model$cluster + expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1)) }) From ecb285003ea94cbdcdda53eb65350c4d5ac53379 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sat, 13 Feb 2016 17:48:02 -0800 Subject: [PATCH 10/10] fix typo --- R/pkg/inst/tests/testthat/test_mllib.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index e2a44b9481c6e..595512e0e0d3a 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -119,7 +119,7 @@ test_that("kmeans", { newIris$Species <- NULL training <- suppressWarnings(createDataFrame(sqlContext, newIris)) - # Cahce the DataFrame here to work around the bug SPARK-13178. + # Cache the DataFrame here to work around the bug SPARK-13178. cache(training) take(training, 1)