Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17790][SPARKR] Support for parallelizing R data.frame larger than 2GB #15375

Closed
wants to merge 14 commits into from

Conversation

falaki
Copy link
Contributor

@falaki falaki commented Oct 6, 2016

What changes were proposed in this pull request?

If the R data structure that is being parallelized is larger than INT_MAX we use files to transfer data to JVM. The serialization protocol mimics Python pickling. This allows us to simply call PythonRDD.readRDDFromFile to create the RDD.

I tested this on my MacBook. Following code works with this patch:

intMax <- .Machine$integer.max
largeVec <- 1:intMax
rdd <- SparkR:::parallelize(sc, largeVec, 2)

How was this patch tested?

  • Unit tests

@SparkQA
Copy link

SparkQA commented Oct 6, 2016

Test build #66434 has finished for PR 15375 at commit f4b1638.

  • This patch fails R style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

I think we need to delete the temp file?

@falaki
Copy link
Contributor Author

falaki commented Oct 6, 2016

@felixcheung added clean up for the temp file and unit test. PTAL.

@SparkQA
Copy link

SparkQA commented Oct 6, 2016

Test build #66464 has finished for PR 15375 at commit 8691569.

  • This patch fails R style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -123,19 +126,46 @@ parallelize <- function(sc, coll, numSlices = 1) {
if (numSlices > length(coll))
numSlices <- length(coll)

sizeLimit <- as.numeric(
sparkR.conf("spark.r.maxAllocationLimit", toString(.Machine$integer.max - 10240)))
objectSize <- object.size(coll)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the guess of size could easily be wrong, and writing them into disk is not that bad anyway, should we have a much smaller default value (for example, 100M)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66467 has finished for PR 15375 at commit 8e065c1.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66472 has finished for PR 15375 at commit 4aab6cf.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -126,13 +126,13 @@ parallelize <- function(sc, coll, numSlices = 1) {
if (numSlices > length(coll))
numSlices <- length(coll)

sizeLimit <- .Machine$integer.max - 10240 # Safe margin bellow maximum allocation limit
sizeLimit <- as.numeric(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be as.integer(?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This number is not serialized anywhere. I think as.numeric is fine.

Copy link
Member

@felixcheung felixcheung Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, probably not a big deal, an user could set spark.r.maxAllocationLimit to 0.01 though, to make numSlices bigger

@felixcheung
Copy link
Member

Odd, this is the error from appveyor:

ontext: Fail to set Spark caller context
java.lang.ClassNotFoundException: org.apache.hadoop.ipc.CallerContext
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.spark.util.CallerContext.setCurrentContext(Utils.scala:2485)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/10/07 03:45:58 INFO Executor: Finished task 1.

fileName <- writeToTempFile(serializedSlices)
jrdd <- callJStatic(
"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices))
file.remove(fileName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the JVM call throws an exception, I don't think this line will execute, perhaps wrap this in tryCatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done!

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66517 has finished for PR 15375 at commit 6ea7fb3.

  • This patch fails R style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class InterfaceStability
    • sealed trait ConsumerStrategy
    • case class SubscribeStrategy(topics: Seq[String], kafkaParams: ju.Map[String, Object])
    • case class SubscribePatternStrategy(
    • case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset
    • abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name)

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66527 has finished for PR 15375 at commit ef989c1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 8, 2016

Test build #66528 has finished for PR 15375 at commit 9df49ea.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@falaki falaki changed the title [SPARK-17790] Support for parallelizing R data.frame larger than 2GB [SPARK-17790][SPARKR] Support for parallelizing R data.frame larger than 2GB Oct 8, 2016
@SparkQA
Copy link

SparkQA commented Oct 8, 2016

Test build #66565 has finished for PR 15375 at commit 766d903.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66699 has finished for PR 15375 at commit 62ab47b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -87,6 +87,9 @@ objectFile <- function(sc, path, minPartitions = NULL) {
#' in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster.
#'
#' If size of serialized slices is larger than 2GB (or INT_MAX bytes), the function
Copy link
Member

@felixcheung felixcheung Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be 200MB now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #3324 has finished for PR 15375 at commit 62ab47b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66750 has finished for PR 15375 at commit 836e874.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@falaki
Copy link
Contributor Author

falaki commented Oct 11, 2016

@felixcheung does it look OK now?

@felixcheung
Copy link
Member

felixcheung commented Oct 11, 2016

This LGTM - I think we merge this to master and branch-2.0
But Spark unit tests are failing?

@falaki
Copy link
Contributor Author

falaki commented Oct 11, 2016

Seems like a flaky test in DirectKafkaStreamSuite:

DirectKafkaStreamSuite:
- pattern based subscription *** FAILED *** (1 minute, 41 seconds)

If jenkins listens to your commands, maybe we can have it retest this?

@shivaram
Copy link
Contributor

Jenkins, retest this please

@tdas
Copy link
Contributor

tdas commented Oct 12, 2016

@falaki @felixcheung The DirectKafkaStreamSuite is a known flaky test. Nothing in this patch should affect Kafka.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66792 has finished for PR 15375 at commit 836e874.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

everything passed, I'm merging to this master and branch-2.0

asfgit pushed a commit that referenced this pull request Oct 12, 2016
…han 2GB

## What changes were proposed in this pull request?
If the R data structure that is being parallelized is larger than `INT_MAX` we use files to transfer data to JVM. The serialization protocol mimics Python pickling. This allows us to simply call `PythonRDD.readRDDFromFile` to create the RDD.

I tested this on my MacBook. Following code works with this patch:
```R
intMax <- .Machine$integer.max
largeVec <- 1:intMax
rdd <- SparkR:::parallelize(sc, largeVec, 2)
```

## How was this patch tested?
* [x] Unit tests

Author: Hossein <hossein@databricks.com>

Closes #15375 from falaki/SPARK-17790.

(cherry picked from commit 5cc503f)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
@asfgit asfgit closed this in 5cc503f Oct 12, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…han 2GB

## What changes were proposed in this pull request?
If the R data structure that is being parallelized is larger than `INT_MAX` we use files to transfer data to JVM. The serialization protocol mimics Python pickling. This allows us to simply call `PythonRDD.readRDDFromFile` to create the RDD.

I tested this on my MacBook. Following code works with this patch:
```R
intMax <- .Machine$integer.max
largeVec <- 1:intMax
rdd <- SparkR:::parallelize(sc, largeVec, 2)
```

## How was this patch tested?
* [x] Unit tests

Author: Hossein <hossein@databricks.com>

Closes apache#15375 from falaki/SPARK-17790.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants