Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8202] [PYSPARK] fix infinite loop during external sort in PySpark #6714

Closed
wants to merge 5 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Jun 9, 2015

The batch size during external sort will grow up to max 10000, then shrink down to zero, causing infinite loop.
Given the assumption that the items usually have similar size, so we don't need to adjust the batch size after first spill.

cc @JoshRosen @rxin @angelini

@SparkQA
Copy link

SparkQA commented Jun 9, 2015

Test build #34490 has finished for PR 6714 at commit e746aec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -512,9 +512,6 @@ def load(f):
f.close()
chunks.append(load(open(path, 'rb')))
current_chunk = []
gc.collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the call to gc.collect being removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't change limit to _next_limit() (which call get_used_memory()). This line here was to get better number about how much memory was used, is not needed anymore.

@airhorns
Copy link

airhorns commented Jun 9, 2015

@rxin out of curiosity do you remember why you wanted to shrink the batch in 3134c3f ? Also, @pwendell heads up, this regression is present in 1.4.0-rc4!

@kevincox
Copy link
Contributor

kevincox commented Jun 9, 2015

Also why keep the batch size once you know you are going to spill to disk. All that does is force you to draw from the iterator in batches. Once you know how big your chunk size should be you can set batch to len(current_chunk) (possibly *0.8 or something) so that you can do it in a single call since you already know you will be spilling to disk.

@davies
Copy link
Contributor Author

davies commented Jun 9, 2015

@airhorns That change is make by me (my mistake), to speed up the unit test of external sorting. Because we adjust the batch size during external aggregation (the size of objects vary from key to key), just borrowed that logic here without notice that elif not chunks: below.

@pwendell Could we not block 1.4 release by this one? 1.4 release is delayed much and many people are waiting for it. This issue could be workaround by increasing the number of partitions during sorting (this could also improve performance by avoid spilling).

@kevincox Is the current approach something similar what you had suggested?

@rxin
Copy link
Contributor

rxin commented Jun 9, 2015

@kevinco it was not me -- I merely fixed a merge conflict and somehow git showed me as the commit author.

@SparkQA
Copy link

SparkQA commented Jun 9, 2015

Test build #34515 has finished for PR 6714 at commit 5c21777.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kevincox
Copy link
Contributor

kevincox commented Jun 9, 2015

@davies No, it appears that you just changed the original memory limit. I am saying once you figure out how large the chunk can be you should set that as the batch size, rather then continuing to allocate in chunks.

@rxin
Copy link
Contributor

rxin commented Jun 10, 2015

Is there a way we can add a test for this?

@JoshRosen
Copy link
Contributor

Maybe we can borrow some of the testing strategies from Tungsten's external sorters, where we're able to mock out the shuffle memory manager and are able to manually trigger spills.

@davies
Copy link
Contributor Author

davies commented Jun 10, 2015

@JoshRosen I had updated the tests to reproduce the failure.

@SparkQA
Copy link

SparkQA commented Jun 10, 2015

Test build #34550 has finished for PR 6714 at commit b170dfb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2015

Test build #892 timed out for PR 6714 at commit 5c21777 after a configured wait of 175m.

@davies
Copy link
Contributor Author

davies commented Jun 10, 2015

@JoshRosen The last commit had passed the tests, last two tests failed in other places.

@SparkQA
Copy link

SparkQA commented Jun 10, 2015

Test build #893 timed out for PR 6714 at commit b170dfb after a configured wait of 175m.

@davies
Copy link
Contributor Author

davies commented Jun 10, 2015

@JoshRosen Is there something wrong with the NewPullRequestBuilder? the failed two run was triggered by my bookmarklet.

@SparkQA
Copy link

SparkQA commented Jun 12, 2015

Test build #897 timed out for PR 6714 at commit b170dfb after a configured wait of 175m.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 18, 2015

Test build #35073 has finished for PR 6714 at commit b170dfb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -179,9 +179,12 @@ def test_in_memory_sort(self):
list(sorter.sorted(l, key=lambda x: -x, reverse=True)))

def test_external_sort(self):
class CustomizedSorter(ExternalSorter):
def _next_limit(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should add a comment here to explain why we're mocking out this part of the code; it doesn't seem self-evident to me and I'm worried that it's going to confuse future readers of this code.

Also, do you think that it's worth adding a separate test case for this path and keeping the old test? There might be some duplication of the code which does assertions over metrics, but we possibly can factor it out into a shared method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the intent here is to mock get_used_memory(); I'm not super familiar with Python mocking frameworks, but if this was Java then I imagine that get_used_memory() would be a method of some context / environment object that we could mock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, without the mock, it will take a long time to reach memory limit (slowing down tests).

@JoshRosen
Copy link
Contributor

It looks like this is essentially reverting the changes made in the test speedup patch, so at least from that perspective it seems like a sensible fix to me.

Just to make sure that I 100% understand the logic / considerations here, it seems like the method that we're modifying is trying to buffer as many records as possible in memory until we hit a memory limit, then sort the in-memory records and spill them to disk. It's expensive to figure out how much memory we're actually using, which is why we only check our memory usage every batch items. If our objects are so big that batch of them are going to blow way past our memory limit, then we used to try to shrink batch so that we inspect the sizes more frequently. The old logic seemed wrong, though, since it would always shrink the batch size even if we managed to buffer many chunks before spilling. Instead, if we did want to adjust our batch size then I think it would make more sense to explicitly deal with the case where num_chunks = 1 because a single chunk is exceeding our memory limit.

@kevincox, to touch on your comment, I think that the reason that we don't just set batch to len(current_chunk) after spilling is that our objects might be of different sizes, so completely eliminating batching / periodic size checking greatly increases the chance that we'll exceed our memory limits well before we've checked our memory consumption. @davies, could you confirm that this is the right rationale?

@JoshRosen
Copy link
Contributor

After this patch, it looks like _next_limit is only called once per ExternalSorter.sorted call. Since _next_limit is only called from there, I wonder whether we should just remove that method and inline its code at the call site in order to make things a bit easier to read.

This makes me wonder, though: is it actually safe to not allow the memory limit to raise after spilling like it did before? Here's the comment in _next_limit:

    def _next_limit(self):
        """
        Return the next memory limit. If the memory is not released
        after spilling, it will dump the data only when the used memory
        starts to increase.
        """
        return max(self.memory_limit, get_used_memory() * 1.05)

If we no longer call _next_limit() after spilling, what will happen in cases where spilling somehow prevented memory from being freed? Does this mean that we'll spill every batch items and pay a huge cost during merge?

@davies
Copy link
Contributor Author

davies commented Jun 18, 2015

@JoshRosen Considering a case that the used memory is above memory limit (because of broadcasted object), then it will start to spill after the first batch (batch =100), and for every 100 records, it's easy to reach maximum open files limit (1024, by default).

In order to balance the memory usage and batch size (not too large or too small), I thought it's better to adjust batch size up and down (but having a bug). After some experiments, I saw the memory usage still go up and up even we try to shrink the batch size (because of memory fragments).

Finally, I'd like to switch to the most simple approach, assuming the items having similar sizes, always use the first batch size when first spilling.

I agreed that we could inline _next_limit(), current it looks similar as others (ExternalXXX), not that bad.

@JoshRosen
Copy link
Contributor

Based on this latest round of comments + some offline discussion, this sounds like a reasonable fix for the 1.4.1 release, so I'm going to merge this to master and branch-1.4.

@asfgit asfgit closed this in 9b20027 Jun 18, 2015
asfgit pushed a commit that referenced this pull request Jun 18, 2015
The batch size during external sort will grow up to max 10000, then shrink down to zero, causing infinite loop.
Given the assumption that the items usually have similar size, so we don't need to adjust the batch size after first spill.

cc JoshRosen rxin angelini

Author: Davies Liu <davies@databricks.com>

Closes #6714 from davies/batch_size and squashes the following commits:

b170dfb [Davies Liu] update test
b9be832 [Davies Liu] Merge branch 'batch_size' of github.com:davies/spark into batch_size
6ade745 [Davies Liu] update test
5c21777 [Davies Liu] Update shuffle.py
e746aec [Davies Liu] fix batch size during sort
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
The batch size during external sort will grow up to max 10000, then shrink down to zero, causing infinite loop.
Given the assumption that the items usually have similar size, so we don't need to adjust the batch size after first spill.

cc JoshRosen rxin angelini

Author: Davies Liu <davies@databricks.com>

Closes apache#6714 from davies/batch_size and squashes the following commits:

b170dfb [Davies Liu] update test
b9be832 [Davies Liu] Merge branch 'batch_size' of github.com:davies/spark into batch_size
6ade745 [Davies Liu] update test
5c21777 [Davies Liu] Update shuffle.py
e746aec [Davies Liu] fix batch size during sort
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants