New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27041][PySpark] Use imap() for python 2.x to resolve oom issue #23954
Conversation
With large partition, pyspark may exceeds executor memory limit and trigger out of memory for python 2.7. This is because map() is used and python 2.7 map() will need to read all data into memory.
python/pyspark/worker.py
Outdated
@@ -45,6 +45,8 @@ | |||
|
|||
if sys.version >= '3': | |||
basestring = str | |||
else: | |||
from itertools import imap as map # use iterator map by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two spaces before inlined comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I fix it and update the pull request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you need to fix the style to proceed the jenkins test.
ok to test |
Test build #102975 has finished for PR 23954 at commit
|
Looks reasonable. Let me take a closer look soon to be doubly sure. It's quite a core path. |
Test build #102977 has finished for PR 23954 at commit
|
@@ -45,6 +45,8 @@ | |||
|
|||
if sys.version >= '3': | |||
basestring = str | |||
else: | |||
from itertools import imap as map # use iterator map by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It deserves a comment, at least. I think this is a relatively safe change as this is already how Python 3 works.
I can only find one usage in this file, which is the part that applies a UDF to data. That's the source of the issue? just checking. Surprisingly that seems to be the only call in non-test code where it seems to matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the only call is the source the trigger the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this not going to potentially change the behavior for python UDF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, that's a good point. I don't know enough Python to be sure. @holdenk do you know how Python would work in this regard? Is it safer to push this check to the one site below and call one or the other map function? it looks like itertools doesn't have imap in Python 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we worrying about the case that global map
inside the pickled function is overridden by existing global imap
? That's not going to happen per https://github.com/cloudpipe/cloudpickle/pull/240
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a test to verify it in this PR too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, sounds good to have a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we test it here -- make a UDF that checks the value of map.__module__
? if it's itertools
, then fail, as it would mean this import 'leaked' into the UDF right? Otherwise in Python 2/3 it should return builtins
or __builtin__
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can test like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way we pickle UDFs is a little weird, so I wouldn't be too surprised if we did end up doing something silly by accident here, in that case we can also invert the imports (e.g. import map as imap in py3)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is fine. Agreed that this should be safe change as this is what Python3 does if I understand it correctly.
@TigerYang414, are you able to add a test? if you're not, I can add it for you. |
Thanks for this change, really appreciate catching the problem :) |
I'm not very familiar with spark test framework yet. I'll appreciate if you could mentor me on this. |
@TigerYang414 see #23954 (comment) ; maybe a short test that defines a UDF that checks what |
It's okie. I'll add a test on @TigerYang414's branch. |
Hey @TigerYang414, I opened a PR TigerYang414#1 to add a test. please merge that after review into your branch. |
Add a test for PR 23954
Test build #103356 has finished for PR 23954 at commit
|
Merged to master |
What changes were proposed in this pull request?
With large partition, pyspark may exceeds executor memory limit and trigger out of memory for python 2.7.
This is because map() is used. Unlike in python3.x, python 2.7 map() will generate a list and need to read all data into memory.
The proposed fix will use imap in python 2.7 and it has been verified.
How was this patch tested?
Manual test.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.