Skip to content

Commit

Permalink
[SPARK-25117][R] Add EXEPT ALL and INTERSECT ALL support in R
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
[SPARK-21274](https://issues.apache.org/jira/browse/SPARK-21274) added support for EXCEPT ALL and INTERSECT ALL. This PR adds the support in R.

## How was this patch tested?
Added test in test_sparkSQL.R

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #22107 from dilipbiswal/SPARK-25117.
  • Loading branch information
dilipbiswal authored and Felix Cheung committed Aug 17, 2018
1 parent c1ffb3c commit 162326c
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 1 deletion.
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Expand Up @@ -117,6 +117,7 @@ exportMethods("arrange",
"dropna",
"dtypes",
"except",
"exceptAll",
"explain",
"fillna",
"filter",
Expand All @@ -131,6 +132,7 @@ exportMethods("arrange",
"hint",
"insertInto",
"intersect",
"intersectAll",
"isLocal",
"isStreaming",
"join",
Expand Down
59 changes: 58 additions & 1 deletion R/pkg/R/DataFrame.R
Expand Up @@ -2848,6 +2848,35 @@ setMethod("intersect",
dataFrame(intersected)
})

#' intersectAll
#'
#' Return a new SparkDataFrame containing rows in both this SparkDataFrame
#' and another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the intersect all operation.
#' @family SparkDataFrame functions
#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
#' @rdname intersectAll
#' @name intersectAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' intersectAllDF <- intersectAll(df1, df2)
#' }
#' @note intersectAll since 2.4.0
setMethod("intersectAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
dataFrame(intersected)
})

#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
Expand All @@ -2867,7 +2896,6 @@ setMethod("intersect",
#' df2 <- read.json(path2)
#' exceptDF <- except(df, df2)
#' }
#' @rdname except
#' @note except since 1.4.0
setMethod("except",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
Expand All @@ -2876,6 +2904,35 @@ setMethod("except",
dataFrame(excepted)
})

#' exceptAll
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the except all operation.
#' @family SparkDataFrame functions
#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
#' @rdname exceptAll
#' @name exceptAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' exceptAllDF <- exceptAll(df1, df2)
#' }
#' @note exceptAll since 2.4.0
setMethod("exceptAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
dataFrame(excepted)
})

#' Save the contents of SparkDataFrame to a data source.
#'
#' The data source is specified by the \code{source} and a set of options (...).
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Expand Up @@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @rdname except
setGeneric("except", function(x, y) { standardGeneric("except") })

#' @rdname exceptAll
setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") })

#' @rdname nafunctions
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })

Expand All @@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertIn
#' @rdname intersect
setGeneric("intersect", function(x, y) { standardGeneric("intersect") })

#' @rdname intersectAll
setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") })

#' @rdname isLocal
setGeneric("isLocal", function(x) { standardGeneric("isLocal") })

Expand Down
19 changes: 19 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Expand Up @@ -2482,6 +2482,25 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
unlink(jsonPath2)
})

test_that("intersectAll() and exceptAll()", {
df1 <- createDataFrame(list(list("a", 1), list("a", 1), list("a", 1),
list("a", 1), list("b", 3), list("c", 4)),
schema = c("a", "b"))
df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b"))
intersectAllExpected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3),
stringsAsFactors = FALSE)
exceptAllExpected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4),
stringsAsFactors = FALSE)
intersectAllDf <- arrange(intersectAll(df1, df2), df1$a)
expect_is(intersectAllDf, "SparkDataFrame")
exceptAllDf <- arrange(exceptAll(df1, df2), df1$a)
expect_is(exceptAllDf, "SparkDataFrame")
intersectAllActual <- collect(intersectAllDf)
expect_identical(intersectAllActual, intersectAllExpected)
exceptAllActual <- collect(exceptAllDf)
expect_identical(exceptAllActual, exceptAllExpected)
})

test_that("withColumn() and withColumnRenamed()", {
df <- read.json(jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
Expand Down

0 comments on commit 162326c

Please sign in to comment.