Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12717][PYTHON] Adding thread-safe broadcast pickle registry #18695

Conversation

BryanCutler
Copy link
Member

@BryanCutler BryanCutler commented Jul 20, 2017

What changes were proposed in this pull request?

When using PySpark broadcast variables in a multi-threaded environment, SparkContext._pickled_broadcast_vars becomes a shared resource. A race condition can occur when broadcast variables that are pickled from one thread get added to the shared _pickled_broadcast_vars and become part of the python command from another thread. This PR introduces a thread-safe pickled registry using thread local storage so that when python command is pickled (causing the broadcast variable to be pickled and added to the registry) each thread will have their own view of the pickle registry to retrieve and clear the broadcast variables used.

How was this patch tested?

Added a unit test that causes this race condition using another thread.

@SparkQA
Copy link

SparkQA commented Jul 20, 2017

Test build #79809 has finished for PR 18695 at commit b703f83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class BroadcastPickleRegistry(object):

@BryanCutler
Copy link
Member Author

ping @holdenk @davies , does this fix look ok to you? Thanks!

self._registry = set()
self._lock = lock

@property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind if i ask why this one should be a property?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure @HyukjinKwon, it's not really necessary. It's just there to basically say that the lock should not be changed once this class is instantiated, so to keep it "private" but allowed to be acquired. Maybe it's overkill here because this is not a widely used class with a very specific use. I could remove it if that makes things easier.

@@ -195,7 +195,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
# This allows other code to determine which Broadcast instances have
# been pickled, so it can determine which Java broadcast objects to
# send.
self._pickled_broadcast_vars = set()
self._pickled_broadcast_registry = BroadcastPickleRegistry(self._lock)
Copy link
Member

@viirya viirya Jul 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using lock, how about use thread local data? So we don't block other threads when pickling.

@BryanCutler
Copy link
Member Author

Thanks @viirya , that was a good idea! I updated to use a thread-local object to store the pickled vars

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79974 has finished for PR 18695 at commit 54e8357.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 28, 2017

The change LGTM. Will it be hard to add a reliable test for this?

@BryanCutler
Copy link
Member Author

Yeah, I think I can add a simple test for this. I'll give it try.

@SparkQA
Copy link

SparkQA commented Jul 31, 2017

Test build #80098 has finished for PR 18695 at commit 0f444f6.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2017

Test build #80100 has finished for PR 18695 at commit 6710f13.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

@viirya @HyukjinKwon , I added a test for this although maybe doesn't look as straightforward as I was thinking :) Could you take a look and see if it makes sense? Thanks!

def process_vars(sc):
broadcast_vars = [x for x in sc._pickled_broadcast_vars]
num_pickled = len(broadcast_vars)
sc._pickled_broadcast_vars.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check if picked vars are actually cleared?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that would be good

@viirya
Copy link
Member

viirya commented Aug 1, 2017

LGTM except for one minor comment.

@@ -139,6 +140,24 @@ def __reduce__(self):
return _from_id, (self._jbroadcast.id(),)


class BroadcastPickleRegistry(threading.local):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. actually, I prefer the locking way before.. I guess It wouldn't be big performance differences due to GIL and simple lock was easy to read ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I am okay with the current way too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok for both ways. :)

My only concern is in previous locking way is we lock it for dumping the command. I'm not sure if the dumping can take long time for big command so we prevent other threads to preparing their commands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this anyway solves the issue and looks apparently safe from your concern. Probably, will make a follow up after testing it (quite) later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the lock was a little more obvious what is going on, but it's better to not use a lock in case of pickling a large command like @viirya said. Also, this way doesn't need to change any of the pickling code, so I prefer it too.

@SparkQA
Copy link

SparkQA commented Aug 1, 2017

Test build #80104 has finished for PR 18695 at commit d4d1fed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Last question to check if I read correctly. So, the problem is around _prepare_for_python_RDD in rdd.py basically because it adds the pickled ones into the registry when it's pickled but this should be thread-safe?
Let's fix up the description to describe a thread local. LGTM too

@HyukjinKwon
Copy link
Member

Will merge this one after few days if there are no more comments from @holdenk and @davies.

@BryanCutler
Copy link
Member Author

Thanks @HyukjinKwon and @viirya! I updated the description. To sum up this issue, _prepare_for_python_RDD will pickle a command, which pickles any broadcast variables part of that command. When a broadcast var is pickled, the __reduce__ function adds itself to a common registry. After pickling, _prepare_for_python_RDD will get all broadcast vars that were just pickled and use them for that command. If multiple threads are both writing to the same pickled registry, then broadcast variables for different commands can be pickled together and added to the registry. The first thread to make the call to get them from the registry will get them all. Making the pickled registry thread local will give each thread it's own view to write and read from, so it's no longer a shared resource.

@HyukjinKwon
Copy link
Member

Thanks for clarification. LGTM

@asfgit asfgit closed this in 77cc0d6 Aug 1, 2017
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 1, 2017

Thanks. Merged to master.

@HyukjinKwon
Copy link
Member

@holdenk, BTW, it looks I am facing the same issue you met before. Sounds I can't trigger the Jenkins build by "ok to test". Do you maybe know who I should ask and or some steps that I should take?

@felixcheung
Copy link
Member

@HyukjinKwon that needs to be added separately by someone who has access to Jenkins as admin

@HyukjinKwon
Copy link
Member

Hmm.. I see. Thanks.

jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
When using PySpark broadcast variables in a multi-threaded environment,  `SparkContext._pickled_broadcast_vars` becomes a shared resource.  A race condition can occur when broadcast variables that are pickled from one thread get added to the shared ` _pickled_broadcast_vars` and become part of the python command from another thread.  This PR introduces a thread-safe pickled registry using thread local storage so that when python command is pickled (causing the broadcast variable to be pickled and added to the registry) each thread will have their own view of the pickle registry to retrieve and clear the broadcast variables used.

Added a unit test that causes this race condition using another thread.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes apache#18695 from BryanCutler/pyspark-bcast-threadsafe-SPARK-12717.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants