-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21985][PySpark] PairDeserializer is broken for double-zipped RDDs #19226
Conversation
Test build #81746 has finished for PR 19226 at commit
|
Test build #81748 has finished for PR 19226 at commit
|
So |
So quick testing locally seems like the exception type may have changed from the explicit valueerror to the Py4J error. |
It's actually this one that is failing https://github.com/aray/spark/blob/0d64a6d11237383c2a6ea21275dc9daa5cc8d634/python/pyspark/tests.py#L964 |
Yup, thats part of |
But that does mean an exception isn't being raised at all so probably not the exception type then. hmm. |
@holdenk I'm not going to be able to solve this tonight (short of just removing the failing test). |
Sure, no worries. I think we should keep the test for now and we can hope this goes into RC2 (I assume something will be missing from RC1 or I'll screw up its packaging in some way). Otherwise the fix can go out into 2.2.1 if somehow RC1 magically passes :) |
python/pyspark/serializers.py
Outdated
@@ -343,9 +346,6 @@ def _load_stream_without_unbatching(self, stream): | |||
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) | |||
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) | |||
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): | |||
if len(key_batch) != len(val_batch): | |||
raise ValueError("Can not deserialize PairRDD with different number of items" | |||
" in batches: (%d, %d)" % (len(key_batch), len(val_batch))) | |||
# for correctness with repeated cartesian/zip this must be returned as one batch | |||
yield zip(key_batch, val_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about returning this batch as a list (and as described in the doc)?
python/pyspark/tests.py
Outdated
@@ -644,6 +644,18 @@ def test_cartesian_chaining(self): | |||
set([(x, (y, y)) for x in range(10) for y in range(10)]) | |||
) | |||
|
|||
def test_zip_chaining(self): | |||
# Tests for SPARK-21985 | |||
rdd = self.sc.parallelize(range(10), 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case already passes without this change, doesn't it?
python/pyspark/serializers.py
Outdated
@@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream): | |||
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) | |||
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) | |||
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): | |||
key_batch = list(key_batch) | |||
val_batch = list(val_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we fix the doc in Serializer._load_stream_without_unbatching
to say, it returns iterator of iterables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 66477f8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I had to be clear. Actually, I meant if Serializer._load_stream_without_unbatching
works as documented an iterator of deserialized batches (lists)
, everything should have worked fine. So, I think the reverse is actually more correct because PairDeserializer
and CartesianDeserializer
do not follow this.
I am okay with the current change too but I believe the reverse is better because I think we could prevent such issues in the future and make the things simpler. WDYT @aray and @holdenk ?
Test build #81760 has finished for PR 19226 at commit
|
Test build #81785 has finished for PR 19226 at commit
|
python/pyspark/tests.py
Outdated
@@ -644,6 +644,18 @@ def test_cartesian_chaining(self): | |||
set([(x, (y, y)) for x in range(10) for y in range(10)]) | |||
) | |||
|
|||
def test_zip_chaining(self): | |||
# Tests for SPARK-21985 | |||
rdd = self.sc.parallelize('abc') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd set the explicit number of partitions because zip
reserializes it depending on this.
(BTW, |
Test build #81810 has finished for PR 19226 at commit
|
Test build #81811 has finished for PR 19226 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, this way should resolve the issue too. I assume you strongly prefer this way and I am okay with it. LGTM. Let me leave it to @holdenk.
python/pyspark/serializers.py
Outdated
@@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream): | |||
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) | |||
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) | |||
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): | |||
key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a small comment that this is required because _load_stream_without_unbatching
could return an iterator of iterators in this case?
Test build #81824 has finished for PR 19226 at commit
|
python/pyspark/serializers.py
Outdated
@@ -343,6 +343,9 @@ def _load_stream_without_unbatching(self, stream): | |||
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) | |||
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) | |||
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): | |||
# the batch is an iterable, we need to check lengths so we convert to list if needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: For double-zipped RDDs, the batches can be iterators from other PairDeserializer, instead of lists. We need to convert them to lists if needed.
LGTM |
Test build #81856 has finished for PR 19226 at commit
|
## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes #19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes #19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
Merged to master, branch-2.2 and branch-2.1. |
## What changes were proposed in this pull request? (edited) Fixes a bug introduced in apache#16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes apache#19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
(edited) Fixes a bug introduced in apache#16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes apache#19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121
In PairDeserializer convert each batch of keys and values to lists (if they do not have
__len__
already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeatedzip
's are done.How was this patch tested?
Additional unit test