From 45b76bc6e2e181eb895d0c72e7fb6d08f0196592 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 15 Dec 2015 19:34:58 +0800 Subject: [PATCH] [SPARK-12337][SPARKR] Implement dropDuplicates() method of DataFrame in SparkR. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 30 ++++++++++++++++++ R/pkg/R/generics.R | 7 +++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 38 ++++++++++++++++++++++- 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index cab39d68c3f52..6f241a9b423c3 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -39,6 +39,7 @@ exportMethods("arrange", "describe", "dim", "distinct", + "dropDuplicates", "dropna", "dtypes", "except", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 764597d1e32b4..8a19911f73280 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1575,6 +1575,36 @@ setMethod("where", filter(x, condition) }) +#' dropDuplicates +#' +#' Returns a new DataFrame with duplicate rows removed, considering only +#' the subset of columns. +#' +#' @param x A DataFrame. +#' @param colnames A character vector of column names. +#' @return A DataFrame with duplicate rows removed. +#' @family DataFrame functions +#' @rdname dropduplicates +#' @name dropDuplicates +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- read.json(sqlContext, path) +#' dropDuplicates(df) +#' dropDuplicates(df, c("col1", "col2")) +#' } +setMethod("dropDuplicates", + signature(x = "DataFrame"), + function(x, colNames = columns(x)) { + stopifnot(class(colNames) == "character") + + sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(colNames)) + dataFrame(sdf) + }) + #' Join #' #' Join two DataFrames based on the given join expression. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index c383e6e78b8b4..79b3e1da32a17 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -421,6 +421,13 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") }) #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) +#' @rdname dropduplicates +#' @export +setGeneric("dropDuplicates", + function(x, colNames = columns(x)) { + standardGeneric("dropDuplicates") + }) + #' @rdname nafunctions #' @export setGeneric("dropna", diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 071fd310fd58a..16bc551df49ae 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -698,7 +698,7 @@ test_that("head() and first() return the correct data", { expect_equal(ncol(testFirst), 2) }) -test_that("distinct() and unique on DataFrames", { +test_that("distinct(), unique() and dropDuplicates() on DataFrames", { lines <- c("{\"name\":\"Michael\"}", "{\"name\":\"Andy\", \"age\":30}", "{\"name\":\"Justin\", \"age\":19}", @@ -714,6 +714,42 @@ test_that("distinct() and unique on DataFrames", { uniques2 <- unique(df) expect_is(uniques2, "DataFrame") expect_equal(count(uniques2), 3) + + # Test dropDuplicates() + df <- createDataFrame( + sqlContext, + list( + list(2, 1, 2), list(1, 1, 1), + list(1, 2, 1), list(2, 1, 2), + list(2, 2, 2), list(2, 2, 1), + list(2, 1, 1), list(1, 1, 2), + list(1, 2, 2), list(1, 2, 1)), + schema = c("key", "value1", "value2")) + result <- collect(dropDuplicates(df)) + expected <- rbind.data.frame( + c(1, 1, 1), c(1, 1, 2), c(1, 2, 1), + c(1, 2, 2), c(2, 1, 1), c(2, 1, 2), + c(2, 2, 1), c(2, 2, 2)) + names(expected) <- c("key", "value1", "value2") + expect_equivalent( + result[order(result$key, result$value1, result$value2),], + expected) + + result <- collect(dropDuplicates(df, c("key", "value1"))) + expected <- rbind.data.frame( + c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2)) + names(expected) <- c("key", "value1", "value2") + expect_equivalent( + result[order(result$key, result$value1, result$value2),], + expected) + + result <- collect(dropDuplicates(df, "key")) + expected <- rbind.data.frame( + c(1, 1, 1), c(2, 1, 2)) + names(expected) <- c("key", "value1", "value2") + expect_equivalent( + result[order(result$key, result$value1, result$value2),], + expected) }) test_that("sample on a DataFrame", {