Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1687] [PySpark] pickable namedtuple #1623

Closed
wants to merge 9 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Jul 28, 2014

Add an hook to replace original namedtuple with an pickable one, then namedtuple could be used in RDDs.

PS: pyspark should be import BEFORE "from collections import namedtuple"

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17315/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1623:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17315/consoleFull

@JoshRosen
Copy link
Contributor

Is there a way to do this that doesn't require PySpark to be imported before the namedtuples are created? Can you directly replace the __reduce__ method on the namedtuple class? Alternatively, maybe you can register a new __reduce__ method using copy_reg. Cloudpickle doesn't use copy_reg.pickle() directly, so the actual fix might be lower-level.

Do not need import pyspark before using namedtuple
@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17375/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1623:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(PickleSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17375/consoleFull

hack_namedtuple(o)

def dump_stream(self, iterator, stream):
self._hack_namedtuple()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to suggest that maybe we should have a boolean flag that tests whether we've already hacked namedtuple, but maybe we don't need it: _hack_namedtuple() is idempotent and that might be premature optimization, since here we only pay the hack cost once per stream.

@JoshRosen
Copy link
Contributor

I came up with a contrived example that doesn't work. Try running the following with ./bin/pyspark:

from collections import namedtuple
Person = namedtuple("Person", 'id firstName lastName')
jon = Person(1, "Jon", "Doe")
from pyspark import SparkContext
sc = SparkContext("local")
sc.textFile("/usr/share/dict/words").map(lambda x: jon).first()

This results in a pickling error.

The problem here is that _hack_namedtuple() is registered too late. What if you made it into a classmethod in PickleSerializer and called it from SparkContext.__init__()?

@davies
Copy link
Contributor Author

davies commented Aug 2, 2014

It works in my Mac, have you apply the patch? It should be registerd before dumps.

@JoshRosen
Copy link
Contributor

Did you run that exact file with PySpark? The important bits are that namedtuple is imported and an instance is created before any PySpark imports, and we launch a job that tries to serialize a namedtuple in its function closure, and this serialization takes place before the hack is registered (hence my use of text file instead of parallelize).

On Aug 1, 2014, at 8:45 PM, Davies Liu notifications@github.com wrote:

It works in my Mac, have you apply the patch? It should be registerd before dumps.


Reply to this email directly or view it on GitHub.

@davies
Copy link
Contributor Author

davies commented Aug 2, 2014

I see, CloudPickle also need this hack.

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17746/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17747/consoleFull

@JoshRosen
Copy link
Contributor

Your latest commit improves things, but I still think the static method approach would be better, since that way we wouldn't wind up calling _hack_namedtuple() so often. Is there a reason why that doesn't work?

@davies
Copy link
Contributor Author

davies commented Aug 2, 2014

User may call namedtuple to create class at any time, so this hack should delay to call pickle, so we have to check many times.

@JoshRosen
Copy link
Contributor

Calling _hack_namedtuple() should set up pickling for any namedtuple subclasses defined up to that point. It looks like we re-assign to collections.namedtuple, but by then it's already too late since the user might have a reference to the original, non-wrapped namedtuple class. Is there any easy way to patch the namedtuple object itself so that it injects the hack? I think that would solve the problem, since any new namedtuple classes would automatically receive the hack and any old ones would be handled by our search through __main__.

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1623:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(PickleSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17746/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1623:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(PickleSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17747/consoleFull

@davies
Copy link
Contributor Author

davies commented Aug 2, 2014

@JoshRosen Good point, I had managed to replace all the reference of namedtuple to new one, so this hijack only need once.

Because it's only related to pickle serialization, so put it called at module level.

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17753/consoleFull

@JoshRosen
Copy link
Contributor

Just to cover all possible cases, are there any thread-safety issues here? Will be be in trouble if a user creates a new namedtuple instance while _hack_namedtuple() is running? That seems like an extremely unlikely scenario, though.

@davies
Copy link
Contributor Author

davies commented Aug 2, 2014

Because of GIL, in most cases, Python threads will not run concurrently. And this patch will replace first, then patch the classes, the process can be interrupted without problems.

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1623:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(PickleSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17753/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17759/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1623:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(PickleSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17759/consoleFull

@JoshRosen
Copy link
Contributor

This looks okay, but I still wonder whether there's a simpler approach. Have you looked at how dill handles namedtuples?

@davies
Copy link
Contributor Author

davies commented Aug 4, 2014

It's easy to extend pickle to support namedtuple, couldpickle and dill have done in this way, but they are slow. We want to use cPickle for dataset, it should be fast by default. I had not find an way to extend cPickle, do you have any ideas?

@JoshRosen
Copy link
Contributor

Here's another (contrived) example that breaks:

from collections import namedtuple as nt
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer

sc = SparkContext("local")
p = PickleSerializer()

Person = nt("Person", 'id firstName lastName')
jon = Person(1, "Jon", "Doe")
sc.textFile("/usr/share/dict/words").map(lambda x: jon).first()

It looks like the problem here is that line 306 assumes that old references will be named namedtuple, which isn't true if I import it under a different name.

@davies
Copy link
Contributor Author

davies commented Aug 4, 2014

Yes, it's easy to break it.

Having an solution working in 99% cases is better than no solutions, or much slower solution working 100% cases.

@davies
Copy link
Contributor Author

davies commented Aug 4, 2014

This feature is not blocker, because we prefer use Row() instead of namedtuple to do inferSchema().

If user really want to use namedtuple or customized class in main, they could use cloudpickle.

@JoshRosen
Copy link
Contributor

I found another technique that may be more robust to namedtuple being accessible under different names. We can replace namedtuple's code object at runtime in order to interpose on calls to it:

import types
def copy_func(f, name=None):  # See http://stackoverflow.com/a/6528148/590203
    return types.FunctionType(f.func_code, f.func_globals, name or f.func_name,
            f.func_defaults, f.func_closure)

from collections import namedtuple
namedtuple._old_namedtuple = copy_func(namedtuple)
def wrapped(*args, **kwargs):
    print "Called the wrapped function!"
    return namedtuple._old_namedtuple(*args, **kwargs)
namedtuple.func_code = wrapped.func_code

print namedtuple("Person", "name age")

This prints

Called the wrapped function!
<class 'collections.Person'>

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17862/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA tests have started for PR 1623. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17867/consoleFull

@JoshRosen
Copy link
Contributor

I've merged this into master and branch-1.1. Thanks!

(I tested this locally)

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA results for PR 1623:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class AutoSerializer(PickleSerializer):

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17862/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA results for PR 1623:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17867/consoleFull

asfgit pushed a commit that referenced this pull request Aug 4, 2014
Add an hook to replace original namedtuple with an pickable one, then namedtuple could be used in RDDs.

PS: pyspark should be import BEFORE "from collections import namedtuple"

Author: Davies Liu <davies.liu@gmail.com>

Closes #1623 from davies/namedtuple and squashes the following commits:

045dad8 [Davies Liu] remove unrelated code changes
4132f32 [Davies Liu] address comment
55b1c1a [Davies Liu] fix tests
61f86eb [Davies Liu] replace all the reference of namedtuple to new hacked one
98df6c6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into namedtuple
f7b1bde [Davies Liu] add hack for CloudPickleSerializer
0c5c849 [Davies Liu] Merge branch 'master' of github.com:apache/spark into namedtuple
21991e6 [Davies Liu] hack namedtuple in __main__ module, make it picklable.
93b03b8 [Davies Liu] pickable namedtuple

(cherry picked from commit 59f84a9)
Signed-off-by: Josh Rosen <joshrosen@apache.org>
@asfgit asfgit closed this in 59f84a9 Aug 4, 2014
@JoshRosen
Copy link
Contributor

Whoops, I broke the build by merging this! I should have just waited for Jenkins to finish. Sorry if this inconvenienced anyone; I won't make this mistake again. Davies has a fix in #1771 that I'll get merged.

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Add an hook to replace original namedtuple with an pickable one, then namedtuple could be used in RDDs.

PS: pyspark should be import BEFORE "from collections import namedtuple"

Author: Davies Liu <davies.liu@gmail.com>

Closes apache#1623 from davies/namedtuple and squashes the following commits:

045dad8 [Davies Liu] remove unrelated code changes
4132f32 [Davies Liu] address comment
55b1c1a [Davies Liu] fix tests
61f86eb [Davies Liu] replace all the reference of namedtuple to new hacked one
98df6c6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into namedtuple
f7b1bde [Davies Liu] add hack for CloudPickleSerializer
0c5c849 [Davies Liu] Merge branch 'master' of github.com:apache/spark into namedtuple
21991e6 [Davies Liu] hack namedtuple in __main__ module, make it picklable.
93b03b8 [Davies Liu] pickable namedtuple
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants