Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-24063
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Feb 22, 2019
2 parents 96152da + 95bb012 commit 43e61ef
Show file tree
Hide file tree
Showing 616 changed files with 13,566 additions and 5,558 deletions.
8 changes: 4 additions & 4 deletions R/pkg/DESCRIPTION
@@ -1,8 +1,8 @@
Package: SparkR
Type: Package
Version: 3.0.0
Title: R Frontend for Apache Spark
Description: Provides an R Frontend for Apache Spark.
Title: R Front end for 'Apache Spark'
Description: Provides an R Front end for 'Apache Spark' <https://spark.apache.org>.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
email = "shivaram@cs.berkeley.edu"),
person("Xiangrui", "Meng", role = "aut",
Expand All @@ -11,8 +11,8 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
email = "felixcheung@apache.org"),
person(family = "The Apache Software Foundation", role = c("aut", "cph")))
License: Apache License (== 2.0)
URL: http://www.apache.org/ http://spark.apache.org/
BugReports: http://spark.apache.org/contributing.html
URL: https://www.apache.org/ https://spark.apache.org/
BugReports: https://spark.apache.org/contributing.html
SystemRequirements: Java (== 8)
Depends:
R (>= 3.1),
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Expand Up @@ -312,8 +312,10 @@ exportMethods("%<=>%",
"lower",
"lpad",
"ltrim",
"map_concat",
"map_entries",
"map_from_arrays",
"map_from_entries",
"map_keys",
"map_values",
"max",
Expand Down
56 changes: 56 additions & 0 deletions R/pkg/R/DataFrame.R
Expand Up @@ -1177,11 +1177,67 @@ setMethod("dim",
setMethod("collect",
signature(x = "SparkDataFrame"),
function(x, stringsAsFactors = FALSE) {
connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
useArrow <- FALSE
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
if (arrowEnabled) {
useArrow <- tryCatch({
requireNamespace1 <- requireNamespace
if (!requireNamespace1("arrow", quietly = TRUE)) {
stop("'arrow' package should be installed.")
}
# Currenty Arrow optimization does not support raw for now.
# Also, it does not support explicit float type set by users.
if (inherits(schema(x), "structType")) {
if (any(sapply(schema(x)$fields(),
function(x) x$dataType.toString() == "FloatType"))) {
stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
"DataFrame does not support FloatType yet."))
}
if (any(sapply(schema(x)$fields(),
function(x) x$dataType.toString() == "BinaryType"))) {
stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
"DataFrame does not support BinaryType yet."))
}
}
TRUE
}, error = function(e) {
warning(paste0("The conversion from Spark DataFrame to R DataFrame was attempted ",
"with Arrow optimization because ",
"'spark.sql.execution.arrow.enabled' is set to true; however, ",
"failed, attempting non-optimization. Reason: ",
e))
FALSE
})
}

dtypes <- dtypes(x)
ncol <- length(dtypes)
if (ncol <= 0) {
# empty data.frame with 0 columns and 0 rows
data.frame()
} else if (useArrow) {
requireNamespace1 <- requireNamespace
if (requireNamespace1("arrow", quietly = TRUE)) {
read_arrow <- get("read_arrow", envir = asNamespace("arrow"), inherits = FALSE)
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))

portAuth <- callJMethod(x@sdf, "collectAsArrowToR")
port <- portAuth[[1]]
authSecret <- portAuth[[2]]
conn <- socketConnection(
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
output <- tryCatch({
doServerAuth(conn, authSecret)
arrowTable <- read_arrow(readRaw(conn))
as.data.frame(as_tibble(arrowTable), stringsAsFactors = stringsAsFactors)
}, finally = {
close(conn)
})
return(output)
} else {
stop("'arrow' package should be installed.")
}
} else {
# listCols is a list of columns
listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
Expand Down
161 changes: 136 additions & 25 deletions R/pkg/R/SQLContext.R
Expand Up @@ -147,6 +147,70 @@ getDefaultSqlSource <- function() {
l[["spark.sql.sources.default"]]
}

writeToFileInArrow <- function(fileName, rdf, numPartitions) {
requireNamespace1 <- requireNamespace

# R API in Arrow is not yet released in CRAN. CRAN requires to add the
# package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available
# or not. Therefore, it works around by avoiding direct requireNamespace.
# Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204.
if (requireNamespace1("arrow", quietly = TRUE)) {
record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE)
RecordBatchStreamWriter <- get(
"RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE)
FileOutputStream <- get(
"FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE)

numPartitions <- if (!is.null(numPartitions)) {
numToInt(numPartitions)
} else {
1
}

rdf_slices <- if (numPartitions > 1) {
split(rdf, makeSplits(numPartitions, nrow(rdf)))
} else {
list(rdf)
}

stream_writer <- NULL
tryCatch({
for (rdf_slice in rdf_slices) {
batch <- record_batch(rdf_slice)
if (is.null(stream_writer)) {
stream <- FileOutputStream(fileName)
schema <- batch$schema
stream_writer <- RecordBatchStreamWriter(stream, schema)
}

stream_writer$write_batch(batch)
}
},
finally = {
if (!is.null(stream_writer)) {
stream_writer$close()
}
})

} else {
stop("'arrow' package should be installed.")
}
}

checkTypeRequirementForArrow <- function(dataHead, schema) {
# Currenty Arrow optimization does not support raw for now.
# Also, it does not support explicit float type set by users. It leads to
# incorrect conversion. We will fall back to the path without Arrow optimization.
if (any(sapply(dataHead, is.raw))) {
stop("Arrow optimization with R DataFrame does not support raw type yet.")
}
if (inherits(schema, "structType")) {
if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
stop("Arrow optimization with R DataFrame does not support FloatType type yet.")
}
}
}

#' Create a SparkDataFrame
#'
#' Converts R data.frame or list into SparkDataFrame.
Expand All @@ -172,36 +236,76 @@ getDefaultSqlSource <- function() {
createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
numPartitions = NULL) {
sparkSession <- getSparkSession()
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
useArrow <- FALSE
firstRow <- NULL

if (is.data.frame(data)) {
# Convert data into a list of rows. Each row is a list.

# get the names of columns, they will be put into RDD
if (is.null(schema)) {
schema <- names(data)
}
# get the names of columns, they will be put into RDD
if (is.null(schema)) {
schema <- names(data)
}

# get rid of factor type
cleanCols <- function(x) {
if (is.factor(x)) {
as.character(x)
} else {
x
}
# get rid of factor type
cleanCols <- function(x) {
if (is.factor(x)) {
as.character(x)
} else {
x
}
}
data[] <- lapply(data, cleanCols)

args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
if (arrowEnabled) {
useArrow <- tryCatch({
stopifnot(length(data) > 0)
dataHead <- head(data, 1)
checkTypeRequirementForArrow(data, schema)
fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
tryCatch({
writeToFileInArrow(fileName, data, numPartitions)
jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"readArrowStreamFromFile",
sparkSession,
fileName)
},
finally = {
# File might not be created.
suppressWarnings(file.remove(fileName))
})

firstRow <- do.call(mapply, append(args, dataHead))[[1]]
TRUE
},
error = function(e) {
warning(paste0("createDataFrame attempted Arrow optimization because ",
"'spark.sql.execution.arrow.enabled' is set to true; however, ",
"failed, attempting non-optimization. Reason: ",
e))
FALSE
})
}

if (!useArrow) {
# Convert data into a list of rows. Each row is a list.
# drop factors and wrap lists
data <- setNames(lapply(data, cleanCols), NULL)
data <- setNames(as.list(data), NULL)

# check if all columns have supported type
lapply(data, getInternalType)

# convert to rows
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
data <- do.call(mapply, append(args, data))
if (length(data) > 0) {
firstRow <- data[[1]]
}
}
}

if (is.list(data)) {
if (useArrow) {
rdd <- jrddInArrow
} else if (is.list(data)) {
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
if (!is.null(numPartitions)) {
rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
Expand All @@ -215,14 +319,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
}

if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
row <- firstRDD(rdd)
if (is.null(firstRow)) {
firstRow <- firstRDD(rdd)
}
names <- if (is.null(schema)) {
names(row)
names(firstRow)
} else {
as.list(schema)
}
if (is.null(names)) {
names <- lapply(1:length(row), function(x) {
names <- lapply(1:length(firstRow), function(x) {
paste("_", as.character(x), sep = "")
})
}
Expand All @@ -237,19 +343,24 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
nn
})

types <- lapply(row, infer_type)
fields <- lapply(1:length(row), function(i) {
types <- lapply(firstRow, infer_type)
fields <- lapply(1:length(firstRow), function(i) {
structField(names[[i]], types[[i]], TRUE)
})
schema <- do.call(structType, fields)
}

stopifnot(class(schema) == "structType")

jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
srdd <- callJMethod(jrdd, "rdd")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
srdd, schema$jobj, sparkSession)
if (useArrow) {
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"toDataFrame", rdd, schema$jobj, sparkSession)
} else {
jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
srdd <- callJMethod(jrdd, "rdd")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
srdd, schema$jobj, sparkSession)
}
dataFrame(sdf)
}

Expand Down
40 changes: 21 additions & 19 deletions R/pkg/R/context.R
Expand Up @@ -81,6 +81,26 @@ objectFile <- function(sc, path, minPartitions = NULL) {
RDD(jrdd, "byte")
}

makeSplits <- function(numSerializedSlices, length) {
# Generate the slice ids to put each row
# For instance, for numSerializedSlices of 22, length of 50
# [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
# [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
# Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
# We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
if (numSerializedSlices > 0) {
unlist(lapply(0: (numSerializedSlices - 1), function(x) {
# nolint start
start <- trunc((as.numeric(x) * length) / numSerializedSlices)
end <- trunc(((as.numeric(x) + 1) * length) / numSerializedSlices)
# nolint end
rep(start, end - start)
}))
} else {
1
}
}

#' Create an RDD from a homogeneous list or vector.
#'
#' This function creates an RDD from a local homogeneous list in R. The elements
Expand Down Expand Up @@ -143,25 +163,7 @@ parallelize <- function(sc, coll, numSlices = 1) {
# For large objects we make sure the size of each slice is also smaller than sizeLimit
numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit)))

# Generate the slice ids to put each row
# For instance, for numSerializedSlices of 22, length of 50
# [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
# [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
# Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
# We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
splits <- if (numSerializedSlices > 0) {
unlist(lapply(0: (numSerializedSlices - 1), function(x) {
# nolint start
start <- trunc((as.numeric(x) * len) / numSerializedSlices)
end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
# nolint end
rep(start, end - start)
}))
} else {
1
}

slices <- split(coll, splits)
slices <- split(coll, makeSplits(numSerializedSlices, len))

# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
# 2-tuples of raws
Expand Down

0 comments on commit 43e61ef

Please sign in to comment.