Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect() #4923

Closed
wants to merge 4 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Mar 6, 2015

Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM.

This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python.

cc @JoshRosen

@SparkQA
Copy link

SparkQA commented Mar 6, 2015

Test build #28324 has started for PR 4923 at commit 9517c8f.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

It looks like we take the result of collectAsIterator and end up passing it back into Java to save it to the temporary file. Do you think that we should just skip this layer of indirection and save it to a temporary file in the JVM then return that file's path back to Python?

@SparkQA
Copy link

SparkQA commented Mar 6, 2015

Test build #28324 has finished for PR 4923 at commit 9517c8f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28324/
Test FAILed.

@davies
Copy link
Contributor Author

davies commented Mar 6, 2015

I had some thought of that, found that by separating these two then we can reuse writeToFile() for runJob(), should we change all of them to use socket to transfer data?

@SparkQA
Copy link

SparkQA commented Mar 6, 2015

Test build #28331 has started for PR 4923 at commit ba54614.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 6, 2015

Test build #28331 has finished for PR 4923 at commit ba54614.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28331/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Mar 6, 2015

Test build #28347 has started for PR 4923 at commit 24c92a4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 6, 2015

Test build #28347 has finished for PR 4923 at commit 24c92a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28347/
Test PASSed.

@JoshRosen
Copy link
Contributor

Now that this has been updated to collect results via a socket, it looks like we may finally be able to close https://issues.apache.org/jira/browse/SPARK-677, one of the oldest Spark issues.

@@ -341,7 +342,7 @@ private[spark] object PythonRDD extends Logging {
/**
* Adapter for calling SparkContext#runJob from Python.
*
* This method will return an iterator of an array that contains all elements in the RDD
* This method will serve an iterator of an array that contains all elements in the RDD
* (effectively a collect()), but allows you to run on a certain subset of partitions,
* or to enable local execution.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the updated description, this method's return type could be confusing to new readers.

It might help to add an explicit description of the return type, e.g.

@return the port number of a local socket which serves the data collected from this job.

We should also document the lifecycle of the server socket created by this method: what happens if a client does not consume the whole iterator? Is it the caller's responsibility to close the socket at that point?

@JoshRosen
Copy link
Contributor

This looks really good to me overall. It would be great if you could update the PR description to reflect the most recent changes (collecting via a socket instead of a temporary file).

A couple of quick questions:

  • Does this have a performance impact (positive or negative)?
  • Do we need to write any additional tests? Are there any corner cases that we should explicitly address in their own tests, such as not consuming the iterator or not connecting to the socket after creating it?
  • Do you think we should backport this as-is to the maintenance branches, or should we consider a different fix there (e.g. using detach())?

@davies davies changed the title [SPARK-6194] [PySpark] fix memory leak in collect() [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect() Mar 9, 2015
@SparkQA
Copy link

SparkQA commented Mar 9, 2015

Test build #28405 has started for PR 4923 at commit d730286.

  • This patch merges cleanly.

@davies
Copy link
Contributor Author

davies commented Mar 9, 2015

@JoshRosen This should have positive performance impact, the end-to-end time of a job (collect 50M integers) changed from 10.6s to 9.2s.

For the additional failure tests, it's nice to have. I'd like leave them out of this PR, do it later if we really need it.

I think we should backport this as-is to maintenance branches.

@JoshRosen
Copy link
Contributor

LGTM; I'll merge this as soon as Jenkins passes. Thanks!

@SparkQA
Copy link

SparkQA commented Mar 9, 2015

Test build #28405 has finished for PR 4923 at commit d730286.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28405/
Test PASSed.

@asfgit asfgit closed this in 8767565 Mar 9, 2015
asfgit pushed a commit that referenced this pull request Mar 10, 2015
Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM.

This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #4923 from davies/fix_collect and squashes the following commits:

d730286 [Davies Liu] address comments
24c92a4 [Davies Liu] fix style
ba54614 [Davies Liu] use socket to transfer data from JVM
9517c8f [Davies Liu] fix memory leak in collect()

(cherry picked from commit 8767565)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	python/pyspark/rdd.py
	python/pyspark/sql/dataframe.py
@JoshRosen
Copy link
Contributor

I've merged this into master (1.4.0) and branch-1.2 (1.2.2). I'll edit the JIRAs to add a reminder to backport this to 1.1.x and 1.3.1.

@JoshRosen
Copy link
Contributor

I have now cherry-picked this into branch-1.3 (1.3.1).

asfgit pushed a commit that referenced this pull request Mar 13, 2015
Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM.

This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #4923 from davies/fix_collect and squashes the following commits:

d730286 [Davies Liu] address comments
24c92a4 [Davies Liu] fix style
ba54614 [Davies Liu] use socket to transfer data from JVM
9517c8f [Davies Liu] fix memory leak in collect()

(cherry picked from commit 8767565)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@JoshRosen
Copy link
Contributor

Note that this patch introduced a bug which can call collect() to hang under certain rare circumstances: https://issues.apache.org/jira/browse/SPARK-6667. This bug should be fixed in #5324.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants