diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 00634c1a70c26..2cc1544bef080 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -39,6 +39,7 @@ exportMethods("arrange", "describe", "dim", "distinct", + "drop", "dropDuplicates", "dropna", "dtypes", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 629c1ce2eddc1..4653a73e11be3 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1192,23 +1192,10 @@ setMethod("$", signature(x = "DataFrame"), setMethod("$<-", signature(x = "DataFrame"), function(x, name, value) { stopifnot(class(value) == "Column" || is.null(value)) - cols <- columns(x) - if (name %in% cols) { - if (is.null(value)) { - cols <- Filter(function(c) { c != name }, cols) - } - cols <- lapply(cols, function(c) { - if (c == name) { - alias(value, name) - } else { - col(c) - } - }) - nx <- select(x, cols) + + if (is.null(value)) { + nx <- drop(x, name) } else { - if (is.null(value)) { - return(x) - } nx <- withColumn(x, name, value) } x@sdf <- nx@sdf @@ -1386,12 +1373,13 @@ setMethod("selectExpr", #' WithColumn #' -#' Return a new DataFrame with the specified column added. +#' Return a new DataFrame by adding a column or replacing the existing column +#' that has the same name. #' #' @param x A DataFrame -#' @param colName A string containing the name of the new column. +#' @param colName A column name. #' @param col A Column expression. -#' @return A DataFrame with the new column added. +#' @return A DataFrame with the new column added or the existing column replaced. #' @family DataFrame functions #' @rdname withColumn #' @name withColumn @@ -1404,12 +1392,16 @@ setMethod("selectExpr", #' path <- "path/to/file.json" #' df <- read.json(sqlContext, path) #' newDF <- withColumn(df, "newCol", df$col1 * 5) +#' # Replace an existing column +#' newDF2 <- withColumn(newDF, "newCol", newDF$col1) #' } setMethod("withColumn", signature(x = "DataFrame", colName = "character", col = "Column"), function(x, colName, col) { - select(x, x$"*", alias(col, colName)) + sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc) + dataFrame(sdf) }) + #' Mutate #' #' Return a new DataFrame with the specified columns added. @@ -2401,4 +2393,47 @@ setMethod("str", cat(paste0("\nDisplaying first ", ncol(localDF), " columns only.")) } } - }) \ No newline at end of file + }) + +#' drop +#' +#' Returns a new DataFrame with columns dropped. +#' This is a no-op if schema doesn't contain column name(s). +#' +#' @param x A SparkSQL DataFrame. +#' @param cols A character vector of column names or a Column. +#' @return A DataFrame +#' +#' @family DataFrame functions +#' @rdname drop +#' @name drop +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- read.json(sqlCtx, path) +#' drop(df, "col1") +#' drop(df, c("col1", "col2")) +#' drop(df, df$col1) +#' } +setMethod("drop", + signature(x = "DataFrame"), + function(x, col) { + stopifnot(class(col) == "character" || class(col) == "Column") + + if (class(col) == "Column") { + sdf <- callJMethod(x@sdf, "drop", col@jc) + } else { + sdf <- callJMethod(x@sdf, "drop", as.list(col)) + } + dataFrame(sdf) + }) + +# Expose base::drop +setMethod("drop", + signature(x = "ANY"), + function(x) { + base::drop(x) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index d616266ead41b..9a8ab97bb8f9a 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -428,6 +428,10 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") }) #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) +#' @rdname drop +#' @export +setGeneric("drop", function(x, ...) { standardGeneric("drop") }) + #' @rdname dropduplicates #' @export setGeneric("dropDuplicates", diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 3b14a497b487a..ad3f9722a4802 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -26,7 +26,7 @@ test_that("Check masked functions", { maskedBySparkR <- masked[funcSparkROrEmpty] namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset", - "summary", "transform") + "summary", "transform", "drop") expect_equal(length(maskedBySparkR), length(namesOfMasked)) expect_equal(sort(maskedBySparkR), sort(namesOfMasked)) # above are those reported as masked when `library(SparkR)` diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 14d40d5066e78..f9b8bd9940c84 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -824,11 +824,6 @@ test_that("select operators", { df$age2 <- df$age * 2 expect_equal(columns(df), c("name", "age", "age2")) expect_equal(count(where(df, df$age2 == df$age * 2)), 2) - - df$age2 <- NULL - expect_equal(columns(df), c("name", "age")) - df$age3 <- NULL - expect_equal(columns(df), c("name", "age")) }) test_that("select with column", { @@ -854,6 +849,27 @@ test_that("select with column", { "To select multiple columns, use a character vector or list for col") }) +test_that("drop column", { + df <- select(read.json(sqlContext, jsonPath), "name", "age") + df1 <- drop(df, "name") + expect_equal(columns(df1), c("age")) + + df$age2 <- df$age + df1 <- drop(df, c("name", "age")) + expect_equal(columns(df1), c("age2")) + + df1 <- drop(df, df$age) + expect_equal(columns(df1), c("name", "age2")) + + df$age2 <- NULL + expect_equal(columns(df), c("name", "age")) + df$age3 <- NULL + expect_equal(columns(df), c("name", "age")) + + # Test to make sure base::drop is not masked + expect_equal(drop(1:3 %*% 2:4), 20) +}) + test_that("subsetting", { # read.json returns columns in random order df <- select(read.json(sqlContext, jsonPath), "name", "age") @@ -1462,6 +1478,11 @@ test_that("withColumn() and withColumnRenamed()", { expect_equal(columns(newDF)[3], "newAge") expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32) + # Replace existing column + newDF <- withColumn(df, "age", df$age + 2) + expect_equal(length(columns(newDF)), 2) + expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32) + newDF2 <- withColumnRenamed(df, "age", "newerAge") expect_equal(length(columns(newDF2)), 2) expect_equal(columns(newDF2)[1], "newerAge") diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index bc89c781562bd..fddc51379406b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2150,6 +2150,8 @@ options. --conf spark.sql.hive.thriftServer.singleSession=true \ ... {% endhighlight %} + - Since 1.6.1, withColumn method in sparkR supports adding a new column to or replacing existing columns + of the same name of a DataFrame. - From Spark 1.6, LongType casts to TimestampType expect seconds instead of microseconds. This change was made to match the behavior of Hive 1.2 for more consistent type casting to TimestampType @@ -2183,6 +2185,7 @@ options. users can use `REFRESH TABLE` SQL command or `HiveContext`'s `refreshTable` method to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate the DataFrame and the new DataFrame will include new files. + - DataFrame.withColumn method in pySpark supports adding a new column or replacing existing columns of the same name. ## Upgrading from Spark SQL 1.3 to 1.4 @@ -2262,6 +2265,16 @@ sqlContext.setConf("spark.sql.retainGroupColumns", "false") +#### Behavior change on DataFrame.withColumn + +Prior to 1.4, DataFrame.withColumn() supports adding a column only. The column will always be added +as a new column with its specified name in the result DataFrame even if there may be any existing +columns of the same name. Since 1.4, DataFrame.withColumn() supports adding a column of a different +name from names of all existing columns or replacing existing columns of the same name. + +Note that this change is only for Scala API, not for PySpark and SparkR. + + ## Upgrading from Spark SQL 1.0-1.2 to 1.3 In Spark 1.3 we removed the "Alpha" label from Spark SQL and as part of this did a cleanup of the