Skip to content

Commit

Permalink
[SPARK-12526][SPARKR] ifelse, when, otherwise` unable to take Col…
Browse files Browse the repository at this point in the history
…umn as value

`ifelse`, `when`, `otherwise` is unable to take `Column` typed S4 object as values.

For example:
```r
ifelse(lit(1) == lit(1), lit(2), lit(3))
ifelse(df$mpg > 0, df$mpg, 0)
```
will both fail with
```r
attempt to replicate an object of type 'environment'
```

The PR replaces `ifelse` calls with `if ... else ...` inside the function implementations to avoid attempt to vectorize(i.e. `rep()`). It remains to be discussed whether we should instead support vectorization in these functions for consistency because `ifelse` in base R is vectorized but I cannot foresee any scenarios these functions will want to be vectorized in SparkR.

For reference, added test cases which trigger failures:
```r
. Error: when(), otherwise() and ifelse() with column on a DataFrame ----------
error in evaluating the argument 'x' in selecting a method for function 'collect':
  error in evaluating the argument 'col' in selecting a method for function 'select':
  attempt to replicate an object of type 'environment'
Calls: when -> when -> ifelse -> ifelse

1: withCallingHandlers(eval(code, new_test_environment), error = capture_calls, message = function(c) invokeRestart("muffleMessage"))
2: eval(code, new_test_environment)
3: eval(expr, envir, enclos)
4: expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1)) at test_sparkSQL.R:1126
5: expect_that(object, equals(expected, label = expected.label, ...), info = info, label = label)
6: condition(object)
7: compare(actual, expected, ...)
8: collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))
Error: Test failures
Execution halted
```

Author: Forest Fang <forest.fang@outlook.com>

Closes #10481 from saurfang/spark-12526.
  • Loading branch information
saurfang authored and shivaram committed Dec 29, 2015
1 parent 73862a1 commit d80cc90
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
4 changes: 2 additions & 2 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ setMethod("%in%",

#' otherwise
#'
#' If values in the specified column are null, returns the value.
#' If values in the specified column are null, returns the value.
#' Can be used in conjunction with `when` to specify a default value for expressions.
#'
#' @rdname otherwise
Expand All @@ -225,7 +225,7 @@ setMethod("%in%",
setMethod("otherwise",
signature(x = "Column", value = "ANY"),
function(x, value) {
value <- ifelse(class(value) == "Column", value@jc, value)
value <- if (class(value) == "Column") { value@jc } else { value }
jc <- callJMethod(x@jc, "otherwise", value)
column(jc)
})
13 changes: 8 additions & 5 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ setMethod("lit", signature("ANY"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions",
"lit",
ifelse(class(x) == "Column", x@jc, x))
if (class(x) == "Column") { x@jc } else { x })
column(jc)
})

Expand Down Expand Up @@ -2262,7 +2262,7 @@ setMethod("unix_timestamp", signature(x = "Column", format = "character"),
setMethod("when", signature(condition = "Column", value = "ANY"),
function(condition, value) {
condition <- condition@jc
value <- ifelse(class(value) == "Column", value@jc, value)
value <- if (class(value) == "Column") { value@jc } else { value }
jc <- callJStatic("org.apache.spark.sql.functions", "when", condition, value)
column(jc)
})
Expand All @@ -2277,13 +2277,16 @@ setMethod("when", signature(condition = "Column", value = "ANY"),
#' @name ifelse
#' @seealso \link{when}
#' @export
#' @examples \dontrun{ifelse(df$a > 1 & df$b > 2, 0, 1)}
#' @examples \dontrun{
#' ifelse(df$a > 1 & df$b > 2, 0, 1)
#' ifelse(df$a > 1, df$a, 1)
#' }
setMethod("ifelse",
signature(test = "Column", yes = "ANY", no = "ANY"),
function(test, yes, no) {
test <- test@jc
yes <- ifelse(class(yes) == "Column", yes@jc, yes)
no <- ifelse(class(no) == "Column", no@jc, no)
yes <- if (class(yes) == "Column") { yes@jc } else { yes }
no <- if (class(no) == "Column") { no@jc } else { no }
jc <- callJMethod(callJStatic("org.apache.spark.sql.functions",
"when",
test, yes),
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,14 @@ test_that("when(), otherwise() and ifelse() on a DataFrame", {
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0))
})

test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1))
expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1))
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0))
})

test_that("group by, agg functions", {
df <- read.json(sqlContext, jsonPath)
df1 <- agg(df, name = "max", age = "sum")
Expand Down

0 comments on commit d80cc90

Please sign in to comment.