Skip to content

Commit

Permalink
[SPARK-12204][SPARKR] Implement drop method for DataFrame in SparkR.
Browse files Browse the repository at this point in the history
Author: Sun Rui <rui.sun@intel.com>

Closes #10201 from sun-rui/SPARK-12204.
  • Loading branch information
Sun Rui authored and shivaram committed Jan 21, 2016
1 parent d741599 commit 1b2a918
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 27 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ exportMethods("arrange",
"describe",
"dim",
"distinct",
"drop",
"dropDuplicates",
"dropna",
"dtypes",
Expand Down
77 changes: 56 additions & 21 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1192,23 +1192,10 @@ setMethod("$", signature(x = "DataFrame"),
setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value))
cols <- columns(x)
if (name %in% cols) {
if (is.null(value)) {
cols <- Filter(function(c) { c != name }, cols)
}
cols <- lapply(cols, function(c) {
if (c == name) {
alias(value, name)
} else {
col(c)
}
})
nx <- select(x, cols)

if (is.null(value)) {
nx <- drop(x, name)
} else {
if (is.null(value)) {
return(x)
}
nx <- withColumn(x, name, value)
}
x@sdf <- nx@sdf
Expand Down Expand Up @@ -1386,12 +1373,13 @@ setMethod("selectExpr",

#' WithColumn
#'
#' Return a new DataFrame with the specified column added.
#' Return a new DataFrame by adding a column or replacing the existing column
#' that has the same name.
#'
#' @param x A DataFrame
#' @param colName A string containing the name of the new column.
#' @param colName A column name.
#' @param col A Column expression.
#' @return A DataFrame with the new column added.
#' @return A DataFrame with the new column added or the existing column replaced.
#' @family DataFrame functions
#' @rdname withColumn
#' @name withColumn
Expand All @@ -1404,12 +1392,16 @@ setMethod("selectExpr",
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
#' # Replace an existing column
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
#' }
setMethod("withColumn",
signature(x = "DataFrame", colName = "character", col = "Column"),
function(x, colName, col) {
select(x, x$"*", alias(col, colName))
sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc)
dataFrame(sdf)
})

#' Mutate
#'
#' Return a new DataFrame with the specified columns added.
Expand Down Expand Up @@ -2401,4 +2393,47 @@ setMethod("str",
cat(paste0("\nDisplaying first ", ncol(localDF), " columns only."))
}
}
})
})

#' drop
#'
#' Returns a new DataFrame with columns dropped.
#' This is a no-op if schema doesn't contain column name(s).
#'
#' @param x A SparkSQL DataFrame.
#' @param cols A character vector of column names or a Column.
#' @return A DataFrame
#'
#' @family DataFrame functions
#' @rdname drop
#' @name drop
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlCtx, path)
#' drop(df, "col1")
#' drop(df, c("col1", "col2"))
#' drop(df, df$col1)
#' }
setMethod("drop",
signature(x = "DataFrame"),
function(x, col) {
stopifnot(class(col) == "character" || class(col) == "Column")

if (class(col) == "Column") {
sdf <- callJMethod(x@sdf, "drop", col@jc)
} else {
sdf <- callJMethod(x@sdf, "drop", as.list(col))
}
dataFrame(sdf)
})

# Expose base::drop
setMethod("drop",
signature(x = "ANY"),
function(x) {
base::drop(x)
})
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

#' @rdname drop
#' @export
setGeneric("drop", function(x, ...) { standardGeneric("drop") })

#' @rdname dropduplicates
#' @export
setGeneric("dropDuplicates",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform")
"summary", "transform", "drop")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
Expand Down
31 changes: 26 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -824,11 +824,6 @@ test_that("select operators", {
df$age2 <- df$age * 2
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)

df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))
})

test_that("select with column", {
Expand All @@ -854,6 +849,27 @@ test_that("select with column", {
"To select multiple columns, use a character vector or list for col")
})

test_that("drop column", {
df <- select(read.json(sqlContext, jsonPath), "name", "age")
df1 <- drop(df, "name")
expect_equal(columns(df1), c("age"))

df$age2 <- df$age
df1 <- drop(df, c("name", "age"))
expect_equal(columns(df1), c("age2"))

df1 <- drop(df, df$age)
expect_equal(columns(df1), c("name", "age2"))

df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))

# Test to make sure base::drop is not masked
expect_equal(drop(1:3 %*% 2:4), 20)
})

test_that("subsetting", {
# read.json returns columns in random order
df <- select(read.json(sqlContext, jsonPath), "name", "age")
Expand Down Expand Up @@ -1462,6 +1478,11 @@ test_that("withColumn() and withColumnRenamed()", {
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)

# Replace existing column
newDF <- withColumn(df, "age", df$age + 2)
expect_equal(length(columns(newDF)), 2)
expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)

newDF2 <- withColumnRenamed(df, "age", "newerAge")
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")
Expand Down
13 changes: 13 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,8 @@ options.
--conf spark.sql.hive.thriftServer.singleSession=true \
...
{% endhighlight %}
- Since 1.6.1, withColumn method in sparkR supports adding a new column to or replacing existing columns
of the same name of a DataFrame.

- From Spark 1.6, LongType casts to TimestampType expect seconds instead of microseconds. This
change was made to match the behavior of Hive 1.2 for more consistent type casting to TimestampType
Expand Down Expand Up @@ -2183,6 +2185,7 @@ options.
users can use `REFRESH TABLE` SQL command or `HiveContext`'s `refreshTable` method
to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate
the DataFrame and the new DataFrame will include new files.
- DataFrame.withColumn method in pySpark supports adding a new column or replacing existing columns of the same name.

## Upgrading from Spark SQL 1.3 to 1.4

Expand Down Expand Up @@ -2262,6 +2265,16 @@ sqlContext.setConf("spark.sql.retainGroupColumns", "false")
</div>


#### Behavior change on DataFrame.withColumn

Prior to 1.4, DataFrame.withColumn() supports adding a column only. The column will always be added
as a new column with its specified name in the result DataFrame even if there may be any existing
columns of the same name. Since 1.4, DataFrame.withColumn() supports adding a column of a different
name from names of all existing columns or replacing existing columns of the same name.

Note that this change is only for Scala API, not for PySpark and SparkR.


## Upgrading from Spark SQL 1.0-1.2 to 1.3

In Spark 1.3 we removed the "Alpha" label from Spark SQL and as part of this did a cleanup of the
Expand Down

0 comments on commit 1b2a918

Please sign in to comment.