Skip to content

Commit

Permalink
[SPARK-18476][SPARKR][ML] SparkR Logistic Regression should should su…
Browse files Browse the repository at this point in the history
…pport output original label.

## What changes were proposed in this pull request?

Similar to SPARK-18401, as a classification algorithm, logistic regression should support output original label instead of supporting index label.

In this PR, original label output is supported and test cases are modified and added. Document is also modified.

## How was this patch tested?

Unit tests.

Author: wm624@hotmail.com <wm624@hotmail.com>

Closes #15910 from wangmiao1981/audit.
  • Loading branch information
wangmiao1981 authored and yanboliang committed Dec 1, 2016
1 parent 0a81121 commit 2eb6764
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 30 deletions.
19 changes: 9 additions & 10 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ setMethod("predict", signature(object = "KMeansModel"),
#' of L1 and L2. Default is 0.0 which is an L2 penalty.
#' @param maxIter maximum iteration number.
#' @param tol convergence tolerance of iterations.
#' @param fitIntercept whether to fit an intercept term.
#' @param family the name of family which is a description of the label distribution to be used in the model.
#' Supported options:
#' \itemize{
Expand Down Expand Up @@ -747,11 +746,11 @@ setMethod("predict", signature(object = "KMeansModel"),
#' \dontrun{
#' sparkR.session()
#' # binary logistic regression
#' label <- c(1.0, 1.0, 1.0, 0.0, 0.0)
#' feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
#' binary_data <- as.data.frame(cbind(label, feature))
#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
#' binary_data <- as.data.frame(cbind(label, features))
#' binary_df <- createDataFrame(binary_data)
#' blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0)
#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0)
#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
#'
#' # summary of binary logistic regression
Expand Down Expand Up @@ -783,7 +782,7 @@ setMethod("predict", signature(object = "KMeansModel"),
#' @note spark.logit since 2.1.0
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100,
tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE,
tol = 1E-6, family = "auto", standardization = TRUE,
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
probabilityCol = "probability") {
formula <- paste(deparse(formula), collapse = "")
Expand All @@ -795,10 +794,10 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit",
data@sdf, formula, as.numeric(regParam),
as.numeric(elasticNetParam), as.integer(maxIter),
as.numeric(tol), as.logical(fitIntercept),
as.character(family), as.logical(standardization),
as.array(thresholds), as.character(weightCol),
as.integer(aggregationDepth), as.character(probabilityCol))
as.numeric(tol), as.character(family),
as.logical(standardization), as.array(thresholds),
as.character(weightCol), as.integer(aggregationDepth),
as.character(probabilityCol))
new("LogisticRegressionModel", jobj = jobj)
})

Expand Down
26 changes: 18 additions & 8 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -646,30 +646,30 @@ test_that("spark.isotonicRegression", {

test_that("spark.logit", {
# test binary logistic regression
label <- c(1.0, 1.0, 1.0, 0.0, 0.0)
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
binary_data <- as.data.frame(cbind(label, feature))
binary_df <- createDataFrame(binary_data)

blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0)
blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
expect_equal(blr_predict$prediction, c(0, 0, 0, 0, 0))
expect_equal(blr_predict$prediction, c("0.0", "0.0", "0.0", "0.0", "0.0"))
blr_model1 <- spark.logit(binary_df, label ~ feature, thresholds = 0.0)
blr_predict1 <- collect(select(predict(blr_model1, binary_df), "prediction"))
expect_equal(blr_predict1$prediction, c(1, 1, 1, 1, 1))
expect_equal(blr_predict1$prediction, c("1.0", "1.0", "1.0", "1.0", "1.0"))

# test summary of binary logistic regression
blr_summary <- summary(blr_model)
blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure"))
expect_equal(blr_fmeasure$threshold, c(0.8221347, 0.7884005, 0.6674709, 0.3785437, 0.3434487),
expect_equal(blr_fmeasure$threshold, c(0.6565513, 0.6214563, 0.3325291, 0.2115995, 0.1778653),
tolerance = 1e-4)
expect_equal(blr_fmeasure$"F-Measure", c(0.5000000, 0.8000000, 0.6666667, 0.8571429, 0.7500000),
expect_equal(blr_fmeasure$"F-Measure", c(0.6666667, 0.5000000, 0.8000000, 0.6666667, 0.5714286),
tolerance = 1e-4)
blr_precision <- collect(select(blr_summary$precisionByThreshold, "threshold", "precision"))
expect_equal(blr_precision$precision, c(1.0000000, 1.0000000, 0.6666667, 0.7500000, 0.6000000),
expect_equal(blr_precision$precision, c(1.0000000, 0.5000000, 0.6666667, 0.5000000, 0.4000000),
tolerance = 1e-4)
blr_recall <- collect(select(blr_summary$recallByThreshold, "threshold", "recall"))
expect_equal(blr_recall$recall, c(0.3333333, 0.6666667, 0.6666667, 1.0000000, 1.0000000),
expect_equal(blr_recall$recall, c(0.5000000, 0.5000000, 1.0000000, 1.0000000, 1.0000000),
tolerance = 1e-4)

# test model save and read
Expand All @@ -683,6 +683,16 @@ test_that("spark.logit", {
expect_error(summary(blr_model2))
unlink(modelPath)

# test prediction label as text
training <- suppressWarnings(createDataFrame(iris))
binomial_training <- training[training$Species %in% c("versicolor", "virginica"), ]
binomial_model <- spark.logit(binomial_training, Species ~ Sepal_Length + Sepal_Width)
prediction <- predict(binomial_model, binomial_training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("virginica", "virginica", "virginica", "versicolor", "virginica",
"versicolor", "virginica", "versicolor", "virginica", "versicolor")
expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)

# test multinomial logistic regression
label <- c(0.0, 1.0, 2.0, 0.0, 0.0)
feature1 <- c(4.845940, 5.64480, 7.430381, 6.464263, 5.555667)
Expand All @@ -694,7 +704,7 @@ test_that("spark.logit", {

model <- spark.logit(df, label ~., family = "multinomial", thresholds = c(0, 1, 1))
predict1 <- collect(select(predict(model, df), "prediction"))
expect_equal(predict1$prediction, c(0, 0, 0, 0, 0))
expect_equal(predict1$prediction, c("0.0", "0.0", "0.0", "0.0", "0.0"))
# Summary of multinomial logistic regression is not implemented yet
expect_error(summary(model))
})
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.io._
import java.lang.reflect.Constructor
import java.net.{MalformedURLException, URI}
import java.net.{URI}
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}

Expand All @@ -34,6 +34,8 @@ private[r] class LogisticRegressionWrapper private (
val features: Array[String],
val isLoaded: Boolean = false) extends MLWritable {

import LogisticRegressionWrapper._

private val logisticRegressionModel: LogisticRegressionModel =
pipeline.stages(1).asInstanceOf[LogisticRegressionModel]

Expand All @@ -57,7 +59,11 @@ private[r] class LogisticRegressionWrapper private (
lazy val recallByThreshold: DataFrame = blrSummary.recallByThreshold

def transform(dataset: Dataset[_]): DataFrame = {
pipeline.transform(dataset).drop(logisticRegressionModel.getFeaturesCol)
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(logisticRegressionModel.getFeaturesCol)
.drop(logisticRegressionModel.getLabelCol)

}

override def write: MLWriter = new LogisticRegressionWrapper.LogisticRegressionWrapperWriter(this)
Expand All @@ -66,14 +72,16 @@ private[r] class LogisticRegressionWrapper private (
private[r] object LogisticRegressionWrapper
extends MLReadable[LogisticRegressionWrapper] {

val PREDICTED_LABEL_INDEX_COL = "pred_label_idx"
val PREDICTED_LABEL_COL = "prediction"

def fit( // scalastyle:ignore
data: DataFrame,
formula: String,
regParam: Double,
elasticNetParam: Double,
maxIter: Int,
tol: Double,
fitIntercept: Boolean,
family: String,
standardization: Boolean,
thresholds: Array[Double],
Expand All @@ -84,14 +92,14 @@ private[r] object LogisticRegressionWrapper

val rFormula = new RFormula()
.setFormula(formula)
RWrapperUtils.checkDataColumns(rFormula, data)
.setForceIndexLabel(true)
checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)

// get feature names from output schema
val schema = rFormulaModel.transform(data).schema
val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
.attributes.get
val features = featureAttrs.map(_.name.get)
val fitIntercept = rFormula.hasIntercept

// get labels and feature names from output schema
val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)

// assemble and fit the pipeline
val logisticRegression = new LogisticRegression()
Expand All @@ -105,16 +113,23 @@ private[r] object LogisticRegressionWrapper
.setWeightCol(weightCol)
.setAggregationDepth(aggregationDepth)
.setFeaturesCol(rFormula.getFeaturesCol)
.setLabelCol(rFormula.getLabelCol)
.setProbabilityCol(probability)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)

if (thresholds.length > 1) {
logisticRegression.setThresholds(thresholds)
} else {
logisticRegression.setThreshold(thresholds(0))
}

val idxToStr = new IndexToString()
.setInputCol(PREDICTED_LABEL_INDEX_COL)
.setOutputCol(PREDICTED_LABEL_COL)
.setLabels(labels)

val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, logisticRegression))
.setStages(Array(rFormulaModel, logisticRegression, idxToStr))
.fit(data)

new LogisticRegressionWrapper(pipeline, features)
Expand Down

0 comments on commit 2eb6764

Please sign in to comment.