-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16589][PYTHON] Chained cartesian produces incorrect number of records #16121
Conversation
Test build #69571 has finished for PR 16121 at commit
|
Test build #69573 has finished for PR 16121 at commit
|
It's pretty tricky to make the chained CartesianDeserializer work, maybe it's easier to have a workaround in the RDD.cartesian() to add an _reserialize() between chained cartesian (or zipped), it will be less performant, but given cartesian() is already super slow, I will not worry about it. The current patch may still be wrong in case of chained DartesianDeserializer and PairSerializer, for example, a.cartesian(b.zip(c)) (have not verified yet) |
@davies I was trying to make minimal changes to |
@davies I suggested workaround before but I remember that @holdenk had some reservations. Moreover it would have to be done proactively for all (?) calls. For example SPARK-17756 seems to hit a similar problem. |
Test build #69587 has finished for PR 16121 at commit
|
I was hesistant with the previous PR since it seemed like we didn't fully understand why we were changing what we were at the time, I can try and take a closer look at this over the next few days if it is in a good place for that to happen. |
Test build #69674 has finished for PR 16121 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, just doing a quick first pass it looks like really good work - but I'd encourage you to add a few more comments in some places (since we had this bug before it seems like just the code wasn't sufficiently self explanatory). I'll do a deeper look later on this week.
@@ -96,7 +96,7 @@ def load_stream(self, stream): | |||
raise NotImplementedError | |||
|
|||
def _load_stream_without_unbatching(self, stream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though this is internal it might make sense to have a docstring for this since were changing its behaviour.
@@ -278,50 +278,51 @@ def __repr__(self): | |||
return "AutoBatchedSerializer(%s)" % self.serializer | |||
|
|||
|
|||
class CartesianDeserializer(FramedSerializer): | |||
class CartesianDeserializer(Serializer): | |||
|
|||
""" | |||
Deserializes the JavaRDD cartesian() of two PythonRDDs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should document this a bit given that we had problems with the implementation. (e.g. expand on the "Due to batching, we can't use the Java cartesian method." comment from rdd.py
to explain how this is intended to function).
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) | ||
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) | ||
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): | ||
yield product(key_batch, val_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe consider adding a comment here explaining why the interaction of batching & product
Test build #69865 has finished for PR 16121 at commit
|
LGTM, merging into master and 2.1/2.0 branch, thanks! |
… records ## What changes were proposed in this pull request? Fixes a bug in the python implementation of rdd cartesian product related to batching that showed up in repeated cartesian products with seemingly random results. The root cause being multiple iterators pulling from the same stream in the wrong order because of logic that ignored batching. `CartesianDeserializer` and `PairDeserializer` were changed to implement `_load_stream_without_unbatching` and borrow the one line implementation of `load_stream` from `BatchedSerializer`. The default implementation of `_load_stream_without_unbatching` was changed to give consistent results (always an iterable) so that it could be used without additional checks. `PairDeserializer` no longer extends `CartesianDeserializer` as it was not really proper. If wanted a new common super class could be added. Both `CartesianDeserializer` and `PairDeserializer` now only extend `Serializer` (which has no `dump_stream` implementation) since they are only meant for *de*serialization. ## How was this patch tested? Additional unit tests (sourced from #14248) plus one for testing a cartesian with zip. Author: Andrew Ray <ray.andrew@gmail.com> Closes #16121 from aray/fix-cartesian. (cherry picked from commit 3c68944) Signed-off-by: Davies Liu <davies.liu@gmail.com>
… records ## What changes were proposed in this pull request? Fixes a bug in the python implementation of rdd cartesian product related to batching that showed up in repeated cartesian products with seemingly random results. The root cause being multiple iterators pulling from the same stream in the wrong order because of logic that ignored batching. `CartesianDeserializer` and `PairDeserializer` were changed to implement `_load_stream_without_unbatching` and borrow the one line implementation of `load_stream` from `BatchedSerializer`. The default implementation of `_load_stream_without_unbatching` was changed to give consistent results (always an iterable) so that it could be used without additional checks. `PairDeserializer` no longer extends `CartesianDeserializer` as it was not really proper. If wanted a new common super class could be added. Both `CartesianDeserializer` and `PairDeserializer` now only extend `Serializer` (which has no `dump_stream` implementation) since they are only meant for *de*serialization. ## How was this patch tested? Additional unit tests (sourced from apache#14248) plus one for testing a cartesian with zip. Author: Andrew Ray <ray.andrew@gmail.com> Closes apache#16121 from aray/fix-cartesian.
… records ## What changes were proposed in this pull request? Fixes a bug in the python implementation of rdd cartesian product related to batching that showed up in repeated cartesian products with seemingly random results. The root cause being multiple iterators pulling from the same stream in the wrong order because of logic that ignored batching. `CartesianDeserializer` and `PairDeserializer` were changed to implement `_load_stream_without_unbatching` and borrow the one line implementation of `load_stream` from `BatchedSerializer`. The default implementation of `_load_stream_without_unbatching` was changed to give consistent results (always an iterable) so that it could be used without additional checks. `PairDeserializer` no longer extends `CartesianDeserializer` as it was not really proper. If wanted a new common super class could be added. Both `CartesianDeserializer` and `PairDeserializer` now only extend `Serializer` (which has no `dump_stream` implementation) since they are only meant for *de*serialization. ## How was this patch tested? Additional unit tests (sourced from apache#14248) plus one for testing a cartesian with zip. Author: Andrew Ray <ray.andrew@gmail.com> Closes apache#16121 from aray/fix-cartesian.
This PR seems to have introduced a bug, which I have reported here: Any thoughts, @aray? Can the check in question simply be removed, or is there a better solution to consider? |
I'll take a look, sorry about that. |
## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes #19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes #19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes #19226 from aray/SPARK-21985.
## What changes were proposed in this pull request? (edited) Fixes a bug introduced in apache#16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes apache#19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
(edited) Fixes a bug introduced in apache#16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes apache#19226 from aray/SPARK-21985. (cherry picked from commit 6adf67d) Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
What changes were proposed in this pull request?
Fixes a bug in the python implementation of rdd cartesian product related to batching that showed up in repeated cartesian products with seemingly random results. The root cause being multiple iterators pulling from the same stream in the wrong order because of logic that ignored batching.
CartesianDeserializer
andPairDeserializer
were changed to implement_load_stream_without_unbatching
and borrow the one line implementation ofload_stream
fromBatchedSerializer
. The default implementation of_load_stream_without_unbatching
was changed to give consistent results (always an iterable) so that it could be used without additional checks.PairDeserializer
no longer extendsCartesianDeserializer
as it was not really proper. If wanted a new common super class could be added.Both
CartesianDeserializer
andPairDeserializer
now only extendSerializer
(which has nodump_stream
implementation) since they are only meant for deserialization.How was this patch tested?
Additional unit tests (sourced from #14248) plus one for testing a cartesian with zip.