Skip to content

Added gc on mem usage warning#1504

Merged
mrocklin merged 2 commits intodask:masterfrom
bluenote10:feature/gc_on_warn
Nov 1, 2017
Merged

Added gc on mem usage warning#1504
mrocklin merged 2 commits intodask:masterfrom
bluenote10:feature/gc_on_warn

Conversation

@bluenote10
Copy link
Copy Markdown
Contributor

We were re-running our standard workflows with the latest master and noticed that our computations will no longer run through, because the workers go into an idling mode as a result of exceeding the 80% memory barrier threshold. This results in a situation where GC never triggers, and there is no visible progress at all (and worker logs flooded by "Memory use is high ..." message).

We found this very simple solution to the problem: Triggering GC when breaking out of the loop. This is how our workflow behaves with and without this change:

2017-10-26_runwithgconwarning

The first run around 09:00 is using the change and runs fine in <1h. The run started at ~09:30 is plain master and stalls due to not releasing memory appropriately.

I discussed this topic yesterday with @mrocklin at pycon.de and he wasn't sure if @pitrou would be happy with the change. His suggestion was to open a PR and discuss how what's the best solution to this.

cc @ogrisel

@pitrou
Copy link
Copy Markdown
Member

pitrou commented Oct 26, 2017

Hmm... What I don't understand is why you don't add gc.collect to the part that does the pausing, rather than the eviction.

@pitrou
Copy link
Copy Markdown
Member

pitrou commented Oct 26, 2017

Also, two suggestions:

  1. perhaps use the throttled GC here?
  2. perhaps use a hysteresis ratio when unpausing?

@bluenote10
Copy link
Copy Markdown
Contributor Author

bluenote10 commented Oct 27, 2017

The implementation uses self.gc which is an instance of ThrottledGC. That's also why I didn't place the self.gc.collect() next to the self.paused = True (I assume that is your first question), which is only executed once. If the throttling suppresses that collection, but the worker has just generated a significant amount of garbage we would miss the only opportunity to collect it. But an unconditional gc.collect() could be fine? I can re-run the experiment if you prefer that solution.

I'll have to do some research the hysteresis idea to see what you mean.

@mrocklin
Copy link
Copy Markdown
Member

I'll have to do some research the hysteresis idea to see what you mean.

The idea here is likely that we should switch from unpaused to paused at something like 82% but only switch back from paused to unpaused around 78% (I chose a 2% difference here arbitrarily, it may not be the correct choice). This prevents rapid switching back and forth between the two states if we are very close to the 80% boundary.

@pitrou
Copy link
Copy Markdown
Member

pitrou commented Oct 27, 2017

The implementation uses self.gc which is an instance of ThrottledGC

I stand corrected. Thank you.

But an unconditional gc.collect() could be fine?

Or perhaps a self.gc.collect() before the if not self.paused: line?

@ogrisel
Copy link
Copy Markdown
Contributor

ogrisel commented Oct 27, 2017

@bluenote10 Interesting, I could no longer get that warning with the current master. Do you think you could provide a standalone script that can trigger this behavior?

@ogrisel
Copy link
Copy Markdown
Contributor

ogrisel commented Oct 27, 2017

Also I think we should rename self.gc to self._throttled_gc to make the code less ambiguous.

Also it might be interesting to move the instantiation of that attribute in the WorkerBase base class to share the same instance with the methods of the base class if the additional collections suggested in #1255 are still useful.

@ogrisel
Copy link
Copy Markdown
Contributor

ogrisel commented Oct 27, 2017

In any case I think that the change suggested here (adding a call to the throttled gc) before breaking in when the buffer is empty is still useful. I should have included that in the first place in my PR. I forgot to do it because my test workfload would not reach that branch of the code any more.

@bluenote10
Copy link
Copy Markdown
Contributor Author

bluenote10 commented Oct 27, 2017

@ogrisel I made a quick attempt to get a self-contained, reproducible example -- see details below. On a first attempt it seems to trigger the problem. I'm running a cluster with 10 workers using --memory-limit set to 4 GB. The chunk sizes are 1.2 GB so a few uncollected chunks should result in an idling worker. If you can't trigger the problem you may have to increase num_partitions so that the workers will go through multiple spill-to-disk operations (each with a certain probability to produce uncollected garbage).

Details

Note: The load_columns function is roughly a copy/paste from my actual code where the problem occurs, but it is probably not really crucial to reproduce.

#!/usr/bin/env python

from __future__ import division, print_function
import sys
import pandas as pd

from collections import OrderedDict

import dask
import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client, progress


def load_columns(lazy_dataframes):

    def extract_meta(df):
        # Local import necessary (worker scope)
        from dask.dataframe.utils import make_meta
        all_meta = {}
        colnames = []
        for column in df.columns:
            meta = make_meta(df[column])
            all_meta[column] = meta
            colnames.append(column)
        return all_meta, colnames

    extract_meta = delayed(extract_meta)
    meta_per_column, colnames = extract_meta(lazy_dataframes[0]).compute()

    def from_delayed_direct(lazy_dataframes, colname, meta):
        from dask.base import collections_to_dsk, tokenize
        import operator
        name = "dfcol-{}-{}".format(colname, tokenize(lazy_dataframes, colname))

        # Start with a dask graph that contains just the key/values of
        # loading the individual dataframe partitions
        dsk = collections_to_dsk(lazy_dataframes)

        # Now add new keys/values where:
        # - the key statisfies the Frame requirement of being (name, part_id)
        # - the value is a task running getitem for the column to extract
        for i, lazy_df in enumerate(lazy_dataframes):
            lazy_df_key = lazy_df.key
            dsk[(name, i)] = (operator.getitem, lazy_df_key, colname)

        divs = [None] * (len(lazy_dataframes) + 1)
        series = dd.Series(dsk, name, meta, divs)
        return series

    columns = [
        from_delayed_direct(lazy_dataframes, col, meta_per_column[col])
        for col in colnames
    ]

    cols_persisted = dask.persist(*columns)
    progress(cols_persisted)
    cols_persisted = OrderedDict(zip(colnames, cols_persisted))
    return cols_persisted


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Error: Please specify scheduler endpoint.")
        sys.exit(1)
    endpoint = sys.argv[1]
    client = Client(endpoint)

    def load_df(N):
        df = pd.DataFrame({"A": [0]*N, "B": [0]*N})
        return df

    N = 80000000    # => results in DataFrames of ~ 1.2 GB
    num_partitions = 20
    lazy_dataframes = [delayed(load_df)(N) for _ in xrange(num_partitions)]
    cols_persisted = load_columns(lazy_dataframes)

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Nov 1, 2017

Conversation on this seems to have stalled. I'm comfortable merging. Are there any objections or other comments?

@bluenote10
Copy link
Copy Markdown
Contributor Author

Yes, I think it is ready to merge. The only thing I left out for now is the hysteresis ratio, because I was a bit worried that picking a gap that is too large would prevent the worker from ever going back into active state. This needs careful testing and is best left for another PR.

@mrocklin mrocklin merged commit eec13d4 into dask:master Nov 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants