From 1893f1c1e76747894efad287a4ac850b62e98e93 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Nov 2018 10:38:37 +0800 Subject: [PATCH 01/19] [POC] Enables Arrow optimization from R DataFrame to Spark DataFrame --- R/pkg/R/SQLContext.R | 104 +++++++++++++----- R/pkg/tests/fulltests/test_sparkSQL.R | 16 +++ .../apache/spark/sql/internal/SQLConf.scala | 5 +- .../org/apache/spark/sql/api/r/SQLUtils.scala | 22 ++++ 4 files changed, 120 insertions(+), 27 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index afcdd6faa849d..91f6514658e72 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -147,6 +147,30 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + stopifnot(require("arrow", quietly = TRUE)) + stopifnot(require("withr", quietly = TRUE)) + numPartitions <- if (!is.null(numPartitions)) { + numToInt(numPartitions) + } else { + 1 + } + fileName <- tempfile() + chunk <- as.integer(nrow(rdf) / numPartitions) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) + stream_writer <- NULL + for (rdf_slice in rdf_slices) { + batch <- arrow::record_batch(rdf_slice) + if (is.null(stream_writer)) { + stream <- arrow:::close_on_exit(arrow::file_output_stream(fileName)) + schema <- batch$schema() + stream_writer <- arrow:::close_on_exit(arrow::record_batch_stream_writer(stream, schema)) + } + arrow::write_record_batch(batch, stream_writer) + } + fileName +} + #' Create a SparkDataFrame #' #' Converts R data.frame or list into SparkDataFrame. @@ -172,15 +196,16 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + conf <- callJMethod(sparkSession, "conf") + arrowEnabled <- tolower(callJMethod(conf, "get", "spark.sql.execution.arrow.enabled")) == "true" + shouldUseArrow <- is.data.frame(data) && arrowEnabled if (is.data.frame(data)) { - # Convert data into a list of rows. Each row is a list. - # get the names of columns, they will be put into RDD if (is.null(schema)) { schema <- names(data) } + # Convert data into a list of rows. Each row is a list. # get rid of factor type cleanCols <- function(x) { if (is.factor(x)) { @@ -189,33 +214,57 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) - - # check if all columns have supported type - lapply(data, getInternalType) - - # convert to rows args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) - data <- do.call(mapply, append(args, data)) + if (arrowEnabled) { + stopifnot(length(data) > 0) + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( + jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { + file.remove(fileName) + }) + row <- do.call(mapply, append(args, head(data, 1)))[[1]] + } else { + # drop factors and wrap lists + data <- setNames(as.list(data), NULL) + + # check if all columns have supported type + lapply(data, getInternalType) + + # convert to rows + data <- do.call(mapply, append(args, data)) + if (length(data) > 0) { + row <- data[[1]] + } + } } - if (is.list(data)) { - sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - if (!is.null(numPartitions)) { - rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) + if (shouldUseArrow) { + rdd <- jrddInArrow + } else { + if (is.list(data)) { + sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + if (!is.null(numPartitions)) { + rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) + } else { + rdd <- parallelize(sc, data, numSlices = 1) + } + } else if (inherits(data, "RDD")) { + rdd <- data } else { - rdd <- parallelize(sc, data, numSlices = 1) + stop(paste("unexpected type:", class(data))) } - } else if (inherits(data, "RDD")) { - rdd <- data - } else { - stop(paste("unexpected type:", class(data))) } if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) { - row <- firstRDD(rdd) + if (!exists("row")) { + row <- firstRDD(rdd) + } names <- if (is.null(schema)) { names(row) } else { @@ -246,10 +295,15 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, stopifnot(class(schema) == "structType") - jrdd <- getJRDD(lapply(rdd, function(x) x), "row") - srdd <- callJMethod(jrdd, "rdd") - sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF", - srdd, schema$jobj, sparkSession) + if (shouldUseArrow) { + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "toDataFrame", rdd, schema$jobj, sparkSession) + } else { + jrdd <- getJRDD(lapply(rdd, function(x) x), "row") + srdd <- callJMethod(jrdd, "rdd") + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF", + srdd, schema$jobj, sparkSession) + } dataFrame(sdf) } diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 88f2286219525..89f661376821b 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -307,6 +307,22 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") + expected <- collect(createDataFrame(mtcars)) + conf <- callJMethod(sparkSession, "conf") + arrowEnabled <- callJMethod(conf, "get", "spark.sql.execution.arrow.enabled") + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true") + tryCatch({ + expect_equal(collect(createDataFrame(mtcars)), expected) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.shuffle.partitions", arrowEnabled) + }) +}) + test_that("read/write csv as DataFrame", { if (windows_with_hadoop()) { csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ebc8c3705ea28..fb0af550e6ed6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1284,8 +1284,9 @@ object SQLConf { val ARROW_EXECUTION_ENABLED = buildConf("spark.sql.execution.arrow.enabled") .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " + - "for use with pyspark.sql.DataFrame.toPandas, and " + - "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. " + + "for use with pyspark.sql.DataFrame.toPandas, " + + "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame," + + "and createDataFrame when its input is a R DataFrame. " + "The following data types are unsupported: " + "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.") .booleanConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index f5d8d4ea0a4da..7bc6b07082649 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types._ @@ -237,4 +238,25 @@ private[sql] object SQLUtils extends Logging { def createArrayType(column: Column): ArrayType = { new ArrayType(ExprUtils.evalTypeExpr(column.expr), true) } + + /** + * R callable function to read a file in Arrow stream format and create a [[RDD]] + * using each serialized ArrowRecordBatch as a partition. + */ + def readArrowStreamFromFile( + sparkSession: SparkSession, + filename: String): JavaRDD[Array[Byte]] = { + ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename) + } + + /** + * R callable function to read a file in Arrow stream format and create a [[DataFrame]] + * from an RDD. + */ + def toDataFrame( + arrowBatchRDD: JavaRDD[Array[Byte]], + schema: StructType, + sparkSession: SparkSession): DataFrame = { + ArrowConverters.toDataFrame(arrowBatchRDD, schema.json, sparkSession.sqlContext) + } } From 39fa2519d28a7917c6fa888e2c8b848a8d63f362 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Nov 2018 17:56:00 +0800 Subject: [PATCH 02/19] Set spark.sql.execution.arrow.enabled back in test --- R/pkg/tests/fulltests/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 89f661376821b..fe4ea53720e1b 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -319,7 +319,7 @@ test_that("createDataFrame Arrow optimization", { }, finally = { # Resetting the conf back to default value - callJMethod(conf, "set", "spark.sql.shuffle.partitions", arrowEnabled) + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) }) }) From be11ce9dea90921b22028732c4ef9564f0a3af71 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Nov 2018 18:30:59 +0800 Subject: [PATCH 03/19] documentation build and nits --- R/pkg/R/SQLContext.R | 17 +++++++++-------- .../org/apache/spark/sql/api/r/SQLUtils.scala | 4 ++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 91f6514658e72..7d8f0b06bd584 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -199,6 +199,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, conf <- callJMethod(sparkSession, "conf") arrowEnabled <- tolower(callJMethod(conf, "get", "spark.sql.execution.arrow.enabled")) == "true" shouldUseArrow <- is.data.frame(data) && arrowEnabled + firstRow <- NULL if (is.data.frame(data)) { # get the names of columns, they will be put into RDD if (is.null(schema)) { @@ -228,7 +229,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, finally = { file.remove(fileName) }) - row <- do.call(mapply, append(args, head(data, 1)))[[1]] + firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]] } else { # drop factors and wrap lists data <- setNames(as.list(data), NULL) @@ -239,7 +240,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, # convert to rows data <- do.call(mapply, append(args, data)) if (length(data) > 0) { - row <- data[[1]] + firstRow <- data[[1]] } } } @@ -262,16 +263,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, } if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) { - if (!exists("row")) { - row <- firstRDD(rdd) + if (is.null(firstRow)) { + firstRow <- firstRDD(rdd) } names <- if (is.null(schema)) { - names(row) + names(firstRow) } else { as.list(schema) } if (is.null(names)) { - names <- lapply(1:length(row), function(x) { + names <- lapply(1:length(firstRow), function(x) { paste("_", as.character(x), sep = "") }) } @@ -286,8 +287,8 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, nn }) - types <- lapply(row, infer_type) - fields <- lapply(1:length(row), function(i) { + types <- lapply(firstRow, infer_type) + fields <- lapply(1:length(firstRow), function(i) { structField(names[[i]], types[[i]], TRUE) }) schema <- do.call(structType, fields) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 7bc6b07082649..8d38da59c590b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -240,7 +240,7 @@ private[sql] object SQLUtils extends Logging { } /** - * R callable function to read a file in Arrow stream format and create a [[RDD]] + * R callable function to read a file in Arrow stream format and create a `RDD` * using each serialized ArrowRecordBatch as a partition. */ def readArrowStreamFromFile( @@ -250,7 +250,7 @@ private[sql] object SQLUtils extends Logging { } /** - * R callable function to read a file in Arrow stream format and create a [[DataFrame]] + * R callable function to read a file in Arrow stream format and create a `DataFrame` * from an RDD. */ def toDataFrame( From 7f73c89612d8516b5e5a420d1f4b6d69b028bdde Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Nov 2018 18:34:48 +0800 Subject: [PATCH 04/19] make the diff smaller --- R/pkg/R/SQLContext.R | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 7d8f0b06bd584..3c07e5cf05779 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -247,19 +247,17 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, if (shouldUseArrow) { rdd <- jrddInArrow - } else { - if (is.list(data)) { - sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - if (!is.null(numPartitions)) { - rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) - } else { - rdd <- parallelize(sc, data, numSlices = 1) - } - } else if (inherits(data, "RDD")) { - rdd <- data + } else if (is.list(data)) { + sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + if (!is.null(numPartitions)) { + rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) } else { - stop(paste("unexpected type:", class(data))) + rdd <- parallelize(sc, data, numSlices = 1) } + } else if (inherits(data, "RDD")) { + rdd <- data + } else { + stop(paste("unexpected type:", class(data))) } if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) { From b800073ac6d8ac55124fcc8837533a0df7fd3441 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 8 Nov 2018 18:27:52 +0800 Subject: [PATCH 05/19] Address comments --- R/pkg/R/SQLContext.R | 53 ++++++++++++++++++--------- R/pkg/tests/fulltests/test_sparkSQL.R | 2 +- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 3c07e5cf05779..30d3952bc5560 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,23 +148,29 @@ getDefaultSqlSource <- function() { } writeToTempFileInArrow <- function(rdf, numPartitions) { - stopifnot(require("arrow", quietly = TRUE)) - stopifnot(require("withr", quietly = TRUE)) + if (!require("arrow", quietly = TRUE)) { + stop("'arrow' package should be installed.") + } + if (!require("withr", quietly = TRUE)) { + stop("'withr' package should be installed.") + } + numPartitions <- if (!is.null(numPartitions)) { numToInt(numPartitions) } else { 1 } - fileName <- tempfile() + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") chunk <- as.integer(nrow(rdf) / numPartitions) rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) stream_writer <- NULL for (rdf_slice in rdf_slices) { batch <- arrow::record_batch(rdf_slice) if (is.null(stream_writer)) { - stream <- arrow:::close_on_exit(arrow::file_output_stream(fileName)) + # We should avoid internal calls 'close_on_exit' but looks there's no exposed API for it. + stream <- arrow:::close_on_exit(arrow::file_output_stream(fileName)) # nolint schema <- batch$schema() - stream_writer <- arrow:::close_on_exit(arrow::record_batch_stream_writer(stream, schema)) + stream_writer <- arrow:::close_on_exit(arrow::record_batch_stream_writer(stream, schema)) # nolint } arrow::write_record_batch(batch, stream_writer) } @@ -196,9 +202,8 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - conf <- callJMethod(sparkSession, "conf") - arrowEnabled <- tolower(callJMethod(conf, "get", "spark.sql.execution.arrow.enabled")) == "true" - shouldUseArrow <- is.data.frame(data) && arrowEnabled + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" + shouldUseArrow <- FALSE firstRow <- NULL if (is.data.frame(data)) { # get the names of columns, they will be put into RDD @@ -206,7 +211,6 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, schema <- names(data) } - # Convert data into a list of rows. Each row is a list. # get rid of factor type cleanCols <- function(x) { if (is.factor(x)) { @@ -219,18 +223,31 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) if (arrowEnabled) { - stopifnot(length(data) > 0) - fileName <- writeToTempFileInArrow(data, numPartitions) - tryCatch( - jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "readArrowStreamFromFile", - sparkSession, - fileName), + shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( + jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), finally = { file.remove(fileName) + }) + firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]] + TRUE + }, + error = function(e) { + message(paste0("createDataFrame attempted Arrow optimization because ", + "'spark.sql.execution.arrow.enabled' is set to true; however, ", + "failed, attempting non-optimization. Reason: ", + e)) + return(FALSE) }) - firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]] - } else { + } + + if (!shouldUseArrow) { + # Convert data into a list of rows. Each row is a list. # drop factors and wrap lists data <- setNames(as.list(data), NULL) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index fe4ea53720e1b..fb4383bb83a7e 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -311,8 +311,8 @@ test_that("createDataFrame Arrow optimization", { skip_if_not_installed("arrow") skip_if_not_installed("withr") expected <- collect(createDataFrame(mtcars)) + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] conf <- callJMethod(sparkSession, "conf") - arrowEnabled <- callJMethod(conf, "get", "spark.sql.execution.arrow.enabled") callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true") tryCatch({ expect_equal(collect(createDataFrame(mtcars)), expected) From 2ca39899d969f18acdb152e69e4fb6c71e237813 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 8 Nov 2018 21:35:01 +0800 Subject: [PATCH 06/19] Fix CRAN check --- R/pkg/DESCRIPTION | 4 +++- R/pkg/R/SQLContext.R | 57 +++++++++++++++++++++++++------------------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 736da46eaa8d3..fd50b8dea1c84 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -22,7 +22,9 @@ Suggests: rmarkdown, testthat, e1071, - survival + survival, + arrow, + withr Collate: 'schema.R' 'generics.R' diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 30d3952bc5560..c9b0d019e274b 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,33 +148,40 @@ getDefaultSqlSource <- function() { } writeToTempFileInArrow <- function(rdf, numPartitions) { - if (!require("arrow", quietly = TRUE)) { - stop("'arrow' package should be installed.") - } - if (!require("withr", quietly = TRUE)) { - stop("'withr' package should be installed.") - } - - numPartitions <- if (!is.null(numPartitions)) { - numToInt(numPartitions) - } else { - 1 - } - fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") - chunk <- as.integer(nrow(rdf) / numPartitions) - rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) - stream_writer <- NULL - for (rdf_slice in rdf_slices) { - batch <- arrow::record_batch(rdf_slice) - if (is.null(stream_writer)) { - # We should avoid internal calls 'close_on_exit' but looks there's no exposed API for it. - stream <- arrow:::close_on_exit(arrow::file_output_stream(fileName)) # nolint - schema <- batch$schema() - stream_writer <- arrow:::close_on_exit(arrow::record_batch_stream_writer(stream, schema)) # nolint + if (requireNamespace("arrow", quietly = TRUE)) { + # Currently arrow requires withr; otherwise, write APIs don't work. + # Direct 'require' is not recommended by CRAN. Here's a workaround. + require1 <- require + if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { + numToInt(numPartitions) + } else { + 1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(nrow(rdf) / numPartitions) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) + stream_writer <- NULL + for (rdf_slice in rdf_slices) { + batch <- arrow::record_batch(rdf_slice) + if (is.null(stream_writer)) { + # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks + # there's no exposed API for it. Here's a workaround but ideally this should + # be removed. + close_on_exit <- get("close_on_exit", envir = asNamespace("arrow"), inherits = FALSE) + stream <- close_on_exit(arrow::file_output_stream(fileName)) + schema <- batch$schema() + stream_writer <- close_on_exit(arrow::record_batch_stream_writer(stream, schema)) + } + arrow::write_record_batch(batch, stream_writer) + } + return(fileName) + } else { + stop("'withr' package should be installed.") } - arrow::write_record_batch(batch, stream_writer) + } else { + stop("'arrow' package should be installed.") } - fileName } #' Create a SparkDataFrame From 7b506f8198e52f2d44126b66fa3829c88edb9794 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 9 Nov 2018 01:08:36 +0800 Subject: [PATCH 07/19] Type specification --- R/pkg/R/SQLContext.R | 21 +++++++++++++++--- R/pkg/tests/fulltests/test_sparkSQL.R | 22 +++++++++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 2 +- 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index c9b0d019e274b..5b273cf94c709 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -159,7 +159,7 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { 1 } fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") - chunk <- as.integer(nrow(rdf) / numPartitions) + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) stream_writer <- NULL for (rdf_slice in rdf_slices) { @@ -232,6 +232,22 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, if (arrowEnabled) { shouldUseArrow <- tryCatch({ stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct")))) { + stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { + stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { + if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") + } + } + firstRow <- do.call(mapply, append(args, dataHead))[[1]] fileName <- writeToTempFileInArrow(data, numPartitions) tryCatch( jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", @@ -241,11 +257,10 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, finally = { file.remove(fileName) }) - firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]] TRUE }, error = function(e) { - message(paste0("createDataFrame attempted Arrow optimization because ", + message(paste0("WARN: createDataFrame attempted Arrow optimization because ", "'spark.sql.execution.arrow.enabled' is set to true; however, ", "failed, attempting non-optimization. Reason: ", e)) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index fb4383bb83a7e..b931dbd626027 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -323,6 +323,28 @@ test_that("createDataFrame Arrow optimization", { }) }) +test_that("createDataFrame Arrow optimization - type specification", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") + rdf <- data.frame(list(list(a=1, + b="a", + c=TRUE, + d=1.1, + e=1L, + g=as.Date("1990-02-24")))) + expected <- collect(createDataFrame(rdf)) + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + conf <- callJMethod(sparkSession, "conf") + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true") + tryCatch({ + expect_equal(collect(createDataFrame(rdf)), expected) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) + }) +}) + test_that("read/write csv as DataFrame", { if (windows_with_hadoop()) { csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fb0af550e6ed6..792bf30e32984 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1286,7 +1286,7 @@ object SQLConf { .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " + "for use with pyspark.sql.DataFrame.toPandas, " + "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame," + - "and createDataFrame when its input is a R DataFrame. " + + "and createDataFrame when its input is an R DataFrame. " + "The following data types are unsupported: " + "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.") .booleanConf From 8992fb8c8d20f9dc1e5fd224b56a5648655a341e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 9 Nov 2018 01:13:49 +0800 Subject: [PATCH 08/19] Workaround CRAN for now --- R/pkg/DESCRIPTION | 4 +--- R/pkg/R/SQLContext.R | 6 +++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index fd50b8dea1c84..736da46eaa8d3 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -22,9 +22,7 @@ Suggests: rmarkdown, testthat, e1071, - survival, - arrow, - withr + survival Collate: 'schema.R' 'generics.R' diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 5b273cf94c709..5546375e76eb2 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,7 +148,11 @@ getDefaultSqlSource <- function() { } writeToTempFileInArrow <- function(rdf, numPartitions) { - if (requireNamespace("arrow", quietly = TRUE)) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { # Currently arrow requires withr; otherwise, write APIs don't work. # Direct 'require' is not recommended by CRAN. Here's a workaround. require1 <- require From 253db939240c2a2dcc8639b0b8b3a30c68a0673a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 9 Nov 2018 01:26:36 +0800 Subject: [PATCH 09/19] workaround for CRAN --- R/pkg/R/SQLContext.R | 16 ++++++++++++---- R/pkg/tests/fulltests/test_sparkSQL.R | 12 ++++++------ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 5546375e76eb2..10e377521d18f 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -153,6 +153,14 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { # around by avoiding direct requireNamespace. requireNamespace1 <- requireNamespace if (requireNamespace1("arrow", quietly = TRUE)) { + record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) + record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) + file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) + write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + # Currently arrow requires withr; otherwise, write APIs don't work. # Direct 'require' is not recommended by CRAN. Here's a workaround. require1 <- require @@ -167,17 +175,17 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) stream_writer <- NULL for (rdf_slice in rdf_slices) { - batch <- arrow::record_batch(rdf_slice) + batch <- record_batch(rdf_slice) if (is.null(stream_writer)) { # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks # there's no exposed API for it. Here's a workaround but ideally this should # be removed. close_on_exit <- get("close_on_exit", envir = asNamespace("arrow"), inherits = FALSE) - stream <- close_on_exit(arrow::file_output_stream(fileName)) + stream <- close_on_exit(file_output_stream(fileName)) schema <- batch$schema() - stream_writer <- close_on_exit(arrow::record_batch_stream_writer(stream, schema)) + stream_writer <- close_on_exit(record_batch_stream_writer(stream, schema)) } - arrow::write_record_batch(batch, stream_writer) + write_record_batch(batch, stream_writer) } return(fileName) } else { diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index b931dbd626027..818bc5a6f65e4 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -326,12 +326,12 @@ test_that("createDataFrame Arrow optimization", { test_that("createDataFrame Arrow optimization - type specification", { skip_if_not_installed("arrow") skip_if_not_installed("withr") - rdf <- data.frame(list(list(a=1, - b="a", - c=TRUE, - d=1.1, - e=1L, - g=as.Date("1990-02-24")))) + rdf <- data.frame(list(list(a = 1, + b = "a", + c = TRUE, + d = 1.1, + e = 1L, + g = as.Date("1990-02-24")))) expected <- collect(createDataFrame(rdf)) arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] conf <- callJMethod(sparkSession, "conf") From cbcd75d50c1e4d1677bf0e9ff59f187d3f7d0267 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Nov 2018 16:42:43 +0800 Subject: [PATCH 10/19] Address comments --- R/pkg/DESCRIPTION | 3 +- R/pkg/R/SQLContext.R | 186 ++++++++++-------- R/pkg/R/context.R | 40 ++-- R/pkg/tests/fulltests/test_sparkSQL.R | 26 ++- .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- 5 files changed, 151 insertions(+), 106 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 736da46eaa8d3..929f2a7d4f1d2 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -22,7 +22,8 @@ Suggests: rmarkdown, testthat, e1071, - survival + survival, + withr Collate: 'schema.R' 'generics.R' diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 10e377521d18f..1b277073b9b9b 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,6 +148,17 @@ getDefaultSqlSource <- function() { } writeToTempFileInArrow <- function(rdf, numPartitions) { + defer_parent <- function(x, ...){ + # For some reasons, Arrow R API requires to load 'defer_parent', which is from 'withr' package. + # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' + # package, which CRAN complains about. + if (requireNamespace("withr", quietly = TRUE)) { + withr::defer_parent(x, ...) + } else { + stop("'withr' package should be installed.") + } + } + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works # around by avoiding direct requireNamespace. @@ -161,41 +172,64 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { write_record_batch <- get( "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) - # Currently arrow requires withr; otherwise, write APIs don't work. - # Direct 'require' is not recommended by CRAN. Here's a workaround. - require1 <- require - if (require1("withr", quietly = TRUE)) { - numPartitions <- if (!is.null(numPartitions)) { - numToInt(numPartitions) - } else { - 1 - } - fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") - chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) - rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) - stream_writer <- NULL + numPartitions <- if (!is.null(numPartitions)) { + numToInt(numPartitions) + } else { + 1 + } + + rdf_slices <- if (numPartitions > 1) { + split(rdf, makeSplits(numPartitions, nrow(rdf))) + } else { + list(rdf) + } + + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + stream_writer <- NULL + tryCatch({ for (rdf_slice in rdf_slices) { batch <- record_batch(rdf_slice) if (is.null(stream_writer)) { # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks # there's no exposed API for it. Here's a workaround but ideally this should # be removed. - close_on_exit <- get("close_on_exit", envir = asNamespace("arrow"), inherits = FALSE) - stream <- close_on_exit(file_output_stream(fileName)) + stream <- file_output_stream(fileName) schema <- batch$schema() - stream_writer <- close_on_exit(record_batch_stream_writer(stream, schema)) + stream_writer <- record_batch_stream_writer(stream, schema) } + write_record_batch(batch, stream_writer) } - return(fileName) - } else { - stop("'withr' package should be installed.") - } + }, + finally = { + if (!is.null(stream_writer)) { + stream_writer$Close() + } + }) + + return(fileName) } else { stop("'arrow' package should be installed.") } } +checkTypeRequirementForArrow <- function(dataHead, schema) { + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct")))) { + stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { + stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { + if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") + } + } +} + #' Create a SparkDataFrame #' #' Converts R data.frame or list into SparkDataFrame. @@ -225,75 +259,63 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, shouldUseArrow <- FALSE firstRow <- NULL if (is.data.frame(data)) { - # get the names of columns, they will be put into RDD - if (is.null(schema)) { - schema <- names(data) - } + # get the names of columns, they will be put into RDD + if (is.null(schema)) { + schema <- names(data) + } - # get rid of factor type - cleanCols <- function(x) { - if (is.factor(x)) { - as.character(x) - } else { - x - } + # get rid of factor type + cleanCols <- function(x) { + if (is.factor(x)) { + as.character(x) + } else { + x } - data[] <- lapply(data, cleanCols) - - args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) - if (arrowEnabled) { - shouldUseArrow <- tryCatch({ - stopifnot(length(data) > 0) - dataHead <- head(data, 1) - # Currenty Arrow optimization does not support POSIXct and raw for now. - # Also, it does not support explicit float type set by users. It leads to - # incorrect conversion. We will fall back to the path without Arrow optimization. - if (any(sapply(dataHead, function(x) is(x, "POSIXct")))) { - stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") - } - if (any(sapply(dataHead, is.raw))) { - stop("Arrow optimization with R DataFrame does not support raw type yet.") - } - if (inherits(schema, "structType")) { - if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { - stop("Arrow optimization with R DataFrame does not support FloatType type yet.") - } - } - firstRow <- do.call(mapply, append(args, dataHead))[[1]] - fileName <- writeToTempFileInArrow(data, numPartitions) - tryCatch( - jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "readArrowStreamFromFile", - sparkSession, - fileName), - finally = { - file.remove(fileName) - }) - TRUE - }, - error = function(e) { - message(paste0("WARN: createDataFrame attempted Arrow optimization because ", - "'spark.sql.execution.arrow.enabled' is set to true; however, ", - "failed, attempting non-optimization. Reason: ", - e)) - return(FALSE) + } + data[] <- lapply(data, cleanCols) + + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { + shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + checkTypeRequirementForArrow(data, schema) + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( + jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { + file.remove(fileName) }) - } - if (!shouldUseArrow) { - # Convert data into a list of rows. Each row is a list. - # drop factors and wrap lists - data <- setNames(as.list(data), NULL) + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + TRUE + }, + error = function(e) { + warning(paste0("createDataFrame attempted Arrow optimization because ", + "'spark.sql.execution.arrow.enabled' is set to true; however, ", + "failed, attempting non-optimization. Reason: ", + e)) + return(FALSE) + }) + } - # check if all columns have supported type - lapply(data, getInternalType) + if (!shouldUseArrow) { + # Convert data into a list of rows. Each row is a list. + # drop factors and wrap lists + data <- setNames(as.list(data), NULL) - # convert to rows - data <- do.call(mapply, append(args, data)) - if (length(data) > 0) { - firstRow <- data[[1]] - } + # check if all columns have supported type + lapply(data, getInternalType) + + # convert to rows + data <- do.call(mapply, append(args, data)) + if (length(data) > 0) { + firstRow <- data[[1]] } + } } if (shouldUseArrow) { diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 0207f249f9aa0..bac3efd03e8b5 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -81,6 +81,26 @@ objectFile <- function(sc, path, minPartitions = NULL) { RDD(jrdd, "byte") } +makeSplits <- function(numSerializedSlices, length) { + # Generate the slice ids to put each row + # For instance, for numSerializedSlices of 22, length of 50 + # [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22 + # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47 + # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced. + # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD + if (numSerializedSlices > 0) { + unlist(lapply(0: (numSerializedSlices - 1), function(x) { + # nolint start + start <- trunc((as.numeric(x) * length) / numSerializedSlices) + end <- trunc(((as.numeric(x) + 1) * length) / numSerializedSlices) + # nolint end + rep(start, end - start) + })) + } else { + 1 + } +} + #' Create an RDD from a homogeneous list or vector. #' #' This function creates an RDD from a local homogeneous list in R. The elements @@ -143,25 +163,7 @@ parallelize <- function(sc, coll, numSlices = 1) { # For large objects we make sure the size of each slice is also smaller than sizeLimit numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit))) - # Generate the slice ids to put each row - # For instance, for numSerializedSlices of 22, length of 50 - # [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22 - # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47 - # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced. - # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD - splits <- if (numSerializedSlices > 0) { - unlist(lapply(0: (numSerializedSlices - 1), function(x) { - # nolint start - start <- trunc((as.numeric(x) * len) / numSerializedSlices) - end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices) - # nolint end - rep(start, end - start) - })) - } else { - 1 - } - - slices <- split(coll, splits) + slices <- split(coll, makeSplits(numSerializedSlices, len)) # Serialize each slice: obtain a list of raws, or a list of lists (slices) of # 2-tuples of raws diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 818bc5a6f65e4..157858b88601b 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -310,9 +310,19 @@ test_that("create DataFrame from RDD", { test_that("createDataFrame Arrow optimization", { skip_if_not_installed("arrow") skip_if_not_installed("withr") - expected <- collect(createDataFrame(mtcars)) - arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + conf <- callJMethod(sparkSession, "conf") + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false") + tryCatch({ + expected <- collect(createDataFrame(mtcars)) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) + }) + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true") tryCatch({ expect_equal(collect(createDataFrame(mtcars)), expected) @@ -332,9 +342,19 @@ test_that("createDataFrame Arrow optimization - type specification", { d = 1.1, e = 1L, g = as.Date("1990-02-24")))) - expected <- collect(createDataFrame(rdf)) + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] conf <- callJMethod(sparkSession, "conf") + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false") + tryCatch({ + expected <- collect(createDataFrame(rdf)) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled) + }) + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true") tryCatch({ expect_equal(collect(createDataFrame(rdf)), expected) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 8d38da59c590b..01f278b5152e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -240,7 +240,7 @@ private[sql] object SQLUtils extends Logging { } /** - * R callable function to read a file in Arrow stream format and create a `RDD` + * R callable function to read a file in Arrow stream format and create an `RDD` * using each serialized ArrowRecordBatch as a partition. */ def readArrowStreamFromFile( From b6bfae911d447ceda18176bab25e9b9e5b761831 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Nov 2018 16:59:38 +0800 Subject: [PATCH 11/19] Fix comments --- R/pkg/R/SQLContext.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 1b277073b9b9b..47bfaaffa38b6 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,10 +148,10 @@ getDefaultSqlSource <- function() { } writeToTempFileInArrow <- function(rdf, numPartitions) { - defer_parent <- function(x, ...){ - # For some reasons, Arrow R API requires to load 'defer_parent', which is from 'withr' package. - # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' - # package, which CRAN complains about. + # For some reasons, Arrow R API requires to load 'defer_parent' which is from 'withr' package. + # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' + # package, which CRAN complains about. + defer_parent <- function(x, ...) { if (requireNamespace("withr", quietly = TRUE)) { withr::defer_parent(x, ...) } else { From f72b505d064d0599ef6a18d1d5045dbc62608338 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Nov 2018 21:47:17 +0800 Subject: [PATCH 12/19] Fix CRAN --- R/pkg/R/SQLContext.R | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 47bfaaffa38b6..858a38ebe30cb 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -151,9 +151,12 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { # For some reasons, Arrow R API requires to load 'defer_parent' which is from 'withr' package. # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' # package, which CRAN complains about. - defer_parent <- function(x, ...) { - if (requireNamespace("withr", quietly = TRUE)) { - withr::defer_parent(x, ...) + defer_parent <- function(x, ...) + # requireNamespace complains in CRAN in Jenkins. We should fix. + requireNamespace1 <- requireNamespace + if (requireNamespace1("withr", quietly = TRUE)) { + defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) + defer_parent(x, ...) } else { stop("'withr' package should be installed.") } From 07e56d4fde02cf1dcbff6c45df1eb4b7e1c87c82 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Nov 2018 21:55:27 +0800 Subject: [PATCH 13/19] Let's don't fix DESCRIPTON for now --- R/pkg/DESCRIPTION | 3 +-- R/pkg/R/SQLContext.R | 16 ++++++---------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 929f2a7d4f1d2..736da46eaa8d3 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -22,8 +22,7 @@ Suggests: rmarkdown, testthat, e1071, - survival, - withr + survival Collate: 'schema.R' 'generics.R' diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 858a38ebe30cb..41c0c492fed24 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,24 +148,20 @@ getDefaultSqlSource <- function() { } writeToTempFileInArrow <- function(rdf, numPartitions) { + requireNamespace1 <- requireNamespace + # For some reasons, Arrow R API requires to load 'defer_parent' which is from 'withr' package. # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' # package, which CRAN complains about. - defer_parent <- function(x, ...) - # requireNamespace complains in CRAN in Jenkins. We should fix. - requireNamespace1 <- requireNamespace - if (requireNamespace1("withr", quietly = TRUE)) { - defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) - defer_parent(x, ...) - } else { - stop("'withr' package should be installed.") - } + if (requireNamespace1("withr", quietly = TRUE)) { + defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) + } else { + stop("'withr' package should be installed.") } # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works # around by avoiding direct requireNamespace. - requireNamespace1 <- requireNamespace if (requireNamespace1("arrow", quietly = TRUE)) { record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) record_batch_stream_writer <- get( From 1d57e0a8d67acfa6f1e60fcbe4992df40e243580 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Nov 2018 22:04:52 +0800 Subject: [PATCH 14/19] Ha .. also avoid R lint failure --- R/pkg/R/SQLContext.R | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 41c0c492fed24..2d58297d3ecd3 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -153,10 +153,13 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { # For some reasons, Arrow R API requires to load 'defer_parent' which is from 'withr' package. # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' # package, which CRAN complains about. + defer_parent <- function(x, ...) { if (requireNamespace1("withr", quietly = TRUE)) { - defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) - } else { - stop("'withr' package should be installed.") + defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) + defer_parent(x, ...) + } else { + stop("'withr' package should be installed.") + } } # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace From 168a81ec8d62dd781b1d960b84a14f967887b269 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 12 Nov 2018 11:27:52 +0800 Subject: [PATCH 15/19] Address nits --- R/pkg/R/SQLContext.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 2d58297d3ecd3..e165bbb4d0963 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -154,7 +154,7 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' # package, which CRAN complains about. defer_parent <- function(x, ...) { - if (requireNamespace1("withr", quietly = TRUE)) { + if (requireNamespace1("withr", quietly = TRUE)) { defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) defer_parent(x, ...) } else { @@ -209,7 +209,7 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { } }) - return(fileName) + fileName } else { stop("'arrow' package should be installed.") } @@ -300,7 +300,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, "'spark.sql.execution.arrow.enabled' is set to true; however, ", "failed, attempting non-optimization. Reason: ", e)) - return(FALSE) + FALSE }) } From 767af864a328b8dfb7bbcecea331e3f84ab45149 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 13 Nov 2018 14:10:32 +0800 Subject: [PATCH 16/19] Address comments --- R/pkg/R/SQLContext.R | 23 ++++++++++--------- .../org/apache/spark/sql/api/r/SQLUtils.scala | 4 ++-- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index e165bbb4d0963..f9d0e68ffd930 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -147,7 +147,7 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } -writeToTempFileInArrow <- function(rdf, numPartitions) { +writeToFileInArrow <- function(fileName, rdf, numPartitions) { requireNamespace1 <- requireNamespace # For some reasons, Arrow R API requires to load 'defer_parent' which is from 'withr' package. @@ -186,7 +186,6 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { list(rdf) } - fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") stream_writer <- NULL tryCatch({ for (rdf_slice in rdf_slices) { @@ -209,7 +208,6 @@ writeToTempFileInArrow <- function(rdf, numPartitions) { } }) - fileName } else { stop("'arrow' package should be installed.") } @@ -258,8 +256,9 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" - shouldUseArrow <- FALSE + useArrow <- FALSE firstRow <- NULL + if (is.data.frame(data)) { # get the names of columns, they will be put into RDD if (is.null(schema)) { @@ -278,16 +277,18 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) if (arrowEnabled) { - shouldUseArrow <- tryCatch({ + useArrow <- tryCatch({ stopifnot(length(data) > 0) dataHead <- head(data, 1) checkTypeRequirementForArrow(data, schema) - fileName <- writeToTempFileInArrow(data, numPartitions) - tryCatch( + fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp") + tryCatch({ + writeToFileInArrow(fileName, data, numPartitions) jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "readArrowStreamFromFile", sparkSession, - fileName), + fileName) + }, finally = { file.remove(fileName) }) @@ -304,7 +305,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, }) } - if (!shouldUseArrow) { + if (!useArrow) { # Convert data into a list of rows. Each row is a list. # drop factors and wrap lists data <- setNames(as.list(data), NULL) @@ -320,7 +321,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, } } - if (shouldUseArrow) { + if (useArrow) { rdd <- jrddInArrow } else if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) @@ -369,7 +370,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, stopifnot(class(schema) == "structType") - if (shouldUseArrow) { + if (useArrow) { sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toDataFrame", rdd, schema$jobj, sparkSession) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 01f278b5152e2..693be99d47495 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -250,8 +250,8 @@ private[sql] object SQLUtils extends Logging { } /** - * R callable function to read a file in Arrow stream format and create a `DataFrame` - * from an RDD. + * R callable function to create a `DataFrame` from a `JavaRDD` of serialized + * ArrowRecordBatches. */ def toDataFrame( arrowBatchRDD: JavaRDD[Array[Byte]], From 92eec4e0d63bb28a0d84d2761863f35751a3f448 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 24 Jan 2019 21:51:57 +0800 Subject: [PATCH 17/19] Match to Arrow 0.12.0 --- R/pkg/R/SQLContext.R | 48 +++++++------------ R/pkg/tests/fulltests/test_sparkSQL.R | 5 +- .../apache/spark/sql/internal/SQLConf.scala | 2 +- 3 files changed, 20 insertions(+), 35 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index f9d0e68ffd930..551dc73df41b4 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -150,29 +150,17 @@ getDefaultSqlSource <- function() { writeToFileInArrow <- function(fileName, rdf, numPartitions) { requireNamespace1 <- requireNamespace - # For some reasons, Arrow R API requires to load 'defer_parent' which is from 'withr' package. - # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' - # package, which CRAN complains about. - defer_parent <- function(x, ...) { - if (requireNamespace1("withr", quietly = TRUE)) { - defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) - defer_parent(x, ...) - } else { - stop("'withr' package should be installed.") - } - } - - # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace - # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works - # around by avoiding direct requireNamespace. + # R API in Arrow is not yet released in CRAN (see ARROW-3204). CRAN requires to add the + # package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available + # or not. Therefore, it works around by avoiding direct requireNamespace. + # Currently, as of Arrow 0.12.0, it can be installed, for instance, by + # `Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.0", subdir = "r")'` if (requireNamespace1("arrow", quietly = TRUE)) { record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) - record_batch_stream_writer <- get( - "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) - file_output_stream <- get( - "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) - write_record_batch <- get( - "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + RecordBatchStreamWriter <- get( + "RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE) + FileOutputStream <- get( + "FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE) numPartitions <- if (!is.null(numPartitions)) { numToInt(numPartitions) @@ -194,17 +182,17 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) { # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks # there's no exposed API for it. Here's a workaround but ideally this should # be removed. - stream <- file_output_stream(fileName) - schema <- batch$schema() - stream_writer <- record_batch_stream_writer(stream, schema) + stream <- FileOutputStream(fileName) + schema <- batch$schema + stream_writer <- RecordBatchStreamWriter(stream, schema) } - write_record_batch(batch, stream_writer) + stream_writer$write_batch(batch) } }, finally = { if (!is.null(stream_writer)) { - stream_writer$Close() + stream_writer$close() } }) @@ -214,12 +202,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) { } checkTypeRequirementForArrow <- function(dataHead, schema) { - # Currenty Arrow optimization does not support POSIXct and raw for now. + # Currenty Arrow optimization does not support raw for now. # Also, it does not support explicit float type set by users. It leads to # incorrect conversion. We will fall back to the path without Arrow optimization. - if (any(sapply(dataHead, function(x) is(x, "POSIXct")))) { - stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") - } if (any(sapply(dataHead, is.raw))) { stop("Arrow optimization with R DataFrame does not support raw type yet.") } @@ -290,7 +275,8 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, fileName) }, finally = { - file.remove(fileName) + # File might not be created. + suppressWarnings(file.remove(fileName)) }) firstRow <- do.call(mapply, append(args, dataHead))[[1]] diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 157858b88601b..93cb8903fc614 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -309,7 +309,6 @@ test_that("create DataFrame from RDD", { test_that("createDataFrame Arrow optimization", { skip_if_not_installed("arrow") - skip_if_not_installed("withr") conf <- callJMethod(sparkSession, "conf") arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] @@ -335,13 +334,13 @@ test_that("createDataFrame Arrow optimization", { test_that("createDataFrame Arrow optimization - type specification", { skip_if_not_installed("arrow") - skip_if_not_installed("withr") rdf <- data.frame(list(list(a = 1, b = "a", c = TRUE, d = 1.1, e = 1L, - g = as.Date("1990-02-24")))) + f = as.Date("1990-02-24"), + g = as.POSIXct("1990-02-24 12:34:56")))) arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] conf <- callJMethod(sparkSession, "conf") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 792bf30e32984..fa9129484c1ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1285,7 +1285,7 @@ object SQLConf { buildConf("spark.sql.execution.arrow.enabled") .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " + "for use with pyspark.sql.DataFrame.toPandas, " + - "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame," + + "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame, " + "and createDataFrame when its input is an R DataFrame. " + "The following data types are unsupported: " + "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.") From 854c9d83f1b1b41ffdac44ffd25072c2d5df9eb0 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 24 Jan 2019 22:06:43 +0800 Subject: [PATCH 18/19] Make linter happy --- R/pkg/R/SQLContext.R | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 551dc73df41b4..f5845b8bb66cc 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -150,11 +150,10 @@ getDefaultSqlSource <- function() { writeToFileInArrow <- function(fileName, rdf, numPartitions) { requireNamespace1 <- requireNamespace - # R API in Arrow is not yet released in CRAN (see ARROW-3204). CRAN requires to add the + # R API in Arrow is not yet released in CRAN. CRAN requires to add the # package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available # or not. Therefore, it works around by avoiding direct requireNamespace. - # Currently, as of Arrow 0.12.0, it can be installed, for instance, by - # `Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.0", subdir = "r")'` + # Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204. if (requireNamespace1("arrow", quietly = TRUE)) { record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) RecordBatchStreamWriter <- get( From 66b120b3dbf3d0e6581429ec6cf10131452b5bb7 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 24 Jan 2019 22:12:30 +0800 Subject: [PATCH 19/19] Fix comment --- R/pkg/R/SQLContext.R | 3 --- 1 file changed, 3 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index f5845b8bb66cc..2e5506a4097e4 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -178,9 +178,6 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) { for (rdf_slice in rdf_slices) { batch <- record_batch(rdf_slice) if (is.null(stream_writer)) { - # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks - # there's no exposed API for it. Here's a workaround but ideally this should - # be removed. stream <- FileOutputStream(fileName) schema <- batch$schema stream_writer <- RecordBatchStreamWriter(stream, schema)