Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12204][SPARKR] Implement drop method for DataFrame in SparkR. #10201

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ exportMethods("arrange",
"describe",
"dim",
"distinct",
"drop",
"dropDuplicates",
"dropna",
"dtypes",
Expand Down
77 changes: 56 additions & 21 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1192,23 +1192,10 @@ setMethod("$", signature(x = "DataFrame"),
setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value))
cols <- columns(x)
if (name %in% cols) {
if (is.null(value)) {
cols <- Filter(function(c) { c != name }, cols)
}
cols <- lapply(cols, function(c) {
if (c == name) {
alias(value, name)
} else {
col(c)
}
})
nx <- select(x, cols)

if (is.null(value)) {
nx <- drop(x, name)
} else {
if (is.null(value)) {
return(x)
}
nx <- withColumn(x, name, value)
}
x@sdf <- nx@sdf
Expand Down Expand Up @@ -1386,12 +1373,13 @@ setMethod("selectExpr",

#' WithColumn
#'
#' Return a new DataFrame with the specified column added.
#' Return a new DataFrame by adding a column or replacing the existing column
#' that has the same name.
#'
#' @param x A DataFrame
#' @param colName A string containing the name of the new column.
#' @param colName A column name.
#' @param col A Column expression.
#' @return A DataFrame with the new column added.
#' @return A DataFrame with the new column added or the existing column replaced.
#' @family DataFrame functions
#' @rdname withColumn
#' @name withColumn
Expand All @@ -1404,12 +1392,16 @@ setMethod("selectExpr",
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
#' # Replace an existing column
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
#' }
setMethod("withColumn",
signature(x = "DataFrame", colName = "character", col = "Column"),
function(x, colName, col) {
select(x, x$"*", alias(col, colName))
sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc)
dataFrame(sdf)
})

#' Mutate
#'
#' Return a new DataFrame with the specified columns added.
Expand Down Expand Up @@ -2401,4 +2393,47 @@ setMethod("str",
cat(paste0("\nDisplaying first ", ncol(localDF), " columns only."))
}
}
})
})

#' drop
#'
#' Returns a new DataFrame with columns dropped.
#' This is a no-op if schema doesn't contain column name(s).
#'
#' @param x A SparkSQL DataFrame.
#' @param cols A character vector of column names or a Column.
#' @return A DataFrame
#'
#' @family DataFrame functions
#' @rdname drop
#' @name drop
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlCtx, path)
#' drop(df, "col1")
#' drop(df, c("col1", "col2"))
#' drop(df, df$col1)
#' }
setMethod("drop",
signature(x = "DataFrame"),
function(x, col) {
stopifnot(class(col) == "character" || class(col) == "Column")

if (class(col) == "Column") {
sdf <- callJMethod(x@sdf, "drop", col@jc)
} else {
sdf <- callJMethod(x@sdf, "drop", as.list(col))
}
dataFrame(sdf)
})

# Expose base::drop
setMethod("drop",
signature(x = "ANY"),
function(x) {
base::drop(x)
})
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

#' @rdname drop
#' @export
setGeneric("drop", function(x, ...) { standardGeneric("drop") })

#' @rdname dropduplicates
#' @export
setGeneric("dropDuplicates",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform")
"summary", "transform", "drop")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
Expand Down
31 changes: 26 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -824,11 +824,6 @@ test_that("select operators", {
df$age2 <- df$age * 2
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)

df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))
})

test_that("select with column", {
Expand All @@ -854,6 +849,27 @@ test_that("select with column", {
"To select multiple columns, use a character vector or list for col")
})

test_that("drop column", {
df <- select(read.json(sqlContext, jsonPath), "name", "age")
df1 <- drop(df, "name")
expect_equal(columns(df1), c("age"))

df$age2 <- df$age
df1 <- drop(df, c("name", "age"))
expect_equal(columns(df1), c("age2"))

df1 <- drop(df, df$age)
expect_equal(columns(df1), c("name", "age2"))

df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))

# Test to make sure base::drop is not masked
expect_equal(drop(1:3 %*% 2:4), 20)
})

test_that("subsetting", {
# read.json returns columns in random order
df <- select(read.json(sqlContext, jsonPath), "name", "age")
Expand Down Expand Up @@ -1462,6 +1478,11 @@ test_that("withColumn() and withColumnRenamed()", {
expect_equal(columns(newDF)[3], "newAge")
expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32)

# Replace existing column
newDF <- withColumn(df, "age", df$age + 2)
expect_equal(length(columns(newDF)), 2)
expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32)

newDF2 <- withColumnRenamed(df, "age", "newerAge")
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")
Expand Down
13 changes: 13 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,8 @@ options.
--conf spark.sql.hive.thriftServer.singleSession=true \
...
{% endhighlight %}
- Since 1.6.1, withColumn method in sparkR supports adding a new column to or replacing existing columns
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which version is appropriate here? 1.6.1 or 2.0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we want to put this in the R migration guide session instead of SQL? or both?

of the same name of a DataFrame.

- From Spark 1.6, LongType casts to TimestampType expect seconds instead of microseconds. This
change was made to match the behavior of Hive 1.2 for more consistent type casting to TimestampType
Expand Down Expand Up @@ -2183,6 +2185,7 @@ options.
users can use `REFRESH TABLE` SQL command or `HiveContext`'s `refreshTable` method
to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate
the DataFrame and the new DataFrame will include new files.
- DataFrame.withColumn method in pySpark supports adding a new column or replacing existing columns of the same name.

## Upgrading from Spark SQL 1.3 to 1.4

Expand Down Expand Up @@ -2262,6 +2265,16 @@ sqlContext.setConf("spark.sql.retainGroupColumns", "false")
</div>


#### Behavior change on DataFrame.withColumn

Prior to 1.4, DataFrame.withColumn() supports adding a column only. The column will always be added
as a new column with its specified name in the result DataFrame even if there may be any existing
columns of the same name. Since 1.4, DataFrame.withColumn() supports adding a column of a different
name from names of all existing columns or replacing existing columns of the same name.

Note that this change is only for Scala API, not for PySpark and SparkR.


## Upgrading from Spark SQL 1.0-1.2 to 1.3

In Spark 1.3 we removed the "Alpha" label from Spark SQL and as part of this did a cleanup of the
Expand Down