-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2790] [PySpark] fix zip with serializers which have different batch sizes. #1894
Conversation
QA tests have started for PR 1894. This patch merges cleanly. |
QA results for PR 1894: |
The failure is not related to this PR, how to re-test this? |
Jenkins, retest this please. |
QA tests have started for PR 1894. This patch merges cleanly. |
QA results for PR 1894: |
Sorry to drop the ball on reviewing this. On StackOverflow, someone has reported an issue where zip() silently returns less output than it should: http://stackoverflow.com/questions/25364380/why-does-zip-truncate-the-data-in-pyspark Any thoughts on what's happening there? My hunch is that the PythonRDDs have the same number of partitions, same number of batches, and same batch sizes, but a different grouping of objects across batched elements. As long as you're working on zip() issues, could you take a look at this and maybe port their test case? |
The builtin zip will return the shortest number of items in them, such as
I will look deep in it. Davies On Mon, Aug 18, 2014 at 10:31 AM, Josh Rosen notifications@github.com
|
Conflicts: python/pyspark/tests.py
@JoshRosen The issue reported in StackOverFlow, the two rdd have different number of elements in them, but PairDeserializer did not check the length of keys and values, so parts of them was dropped by zip() silently. In this PR, this issue has been fixed. I had added unit tests for these cases. |
My impression from the StackOverflow example was that both RDDs had the same number of items, since |
They have same total number of items in rdd, but different number of items On Mon, Aug 18, 2014 at 11:41 PM, Josh Rosen notifications@github.com
|
It seems like it could be really confusing to users if we fail on a zip() where the RDDs have the same total number of items and same number of partitions; is there any cheap way to detect this case and re-balance / serialize or to otherwise work around this limitation? If not, it's still better to fail loudly with an error than to return a wrong answer, but it would be nice if we supported this case. |
I had not find a way to solve this problem, we can not control exactly how many items will be placed in each partition. In this PR, it will raise an exception. Also, we have documented this in the API doc. For this user cases, zipWithIndex() is what user wanted. |
QA tests have started for PR 1894 at commit
|
QA tests have finished for PR 1894 at commit
|
This looks great. I think it's better to crash early and loudly rather than to silently return a bad result, so I'm going to merge it into |
…batch sizes. If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark. Author: Davies Liu <davies.liu@gmail.com> Closes #1894 from davies/zip and squashes the following commits: c4652ea [Davies Liu] add more test cases 6d05fc8 [Davies Liu] Merge branch 'master' into zip 813b1e4 [Davies Liu] add more tests for failed cases a4aafda [Davies Liu] fix zip with serializers which have different batch sizes. (cherry picked from commit d7e80c2) Signed-off-by: Josh Rosen <joshrosen@apache.org>
…batch sizes. If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark. Author: Davies Liu <davies.liu@gmail.com> Closes apache#1894 from davies/zip and squashes the following commits: c4652ea [Davies Liu] add more test cases 6d05fc8 [Davies Liu] Merge branch 'master' into zip 813b1e4 [Davies Liu] add more tests for failed cases a4aafda [Davies Liu] fix zip with serializers which have different batch sizes.
If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark.