Skip to content

Added explicit worker GC (throttled)#1255

Closed
bluenote10 wants to merge 18 commits intodask:masterfrom
bluenote10:feature/explicit_gc
Closed

Added explicit worker GC (throttled)#1255
bluenote10 wants to merge 18 commits intodask:masterfrom
bluenote10:feature/explicit_gc

Conversation

@bluenote10
Copy link
Copy Markdown
Contributor

This PR adds explicit garbage collections to mitigate the memory issues observed in #1015 and dask/zict#19.

The implementation is throttled so that it does not lead to excessive GC calls. As discussed in dask/zict#19 calling GC from the inner scopes like put_key_in_memory or update_data would not collect the current value. The ideal solution would trigger it on the outermost scope whenever a large value has been persisted after releasing the reference to the value. However these places are tricky to find. Currently I'm triggering from release_key, gather, and execute, which is hopefully frequent enough.

Comment thread distributed/worker.py Outdated
if key not in self.types:
self.types[key] = type(value)

self.nbytes[key] = sizeof(value)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I changed this because it is probably safer to just update the nbytes value instead of relying that it can't change. For self.types this was already done that way and the if was probably a left-over.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We assume immutability. Additionally sizeof can take some time while type is free.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Jul 17, 2017

Sorry for the delay here, I was away at a conference.

A couple of concerns:

  1. This puts GC on the main event loop thread. Does this matter? I wouldn't be surprised if gc.collect() held the GIL explicitly while collecting, but if it doesn't then it would be far more preferable to keep this off of the main thread.
  2. Rather than make a new class for this perhaps we can just add gc.collect (or some suitable sending of gc.collect to a thread) in a PeriodicCallback

@bluenote10
Copy link
Copy Markdown
Contributor Author

Some thoughts:

  1. I would be very surprised if the GC does not hold the GIL during collection, i.e., it will have to briefly stop the event loop anyway.
  2. I was thinking about a periodic solution as well, but in the end, it's not an ideal solution. On the one hand the period needs to be fairly small, because already ~1 sec of heavy spill-to-disk-swaps can produce large amounts of uncollected garbage, causing workers to go out-of-memory. On the other hand, triggering with 1 second intervals would probably have a measurable on CPU-bound tasks. A smart trigger would avoid these issues.

Comment thread distributed/worker.py Outdated
if key not in self.types:
self.types[key] = type(value)

self.nbytes[key] = sizeof(value)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We assume immutability. Additionally sizeof can take some time while type is free.

Comment thread distributed/utils.py Outdated
import re
import shutil
import socket
import time
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from .metrics import time

The windows time.time function has a 1s precision limit.

Comment thread distributed/worker.py Outdated
'key': key,
'cause': cause})
self._throttledGC.collect(
force_gc=True if nbytes_to_free > 10 * 2**20 else False
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could probably drop the ternary statement and just replace with the condition.

force_gc = nbytes_to_free > 10 * 2**20

@mrocklin
Copy link
Copy Markdown
Member

I'm generally fine with the code here. Unfortunately it looks like the tests are not. This change causes a variety of tests to fail in interesting ways. I suspect some subtle multi-threading interaction.

@bluenote10
Copy link
Copy Markdown
Contributor Author

I was trying to reproduce the problems locally, but the tests just work for me. There was one remaining issue regarding the usage of nbytes_to_free, but now the set of failed tests seems to have changed again (one of the failures is rather a Travis timeout, right?). Do the tests fail randomly or is there a clear pattern? How can we solve this?

@mrocklin
Copy link
Copy Markdown
Member

Yeah, welcome to the wonderful world of concurrent debugging. The virtual machines on travis-ci are very slow and so are quite good at catching subtle bugs that sneak by on faster machines. I recommend trying to run a few of the failing tests in a for loop like the following:

for i in {1..100}; do distributed/tests/test_stress.py::test_stress_1 --pdb ; done

This might help you catch some of the errors on your local machine.

@mrocklin
Copy link
Copy Markdown
Member

It's also entirely possible though that calling gc.collect in this way is subtly dangerous though and that we'll need to try another approach.

@bluenote10
Copy link
Copy Markdown
Contributor Author

How can garbage collection be dangerous? As far as I can see the explicit garbage collection happens in places where an automatic garbage collection could trigger anyway, so it's not possible to rely on GC not happening, right? Or am I missing that there are periods with gc.disable()?

I was rerunning all the failed tests (from the Travis Python 2.7 build) in a brute-force attempt (see commands below). I couldn't get the tests to fail except for distributed/tests/test_steal.py::test_steal_expensive_data_slow_computation. However, locally this test fails due to the buffer concatenation issue #1179, which is not the reason why it fails on Travis.

Details

Command lines for running the failed tests in brute force:

for i in {1..100}; do py.test "distributed/tests/test_client.py::test_open_close_many_workers[Worker-100-5]" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_scheduler.py::test_balance_many_workers" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_scheduler.py::test_balance_many_workers_2" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_scheduler.py::test_correct_bad_time_estimate" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_steal.py::test_worksteal_many_thieves" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_steal.py::test_dont_steal_unknown_functions" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_steal.py::test_new_worker_steals" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_steal.py::test_work_steal_no_kwargs" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_steal.py::test_balance_without_dependencies" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_steal.py::test_steal_twice" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_steal.py::test_accept_old_result_if_stolen" --pdb --runslow ; done
for i in {1..100}; do py.test "distributed/tests/test_stress.py::test_stress_1" --pdb --runslow ; done

@mrocklin
Copy link
Copy Markdown
Member

How can garbage collection be dangerous?

Yeah I don't know. I suspect some subtle interaction.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Aug 7, 2017

Any further thoughts on this @bluenote10 ?

@bluenote10
Copy link
Copy Markdown
Contributor Author

I'm still puzzled why the tests fail on travis, but not locally. The logs on travis also don't look like actual test failures, rather errors related to testing infrastructure. I'm trying to get more output from travis now to see if this can help.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Aug 7, 2017

There are a number of new failures in the tests, I suspect due to some change on travis-ci. I pushed a trivial change here to help us get a better understanding of the kinds of tests that are failing on master: #1317

Conflicts:
	distributed/worker.py
@mrocklin
Copy link
Copy Markdown
Member

I've merged this into master. We'll see if problems persist.

@mrocklin
Copy link
Copy Markdown
Member

Sorry, I've merged master into this. Not the other way around.

@mrocklin
Copy link
Copy Markdown
Member

It looks like you've gotten tests to pass, but only by removing all of the actual functionality of the PR :/

Do you have any thoughts about how to trigger GC safely? Or other thoughts about how to achieve the same results while only relying on the standard GC operation?

@bluenote10
Copy link
Copy Markdown
Contributor Author

I'm still wondering if the tests failing by explicit GC may hint to a potential cause of the memory leaks. Also, some failing tests were related to work stealing, and I think you already spotted an issue there. I will try to systematically re-enable GC here (after merging in the work stealing fix) and look at the pattern of failing tests again. However I'll have to work on something else for a while, so it might take a bit until I can pick this up again.

@mrocklin
Copy link
Copy Markdown
Member

OK, thank you for your continued engagement on this. Sorry it took me a while to engage myself.

Comment thread distributed/utils.py Outdated
new_time = time()
if force_gc or new_time - self.last_collect > self.min_interval_in_sec:
gc.collect()
self.last_collect = new_time
Copy link
Copy Markdown
Contributor

@ogrisel ogrisel Oct 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @pitrou said in another PR, GC can legitimately take more than several hundred milliseconds (maybe even seconds) on interpreters loaded with lots of small python objects (e.g. in dask.bags with nested constructs). Therefore you might want to consider a strategy that records the time taken by the last call to gc.collect() and the time elapsed since after the end of the last call to gc.collect() so as to only call gc.collect() if less than 5% of the time is spent inside the gc.collect() calls in aggregate instead of using a fixed absolute value for min_interval_in_sec.

Also using a time.monotonic() instead of time.time() under Python 3.5 and later will probably spare some rare but potentially hard to debug issues in case of leap second events.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point, I think I'll just use your implementation, no need for two throttled GC implementations ;). I'll still have to check if this PR is needed at all with you modification.

@mrocklin
Copy link
Copy Markdown
Member

This PR has gone stale. Closing for now.

@mrocklin mrocklin closed this Apr 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants