New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame #22954

Closed
wants to merge 19 commits into
base: master
from

Conversation

Projects
None yet
6 participants
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 6, 2018

What changes were proposed in this pull request?

This PR targets to support Arrow optimization for conversion from R DataFrame to Spark DataFrame.
Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization.

This can be tested as below:

$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
collect(createDataFrame(mtcars))

Requirements

  • R 3.5.x
  • Arrow package 0.12+
    Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.0", subdir = "r")'

Note: currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
Note: currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

Benchmarks

Shall

sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true

R code

createDataFrame(mtcars) # Initializes
rdf <- read.csv("500000.csv")

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  createDataFrame(rdf)
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()

Data (350 MB):

object.size(read.csv("500000.csv"))
350379504 bytes

"500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

Results

Time difference of 29.9468 secs
Time difference of 3.222129 secs

The performance improvement was around 950%.
Actually, this PR improves around 1200%+ because this PR includes a small optimization about regular R DataFrame -> Spark DatFrame. See #22954 (comment)

Limitations:

For now, Arrow optimization with R does not support when the data is raw, and when user explicitly gives float type in the schema. They produce corrupt values.
In this case, we decide to fall back to non-optimization code path.

How was this patch tested?

Small test was added.

I manually forced to set this optimization true for all R tests and they were all passed (with few of fallback warnings).

TODOs:

  • Draft codes
  • make the tests passed
  • make the CRAN check pass
  • Performance measurement
  • Supportability investigation (for instance types)
  • Wait for Arrow 0.12.0 release
  • Fix and match it to Arrow 0.12.0
@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 6, 2018

Let me leave a cc @felixcheung, @BryanCutler, @yanboliang, @shivaram FYI.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 6, 2018

Test build #98508 has finished for PR 22954 at commit 90011a5.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 6, 2018

Test build #98510 has finished for PR 22954 at commit 46eaeca.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 6, 2018

Test build #98512 has finished for PR 22954 at commit 614170e.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 6, 2018

Test build #98514 has finished for PR 22954 at commit b15d79c.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 7, 2018

So far, the regressions tests are passed and newly added test for R optimization is verified locally. Let me fix CRAN test and some nits.

Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
@felixcheung
Copy link
Member

felixcheung left a comment

so this is super cool - my biggest concern is we changed to not write out to file for not respecting the encryption requirement, and this adds back the temp file

Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 7, 2018

Thanks, @felixcheung. I will address those comments during cleaning up.

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 7, 2018

For encryption stuff, I will try to handle that as well (maybe as a followup(?)) so that we support it even when that's enabled.

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 8, 2018

@felixcheung! performance improvement was 955% ! I described the benchmark I took in PR description.

@@ -215,14 +278,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
}

if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
row <- firstRDD(rdd)
if (is.null(firstRow)) {
firstRow <- firstRDD(rdd)

This comment has been minimized.

@HyukjinKwon

HyukjinKwon Nov 8, 2018

Author Member

Note that this PR optimizes the original code path as well here - when the input is local R DataFrame, here we avoid firstRDD operation.

In the master branch, the benchmark shows:

Exception in thread "dispatcher-event-loop-6" java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)

If I try this with 100000.csv (79MB) record, it takes longer. To cut it short:

Current master:

Time difference of 8.502607 secs

With this PR, but without Arrow

Time difference of 5.143395 secs

With this PR, but with Arrow

Time difference of 0.6981369 secs

So, technically this PR improves more then 1200%

This comment has been minimized.

@felixcheung

felixcheung Nov 9, 2018

Member

I <3 4 digits!

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 8, 2018

adding @falaki and @mengxr as well.

@HyukjinKwon HyukjinKwon changed the title [DO-NOT-MERGE][POC] Enables Arrow optimization from R DataFrame to Spark DataFrame [DO-NOT-MERGE] Enables Arrow optimization from R DataFrame to Spark DataFrame Nov 8, 2018

@HyukjinKwon HyukjinKwon changed the title [DO-NOT-MERGE] Enables Arrow optimization from R DataFrame to Spark DataFrame [WIP] Enables Arrow optimization from R DataFrame to Spark DataFrame Nov 8, 2018

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2018

Test build #98595 has finished for PR 22954 at commit 8813192.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2018

Test build #98603 has finished for PR 22954 at commit 7be15d3.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2018

Test build #98613 has started for PR 22954 at commit 2ddbd69.

Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated

@HyukjinKwon HyukjinKwon changed the title [WIP] Enables Arrow optimization from R DataFrame to Spark DataFrame [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame Nov 8, 2018

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 8, 2018

I have finished most of todos except waiting for R API of Arrow 0.12.0 and fixing some changes accordingly.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2018

Test build #98615 has finished for PR 22954 at commit 2ba6add.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2018

Test build #98614 has finished for PR 22954 at commit 0903736.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Nov 8, 2018

retest this please

Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 9, 2018

Test build #98628 has finished for PR 22954 at commit 2ba6add.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated
Show resolved Hide resolved R/pkg/R/SQLContext.R Outdated

@HyukjinKwon HyukjinKwon force-pushed the HyukjinKwon:r-arrow-createdataframe branch to 767af86 Jan 22, 2019

@HyukjinKwon HyukjinKwon changed the title [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame [WIP][SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame Jan 22, 2019

@felixcheung

This comment has been minimized.

Copy link
Member

felixcheung commented Jan 22, 2019

I was thinking a blog post in the Arrow project ;)

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Jan 22, 2019

Gotya, yea, I am interested in it of course. I'll start to work on that after this PR merged.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 22, 2019

Test build #101513 has finished for PR 22954 at commit 767af86.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@BryanCutler

This comment has been minimized.

Copy link
Contributor

BryanCutler commented Jan 22, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 22, 2019

Test build #101551 has finished for PR 22954 at commit 767af86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
if (requireNamespace1("arrow", quietly = TRUE)) {
record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE)
record_batch_stream_writer <- get(
"record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE)

This comment has been minimized.

@HyukjinKwon

HyukjinKwon Jan 24, 2019

Author Member

FWIW, sparklyr and arrow implementation uses the same trick to avoid CRAN failure.

HyukjinKwon added some commits Jan 24, 2019

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame Jan 24, 2019

# package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available
# or not. Therefore, it works around by avoiding direct requireNamespace.
# Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204.
if (requireNamespace1("arrow", quietly = TRUE)) {

This comment has been minimized.

@HyukjinKwon

HyukjinKwon Jan 24, 2019

Author Member

Assuming from ARROW-3204, Arrow is still not in the CRAN and looks it's going to take few months, and looks we can run the build via AppVeyor when it's on CRAN.

This comment has been minimized.

@felixcheung

felixcheung Jan 25, 2019

Member

yes, published to CRAN not happening yet for Arrow. install_github should be fine? it's a one time thing

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Jan 24, 2019

To cut it short, I think this PR is ready to go. I reran the benchmark, and updated PR descriptions.

Few things to mention:

  1. Arrow is not related on CRAN and looks it's going to take few months (see ARROW-3204). So, for now, it should be manually installed.

    • It can be installed by Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.0", subdir = "r")'.
    • I used maxOS Mojave 10.14.2 and faced some problems to fix at my env. Please connect me if you guys face some issue during installing this. If this is globally happening, I will document this somewhere.
  2. Looks we can run the build via AppVeyor when it's on CRAN (see ARROW-3204).

  3. We should remove the workarounds that I used to avoid CRAN check (see #22954 (comment) and #22954 (comment))

Next items (im going to investigate first before filing JIRAs):

  1. Im gonna take a look if we can do this Spark DataFrame -> R DataFrame too
  2. Also, I'm going to take a look for R native function APIs like lapply and gapply and see if we can optimize this
  3. Before Spark 3.0 release, I will document this. Hopefully, we can get rid of both workaround I mentioned above and Arrow is on CRAN before this.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 24, 2019

Test build #101633 has finished for PR 22954 at commit 66b120b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 24, 2019

Test build #101630 has finished for PR 22954 at commit 92eec4e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 24, 2019

Test build #101632 has finished for PR 22954 at commit 854c9d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Jan 25, 2019

@felixcheung and @shivaram, are you okay with this plan #22954 (comment) ? If so, I think we can go ahead.

@felixcheung

This comment has been minimized.

Copy link
Member

felixcheung commented Jan 25, 2019

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Jan 26, 2019

Yea will do. Do you mind if we go ahead with this PR @felixcheung?

@felixcheung

This comment has been minimized.

Copy link
Member

felixcheung commented Jan 26, 2019

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Jan 27, 2019

Thanks. @felixcheung.

Merged to master.

@asfgit asfgit closed this in e8982ca Jan 27, 2019

@HyukjinKwon

This comment has been minimized.

Copy link
Member Author

HyukjinKwon commented Feb 13, 2019

BTW, https://issues.apache.org/jira/browse/SPARK-26759 has subtasks for Arrow optimization (just FYI if anyone missed it)

stczwd added a commit to stczwd/spark that referenced this pull request Feb 18, 2019

[SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark…
… DataFrame

## What changes were proposed in this pull request?

This PR targets to support Arrow optimization for conversion from R DataFrame to Spark DataFrame.
Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization.

This can be tested as below:

```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

```r
collect(createDataFrame(mtcars))
```

### Requirements
  - R 3.5.x
  - Arrow package 0.12+
    ```bash
    Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
    ```

**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

### Benchmarks

**Shall**

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false
```

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

**R code**

```r
createDataFrame(mtcars) # Initializes
rdf <- read.csv("500000.csv")

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  createDataFrame(rdf)
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()
```

**Data (350 MB):**

```r
object.size(read.csv("500000.csv"))
350379504 bytes
```

"500000 Records"  http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

**Results**

```
Time difference of 29.9468 secs
```

```
Time difference of 3.222129 secs
```

The performance improvement was around **950%**.
Actually, this PR improves around **1200%**+ because this PR includes a small optimization about regular R DataFrame -> Spark DatFrame. See apache#22954 (comment)

### Limitations:

For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.
In this case, we decide to fall back to non-optimization code path.

## How was this patch tested?

Small test was added.

I manually forced to set this optimization `true` for _all_ R tests and they were _all_ passed (with few of fallback warnings).

**TODOs:**
- [x] Draft codes
- [x] make the tests passed
- [x] make the CRAN check pass
- [x] Performance measurement
- [x] Supportability investigation (for instance types)
- [x] Wait for Arrow 0.12.0 release
- [x] Fix and match it to Arrow 0.12.0

Closes apache#22954 from HyukjinKwon/r-arrow-createdataframe.

Lead-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment