From fb1bca43fc13fa5509539e4a6d4fe20cd26d1dd5 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 29 Feb 2016 21:15:21 -0800 Subject: [PATCH 01/23] runable draft --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 5 ++ R/pkg/R/mllib.R | 65 +++++++++++++++++++ R/pkg/inst/tests/testthat/test_mllib.R | 26 ++++++++ .../apache/spark/ml/r/SparkRWrappers.scala | 34 +++++++++- 5 files changed, 130 insertions(+), 3 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 6a3d63f43f785..ee46108d419f3 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -15,7 +15,8 @@ exportMethods("glm", "predict", "summary", "kmeans", - "fitted") + "fitted", + "naiveBayes") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index ab61bce03df23..83a3745e16c73 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1168,3 +1168,8 @@ setGeneric("kmeans") #' @rdname fitted #' @export setGeneric("fitted") + +#' @rdname naiveBayes +#' @export +setGeneric("naiveBayes", + function(object, ...) { standardGeneric("naiveBayes") }) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 346f33d7dab2c..9a6d77fe7deb1 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -192,3 +192,68 @@ setMethod("fitted", signature(object = "PipelineModel"), stop(paste("Unsupported model", modelName, sep = " ")) } }) + +#' Fit a naive Bayes model +#' +#' Fit a naive Bayes model, similarly to R's naiveBayes() except for omitting two arguments 'subset' +#' and 'na.action'. Users can use 'subset' function and 'fillna' or 'na.omit' function of DataFrame, +#' respectviely, to preprocess their DataFrame. +#' +#' @param object A symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', '.', ':', '+', and '-'. +#' @param data DataFrame for training +#' @param laplace Smoothing parameter +#' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial". +#' @param ... Undefined parameters +#' @return A fitted naive Bayes model. +#' @rdname naiveBayes +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' df <- createDataFrame(sqlContext, iris) +#' model <- glm(Sepal_Length ~ Sepal_Width, df, laplace = 1, modelType = "multinomial") +#'} +setMethod("naiveBayes", signature(object = "formula"), + function(object, data, laplace = 0, modelType = c("multinomial", "bernoulli"), ...) { + + formula <- paste(deparse(object), collapse = "") + modelType <- if (missing(modelType)) "multinomial" else match.arg(modelType) + model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", + formula, data@sdf, laplace, modelType) + return(new("PipelineModel", model = model)) + }) + +#' Fit a naive Bayes model +#' +#' Fit a naive Bayes model, similarly to R's naiveBayes(). +#' +#' @param object DataFrame as features for training. +#' @param y DataFrame as label for training. +#' @param laplace Smoothing parameter +#' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial". +#' @param ... Undefined parameters +#' @return A fitted naive Bayes model. +#' @rdname naiveBayes +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' df <- createDataFrame(sqlContext, iris) +#' cache(df) +#' take(df, 1) +#' model <- glm(df[, -2], df[, 2], laplace = 1, modelType = "multinomial") +#'} +setMethod("naiveBayes", signature(object = "DataFrame"), + function(object, y, laplace = 0, modelType = c("multinomial", "bernoulli"), ...) { + yNames <- as.array(colnames(y)) + if (length(yNames) != 1) { + stop(paste("Only one-dimensional y is supported, we get", length(yNames), sep = " ")) + } + modelType <- if (missing(modelType)) "multinomial" else match.arg(modelType) + model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", + object@sdf, y@sdf, laplace, modelType) + return(new("PipelineModel", model = model)) + }) \ No newline at end of file diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index af84a0abcf94d..7f024596f7572 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -141,3 +141,29 @@ test_that("kmeans", { cluster <- summary.model$cluster expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1)) }) + +test_that("naiveBayes", { + training <- suppressWarnings(createDataFrame(sqlContext, iris)) + + # Cache the DataFrame here to work around the bug SPARK-13178. + cache(training) + take(training, 1) + + model <- naiveBayes(Sepal_Width ~ ., data = training, laplace = 1, modelType = "multinomial") + # sample <- take(select(predict(model, training), "prediction"), 1) + # expect_equal(typeof(sample$prediction), "integer") + # expect_equal(sample$prediction, 1) + + # # Test stats::kmeans is working + # statsModel <- kmeans(x = newIris, centers = 2) + # expect_equal(sort(unique(statsModel$cluster)), c(1, 2)) + + # # Test fitted works on KMeans + # fitted.model <- fitted(model) + # expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1)) + + # # Test summary works on KMeans + # summary.model <- summary(model) + # cluster <- summary.model$cluster + # expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1)) +}) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index d23e4fc9d1f57..8c6825962c6f8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -19,11 +19,12 @@ package org.apache.spark.ml.api.r import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} +import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes} import org.apache.spark.ml.clustering.{KMeans, KMeansModel} import org.apache.spark.ml.feature.{RFormula, VectorAssembler} import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.types.StructType private[r] object SparkRWrappers { def fitRModelFormula( @@ -52,6 +53,35 @@ private[r] object SparkRWrappers { pipeline.fit(df) } + def fitNaiveBayes( + value: String, + df: DataFrame, + laplace: Double, + modelType: String): PipelineModel = { + + val formula = new RFormula().setFormula(value) + val naiveBayes = new NaiveBayes().setSmoothing(laplace).setModelType(modelType) + val pipeline = new Pipeline().setStages(Array(formula, naiveBayes)) + pipeline.fit(df) + } + + def fitNaiveBayes( + x: DataFrame, + y: DataFrame, + laplace: Double, + modelType: String): PipelineModel = { + + val (formulaValue, data) = { + val cBindData = x.rdd.zip(y.rdd).map(r => Row.merge(r._1, r._2)) + val schema = StructType(x.schema.fields ++ y.schema.fields) + val cBindDF = x.sqlContext.createDataFrame(cBindData, schema) + val autoFormula = s"${y.schema.fieldNames.head} ~ ." + (autoFormula, cBindDF) + } + + fitNaiveBayes(formulaValue, data, laplace, modelType) + } + def fitKMeans( df: DataFrame, initMode: String, From 787f25f5a84330632dff5c8d9fd8d7c0de02de8c Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 29 Feb 2016 22:50:17 -0800 Subject: [PATCH 02/23] refine test and na handler --- R/pkg/R/mllib.R | 12 +++++++----- R/pkg/inst/tests/testthat/test_mllib.R | 18 ++++++++++-------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 9a6d77fe7deb1..9f582141d2541 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -197,7 +197,8 @@ setMethod("fitted", signature(object = "PipelineModel"), #' #' Fit a naive Bayes model, similarly to R's naiveBayes() except for omitting two arguments 'subset' #' and 'na.action'. Users can use 'subset' function and 'fillna' or 'na.omit' function of DataFrame, -#' respectviely, to preprocess their DataFrame. +#' respectviely, to preprocess their DataFrame. We use na.omit in this interface to avoid potential +# errors. #' #' @param object A symbolic description of the model to be fitted. Currently only a few formula #' operators are supported, including '~', '.', ':', '+', and '-'. @@ -217,9 +218,9 @@ setMethod("fitted", signature(object = "PipelineModel"), #'} setMethod("naiveBayes", signature(object = "formula"), function(object, data, laplace = 0, modelType = c("multinomial", "bernoulli"), ...) { - + data <- na.omit(data) formula <- paste(deparse(object), collapse = "") - modelType <- if (missing(modelType)) "multinomial" else match.arg(modelType) + modelType <- match.arg(modelType) model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", formula, data@sdf, laplace, modelType) return(new("PipelineModel", model = model)) @@ -227,7 +228,8 @@ setMethod("naiveBayes", signature(object = "formula"), #' Fit a naive Bayes model #' -#' Fit a naive Bayes model, similarly to R's naiveBayes(). +#' Fit a naive Bayes model, similarly to R's naiveBayes(). With this interface, users need to handle +#' NA value themselves. #' #' @param object DataFrame as features for training. #' @param y DataFrame as label for training. @@ -252,7 +254,7 @@ setMethod("naiveBayes", signature(object = "DataFrame"), if (length(yNames) != 1) { stop(paste("Only one-dimensional y is supported, we get", length(yNames), sep = " ")) } - modelType <- if (missing(modelType)) "multinomial" else match.arg(modelType) + modelType <- match.arg(modelType) model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", object@sdf, y@sdf, laplace, modelType) return(new("PipelineModel", model = model)) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 7f024596f7572..ade0c8907766e 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -143,20 +143,22 @@ test_that("kmeans", { }) test_that("naiveBayes", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + data(HouseVotes84, package = "mlbench") + training <- createDataFrame(sqlContext, HouseVotes84) # Cache the DataFrame here to work around the bug SPARK-13178. cache(training) take(training, 1) - model <- naiveBayes(Sepal_Width ~ ., data = training, laplace = 1, modelType = "multinomial") - # sample <- take(select(predict(model, training), "prediction"), 1) - # expect_equal(typeof(sample$prediction), "integer") - # expect_equal(sample$prediction, 1) + model <- naiveBayes(Class ~ ., data = training, laplace = 1, modelType = "multinomial") + sample <- take(select(predict(model, training), "prediction"), 1) + expect_equal(typeof(sample$prediction), "integer") + expect_equal(sample$prediction, 0) - # # Test stats::kmeans is working - # statsModel <- kmeans(x = newIris, centers = 2) - # expect_equal(sort(unique(statsModel$cluster)), c(1, 2)) + # Test e1071::naiveBayes is working + library(e1071) + model <- naiveBayes(Class ~ ., data = HouseVotes84) + expect_equal(predict(model, HouseVotes84[1:3,]), c("republican", "republican", "republican")) # # Test fitted works on KMeans # fitted.model <- fitted(model) From b66d3e5ef0803dad949a53d4210a455856c8a400 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 1 Mar 2016 09:55:36 -0800 Subject: [PATCH 03/23] refine getModelName --- R/pkg/inst/tests/testthat/test_mllib.R | 9 --------- .../org/apache/spark/ml/r/SparkRWrappers.scala | 17 +++++++++-------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index ade0c8907766e..8c7c98e0b4832 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -159,13 +159,4 @@ test_that("naiveBayes", { library(e1071) model <- naiveBayes(Class ~ ., data = HouseVotes84) expect_equal(predict(model, HouseVotes84[1:3,]), c("republican", "republican", "republican")) - - # # Test fitted works on KMeans - # fitted.model <- fitted(model) - # expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1)) - - # # Test summary works on KMeans - # summary.model <- summary(model) - # cluster <- summary.model$cluster - # expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1)) }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 8c6825962c6f8..ba05d55b9f866 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -53,6 +53,9 @@ private[r] object SparkRWrappers { pipeline.fit(df) } + /** + * Fit a Naive Bayes model with a formula value and a DataFrame. + */ def fitNaiveBayes( value: String, df: DataFrame, @@ -65,6 +68,11 @@ private[r] object SparkRWrappers { pipeline.fit(df) } + /** + * Fit a Naive Bayes model with DataFrame x as features and DataFrame y as labels. DataFrame x and + * y should have the same number of rows for binding together. Note that the DataFrame y should + * contains only one column, otherwise its first column will be used as the default label column. + */ def fitNaiveBayes( x: DataFrame, y: DataFrame, @@ -185,13 +193,6 @@ private[r] object SparkRWrappers { } def getModelName(model: PipelineModel): String = { - model.stages.last match { - case m: LinearRegressionModel => - "LinearRegressionModel" - case m: LogisticRegressionModel => - "LogisticRegressionModel" - case m: KMeansModel => - "KMeansModel" - } + model.stages.last.getClass.getSimpleName } } From a5ab2e678c660fbee957cbb20dced0b5f5a4a256 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 1 Mar 2016 14:26:39 -0800 Subject: [PATCH 04/23] remove default interface --- R/pkg/R/generics.R | 3 +- R/pkg/R/mllib.R | 44 +++---------------- R/pkg/inst/tests/testthat/test_mllib.R | 6 +-- .../apache/spark/ml/r/SparkRWrappers.scala | 28 +----------- 4 files changed, 8 insertions(+), 73 deletions(-) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 83a3745e16c73..5c5c60aefa31d 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1171,5 +1171,4 @@ setGeneric("fitted") #' @rdname naiveBayes #' @export -setGeneric("naiveBayes", - function(object, ...) { standardGeneric("naiveBayes") }) +setGeneric("naiveBayes", function(formula, ...) { standardGeneric("naiveBayes") }) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 9f582141d2541..84d2b6d1f9b92 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -203,7 +203,7 @@ setMethod("fitted", signature(object = "PipelineModel"), #' @param object A symbolic description of the model to be fitted. Currently only a few formula #' operators are supported, including '~', '.', ':', '+', and '-'. #' @param data DataFrame for training -#' @param laplace Smoothing parameter +#' @param lambda Smoothing parameter #' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial". #' @param ... Undefined parameters #' @return A fitted naive Bayes model. @@ -214,48 +214,14 @@ setMethod("fitted", signature(object = "PipelineModel"), #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' df <- createDataFrame(sqlContext, iris) -#' model <- glm(Sepal_Length ~ Sepal_Width, df, laplace = 1, modelType = "multinomial") +#' model <- glm(Sepal_Length ~ Sepal_Width, df, lambda = 1, modelType = "multinomial") #'} -setMethod("naiveBayes", signature(object = "formula"), - function(object, data, laplace = 0, modelType = c("multinomial", "bernoulli"), ...) { +setMethod("naiveBayes", signature(formula = "formula"), + function(formula, data, lambda = 1, modelType = c("multinomial", "bernoulli"), ...) { data <- na.omit(data) formula <- paste(deparse(object), collapse = "") modelType <- match.arg(modelType) model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", - formula, data@sdf, laplace, modelType) + formula, data@sdf, lambda, modelType) return(new("PipelineModel", model = model)) }) - -#' Fit a naive Bayes model -#' -#' Fit a naive Bayes model, similarly to R's naiveBayes(). With this interface, users need to handle -#' NA value themselves. -#' -#' @param object DataFrame as features for training. -#' @param y DataFrame as label for training. -#' @param laplace Smoothing parameter -#' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial". -#' @param ... Undefined parameters -#' @return A fitted naive Bayes model. -#' @rdname naiveBayes -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlContext <- sparkRSQL.init(sc) -#' df <- createDataFrame(sqlContext, iris) -#' cache(df) -#' take(df, 1) -#' model <- glm(df[, -2], df[, 2], laplace = 1, modelType = "multinomial") -#'} -setMethod("naiveBayes", signature(object = "DataFrame"), - function(object, y, laplace = 0, modelType = c("multinomial", "bernoulli"), ...) { - yNames <- as.array(colnames(y)) - if (length(yNames) != 1) { - stop(paste("Only one-dimensional y is supported, we get", length(yNames), sep = " ")) - } - modelType <- match.arg(modelType) - model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", - object@sdf, y@sdf, laplace, modelType) - return(new("PipelineModel", model = model)) - }) \ No newline at end of file diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 8c7c98e0b4832..5d6bddb1d64b3 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -146,11 +146,7 @@ test_that("naiveBayes", { data(HouseVotes84, package = "mlbench") training <- createDataFrame(sqlContext, HouseVotes84) - # Cache the DataFrame here to work around the bug SPARK-13178. - cache(training) - take(training, 1) - - model <- naiveBayes(Class ~ ., data = training, laplace = 1, modelType = "multinomial") + model <- naiveBayes(Class ~ ., data = training, lambda = 1, modelType = "multinomial") sample <- take(select(predict(model, training), "prediction"), 1) expect_equal(typeof(sample$prediction), "integer") expect_equal(sample$prediction, 0) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index ba05d55b9f866..59ccfc3d0f4e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -23,8 +23,7 @@ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressio import org.apache.spark.ml.clustering.{KMeans, KMeansModel} import org.apache.spark.ml.feature.{RFormula, VectorAssembler} import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} -import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.DataFrame private[r] object SparkRWrappers { def fitRModelFormula( @@ -53,9 +52,6 @@ private[r] object SparkRWrappers { pipeline.fit(df) } - /** - * Fit a Naive Bayes model with a formula value and a DataFrame. - */ def fitNaiveBayes( value: String, df: DataFrame, @@ -68,28 +64,6 @@ private[r] object SparkRWrappers { pipeline.fit(df) } - /** - * Fit a Naive Bayes model with DataFrame x as features and DataFrame y as labels. DataFrame x and - * y should have the same number of rows for binding together. Note that the DataFrame y should - * contains only one column, otherwise its first column will be used as the default label column. - */ - def fitNaiveBayes( - x: DataFrame, - y: DataFrame, - laplace: Double, - modelType: String): PipelineModel = { - - val (formulaValue, data) = { - val cBindData = x.rdd.zip(y.rdd).map(r => Row.merge(r._1, r._2)) - val schema = StructType(x.schema.fields ++ y.schema.fields) - val cBindDF = x.sqlContext.createDataFrame(cBindData, schema) - val autoFormula = s"${y.schema.fieldNames.head} ~ ." - (autoFormula, cBindDF) - } - - fitNaiveBayes(formulaValue, data, laplace, modelType) - } - def fitKMeans( df: DataFrame, initMode: String, From 9215fafd3295f488ba6d827b49eddb91c3032438 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 1 Mar 2016 15:42:51 -0800 Subject: [PATCH 05/23] refine code --- R/pkg/R/generics.R | 2 +- R/pkg/R/mllib.R | 16 +++++++++++----- R/pkg/inst/tests/testthat/test_mllib.R | 7 +------ 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 5c5c60aefa31d..09685e7e61355 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1171,4 +1171,4 @@ setGeneric("fitted") #' @rdname naiveBayes #' @export -setGeneric("naiveBayes", function(formula, ...) { standardGeneric("naiveBayes") }) +setGeneric("naiveBayes", function(formula, data, ...) { standardGeneric("naiveBayes") }) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 84d2b6d1f9b92..81effb7285c3e 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -78,6 +78,11 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram #'} setMethod("predict", signature(object = "PipelineModel"), function(object, newData) { + modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getModelName", object@model) + if (modelName == "NaiveBayesModel") { + newData <- na.omit(newData) + } return(dataFrame(callJMethod(object@model, "transform", newData@sdf))) }) @@ -198,7 +203,7 @@ setMethod("fitted", signature(object = "PipelineModel"), #' Fit a naive Bayes model, similarly to R's naiveBayes() except for omitting two arguments 'subset' #' and 'na.action'. Users can use 'subset' function and 'fillna' or 'na.omit' function of DataFrame, #' respectviely, to preprocess their DataFrame. We use na.omit in this interface to avoid potential -# errors. +#' errors. #' #' @param object A symbolic description of the model to be fitted. Currently only a few formula #' operators are supported, including '~', '.', ':', '+', and '-'. @@ -211,15 +216,16 @@ setMethod("fitted", signature(object = "PipelineModel"), #' @export #' @examples #'\dontrun{ +#' data(HouseVotes84, package = "mlbench") #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' df <- createDataFrame(sqlContext, iris) -#' model <- glm(Sepal_Length ~ Sepal_Width, df, lambda = 1, modelType = "multinomial") +#' df <- createDataFrame(sqlContext, HouseVotes84) +#' model <- glm(Class ~ ., df, lambda = 1, modelType = "multinomial") #'} -setMethod("naiveBayes", signature(formula = "formula"), +setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"), function(formula, data, lambda = 1, modelType = c("multinomial", "bernoulli"), ...) { data <- na.omit(data) - formula <- paste(deparse(object), collapse = "") + formula <- paste(deparse(formula), collapse = "") modelType <- match.arg(modelType) model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", formula, data@sdf, lambda, modelType) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 5d6bddb1d64b3..d7236db20c3f8 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -148,11 +148,6 @@ test_that("naiveBayes", { model <- naiveBayes(Class ~ ., data = training, lambda = 1, modelType = "multinomial") sample <- take(select(predict(model, training), "prediction"), 1) - expect_equal(typeof(sample$prediction), "integer") + expect_equal(typeof(sample$prediction), "double") expect_equal(sample$prediction, 0) - - # Test e1071::naiveBayes is working - library(e1071) - model <- naiveBayes(Class ~ ., data = HouseVotes84) - expect_equal(predict(model, HouseVotes84[1:3,]), c("republican", "republican", "republican")) }) From 388e85dbf41faeea74f5aaa084664d9d52cce184 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 2 Mar 2016 21:31:32 -0800 Subject: [PATCH 06/23] add summary for NaiveBayes --- R/pkg/R/mllib.R | 13 +++++ .../spark/ml/classification/NaiveBayes.scala | 58 ++++++++++++++++--- .../apache/spark/ml/feature/RFormula.scala | 12 ++++ .../apache/spark/ml/r/SparkRWrappers.scala | 46 +++++++++++++-- 4 files changed, 117 insertions(+), 12 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 81effb7285c3e..e49fbe250cf71 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -140,6 +140,19 @@ setMethod("summary", signature(object = "PipelineModel"), colnames(coefficients) <- unlist(features) rownames(coefficients) <- 1:k return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster))) + } else if (modelName == "NaiveBayesModel") { + labels <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getNaiveBayesLabels", object@model) + pi <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getNaiveBayesPi", object@model) + theta <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getNaiveBayesTheta", object@model) + pi <- t(as.matrix(unlist(pi))) + colnames(pi) <- unlist(labels) + theta <- matrix(theta, nrow = length(labels)) + rownames(theta) <- unlist(labels) + colnames(theta) <- unlist(features) + return(list(pi = pi, theta = theta)) } else { stop(paste("Unsupported model", modelName, sep = " ")) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 718f49d3aedcd..ce7a894a8e415 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -104,7 +104,16 @@ class NaiveBayes @Since("1.5.0") ( override protected def train(dataset: DataFrame): NaiveBayesModel = { val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) - NaiveBayesModel.fromOld(oldModel, this) + val nbModel = copyValues(NaiveBayesModel.fromOld(oldModel, this)) + val summary = new NaiveBayesSummary( + nbModel.transform(dataset), + $(predictionCol), + $(rawPredictionCol), + $(probabilityCol), + $(featuresCol), + $(labelCol)) + nbModel.setSummary(summary) + nbModel } @Since("1.5.0") @@ -129,6 +138,7 @@ object NaiveBayes extends DefaultParamsReadable[NaiveBayes] { @Experimental class NaiveBayesModel private[ml] ( @Since("1.5.0") override val uid: String, + @Since("2.0.0") val labels: Vector, @Since("1.5.0") val pi: Vector, @Since("1.5.0") val theta: Matrix) extends ProbabilisticClassificationModel[Vector, NaiveBayesModel] @@ -136,6 +146,10 @@ class NaiveBayesModel private[ml] ( import OldNaiveBayes.{Bernoulli, Multinomial} + private[ml] def this(uid: String, pi: Vector, theta: Matrix) = { + this(uid, Vectors.dense((0 until pi.size).map(_.toDouble).toArray), pi, theta) + } + /** * Bernoulli scoring requires log(condprob) if 1, log(1-condprob) if 0. * This precomputes log(1.0 - exp(theta)) and its sum which are used for the linear algebra @@ -217,7 +231,7 @@ class NaiveBayesModel private[ml] ( @Since("1.5.0") override def copy(extra: ParamMap): NaiveBayesModel = { - copyValues(new NaiveBayesModel(uid, pi, theta).setParent(this.parent), extra) + copyValues(new NaiveBayesModel(uid, labels, pi, theta).setParent(this.parent), extra) } @Since("1.5.0") @@ -227,6 +241,25 @@ class NaiveBayesModel private[ml] ( @Since("1.6.0") override def write: MLWriter = new NaiveBayesModel.NaiveBayesModelWriter(this) + + private var trainingSummary: Option[NaiveBayesSummary] = None + + private[classification] def setSummary(summary: NaiveBayesSummary): this.type = { + this.trainingSummary = Some(summary) + this + } + + /** + * Gets summary of model on training set. An exception is thrown if `trainingSummary == None`. + */ + @Since("2.0.0") + def summary: NaiveBayesSummary = trainingSummary match { + case Some(summ) => summ + case None => + throw new SparkException( + s"No training summary available for the ${this.getClass.getSimpleName}", + new NullPointerException()) + } } @Since("1.6.0") @@ -241,7 +274,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { val pi = Vectors.dense(oldModel.pi) val theta = new DenseMatrix(oldModel.labels.length, oldModel.theta(0).length, oldModel.theta.flatten, true) - new NaiveBayesModel(uid, pi, theta) + new NaiveBayesModel(uid, labels, pi, theta) } @Since("1.6.0") @@ -253,13 +286,13 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { /** [[MLWriter]] instance for [[NaiveBayesModel]] */ private[NaiveBayesModel] class NaiveBayesModelWriter(instance: NaiveBayesModel) extends MLWriter { - private case class Data(pi: Vector, theta: Matrix) + private case class Data(labels: Vector, pi: Vector, theta: Matrix) override protected def saveImpl(path: String): Unit = { // Save metadata and Params DefaultParamsWriter.saveMetadata(instance, path, sc) // Save model data: pi, theta - val data = Data(instance.pi, instance.theta) + val data = Data(instance.labels, instance.pi, instance.theta) val dataPath = new Path(path, "data").toString sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -275,12 +308,21 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { val dataPath = new Path(path, "data").toString val data = sqlContext.read.parquet(dataPath).select("pi", "theta").head() - val pi = data.getAs[Vector](0) - val theta = data.getAs[Matrix](1) - val model = new NaiveBayesModel(metadata.uid, pi, theta) + val labels = data.getAs[Vector](0) + val pi = data.getAs[Vector](1) + val theta = data.getAs[Matrix](2) + val model = new NaiveBayesModel(metadata.uid, labels, pi, theta) DefaultParamsReader.getAndSetParams(model, metadata) model } } } + +class NaiveBayesSummary private[classification] ( + @Since("2.0.0") @transient val predictions: DataFrame, + @Since("2.0.0") val predictionCol: String, + @Since("2.0.0") val rawPredictionCol: String, + @Since("2.0.0") val probabilityCol: String, + @Since("2.0.0") val featuresCol: String, + @Since("2.0.0") val labelCol: String) extends Serializable {} diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index c21da218b36d6..720cf76125ac3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -248,6 +248,18 @@ class RFormulaModel private[feature]( !columnNames.contains($(labelCol)) || schema($(labelCol)).dataType == DoubleType, "Label column already exists and is not of type DoubleType.") } + + /** + * Get the original array of labels if exists. + */ + private[ml] def getOriginalLabels: Option[Array[String]] = { + // According to the sequences of transformers in RFormula, if the last stage is a + // StringIndexerModel, then we can extract the original labels from it. + pipelineModel.stages.last match { + case m: StringIndexerModel => Some(m.labels) + case _ => None + } + } } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 59ccfc3d0f4e1..fa93b9c71ce4e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -19,9 +19,9 @@ package org.apache.spark.ml.api.r import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes} +import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes, NaiveBayesModel} import org.apache.spark.ml.clustering.{KMeans, KMeansModel} -import org.apache.spark.ml.feature.{RFormula, VectorAssembler} +import org.apache.spark.ml.feature._ import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame @@ -55,11 +55,11 @@ private[r] object SparkRWrappers { def fitNaiveBayes( value: String, df: DataFrame, - laplace: Double, + lambda: Double, modelType: String): PipelineModel = { val formula = new RFormula().setFormula(value) - val naiveBayes = new NaiveBayes().setSmoothing(laplace).setModelType(modelType) + val naiveBayes = new NaiveBayes().setSmoothing(lambda).setModelType(modelType) val pipeline = new Pipeline().setStages(Array(formula, naiveBayes)) pipeline.fit(df) } @@ -103,6 +103,7 @@ private[r] object SparkRWrappers { } case m: KMeansModel => m.clusterCenters.flatMap(_.toArray) + case m: NaiveBayesModel => Array() // A dummy result to prevent unmatched error. } } @@ -141,6 +142,39 @@ private[r] object SparkRWrappers { } } + def getNaiveBayesLabels(model: PipelineModel): Array[String] = { + val lastModel = model.stages.last + assert(lastModel.isInstanceOf[NaiveBayesModel], + s"Naive Bayes model expected, ${lastModel.getClass.getName} found.") + val numOfStages = model.stages.length + assert(numOfStages == 2, + "The number of Pipeline stages does not match with the training phase.") + // If the original label column is a String column, the next to last stage should be a + // StringIndexerModel. Otherwise we transform the original label column to a String array. + val rFormulaModel = model.stages.head.asInstanceOf[RFormulaModel] + + rFormulaModel.getOriginalLabels match { + case Some(labels) => labels + case None => lastModel.asInstanceOf[NaiveBayesModel].labels.toArray.map(_.toString) + } + } + + def getNaiveBayesPi(model: PipelineModel): Array[Double] = { + model.stages.last match { + case m: NaiveBayesModel => m.pi.toArray + case other => throw new UnsupportedOperationException( + s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") + } + } + + def getNaiveBayesTheta(model: PipelineModel): Array[Double] = { + model.stages.last match { + case m: NaiveBayesModel => m.theta.toArray + case other => throw new UnsupportedOperationException( + s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") + } + } + def getModelFeatures(model: PipelineModel): Array[String] = { model.stages.last match { case m: LinearRegressionModel => @@ -163,6 +197,10 @@ private[r] object SparkRWrappers { val attrs = AttributeGroup.fromStructField( m.summary.predictions.schema(m.summary.featuresCol)) attrs.attributes.get.map(_.name.get) + case m: NaiveBayesModel => + val attrs = AttributeGroup.fromStructField( + m.summary.predictions.schema(m.summary.featuresCol)) + attrs.attributes.get.map(_.name.get) } } From 26d38e1baa0574221fc8cca104dfeeb1e057755f Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 2 Mar 2016 22:06:49 -0800 Subject: [PATCH 07/23] refine --- R/pkg/inst/tests/testthat/test_mllib.R | 8 ++++++++ .../main/scala/org/apache/spark/ml/r/SparkRWrappers.scala | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index d7236db20c3f8..42b6dca02a9d8 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -150,4 +150,12 @@ test_that("naiveBayes", { sample <- take(select(predict(model, training), "prediction"), 1) expect_equal(typeof(sample$prediction), "double") expect_equal(sample$prediction, 0) + + # Test summary works on naiveBayes + summary.model <- summary(model) + expect_equal(length(summary.model$pi), 2) + l1 <- summary(model)$theta[1,] + l2 <- summary(model)$theta[2,] + expect_true(all.equal(Reduce(`+`, l1), 1)) + expect_true(all.equal(Reduce(`+`, l2), 1)) }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index fa93b9c71ce4e..43f36b3b89122 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -161,7 +161,7 @@ private[r] object SparkRWrappers { def getNaiveBayesPi(model: PipelineModel): Array[Double] = { model.stages.last match { - case m: NaiveBayesModel => m.pi.toArray + case m: NaiveBayesModel => m.pi.toArray.map(math.exp) // Use exp to reveal the probability case other => throw new UnsupportedOperationException( s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") } @@ -169,7 +169,7 @@ private[r] object SparkRWrappers { def getNaiveBayesTheta(model: PipelineModel): Array[Double] = { model.stages.last match { - case m: NaiveBayesModel => m.theta.toArray + case m: NaiveBayesModel => m.theta.toArray.map(math.exp) // Use exp to reveal the probability case other => throw new UnsupportedOperationException( s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") } From a07beb2a26a4650b12b2fa72a8b802125b6b5560 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 2 Mar 2016 23:19:47 -0800 Subject: [PATCH 08/23] fix bugs --- .../scala/org/apache/spark/ml/classification/NaiveBayes.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index ce7a894a8e415..ef5e662266f38 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -291,7 +291,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { override protected def saveImpl(path: String): Unit = { // Save metadata and Params DefaultParamsWriter.saveMetadata(instance, path, sc) - // Save model data: pi, theta + // Save model data: labels, pi, theta val data = Data(instance.labels, instance.pi, instance.theta) val dataPath = new Path(path, "data").toString sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) @@ -307,7 +307,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sqlContext.read.parquet(dataPath).select("pi", "theta").head() + val data = sqlContext.read.parquet(dataPath).select("labels", "pi", "theta").head() val labels = data.getAs[Vector](0) val pi = data.getAs[Vector](1) val theta = data.getAs[Matrix](2) From afaba4a22b40bafe5f9fb5c2796f7a72deff8a61 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 7 Mar 2016 09:58:39 -0800 Subject: [PATCH 09/23] revert NaiveBayes labels --- .../spark/ml/classification/NaiveBayes.scala | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index ef5e662266f38..9e73dd5082bca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -138,7 +138,6 @@ object NaiveBayes extends DefaultParamsReadable[NaiveBayes] { @Experimental class NaiveBayesModel private[ml] ( @Since("1.5.0") override val uid: String, - @Since("2.0.0") val labels: Vector, @Since("1.5.0") val pi: Vector, @Since("1.5.0") val theta: Matrix) extends ProbabilisticClassificationModel[Vector, NaiveBayesModel] @@ -146,10 +145,6 @@ class NaiveBayesModel private[ml] ( import OldNaiveBayes.{Bernoulli, Multinomial} - private[ml] def this(uid: String, pi: Vector, theta: Matrix) = { - this(uid, Vectors.dense((0 until pi.size).map(_.toDouble).toArray), pi, theta) - } - /** * Bernoulli scoring requires log(condprob) if 1, log(1-condprob) if 0. * This precomputes log(1.0 - exp(theta)) and its sum which are used for the linear algebra @@ -231,7 +226,7 @@ class NaiveBayesModel private[ml] ( @Since("1.5.0") override def copy(extra: ParamMap): NaiveBayesModel = { - copyValues(new NaiveBayesModel(uid, labels, pi, theta).setParent(this.parent), extra) + copyValues(new NaiveBayesModel(uid, pi, theta).setParent(this.parent), extra) } @Since("1.5.0") @@ -270,11 +265,10 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { oldModel: OldNaiveBayesModel, parent: NaiveBayes): NaiveBayesModel = { val uid = if (parent != null) parent.uid else Identifiable.randomUID("nb") - val labels = Vectors.dense(oldModel.labels) val pi = Vectors.dense(oldModel.pi) val theta = new DenseMatrix(oldModel.labels.length, oldModel.theta(0).length, oldModel.theta.flatten, true) - new NaiveBayesModel(uid, labels, pi, theta) + new NaiveBayesModel(uid, pi, theta) } @Since("1.6.0") @@ -286,13 +280,13 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { /** [[MLWriter]] instance for [[NaiveBayesModel]] */ private[NaiveBayesModel] class NaiveBayesModelWriter(instance: NaiveBayesModel) extends MLWriter { - private case class Data(labels: Vector, pi: Vector, theta: Matrix) + private case class Data(pi: Vector, theta: Matrix) override protected def saveImpl(path: String): Unit = { // Save metadata and Params DefaultParamsWriter.saveMetadata(instance, path, sc) // Save model data: labels, pi, theta - val data = Data(instance.labels, instance.pi, instance.theta) + val data = Data(instance.pi, instance.theta) val dataPath = new Path(path, "data").toString sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -307,11 +301,10 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sqlContext.read.parquet(dataPath).select("labels", "pi", "theta").head() - val labels = data.getAs[Vector](0) - val pi = data.getAs[Vector](1) - val theta = data.getAs[Matrix](2) - val model = new NaiveBayesModel(metadata.uid, labels, pi, theta) + val data = sqlContext.read.parquet(dataPath).select("pi", "theta").head() + val pi = data.getAs[Vector](0) + val theta = data.getAs[Matrix](1) + val model = new NaiveBayesModel(metadata.uid, pi, theta) DefaultParamsReader.getAndSetParams(model, metadata) model From 1a685e1d345f53ae9f7cfb270f110052df818f4c Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 7 Mar 2016 10:22:39 -0800 Subject: [PATCH 10/23] refine extracing labels --- .../org/apache/spark/ml/r/SparkRWrappers.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 43f36b3b89122..187d2839165ea 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -142,20 +142,24 @@ private[r] object SparkRWrappers { } } + /** + * Extract labels' names for NaiveBayesModel. + */ def getNaiveBayesLabels(model: PipelineModel): Array[String] = { val lastModel = model.stages.last assert(lastModel.isInstanceOf[NaiveBayesModel], s"Naive Bayes model expected, ${lastModel.getClass.getName} found.") - val numOfStages = model.stages.length - assert(numOfStages == 2, - "The number of Pipeline stages does not match with the training phase.") - // If the original label column is a String column, the next to last stage should be a - // StringIndexerModel. Otherwise we transform the original label column to a String array. + val rFormulaModel = model.stages.head.asInstanceOf[RFormulaModel] + // If RFormula reindex the labels with a StringIndexer, then we extract the original labels. + // Otherwise, we extract the labels out and sort them as what mllib.NaiveBayes does. rFormulaModel.getOriginalLabels match { case Some(labels) => labels - case None => lastModel.asInstanceOf[NaiveBayesModel].labels.toArray.map(_.toString) + case None => + val summary = lastModel.asInstanceOf[NaiveBayesModel].summary + summary.predictions.select(summary.labelCol) + .distinct().map(_.getDouble(0)).collect().sorted.map(_.toString) } } From 30e9c372207ed206a7dc294b5726ad008a18ed12 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 8 Mar 2016 20:37:37 -0800 Subject: [PATCH 11/23] fix error --- mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 187d2839165ea..b944aee781b46 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -159,7 +159,7 @@ private[r] object SparkRWrappers { case None => val summary = lastModel.asInstanceOf[NaiveBayesModel].summary summary.predictions.select(summary.labelCol) - .distinct().map(_.getDouble(0)).collect().sorted.map(_.toString) + .distinct().collect().map(_.getDouble(0)).sorted.map(_.toString) } } From 390f8e62ed1eccaf22b5d4da1123a6f98080e4ba Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 8 Mar 2016 22:04:42 -0800 Subject: [PATCH 12/23] fix typos --- R/pkg/R/mllib.R | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index e49fbe250cf71..87b1f8de27f38 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -215,15 +215,14 @@ setMethod("fitted", signature(object = "PipelineModel"), #' #' Fit a naive Bayes model, similarly to R's naiveBayes() except for omitting two arguments 'subset' #' and 'na.action'. Users can use 'subset' function and 'fillna' or 'na.omit' function of DataFrame, -#' respectviely, to preprocess their DataFrame. We use na.omit in this interface to avoid potential -#' errors. +#' respectively, to preprocess their DataFrame. We use na.omit in this interface to remove rows with +#' NA values. #' #' @param object A symbolic description of the model to be fitted. Currently only a few formula #' operators are supported, including '~', '.', ':', '+', and '-'. #' @param data DataFrame for training #' @param lambda Smoothing parameter #' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial". -#' @param ... Undefined parameters #' @return A fitted naive Bayes model. #' @rdname naiveBayes #' @export @@ -233,7 +232,7 @@ setMethod("fitted", signature(object = "PipelineModel"), #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' df <- createDataFrame(sqlContext, HouseVotes84) -#' model <- glm(Class ~ ., df, lambda = 1, modelType = "multinomial") +#' model <- naiveBayes(Class ~ ., df, lambda = 1, modelType = "multinomial") #'} setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"), function(formula, data, lambda = 1, modelType = c("multinomial", "bernoulli"), ...) { From dbaf4e622dd20d646e0cc26d5df1ba3aec02f827 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 9 Mar 2016 11:53:20 -0800 Subject: [PATCH 13/23] resolve dependency issue --- R/pkg/DESCRIPTION | 3 ++- R/pkg/inst/tests/testthat/test_mllib.R | 15 ++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 465bc37788e5d..2bff229705f81 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -11,7 +11,8 @@ Depends: R (>= 3.0), methods, Suggests: - testthat + testthat, + e1071 Description: R frontend for Spark License: Apache License (== 2.0) Collate: diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 42b6dca02a9d8..9e533432543d2 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -143,19 +143,24 @@ test_that("kmeans", { }) test_that("naiveBayes", { - data(HouseVotes84, package = "mlbench") - training <- createDataFrame(sqlContext, HouseVotes84) + training <- suppressWarnings(createDataFrame(sqlContext, iris)) - model <- naiveBayes(Class ~ ., data = training, lambda = 1, modelType = "multinomial") + model <- naiveBayes(Sepal_Width ~ ., data = training, lambda = 1, modelType = "multinomial") sample <- take(select(predict(model, training), "prediction"), 1) expect_equal(typeof(sample$prediction), "double") - expect_equal(sample$prediction, 0) + expect_equal(sample$prediction, 9) # Test summary works on naiveBayes summary.model <- summary(model) - expect_equal(length(summary.model$pi), 2) + expect_equal(length(summary.model$pi), 23) + expect_equal(sum(summary.model$pi), 1) l1 <- summary(model)$theta[1,] l2 <- summary(model)$theta[2,] expect_true(all.equal(Reduce(`+`, l1), 1)) expect_true(all.equal(Reduce(`+`, l2), 1)) + + # Test e1071::naiveBayes + if (requireNamespace("e1071", quietly = TRUE)) { + model2 <- e1071::naiveBayes(Class ~ ., data = HouseVotes84) + } }) From 9991e7993d425acf54471ddf4380d4c106138501 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sat, 12 Mar 2016 19:11:11 -0800 Subject: [PATCH 14/23] fix nit --- R/pkg/inst/tests/testthat/test_mllib.R | 2 +- .../scala/org/apache/spark/ml/classification/NaiveBayes.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 9e533432543d2..f3331753d71fe 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -161,6 +161,6 @@ test_that("naiveBayes", { # Test e1071::naiveBayes if (requireNamespace("e1071", quietly = TRUE)) { - model2 <- e1071::naiveBayes(Class ~ ., data = HouseVotes84) + expect_that(e1071::naiveBayes(Sepal.Width ~ ., data = iris), not(throws_error())) } }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 9e73dd5082bca..fdbb9299724e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -285,7 +285,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { override protected def saveImpl(path: String): Unit = { // Save metadata and Params DefaultParamsWriter.saveMetadata(instance, path, sc) - // Save model data: labels, pi, theta + // Save model data: pi, theta val data = Data(instance.pi, instance.theta) val dataPath = new Path(path, "data").toString sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) From 6c97cefdba5686704d31555ee71423d4afb888f4 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 16 Mar 2016 16:29:52 -0700 Subject: [PATCH 15/23] fix nits --- R/pkg/R/mllib.R | 17 ++++++++--------- R/pkg/inst/tests/testthat/test_mllib.R | 18 +++++++++--------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 87b1f8de27f38..53c824e05e173 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -42,7 +42,7 @@ setClass("PipelineModel", representation(model = "jobj")) #' @rdname glm #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' data(iris) @@ -71,7 +71,7 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram #' @rdname predict #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- glm(y ~ x, trainingData) #' predicted <- predict(model, testData) #' showDF(predicted) @@ -102,7 +102,7 @@ setMethod("predict", signature(object = "PipelineModel"), #' @rdname summary #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- glm(y ~ x, trainingData) #' summary(model) #'} @@ -170,7 +170,7 @@ setMethod("summary", signature(object = "PipelineModel"), #' @rdname kmeans #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- kmeans(x, centers = 2, algorithm="random") #'} setMethod("kmeans", signature(x = "DataFrame"), @@ -191,7 +191,7 @@ setMethod("kmeans", signature(x = "DataFrame"), #' @rdname fitted #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- kmeans(trainingData, 2) #' fitted.model <- fitted(model) #' showDF(fitted.model) @@ -227,12 +227,11 @@ setMethod("fitted", signature(object = "PipelineModel"), #' @rdname naiveBayes #' @export #' @examples -#'\dontrun{ -#' data(HouseVotes84, package = "mlbench") +#' \dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' df <- createDataFrame(sqlContext, HouseVotes84) -#' model <- naiveBayes(Class ~ ., df, lambda = 1, modelType = "multinomial") +#' df <- createDataFrame(sqlContext, infert) +#' model <- naiveBayes(education ~ ., df, lambda = 1, modelType = "multinomial") #'} setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"), function(formula, data, lambda = 1, modelType = c("multinomial", "bernoulli"), ...) { diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index f3331753d71fe..2cf2e7ce31bed 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -143,24 +143,24 @@ test_that("kmeans", { }) test_that("naiveBayes", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- createDataFrame(sqlContext, infert) - model <- naiveBayes(Sepal_Width ~ ., data = training, lambda = 1, modelType = "multinomial") + model <- naiveBayes(education ~ ., data = training, lambda = 1, modelType = "multinomial") sample <- take(select(predict(model, training), "prediction"), 1) expect_equal(typeof(sample$prediction), "double") - expect_equal(sample$prediction, 9) + expect_equal(sample$prediction, 2) # Test summary works on naiveBayes summary.model <- summary(model) - expect_equal(length(summary.model$pi), 23) + expect_equal(length(summary.model$pi), 3) expect_equal(sum(summary.model$pi), 1) - l1 <- summary(model)$theta[1,] - l2 <- summary(model)$theta[2,] - expect_true(all.equal(Reduce(`+`, l1), 1)) - expect_true(all.equal(Reduce(`+`, l2), 1)) + l1 <- summary.model$theta[1,] + l2 <- summary.model$theta[2,] + expect_equal(sum(unlist(l1)), 1) + expect_equal(sum(unlist(l2)), 1) # Test e1071::naiveBayes if (requireNamespace("e1071", quietly = TRUE)) { - expect_that(e1071::naiveBayes(Sepal.Width ~ ., data = iris), not(throws_error())) + expect_that(e1071::naiveBayes(education ~ ., data = infert), not(throws_error())) } }) From 721a8b75abcff2970b4f74817e754dcff047c810 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 16 Mar 2016 17:19:06 -0700 Subject: [PATCH 16/23] remove NaiveBayesModelSummary --- .../spark/ml/classification/NaiveBayes.scala | 53 +++++++++---------- .../apache/spark/ml/r/SparkRWrappers.scala | 16 ++---- 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index fdbb9299724e1..46e8ba2ab595d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.util._ import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes} @@ -105,15 +106,11 @@ class NaiveBayes @Since("1.5.0") ( val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) val nbModel = copyValues(NaiveBayesModel.fromOld(oldModel, this)) - val summary = new NaiveBayesSummary( - nbModel.transform(dataset), - $(predictionCol), - $(rawPredictionCol), - $(probabilityCol), - $(featuresCol), - $(labelCol)) - nbModel.setSummary(summary) - nbModel + val attr = AttributeGroup.fromStructField(dataset.schema($(featuresCol))).attributes + if (attr.isDefined) { + nbModel.setFeatureNames(attr.get.map(_.name.getOrElse("NA"))) + } + nbModel.setLabelNames(oldModel.labels.map(_.toString)) } @Since("1.5.0") @@ -237,22 +234,32 @@ class NaiveBayesModel private[ml] ( @Since("1.6.0") override def write: MLWriter = new NaiveBayesModel.NaiveBayesModelWriter(this) - private var trainingSummary: Option[NaiveBayesSummary] = None + private var featureNames: Option[Array[String]] = None + private var labelNames: Option[Array[String]] = None - private[classification] def setSummary(summary: NaiveBayesSummary): this.type = { - this.trainingSummary = Some(summary) + private[classification] def setFeatureNames(names: Array[String]): this.type = { + this.featureNames = Some(names) this } - /** - * Gets summary of model on training set. An exception is thrown if `trainingSummary == None`. - */ - @Since("2.0.0") - def summary: NaiveBayesSummary = trainingSummary match { - case Some(summ) => summ + private[classification] def setLabelNames(names: Array[String]): this.type = { + this.labelNames = Some(names) + this + } + + private[ml] def getFeatureNames: Array[String] = featureNames match { + case Some(names) => names case None => throw new SparkException( - s"No training summary available for the ${this.getClass.getSimpleName}", + s"No training result available for the ${this.getClass.getSimpleName}", + new NullPointerException()) + } + + private[ml] def getLabelNames: Array[String] = labelNames match { + case Some(names) => names + case None => + throw new SparkException( + s"No training result available for the ${this.getClass.getSimpleName}", new NullPointerException()) } } @@ -311,11 +318,3 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { } } } - -class NaiveBayesSummary private[classification] ( - @Since("2.0.0") @transient val predictions: DataFrame, - @Since("2.0.0") val predictionCol: String, - @Since("2.0.0") val rawPredictionCol: String, - @Since("2.0.0") val probabilityCol: String, - @Since("2.0.0") val featuresCol: String, - @Since("2.0.0") val labelCol: String) extends Serializable {} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index b944aee781b46..3f78946861d6f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -82,7 +82,7 @@ private[r] object SparkRWrappers { def getModelCoefficients(model: PipelineModel): Array[Double] = { model.stages.last match { - case m: LinearRegressionModel => { + case m: LinearRegressionModel => val coefficientStandardErrorsR = Array(m.summary.coefficientStandardErrors.last) ++ m.summary.coefficientStandardErrors.dropRight(1) val tValuesR = Array(m.summary.tValues.last) ++ m.summary.tValues.dropRight(1) @@ -93,14 +93,12 @@ private[r] object SparkRWrappers { } else { m.coefficients.toArray ++ coefficientStandardErrorsR ++ tValuesR ++ pValuesR } - } - case m: LogisticRegressionModel => { + case m: LogisticRegressionModel => if (m.getFitIntercept) { Array(m.intercept) ++ m.coefficients.toArray } else { m.coefficients.toArray } - } case m: KMeansModel => m.clusterCenters.flatMap(_.toArray) case m: NaiveBayesModel => Array() // A dummy result to prevent unmatched error. @@ -156,10 +154,7 @@ private[r] object SparkRWrappers { // Otherwise, we extract the labels out and sort them as what mllib.NaiveBayes does. rFormulaModel.getOriginalLabels match { case Some(labels) => labels - case None => - val summary = lastModel.asInstanceOf[NaiveBayesModel].summary - summary.predictions.select(summary.labelCol) - .distinct().collect().map(_.getDouble(0)).sorted.map(_.toString) + case None => lastModel.asInstanceOf[NaiveBayesModel].getLabelNames } } @@ -201,10 +196,7 @@ private[r] object SparkRWrappers { val attrs = AttributeGroup.fromStructField( m.summary.predictions.schema(m.summary.featuresCol)) attrs.attributes.get.map(_.name.get) - case m: NaiveBayesModel => - val attrs = AttributeGroup.fromStructField( - m.summary.predictions.schema(m.summary.featuresCol)) - attrs.attributes.get.map(_.name.get) + case m: NaiveBayesModel => m.getFeatureNames } } From 8e2139379313f2b7094e750fba816e5a701a413a Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 16 Mar 2016 19:22:48 -0700 Subject: [PATCH 17/23] add raw label prediction --- R/pkg/R/mllib.R | 8 ++++++-- R/pkg/inst/tests/testthat/test_mllib.R | 11 ++++++----- .../org/apache/spark/ml/r/SparkRWrappers.scala | 13 +++++++++++++ 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 53c824e05e173..e69e1c0ec73b3 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -80,10 +80,14 @@ setMethod("predict", signature(object = "PipelineModel"), function(object, newData) { modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "getModelName", object@model) - if (modelName == "NaiveBayesModel") { + jdf <- if (modelName == "NaiveBayesModel") { newData <- na.omit(newData) + callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getNaiveBayesRawLabelsPrediction", object@model, newData@sdf) + } else { + callJMethod(object@model, "transform", newData@sdf) } - return(dataFrame(callJMethod(object@model, "transform", newData@sdf))) + return(dataFrame(jdf)) }) #' Get the summary of a model diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 2cf2e7ce31bed..8e27d37779f32 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -143,12 +143,12 @@ test_that("kmeans", { }) test_that("naiveBayes", { - training <- createDataFrame(sqlContext, infert) + training <- suppressWarnings(createDataFrame(sqlContext, infert)) model <- naiveBayes(education ~ ., data = training, lambda = 1, modelType = "multinomial") - sample <- take(select(predict(model, training), "prediction"), 1) - expect_equal(typeof(sample$prediction), "double") - expect_equal(sample$prediction, 2) + sample <- take(select(predict(model, training), "rawLabelsPrediction"), 1) + expect_equal(typeof(sample$rawLabelsPrediction), "character") + expect_equal(sample$rawLabelsPrediction, "0-5yrs") # Test summary works on naiveBayes summary.model <- summary(model) @@ -161,6 +161,7 @@ test_that("naiveBayes", { # Test e1071::naiveBayes if (requireNamespace("e1071", quietly = TRUE)) { - expect_that(e1071::naiveBayes(education ~ ., data = infert), not(throws_error())) + expect_that(m <- e1071::naiveBayes(education ~ ., data = infert), not(throws_error())) + expect_equal(as.character(predict(m, infert[1, ])), "0-5yrs") } }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 3f78946861d6f..1c1d5fed5b1e4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -24,6 +24,7 @@ import org.apache.spark.ml.clustering.{KMeans, KMeansModel} import org.apache.spark.ml.feature._ import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions._ private[r] object SparkRWrappers { def fitRModelFormula( @@ -174,6 +175,18 @@ private[r] object SparkRWrappers { } } + def getNaiveBayesRawLabelsPrediction(model: PipelineModel, test: DataFrame): DataFrame = { + model.stages.last match { + case m: NaiveBayesModel => + val labels = getNaiveBayesLabels(model) + val predictRawLabelsUDF = udf { (label: Double) => labels(label.toInt) } + model.transform(test) + .withColumn("rawLabelsPrediction", predictRawLabelsUDF.apply(col(m.getLabelCol))) + case other => throw new UnsupportedOperationException( + s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") + } + } + def getModelFeatures(model: PipelineModel): Array[String] = { model.stages.last match { case m: LinearRegressionModel => From 90b6ad9ebd91d8cdfe9680c9c89355eaf3936b12 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 16 Mar 2016 19:42:25 -0700 Subject: [PATCH 18/23] fix r style --- R/pkg/inst/tests/testthat/test_mllib.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 8e27d37779f32..b30d8d5f099c9 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -154,8 +154,8 @@ test_that("naiveBayes", { summary.model <- summary(model) expect_equal(length(summary.model$pi), 3) expect_equal(sum(summary.model$pi), 1) - l1 <- summary.model$theta[1,] - l2 <- summary.model$theta[2,] + l1 <- summary.model$theta[1, ] + l2 <- summary.model$theta[2, ] expect_equal(sum(unlist(l1)), 1) expect_equal(sum(unlist(l2)), 1) From 87fa0aa25f897ffef755557d2a9320eda86e74ed Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sun, 20 Mar 2016 01:06:34 -0700 Subject: [PATCH 19/23] add IndexToString to extract labels --- R/pkg/R/mllib.R | 11 +-- .../spark/ml/classification/NaiveBayes.scala | 16 +--- .../apache/spark/ml/feature/RFormula.scala | 9 +- .../apache/spark/ml/r/SparkRWrappers.scala | 91 ++++++++++--------- 4 files changed, 55 insertions(+), 72 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index edf01dc343ddb..87c97f90c7208 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -78,16 +78,7 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram #'} setMethod("predict", signature(object = "PipelineModel"), function(object, newData) { - modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelName", object@model) - jdf <- if (modelName == "NaiveBayesModel") { - newData <- na.omit(newData) - callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getNaiveBayesRawLabelsPrediction", object@model, newData@sdf) - } else { - callJMethod(object@model, "transform", newData@sdf) - } - return(dataFrame(jdf)) + return(dataFrame(callJMethod(object@model, "transform", newData@sdf))) }) #' Get the summary of a model diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 8cea3a5d10e23..c4b4eebb65b11 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -110,7 +110,7 @@ class NaiveBayes @Since("1.5.0") ( if (attr.isDefined) { nbModel.setFeatureNames(attr.get.map(_.name.getOrElse("NA"))) } - nbModel.setLabelNames(oldModel.labels.map(_.toString)) + nbModel } @Since("1.5.0") @@ -235,18 +235,12 @@ class NaiveBayesModel private[ml] ( override def write: MLWriter = new NaiveBayesModel.NaiveBayesModelWriter(this) private var featureNames: Option[Array[String]] = None - private var labelNames: Option[Array[String]] = None private[classification] def setFeatureNames(names: Array[String]): this.type = { this.featureNames = Some(names) this } - private[classification] def setLabelNames(names: Array[String]): this.type = { - this.labelNames = Some(names) - this - } - private[ml] def getFeatureNames: Array[String] = featureNames match { case Some(names) => names case None => @@ -254,14 +248,6 @@ class NaiveBayesModel private[ml] ( s"No training result available for the ${this.getClass.getSimpleName}", new NullPointerException()) } - - private[ml] def getLabelNames: Array[String] = labelNames match { - case Some(names) => names - case None => - throw new SparkException( - s"No training result available for the ${this.getClass.getSimpleName}", - new NullPointerException()) - } } @Since("1.6.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 211089ffe370d..ee68a1d127579 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -77,15 +77,13 @@ class RFormula(override val uid: String) /** * R formula parameter. The formula is provided in string form. - * - * @group param + * @group param */ val formula: Param[String] = new Param(this, "formula", "R model formula") /** * Sets the formula to use for this transformer. Must be called before use. - * - * @group setParam + * @group setParam * @param value an R formula in string form (e.g. "y ~ x + z") */ def setFormula(value: String): this.type = set(formula, value) @@ -195,8 +193,7 @@ object RFormula extends DefaultParamsReadable[RFormula] { /** * :: Experimental :: * A fitted RFormula. Fitting is required to determine the factor levels of formula terms. - * - * @param resolvedFormula the fitted R formula. + * @param resolvedFormula the fitted R formula. * @param pipelineModel the fitted feature model, including factor to index mappings. */ @Experimental diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 1c1d5fed5b1e4..597a594817be7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.api.r -import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} import org.apache.spark.ml.attribute._ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes, NaiveBayesModel} import org.apache.spark.ml.clustering.{KMeans, KMeansModel} @@ -25,6 +25,7 @@ import org.apache.spark.ml.feature._ import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StringType private[r] object SparkRWrappers { def fitRModelFormula( @@ -59,10 +60,37 @@ private[r] object SparkRWrappers { lambda: Double, modelType: String): PipelineModel = { + // Transform data with RFormula val formula = new RFormula().setFormula(value) + val fModel = formula.fit(df) + val rawLabels = fModel.getOriginalLabels + val naiveBayes = new NaiveBayes().setSmoothing(lambda).setModelType(modelType) - val pipeline = new Pipeline().setStages(Array(formula, naiveBayes)) - pipeline.fit(df) + val rawLabelsIndexer = new IndexToString() + .setInputCol(naiveBayes.getLabelCol).setOutputCol("rawLabelsPrediction") + + if (fModel.getOriginalLabels.isDefined) { + // String labels have already been re-indexed by RFormula. + val stages: Array[PipelineStage] = + Array(fModel, naiveBayes, rawLabelsIndexer.setLabels(rawLabels.get)) + new Pipeline().setStages(stages).fit(df) + } else { + // Re-index numerical labels for NaiveBayes since it assumes labels are indices. + val labelIndexer = new StringIndexer().setInputCol(fModel.getLabelCol).fit(df) + val stages: Array[PipelineStage] = + Array( + labelIndexer, + fModel, + naiveBayes.setLabelCol(labelIndexer.getOutputCol), + rawLabelsIndexer.setLabels(labelIndexer.labels)) + new Pipeline().setStages(stages).fit(df) + } + } + + def isNaiveBayesModel(model: PipelineModel): Boolean = { + model.stages.length >= 2 && + model.stages(model.stages.length - 2).isInstanceOf[NaiveBayesModel] && + model.stages.last.isInstanceOf[IndexToString] } def fitKMeans( @@ -102,7 +130,7 @@ private[r] object SparkRWrappers { } case m: KMeansModel => m.clusterCenters.flatMap(_.toArray) - case m: NaiveBayesModel => Array() // A dummy result to prevent unmatched error. + case _ if isNaiveBayesModel(model) => Array() // A dummy result to prevent unmatched error. } } @@ -145,49 +173,24 @@ private[r] object SparkRWrappers { * Extract labels' names for NaiveBayesModel. */ def getNaiveBayesLabels(model: PipelineModel): Array[String] = { - val lastModel = model.stages.last - assert(lastModel.isInstanceOf[NaiveBayesModel], - s"Naive Bayes model expected, ${lastModel.getClass.getName} found.") - - val rFormulaModel = model.stages.head.asInstanceOf[RFormulaModel] - - // If RFormula reindex the labels with a StringIndexer, then we extract the original labels. - // Otherwise, we extract the labels out and sort them as what mllib.NaiveBayes does. - rFormulaModel.getOriginalLabels match { - case Some(labels) => labels - case None => lastModel.asInstanceOf[NaiveBayesModel].getLabelNames - } + assert(isNaiveBayesModel(model), + s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") + model.stages.last.asInstanceOf[IndexToString].getLabels } def getNaiveBayesPi(model: PipelineModel): Array[Double] = { - model.stages.last match { - case m: NaiveBayesModel => m.pi.toArray.map(math.exp) // Use exp to reveal the probability - case other => throw new UnsupportedOperationException( - s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") - } + assert(isNaiveBayesModel(model), + s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") + model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].pi.toArray.map(math.exp) } def getNaiveBayesTheta(model: PipelineModel): Array[Double] = { - model.stages.last match { - case m: NaiveBayesModel => m.theta.toArray.map(math.exp) // Use exp to reveal the probability - case other => throw new UnsupportedOperationException( - s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") - } - } - - def getNaiveBayesRawLabelsPrediction(model: PipelineModel, test: DataFrame): DataFrame = { - model.stages.last match { - case m: NaiveBayesModel => - val labels = getNaiveBayesLabels(model) - val predictRawLabelsUDF = udf { (label: Double) => labels(label.toInt) } - model.transform(test) - .withColumn("rawLabelsPrediction", predictRawLabelsUDF.apply(col(m.getLabelCol))) - case other => throw new UnsupportedOperationException( - s"NaiveBayesModel required but ${other.getClass.getSimpleName} found.") - } + assert(isNaiveBayesModel(model), + s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") + model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].theta.toArray.map(math.exp) } - def getModelFeatures(model: PipelineModel): Array[String] = { + def getModelFeatures(model: PipelineModel): Array[String] = { model.stages.last match { case m: LinearRegressionModel => val attrs = AttributeGroup.fromStructField( @@ -209,11 +212,17 @@ private[r] object SparkRWrappers { val attrs = AttributeGroup.fromStructField( m.summary.predictions.schema(m.summary.featuresCol)) attrs.attributes.get.map(_.name.get) - case m: NaiveBayesModel => m.getFeatureNames + case _ if isNaiveBayesModel(model) => + model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].getFeatureNames } } def getModelName(model: PipelineModel): String = { - model.stages.last.getClass.getSimpleName + model.stages.last match { + case m: LinearRegressionModel => "LinearRegressionModel" + case m: LogisticRegressionModel => "LogisticRegressionModel" + case m: KMeansModel => "KMeansModel" + case _ if isNaiveBayesModel(model) => "NaiveBayesModel" + } } } From 3d291de561bc9155e32a0c286309e8b7ddde48c4 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sun, 20 Mar 2016 01:22:15 -0700 Subject: [PATCH 20/23] remove useless imports --- mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index 597a594817be7..bbe7c0739575a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -24,8 +24,6 @@ import org.apache.spark.ml.clustering.{KMeans, KMeansModel} import org.apache.spark.ml.feature._ import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.StringType private[r] object SparkRWrappers { def fitRModelFormula( From 49f36f304fd92130d55509ac0309f5f7d74d0e5c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 21 Mar 2016 23:05:14 -0700 Subject: [PATCH 21/23] refactor with NaiveBayesWrapper --- R/pkg/R/mllib.R | 99 +++++++++++++------ R/pkg/inst/tests/testthat/test_mllib.R | 69 +++++++++---- .../spark/ml/classification/NaiveBayes.scala | 24 +---- .../apache/spark/ml/feature/RFormula.scala | 12 --- .../apache/spark/ml/r/NaiveBayesWrapper.scala | 75 ++++++++++++++ .../apache/spark/ml/r/SparkRWrappers.scala | 87 +++------------- 6 files changed, 211 insertions(+), 155 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 87c97f90c7208..8374a7ae7555b 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -22,6 +22,11 @@ #' @export setClass("PipelineModel", representation(model = "jobj")) +#' @tile S4 class that represents a NaiveBayesModel +#' @param jobj a Java object reference to the backing Scala NaiveBayesWrapper +#' @export +setClass("NaiveBayesModel", representation(jobj = "jobj")) + #' Fits a generalized linear model #' #' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package. @@ -61,7 +66,7 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram return(new("PipelineModel", model = model)) }) -#' Make predictions from a model +#' Make predictions from a amodel #' #' Makes predictions from a model produced by glm(), similarly to R's predict(). #' @@ -81,6 +86,26 @@ setMethod("predict", signature(object = "PipelineModel"), return(dataFrame(callJMethod(object@model, "transform", newData@sdf))) }) +#' Make predictions from a naive Bayes model +#' +#' Makes predictions from a model produced by naiveBayes(), similarly to R package e1071's predict. +#' +#' @param object A fitted naive Bayes model +#' @param newData DataFrame for testing +#' @return DataFrame containing predicted labels in a column named "prediction" +#' @rdname predict +#' @export +#' @examples +#' \dontrun{ +#' model <- naiveBayes(y ~ x, trainingData) +#' predicted <- predict(model, testData) +#' showDF(predicted) +#'} +setMethod("predict", signature(object = "NaiveBayesModel"), + function(object, newData) { + return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) + }) + #' Get the summary of a model #' #' Returns the summary of a model produced by glm(), similarly to R's summary(). @@ -135,24 +160,40 @@ setMethod("summary", signature(object = "PipelineModel"), colnames(coefficients) <- unlist(features) rownames(coefficients) <- 1:k return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster))) - } else if (modelName == "NaiveBayesModel") { - labels <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getNaiveBayesLabels", object@model) - pi <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getNaiveBayesPi", object@model) - theta <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getNaiveBayesTheta", object@model) - pi <- t(as.matrix(unlist(pi))) - colnames(pi) <- unlist(labels) - theta <- matrix(theta, nrow = length(labels)) - rownames(theta) <- unlist(labels) - colnames(theta) <- unlist(features) - return(list(pi = pi, theta = theta)) } else { stop(paste("Unsupported model", modelName, sep = " ")) } }) +#' Get the summary of a naive Bayes model +#' +#' Returns the summary of a naive Bayes model produced by naiveBayes(), similarly to R's summary(). +#' +#' @param object A fitted MLlib model +#' @return a list containing 'apriori', the label distribution, and 'tables', conditional +# probabilities given the target label +#' @rdname summary +#' @export +#' @examples +#' \dontrun{ +#' model <- naiveBayes(y ~ x, trainingData) +#' summary(model) +#'} +setMethod("summary", signature(object = "NaiveBayesModel"), + function(object, ...) { + jobj <- object@jobj + features <- callJMethod(jobj, "features") + labels <- callJMethod(jobj, "labels") + apriori <- callJMethod(jobj, "apriori") + apriori <- t(as.matrix(unlist(apriori))) + colnames(apriori) <- unlist(labels) + tables <- callJMethod(jobj, "tables") + tables <- matrix(tables, nrow = length(labels)) + rownames(tables) <- unlist(labels) + colnames(tables) <- unlist(features) + return(list(apriori = apriori, tables = tables)) + }) + #' Fit a k-means model #' #' Fit a k-means model, similarly to R's kmeans(). @@ -206,34 +247,30 @@ setMethod("fitted", signature(object = "PipelineModel"), } }) -#' Fit a naive Bayes model +#' Fit a Bernoulli naive Bayes model #' -#' Fit a naive Bayes model, similarly to R's naiveBayes() except for omitting two arguments 'subset' -#' and 'na.action'. Users can use 'subset' function and 'fillna' or 'na.omit' function of DataFrame, -#' respectively, to preprocess their DataFrame. We use na.omit in this interface to remove rows with -#' NA values. +#' Fit a Bernoulli naive Bayes model, similarly to R package e1071's naiveBayes() while only +#' categorical features are supported. The input should be a DataFrame of observations instead of a +#' contingency table. #' #' @param object A symbolic description of the model to be fitted. Currently only a few formula -#' operators are supported, including '~', '.', ':', '+', and '-'. +#' operators are supported, including '~', '.', ':', '+', and '-'. #' @param data DataFrame for training -#' @param lambda Smoothing parameter -#' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial". -#' @return A fitted naive Bayes model. +#' @param laplace Smoothing parameter +#' @return a fitted naive Bayes model #' @rdname naiveBayes +#' @seealso e1071: \url{https://cran.r-project.org/web/packages/e1071/} #' @export #' @examples #' \dontrun{ -#' sc <- sparkR.init() -#' sqlContext <- sparkRSQL.init(sc) #' df <- createDataFrame(sqlContext, infert) -#' model <- naiveBayes(education ~ ., df, lambda = 1, modelType = "multinomial") +#' model <- naiveBayes(education ~ ., df, laplace = 0) #'} setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"), - function(formula, data, lambda = 1, modelType = c("multinomial", "bernoulli"), ...) { + function(formula, data, laplace = 0, ...) { data <- na.omit(data) formula <- paste(deparse(formula), collapse = "") - modelType <- match.arg(modelType) - model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", - formula, data@sdf, lambda, modelType) - return(new("PipelineModel", model = model)) + jobj <- callJStatic("org.apache.spark.ml.r.NaiveBayesWrapper", "fit", + formula, data@sdf, laplace) + return(new("NaiveBayesModel", jobj = jobj)) }) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index b62dcd873ce5c..74e26f3c94d09 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -143,25 +143,60 @@ test_that("kmeans", { }) test_that("naiveBayes", { - training <- suppressWarnings(createDataFrame(sqlContext, infert)) - - model <- naiveBayes(education ~ ., data = training, lambda = 1, modelType = "multinomial") - sample <- take(select(predict(model, training), "rawLabelsPrediction"), 1) - expect_equal(typeof(sample$rawLabelsPrediction), "character") - expect_equal(sample$rawLabelsPrediction, "0-5yrs") - - # Test summary works on naiveBayes - summary.model <- summary(model) - expect_equal(length(summary.model$pi), 3) - expect_equal(sum(summary.model$pi), 1) - l1 <- summary.model$theta[1, ] - l2 <- summary.model$theta[2, ] - expect_equal(sum(unlist(l1)), 1) - expect_equal(sum(unlist(l2)), 1) + # R code to reproduce the result. + # We do not support instance weights yet. So we ignore the frequencies. + # + # library(e1071) + # t <- as.data.frame(Titanic) + # t1 <- t[t$Freq > 0, -5] + # m <- naiveBayes(Survived ~ ., data = t1) + # m + # predict(m, t1) + # + # -- output of 'm' + # + # A-priori probabilities: + # Y + # No Yes + # 0.4166667 0.5833333 + # + # Conditional probabilities: + # Class + # Y 1st 2nd 3rd Crew + # No 0.2000000 0.2000000 0.4000000 0.2000000 + # Yes 0.2857143 0.2857143 0.2857143 0.1428571 + # + # Sex + # Y Male Female + # No 0.5 0.5 + # Yes 0.5 0.5 + # + # Age + # Y Child Adult + # No 0.2000000 0.8000000 + # Yes 0.4285714 0.5714286 + # + # -- output of 'predict(m, t1)' + # + # Yes Yes Yes Yes No No Yes Yes No No Yes Yes Yes Yes Yes Yes Yes Yes No No Yes Yes No No + # + + t <- as.data.frame(Titanic) + t1 <- t[t$Freq > 0, -5] + df <- suppressWarnings(createDataFrame(sqlContext, t1)) + m <- naiveBayes(Survived ~ ., data = df) + s <- summary(m) + expect_equal(s$apriori[1, "Yes"], 0.5833333, tolerance = 1e-6) + expect_equal(sum(s$apriori), 1) + expect_equal(s$tables["Yes", "Age_Adult"], 0.5714286, tolerance = 1e-6) + p <- collect(select(predict(m, df), "prediction")) + expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No", + "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No", + "Yes", "Yes", "No", "No")) # Test e1071::naiveBayes if (requireNamespace("e1071", quietly = TRUE)) { - expect_that(m <- e1071::naiveBayes(education ~ ., data = infert), not(throws_error())) - expect_equal(as.character(predict(m, infert[1, ])), "0-5yrs") + expect_that(m <- e1071::naiveBayes(Survived ~ ., data = t1), not(throws_error())) + expect_equal(as.character(predict(m, t1[1, ])), "Yes") } }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index c4b4eebb65b11..483ef0d88ca64 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.PredictorParams -import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.util._ import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes} @@ -105,12 +104,7 @@ class NaiveBayes @Since("1.5.0") ( override protected def train(dataset: DataFrame): NaiveBayesModel = { val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) - val nbModel = copyValues(NaiveBayesModel.fromOld(oldModel, this)) - val attr = AttributeGroup.fromStructField(dataset.schema($(featuresCol))).attributes - if (attr.isDefined) { - nbModel.setFeatureNames(attr.get.map(_.name.getOrElse("NA"))) - } - nbModel + NaiveBayesModel.fromOld(oldModel, this) } @Since("1.5.0") @@ -233,21 +227,6 @@ class NaiveBayesModel private[ml] ( @Since("1.6.0") override def write: MLWriter = new NaiveBayesModel.NaiveBayesModelWriter(this) - - private var featureNames: Option[Array[String]] = None - - private[classification] def setFeatureNames(names: Array[String]): this.type = { - this.featureNames = Some(names) - this - } - - private[ml] def getFeatureNames: Array[String] = featureNames match { - case Some(names) => names - case None => - throw new SparkException( - s"No training result available for the ${this.getClass.getSimpleName}", - new NullPointerException()) - } } @Since("1.6.0") @@ -258,6 +237,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { oldModel: OldNaiveBayesModel, parent: NaiveBayes): NaiveBayesModel = { val uid = if (parent != null) parent.uid else Identifiable.randomUID("nb") + val labels = Vectors.dense(oldModel.labels) val pi = Vectors.dense(oldModel.pi) val theta = new DenseMatrix(oldModel.labels.length, oldModel.theta(0).length, oldModel.theta.flatten, true) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index ee68a1d127579..e7ca7ada74c8c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -257,18 +257,6 @@ class RFormulaModel private[feature]( "Label column already exists and is not of type DoubleType.") } - /** - * Get the original array of labels if exists. - */ - private[ml] def getOriginalLabels: Option[Array[String]] = { - // According to the sequences of transformers in RFormula, if the last stage is a - // StringIndexerModel, then we can extract the original labels from it. - pipelineModel.stages.last match { - case m: StringIndexerModel => Some(m.labels) - case _ => None - } - } - @Since("2.0.0") override def write: MLWriter = new RFormulaModel.RFormulaModelWriter(this) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala new file mode 100644 index 0000000000000..0e4ef6c52dc04 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.attribute.{AttributeGroup, Attribute, NominalAttribute} +import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel} +import org.apache.spark.ml.feature.{IndexToString, RFormula} +import org.apache.spark.sql.DataFrame + +private[r] class NaiveBayesWrapper private ( + pipeline: PipelineModel, + val labels: Array[String], + val features: Array[String]) { + + import NaiveBayesWrapper._ + + private val naiveBayesModel: NaiveBayesModel = pipeline.stages(1).asInstanceOf[NaiveBayesModel] + + lazy val apriori: Array[Double] = naiveBayesModel.pi.toArray.map(math.exp) + + lazy val tables: Array[Double] = naiveBayesModel.theta.toArray.map(math.exp) + + def transform(dataset: DataFrame): DataFrame = { + pipeline.transform(dataset).drop(PREDICTED_LABEL_INDEX_COL) + } +} + +private[r] object NaiveBayesWrapper { + + val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" + val PREDICTED_LABEL_COL = "prediction" + + def fit(formula: String, data: DataFrame, laplace: Double): NaiveBayesWrapper = { + val rFormula = new RFormula() + .setFormula(formula) + .fit(data) + // get labels and feature names from output schema + val schema = rFormula.transform(data).schema + val labelAttr = Attribute.fromStructField(schema(rFormula.getLabelCol)) + .asInstanceOf[NominalAttribute] + val labels = labelAttr.values.get + val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) + .attributes.get + val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline + val naiveBayes = new NaiveBayes() + .setSmoothing(laplace) + .setModelType("bernoulli") + .setPredictionCol(PREDICTED_LABEL_INDEX_COL) + val idxToStr = new IndexToString() + .setInputCol(PREDICTED_LABEL_INDEX_COL) + .setOutputCol(PREDICTED_LABEL_COL) + .setLabels(labels) + val pipeline = new Pipeline() + .setStages(Array(rFormula, naiveBayes, idxToStr)) + .fit(data) + new NaiveBayesWrapper(pipeline, labels, features) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index bbe7c0739575a..d23e4fc9d1f57 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -17,11 +17,11 @@ package org.apache.spark.ml.api.r -import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} +import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes, NaiveBayesModel} +import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} import org.apache.spark.ml.clustering.{KMeans, KMeansModel} -import org.apache.spark.ml.feature._ +import org.apache.spark.ml.feature.{RFormula, VectorAssembler} import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame @@ -52,45 +52,6 @@ private[r] object SparkRWrappers { pipeline.fit(df) } - def fitNaiveBayes( - value: String, - df: DataFrame, - lambda: Double, - modelType: String): PipelineModel = { - - // Transform data with RFormula - val formula = new RFormula().setFormula(value) - val fModel = formula.fit(df) - val rawLabels = fModel.getOriginalLabels - - val naiveBayes = new NaiveBayes().setSmoothing(lambda).setModelType(modelType) - val rawLabelsIndexer = new IndexToString() - .setInputCol(naiveBayes.getLabelCol).setOutputCol("rawLabelsPrediction") - - if (fModel.getOriginalLabels.isDefined) { - // String labels have already been re-indexed by RFormula. - val stages: Array[PipelineStage] = - Array(fModel, naiveBayes, rawLabelsIndexer.setLabels(rawLabels.get)) - new Pipeline().setStages(stages).fit(df) - } else { - // Re-index numerical labels for NaiveBayes since it assumes labels are indices. - val labelIndexer = new StringIndexer().setInputCol(fModel.getLabelCol).fit(df) - val stages: Array[PipelineStage] = - Array( - labelIndexer, - fModel, - naiveBayes.setLabelCol(labelIndexer.getOutputCol), - rawLabelsIndexer.setLabels(labelIndexer.labels)) - new Pipeline().setStages(stages).fit(df) - } - } - - def isNaiveBayesModel(model: PipelineModel): Boolean = { - model.stages.length >= 2 && - model.stages(model.stages.length - 2).isInstanceOf[NaiveBayesModel] && - model.stages.last.isInstanceOf[IndexToString] - } - def fitKMeans( df: DataFrame, initMode: String, @@ -109,7 +70,7 @@ private[r] object SparkRWrappers { def getModelCoefficients(model: PipelineModel): Array[Double] = { model.stages.last match { - case m: LinearRegressionModel => + case m: LinearRegressionModel => { val coefficientStandardErrorsR = Array(m.summary.coefficientStandardErrors.last) ++ m.summary.coefficientStandardErrors.dropRight(1) val tValuesR = Array(m.summary.tValues.last) ++ m.summary.tValues.dropRight(1) @@ -120,15 +81,16 @@ private[r] object SparkRWrappers { } else { m.coefficients.toArray ++ coefficientStandardErrorsR ++ tValuesR ++ pValuesR } - case m: LogisticRegressionModel => + } + case m: LogisticRegressionModel => { if (m.getFitIntercept) { Array(m.intercept) ++ m.coefficients.toArray } else { m.coefficients.toArray } + } case m: KMeansModel => m.clusterCenters.flatMap(_.toArray) - case _ if isNaiveBayesModel(model) => Array() // A dummy result to prevent unmatched error. } } @@ -167,28 +129,7 @@ private[r] object SparkRWrappers { } } - /** - * Extract labels' names for NaiveBayesModel. - */ - def getNaiveBayesLabels(model: PipelineModel): Array[String] = { - assert(isNaiveBayesModel(model), - s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") - model.stages.last.asInstanceOf[IndexToString].getLabels - } - - def getNaiveBayesPi(model: PipelineModel): Array[Double] = { - assert(isNaiveBayesModel(model), - s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") - model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].pi.toArray.map(math.exp) - } - - def getNaiveBayesTheta(model: PipelineModel): Array[Double] = { - assert(isNaiveBayesModel(model), - s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") - model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].theta.toArray.map(math.exp) - } - - def getModelFeatures(model: PipelineModel): Array[String] = { + def getModelFeatures(model: PipelineModel): Array[String] = { model.stages.last match { case m: LinearRegressionModel => val attrs = AttributeGroup.fromStructField( @@ -210,17 +151,17 @@ private[r] object SparkRWrappers { val attrs = AttributeGroup.fromStructField( m.summary.predictions.schema(m.summary.featuresCol)) attrs.attributes.get.map(_.name.get) - case _ if isNaiveBayesModel(model) => - model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].getFeatureNames } } def getModelName(model: PipelineModel): String = { model.stages.last match { - case m: LinearRegressionModel => "LinearRegressionModel" - case m: LogisticRegressionModel => "LogisticRegressionModel" - case m: KMeansModel => "KMeansModel" - case _ if isNaiveBayesModel(model) => "NaiveBayesModel" + case m: LinearRegressionModel => + "LinearRegressionModel" + case m: LogisticRegressionModel => + "LogisticRegressionModel" + case m: KMeansModel => + "KMeansModel" } } } From 12a41bb71facbf33a49347ea19d5946424b516f5 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 22 Mar 2016 08:55:41 -0700 Subject: [PATCH 22/23] fix tests --- R/pkg/R/mllib.R | 5 ++--- R/pkg/inst/tests/testthat/test_mllib.R | 4 ++-- .../main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 8374a7ae7555b..25550193690bb 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -22,7 +22,7 @@ #' @export setClass("PipelineModel", representation(model = "jobj")) -#' @tile S4 class that represents a NaiveBayesModel +#' @title S4 class that represents a NaiveBayesModel #' @param jobj a Java object reference to the backing Scala NaiveBayesWrapper #' @export setClass("NaiveBayesModel", representation(jobj = "jobj")) @@ -66,7 +66,7 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram return(new("PipelineModel", model = model)) }) -#' Make predictions from a amodel +#' Make predictions from a model #' #' Makes predictions from a model produced by glm(), similarly to R's predict(). #' @@ -268,7 +268,6 @@ setMethod("fitted", signature(object = "PipelineModel"), #'} setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"), function(formula, data, laplace = 0, ...) { - data <- na.omit(data) formula <- paste(deparse(formula), collapse = "") jobj <- callJStatic("org.apache.spark.ml.r.NaiveBayesWrapper", "fit", formula, data@sdf, laplace) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 74e26f3c94d09..ec7f62c43f1f8 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -186,9 +186,9 @@ test_that("naiveBayes", { df <- suppressWarnings(createDataFrame(sqlContext, t1)) m <- naiveBayes(Survived ~ ., data = df) s <- summary(m) - expect_equal(s$apriori[1, "Yes"], 0.5833333, tolerance = 1e-6) + expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6) expect_equal(sum(s$apriori), 1) - expect_equal(s$tables["Yes", "Age_Adult"], 0.5714286, tolerance = 1e-6) + expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6) p <- collect(select(predict(m, df), "prediction")) expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No", diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala index 0e4ef6c52dc04..07383d393d637 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.r import org.apache.spark.ml.{Pipeline, PipelineModel} -import org.apache.spark.ml.attribute.{AttributeGroup, Attribute, NominalAttribute} +import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute} import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel} import org.apache.spark.ml.feature.{IndexToString, RFormula} import org.apache.spark.sql.DataFrame From 0ac224ef1a3efb64f92ecac8b84cfe487e17231e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 22 Mar 2016 10:29:06 -0700 Subject: [PATCH 23/23] fix linter --- R/pkg/inst/tests/testthat/test_mllib.R | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index ec7f62c43f1f8..44b48369ef2b5 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -146,12 +146,12 @@ test_that("naiveBayes", { # R code to reproduce the result. # We do not support instance weights yet. So we ignore the frequencies. # - # library(e1071) - # t <- as.data.frame(Titanic) - # t1 <- t[t$Freq > 0, -5] - # m <- naiveBayes(Survived ~ ., data = t1) - # m - # predict(m, t1) + #' library(e1071) + #' t <- as.data.frame(Titanic) + #' t1 <- t[t$Freq > 0, -5] + #' m <- naiveBayes(Survived ~ ., data = t1) + #' m + #' predict(m, t1) # # -- output of 'm' #