Skip to content

Commit

Permalink
[SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR
Browse files Browse the repository at this point in the history
Add ```write.json``` and ```write.parquet``` for SparkR, and deprecated ```saveAsParquetFile```.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #10281 from yanboliang/spark-12310.
  • Loading branch information
yanboliang authored and shivaram committed Dec 16, 2015
1 parent 2eb5af5 commit 22f6cd8
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 56 deletions.
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
"write.df")
"write.df",
"write.json",
"write.parquet")

exportClasses("Column")

Expand Down
51 changes: 45 additions & 6 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -596,30 +596,69 @@ setMethod("toJSON",
RDD(jrdd, serializedMode = "string")
})

#' saveAsParquetFile
#' write.json
#'
#' Save the contents of a DataFrame as a JSON file (one object per line). Files written out
#' with this method can be read back in as a DataFrame using read.json().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname write.json
#' @name write.json
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' write.json(df, "/tmp/sparkr-tmp/")
#'}
setMethod("write.json",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "json", path))
})

#' write.parquet
#'
#' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out
#' with this method can be read back in as a DataFrame using parquetFile().
#' with this method can be read back in as a DataFrame using read.parquet().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname saveAsParquetFile
#' @name saveAsParquetFile
#' @rdname write.parquet
#' @name write.parquet
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
#' write.parquet(df, "/tmp/sparkr-tmp1/")
#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
setMethod("write.parquet",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "parquet", path))
})

#' @rdname write.parquet
#' @name saveAsParquetFile
#' @export
setMethod("saveAsParquetFile",
signature(x = "DataFrame", path = "character"),
function(x, path) {
invisible(callJMethod(x@sdf, "saveAsParquetFile", path))
.Deprecated("write.parquet")
write.parquet(x, path)
})

#' Distinct
Expand Down
16 changes: 12 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,6 @@ setGeneric("sample_frac",
#' @export
setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })

#' @rdname saveAsParquetFile
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname saveAsTable
#' @export
setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
Expand All @@ -541,6 +537,18 @@ setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
#' @export
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })

#' @rdname write.parquet
#' @export
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })

#' @rdname write.parquet
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
Expand Down
104 changes: 59 additions & 45 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})

test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
test_that("read/write json files", {
# Test read.df
df <- read.df(sqlContext, jsonPath, "json")
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

# Test read.df with a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))

df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_is(df1, "DataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))

# Test loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_is(df2, "DataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))

# Test read.json
df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
# read.json()/jsonFile() works with multiple input paths

# Test write.df
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
write.df(df, jsonPath2, "json", mode="overwrite")
jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))

# Test write.json
jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json")
write.json(df, jsonPath3)

# Test read.json()/jsonFile() works with multiple input paths
jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "DataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "DataFrame")
expect_equal(count(jsonDF2), 6)

unlink(jsonPath2)
unlink(jsonPath3)
})

test_that("jsonRDD() on a RDD with json string", {
Expand Down Expand Up @@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", {
expect_equal(count(sql(sqlContext, "select * from table1")), 2)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
dropTempTable(sqlContext, "table1")

unlink(jsonPath2)
unlink(parquetPath2)
})

test_that("table() returns a new DataFrame", {
Expand Down Expand Up @@ -848,33 +878,6 @@ test_that("column calculation", {
expect_equal(count(df2), 3)
})

test_that("read.df() from json file", {
df <- read.df(sqlContext, jsonPath, "json")
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

# Check if we can apply a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))

df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_is(df1, "DataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))

# Run the same with loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_is(df2, "DataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
})

test_that("write.df() as parquet file", {
df <- read.df(sqlContext, jsonPath, "json")
write.df(df, parquetPath, "parquet", mode="overwrite")
df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_is(df2, "DataFrame")
expect_equal(count(df2), 3)
})

test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
Expand All @@ -895,6 +898,8 @@ test_that("test HiveContext", {
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "DataFrame")
expect_equal(count(df3), 3)

unlink(jsonPath2)
})

test_that("column operators", {
Expand Down Expand Up @@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", {
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))

unlink(jsonPath2)
unlink(jsonPath3)
})

test_that("toJSON() returns an RDD of the correct values", {
Expand Down Expand Up @@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {

# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)

unlink(jsonPath2)
})

test_that("withColumn() and withColumnRenamed()", {
Expand Down Expand Up @@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", {
detach(airquality)
})

test_that("write.df() on DataFrame and works with read.parquet", {
df <- read.json(sqlContext, jsonPath)
test_that("read/write Parquet files", {
df <- read.df(sqlContext, jsonPath, "json")
# Test write.df and read.df
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, parquetPath)
expect_is(parquetDF, "DataFrame")
expect_equal(count(df), count(parquetDF))
})
df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_is(df2, "DataFrame")
expect_equal(count(df2), 3)

test_that("read.parquet()/parquetFile() works with multiple input paths", {
df <- read.json(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
# Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
write.parquet(df, parquetPath2)
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
suppressWarnings(saveAsParquetFile(df, parquetPath3))
parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
expect_is(parquetDF, "DataFrame")
expect_equal(count(parquetDF), count(df) * 2)
parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2))
parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
expect_is(parquetDF2, "DataFrame")
expect_equal(count(parquetDF2), count(df) * 2)

# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)

unlink(parquetPath2)
unlink(parquetPath3)
unlink(parquetPath4)
})

test_that("describe() and summarize() on a DataFrame", {
Expand Down

0 comments on commit 22f6cd8

Please sign in to comment.