Skip to content

Commit

Permalink
[SPARK-12625][SPARKR][SQL] replace R usage of Spark SQL deprecated API
Browse files Browse the repository at this point in the history
rxin davies shivaram
Took save mode from my PR #10480, and move everything to writer methods. This is related to PR #10559

- [x] it seems jsonRDD() is broken, need to investigate - this is not a public API though; will look into some more tonight. (fixed)

Author: felixcheung <felixcheung_m@hotmail.com>

Closes #10584 from felixcheung/rremovedeprecated.
  • Loading branch information
felixcheung authored and rxin committed Jan 5, 2016
1 parent b634901 commit cc4d522
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 31 deletions.
33 changes: 16 additions & 17 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ setMethod("registerTempTable",
setMethod("insertInto",
signature(x = "DataFrame", tableName = "character"),
function(x, tableName, overwrite = FALSE) {
callJMethod(x@sdf, "insertInto", tableName, overwrite)
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
callJMethod(write, "insertInto", tableName)
})

#' Cache
Expand Down Expand Up @@ -1948,18 +1951,15 @@ setMethod("write.df",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "save", path)
})

#' @rdname write.df
Expand Down Expand Up @@ -2013,15 +2013,14 @@ setMethod("saveAsTable",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
callJMethod(write, "saveAsTable", tableName)
})

#' summary
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,12 @@ jsonFile <- function(sqlContext, path) {

# TODO: support schema
jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
.Deprecated("read.json")
rdd <- serializeToString(rdd)
if (is.null(schema)) {
sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
read <- callJMethod(sqlContext, "read")
# samplingRatio is deprecated
sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"))
dataFrame(sdf)
} else {
stop("not implemented")
Expand Down Expand Up @@ -289,10 +292,7 @@ read.parquet <- function(sqlContext, path) {
# TODO: Implement saveasParquetFile and write examples for both
parquetFile <- function(sqlContext, ...) {
.Deprecated("read.parquet")
# Allow the user to have a more flexible definiton of the text file path
paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x)))
sdf <- callJMethod(sqlContext, "parquetFile", paths)
dataFrame(sdf)
read.parquet(sqlContext, unlist(list(...)))
}

#' SQL Query
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ setMethod("cast",
setMethod("%in%",
signature(x = "Column"),
function(x, table) {
jc <- callJMethod(x@jc, "in", as.list(table))
jc <- callJMethod(x@jc, "isin", as.list(table))
return(column(jc))
})

Expand Down
9 changes: 9 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,12 @@ assignNewEnv <- function(data) {
splitString <- function(input) {
Filter(nzchar, unlist(strsplit(input, ",|\\s")))
}

convertToJSaveMode <- function(mode) {
allModes <- c("append", "overwrite", "error", "ignore")
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"') # nolint
}
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode
}
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,12 @@ test_that("read/write json files", {
test_that("jsonRDD() on a RDD with json string", {
rdd <- parallelize(sc, mockLines)
expect_equal(count(rdd), 3)
df <- jsonRDD(sqlContext, rdd)
df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

rdd2 <- flatMap(rdd, function(x) c(x, x))
df <- jsonRDD(sqlContext, rdd2)
df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
expect_is(df, "DataFrame")
expect_equal(count(df), 6)
})
Expand Down
11 changes: 5 additions & 6 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,12 @@ def run_build_tests():


def run_sparkr_tests():
# set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS")
set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS")

# if which("R"):
# run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")])
# else:
# print("Ignoring SparkR tests as R was not found in PATH")
pass
if which("R"):
run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")])
else:
print("Ignoring SparkR tests as R was not found in PATH")


def parse_opts():
Expand Down

0 comments on commit cc4d522

Please sign in to comment.