diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index adfd3871f3426..0fd08482c4413 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -117,6 +117,7 @@ exportMethods("arrange", "dropna", "dtypes", "except", + "exceptAll", "explain", "fillna", "filter", @@ -131,6 +132,7 @@ exportMethods("arrange", "hint", "insertInto", "intersect", + "intersectAll", "isLocal", "isStreaming", "join", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 471ada15d655e..4f2d4c7c002d4 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2848,6 +2848,35 @@ setMethod("intersect", dataFrame(intersected) }) +#' intersectAll +#' +#' Return a new SparkDataFrame containing rows in both this SparkDataFrame +#' and another SparkDataFrame while preserving the duplicates. +#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in +#' SQL, this function resolves columns by position (not by name). +#' +#' @param x a SparkDataFrame. +#' @param y a SparkDataFrame. +#' @return A SparkDataFrame containing the result of the intersect all operation. +#' @family SparkDataFrame functions +#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method +#' @rdname intersectAll +#' @name intersectAll +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' intersectAllDF <- intersectAll(df1, df2) +#' } +#' @note intersectAll since 2.4.0 +setMethod("intersectAll", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { + intersected <- callJMethod(x@sdf, "intersectAll", y@sdf) + dataFrame(intersected) + }) + #' except #' #' Return a new SparkDataFrame containing rows in this SparkDataFrame @@ -2867,7 +2896,6 @@ setMethod("intersect", #' df2 <- read.json(path2) #' exceptDF <- except(df, df2) #' } -#' @rdname except #' @note except since 1.4.0 setMethod("except", signature(x = "SparkDataFrame", y = "SparkDataFrame"), @@ -2876,6 +2904,35 @@ setMethod("except", dataFrame(excepted) }) +#' exceptAll +#' +#' Return a new SparkDataFrame containing rows in this SparkDataFrame +#' but not in another SparkDataFrame while preserving the duplicates. +#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in +#' SQL, this function resolves columns by position (not by name). +#' +#' @param x a SparkDataFrame. +#' @param y a SparkDataFrame. +#' @return A SparkDataFrame containing the result of the except all operation. +#' @family SparkDataFrame functions +#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method +#' @rdname exceptAll +#' @name exceptAll +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' exceptAllDF <- exceptAll(df1, df2) +#' } +#' @note exceptAll since 2.4.0 +setMethod("exceptAll", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { + excepted <- callJMethod(x@sdf, "exceptAll", y@sdf) + dataFrame(excepted) + }) + #' Save the contents of SparkDataFrame to a data source. #' #' The data source is specified by the \code{source} and a set of options (...). diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 4a7210bf1b902..f6f1849787a23 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") }) #' @rdname except setGeneric("except", function(x, y) { standardGeneric("except") }) +#' @rdname exceptAll +setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") }) + #' @rdname nafunctions setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") }) @@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertIn #' @rdname intersect setGeneric("intersect", function(x, y) { standardGeneric("intersect") }) +#' @rdname intersectAll +setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") }) + #' @rdname isLocal setGeneric("isLocal", function(x) { standardGeneric("isLocal") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index adcbbff823a2d..bff6e3512ee2f 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2482,6 +2482,25 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF unlink(jsonPath2) }) +test_that("intersectAll() and exceptAll()", { + df1 <- createDataFrame(list(list("a", 1), list("a", 1), list("a", 1), + list("a", 1), list("b", 3), list("c", 4)), + schema = c("a", "b")) + df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b")) + intersectAllExpected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3), + stringsAsFactors = FALSE) + exceptAllExpected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4), + stringsAsFactors = FALSE) + intersectAllDf <- arrange(intersectAll(df1, df2), df1$a) + expect_is(intersectAllDf, "SparkDataFrame") + exceptAllDf <- arrange(exceptAll(df1, df2), df1$a) + expect_is(exceptAllDf, "SparkDataFrame") + intersectAllActual <- collect(intersectAllDf) + expect_identical(intersectAllActual, intersectAllExpected) + exceptAllActual <- collect(exceptAllDf) + expect_identical(exceptAllActual, exceptAllExpected) +}) + test_that("withColumn() and withColumnRenamed()", { df <- read.json(jsonPath) newDF <- withColumn(df, "newAge", df$age + 2)