Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7482][SparkR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala. #6007

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"sampleDF",
"sample",
"sample_frac",
"saveAsParquetFile",
"saveAsTable",
Expand All @@ -53,7 +53,8 @@ exportMethods("arrange",
"unpersist",
"where",
"withColumn",
"withColumnRenamed")
"withColumnRenamed",
"write.df")

exportClasses("Column")

Expand Down Expand Up @@ -101,6 +102,7 @@ export("cacheTable",
"jsonFile",
"loadDF",
"parquetFile",
"read.df",
"sql",
"table",
"tableNames",
Expand Down
35 changes: 22 additions & 13 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ setMethod("registerTempTable",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' df2 <- loadDF(sqlCtx, path2, "parquet")
#' df <- read.df(sqlCtx, path, "parquet")
#' df2 <- read.df(sqlCtx, path2, "parquet")
#' registerTempTable(df, "table1")
#' insertInto(df2, "table1", overwrite = TRUE)
#'}
Expand Down Expand Up @@ -473,14 +473,14 @@ setMethod("distinct",
dataFrame(sdf)
})

#' SampleDF
#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
#'
#' @param x A SparkSQL DataFrame
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @rdname sampleDF
#' @rdname sample
#' @aliases sample_frac
#' @export
#' @examples
Expand All @@ -489,10 +489,10 @@ setMethod("distinct",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' collect(sampleDF(df, FALSE, 0.5))
#' collect(sampleDF(df, TRUE, 0.5))
#' collect(sample(df, FALSE, 0.5))
#' collect(sample(df, TRUE, 0.5))
#'}
setMethod("sampleDF",
setMethod("sample",
# TODO : Figure out how to send integer as java.lang.Long to JVM so
# we can send seed as an argument through callJMethod
signature(x = "DataFrame", withReplacement = "logical",
Expand All @@ -503,13 +503,13 @@ setMethod("sampleDF",
dataFrame(sdf)
})

#' @rdname sampleDF
#' @aliases sampleDF
#' @rdname sample
#' @aliases sample
setMethod("sample_frac",
signature(x = "DataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction) {
sampleDF(x, withReplacement, fraction)
sample(x, withReplacement, fraction)
})

#' Count
Expand Down Expand Up @@ -1303,17 +1303,17 @@ setMethod("except",
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
#'
#' @rdname saveAsTable
#' @rdname write.df
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' saveAsTable(df, "myfile")
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("saveDF",
setMethod("write.df",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
Expand All @@ -1334,6 +1334,15 @@ setMethod("saveDF",
callJMethod(df@sdf, "save", source, jmode, options)
})

#' @rdname write.df
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})

#' saveAsTable
#'
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
MAXINT)))))

# TODO(zongheng): investigate if this call is an in-place shuffle?
sample(samples)[1:total]
base::sample(samples)[1:total]
})

# Creates tuples of the elements in this RDD by applying a function.
Expand Down Expand Up @@ -996,7 +996,7 @@ setMethod("coalesce",
if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
start <- as.integer(base::sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
pos <- (start + i) %% numPartitions
Expand Down
13 changes: 10 additions & 3 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ clearCache <- function(sqlCtx) {
#' \dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' df <- read.df(sqlCtx, path, "parquet")
#' registerTempTable(df, "table")
#' dropTempTable(sqlCtx, "table")
#' }
Expand Down Expand Up @@ -450,10 +450,10 @@ dropTempTable <- function(sqlCtx, tableName) {
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- load(sqlCtx, "path/to/file.json", source = "json")
#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
#' }

loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
Expand All @@ -462,6 +462,13 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
dataFrame(sdf)
}

#' @aliases loadDF
#' @export

loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
read.df(sqlCtx, path, source, ...)
}

#' Create an external table
#'
#' Creates an external table based on the dataset in a data source,
Expand Down
22 changes: 13 additions & 9 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -456,19 +456,19 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
#' @export
setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })

#' @rdname sampleDF
#' @rdname sample
#' @export
setGeneric("sample_frac",
setGeneric("sample",
function(x, withReplacement, fraction, seed) {
standardGeneric("sample_frac")
})
standardGeneric("sample")
})

#' @rdname sampleDF
#' @rdname sample
#' @export
setGeneric("sampleDF",
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
standardGeneric("sampleDF")
})
standardGeneric("sample_frac")
})

#' @rdname saveAsParquetFile
#' @export
Expand All @@ -480,7 +480,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
standardGeneric("saveAsTable")
})

#' @rdname saveAsTable
#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we'll also need a generic for read.df ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. read.df is not a generic function.


#' @rdname write.df
#' @export
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })

Expand Down
40 changes: 20 additions & 20 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,18 @@ test_that("registerTempTable() results in a queryable table and sql() results in
})

test_that("insertInto() on a registered table", {
df <- loadDF(sqlCtx, jsonPath, "json")
saveDF(df, parquetPath, "parquet", "overwrite")
dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
df <- read.df(sqlCtx, jsonPath, "json")
write.df(df, parquetPath, "parquet", "overwrite")
dfParquet <- read.df(sqlCtx, parquetPath, "parquet")

lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")
saveDF(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
df2 <- read.df(sqlCtx, jsonPath2, "json")
write.df(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")

registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1")
Expand Down Expand Up @@ -421,12 +421,12 @@ test_that("distinct() on DataFrames", {
expect_true(count(uniques) == 3)
})

test_that("sampleDF on a DataFrame", {
test_that("sample on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
sampled <- sampleDF(df, FALSE, 1.0)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_true(inherits(sampled, "DataFrame"))
sampled2 <- sampleDF(df, FALSE, 0.1)
sampled2 <- sample(df, FALSE, 0.1)
expect_true(count(sampled2) < 3)

# Also test sample_frac
Expand Down Expand Up @@ -491,16 +491,16 @@ test_that("column calculation", {
expect_true(count(df2) == 3)
})

test_that("load() from json file", {
df <- loadDF(sqlCtx, jsonPath, "json")
test_that("read.df() from json file", {
df <- read.df(sqlCtx, jsonPath, "json")
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)
})

test_that("save() as parquet file", {
df <- loadDF(sqlCtx, jsonPath, "json")
saveDF(df, parquetPath, "parquet", mode="overwrite")
df2 <- loadDF(sqlCtx, parquetPath, "parquet")
test_that("write.df() as parquet file", {
df <- read.df(sqlCtx, jsonPath, "json")
write.df(df, parquetPath, "parquet", mode="overwrite")
df2 <- read.df(sqlCtx, parquetPath, "parquet")
expect_true(inherits(df2, "DataFrame"))
expect_true(count(df2) == 3)
})
Expand Down Expand Up @@ -670,7 +670,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")
df2 <- read.df(sqlCtx, jsonPath2, "json")

unioned <- arrange(unionAll(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
Expand Down Expand Up @@ -712,19 +712,19 @@ test_that("mutate() and rename()", {
expect_true(columns(newDF2)[1] == "newerAge")
})

test_that("saveDF() on DataFrame and works with parquetFile", {
test_that("write.df() on DataFrame and works with parquetFile", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath)
expect_true(inherits(parquetDF, "DataFrame"))
expect_equal(count(df), count(parquetDF))
})

test_that("parquetFile works with multiple input paths", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
saveDF(df, parquetPath2, "parquet", mode="overwrite")
write.df(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
expect_true(inherits(parquetDF, "DataFrame"))
expect_true(count(parquetDF) == count(df)*2)
Expand Down