Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Massive memory (100GB) used by dask-scheduler #6833

Open
pseudotensor opened this issue Nov 13, 2020 · 31 comments
Open

Massive memory (100GB) used by dask-scheduler #6833

pseudotensor opened this issue Nov 13, 2020 · 31 comments

Comments

@pseudotensor
Copy link

pseudotensor commented Nov 13, 2020

Using rapids 0.14 with dask 2.17.0 python 3.6, conda, Ubuntu 16.04

I'm running xgboost using dask on GPUs. I do:

  1. convert in-memory numpy frame -> dask distributed frame using from_array()
  2. chunk the frames sufficiently for every worker (here 3 nodes, 2 GPUs/node each) has data as required so xgboost does not hang
  3. Run dataset like 5M rows x 10 columns of airlines data

Notes:

  1. Every time 1-3 is done it is in an isolate fork that dies at end of the fit. So whatever instances of client etc. are destroyed. Nothing remains on GPU, nothing remains in a process since it's gone. So I don't believe I need a client.close() call.
  2. Even though these forks are gone, within the code I'm always using client as a context manager within a with statement. So again shouldn't need a client.close() call or something like that.

I see my application use reasonable amount of memory based upon that dataset. I see workers using not much memory at all, like 2-3%.

However, the dask-scheduler is using 70% of my 128GB system! I don't understand how/why since the scheduler shouldn't hold data as far as I understand. Perhaps the above sequence of sending dask frame to xgboost is a problem, but it would be odd that task graph is forced to hold data.

Even if a single graph held data, which is already a problem, there's no way 90GB are needed to hold the data involved, so it's like there is repeatedly old data being stored.

image

I don't have code to share to repro since it's not easy to extract, but I'm hoping still for ideas. I will work on a repro, but any fixes/ideas would be good.

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

maps.txt

Here is output of sudo cat /proc/32193/maps > maps.txt

0x7f3b29ace000 0x7f52ac000000 0x1782532000 0x0 is about 94GB

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

Apart from being all odd and a serious issue (can't use dask like this), is there a way to ask scheduler for its memory usage?

@pseudotensor
Copy link
Author

I'm guessing I'm misunderstanding a user's responsibility when using dask.

  1. I assume that if I did not persist the data, then when a process closes that data is gone

  2. I assume data is distributed, not sitting on a single node's scheduler. That would defeat the purpose of distribution.

  3. I assume from_array() is taking the numpy array and distributing that on the cluster, not just storing it within the scheduler. That would defeat the purpose of distribution as well.

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

Here's a code fragment, but it's obvious from my description:

X is 2D numpy array, y 1D numpy array.

with client:
    import xgboost as xgb

    import dask.dataframe as dd
    chunksize=500000
    X = dd.from_array(X, columns=columns, chunksize=chunksize)
    y = dd.from_array(y, chunksize=chunksize)
    model = xgb.dask.DaskXGBClassifier()
    model.fit(X, y)

This is done in a fork that dies every fit on about 5M rows x 10 columns.

I expect the dask-scheduler memory should never be large or hold data. And certainly memory should not keep growing.

@pseudotensor
Copy link
Author

FYI before posting I googled and search around and didn't find exactly the same scale of problem. Mostly stack overflow stuff was misuses or misinterpretations AFAIK.

I'm also not clear if I should post to dask or dask/distributed.

@rjzamora
Copy link
Member

This question may be a better fit for dask/distributed, but I can try to offer some thoughts...

I assume that if I did not persist the data, then when a process closes that data is gone

I assume that it is definitely better to explicitly close your client (and maybe even cancel futures). Although your client process is dying, I don't think the relevent data will be immediately deleted from the cluster.

I assume data is distributed, not sitting on a single node's scheduler. That would defeat the purpose of distribution.

The data will be distributed for processing. However, since your data is being generated with numpy, the initial data has to originate on a single process.

I assume from_array() is taking the numpy array and distributing that on the cluster, not just storing it within the scheduler. That would defeat the purpose of distribution as well.

Same answer as above - To perform a truely "data-distributed" workflow, you will need a parallel read, or to use from_delayed to allow the data to be generated in parallel. When you are using a single numpy array as the source data, it is indeed likely that the entirity of your data will be stored in the graph.

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

This question may be a better fit for dask/distributed, but I can try to offer some thoughts...

I assume that if I did not persist the data, then when a process closes that data is gone

I assume that it is definitely better to explicitly close your client (and maybe even cancel futures). Although your client process is dying, I don't think the relevent data will be immediately deleted from the cluster.

I assume data is distributed, not sitting on a single node's scheduler. That would defeat the purpose of distribution.

The data will be distributed for processing. However, since your data is being generated with numpy, the initial data has to originate on a single process.

I assume from_array() is taking the numpy array and distributing that on the cluster, not just storing it within the scheduler. That would defeat the purpose of distribution as well.

Same answer as above - To perform a truely "data-distributed" workflow, you will need a parallel read, or to use from_delayed to allow the data to be generated in parallel. When you are using a single numpy array as the source data, it is indeed likely that the entirity of your data will be stored in the graph.

I understand that the initial numpy data is on the single server, but this is not on the dask-scheduler. Also, it's relatively small amount of data, only 200MB of numpy data.

I understand to get a fully distributed experience I need to start from get-go with dask, but I'm far from that if even simple uses are showing massive use by the scheduler.

I'm confused why the data needs to be stored in the graph. Do I need to force distribution before passing the data to xgboost? If I run .compute I just manifest the data locally, so I assume I need to run .scatter or some equivalent? .persist()? Not sure what will force data to be distributed and the graph completed so scheduler is done with data.

Is there a command that will take numpy or pandas data and bypass the single-node scheduler and immediately distribute the data?

Also, why does the data keep accumulating? Every fit adds 200MB to scheduler. Why are (I guess) the old graphs still there after xgboost is done and all forks related to the client are gone?

@rjzamora
Copy link
Member

Right - I understand that your data is small, and I agree that it shouldn't be taking up a lot of space in the graph. When I say "the entirity of your data will be stored in the graph", I am just saying that the very first task in the graph will include the entire array as input. If you call X.persist() after the from_array call, your data will become distributed (you are correct that compute is not what you want).

Also, why does the data keep accumulating? Every fit adds 200MB to scheduler. Why are (I guess) the old graphs still there after xgboost is done and all forks related to the client are gone?

This is where someone with better distributed knowledge my have more intuition than me (cc @quasiben ). Perhaps you can try to be explicit about cleaning up collections before you let the client die (e.g. client.cancel(X))?

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

Will use of dd.from_array(X).persist() bypass the scheduler or must it go through it?

It seems a bit odd of a design if all in-memory dask distributed data operations have to go from the client process -> scheduler -> workers. Then the scheduler always ends up holding all the data meant for the cluster of workers for from_pandas, from_array, etc. How can one just get the data in memory on the cluster?

I understand read_csv or other url/file operations can lazily slice data and use it to distribute, and there the lazy thing stored is just the file name or link. Of course, read_csv does not work unless disk is shared. And connectors that read data from cloud require special other infrastructure.

The simplest mode should be in-memory and be optimal by itself IMHO. As-is, it's at least an excessive copy of the full data frame on the scheduler. Factors of 2 are important! :) In reality, the numpy frame is converted into a dask frame that I think has more overhead like pandas, but maybe not. It seems scheduler should just remember the client has memory and use it when performing any task in the task graph, not the actual data.

At worst, for whatever reason, the frames never go away. I'll try the .persist and client.cancel. Thanks.

@rjzamora
Copy link
Member

It seems a bit odd of a design if all in-memory dask distributed data operations have to go from the client process -> scheduler -> workers. Then the scheduler always ends up holding all the data meant for the cluster of workers for from_pandas, from_array, etc. How can one just get the data in memory on the cluster?

I see - If you are starting with a literal numpy array or pandas DataFrame, you are correct that the data will flow client process -> scheduler -> workers. There is really no way around this, since the starting point is an in-memory data structure. If you don't want to read from disk, you may be looking for something like dask.delayed for the generation of your data (where you can supply a user-defined function to generate in in-memory numpy array for each initial chunk).

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

But why can't the scheduler and task just keep track of the client having the memory? Of course, one can't del the object provided as that would mess up things, but one can't rm the file for read_csv anyways. So it's the same issue, no new issue.

Actually del isn't a problem. As long as the client/dask stuff keeps track of memory and holds a reference, del won't matter. So it's even easier to manage than file issues.

@pseudotensor
Copy link
Author

Looking at https://distributed.dask.org/en/latest/memory.html I see no reason why there is accumulation. The fork goes away, the future held should be deleted. I would think I don't have to cancel the future myself.

However, perhaps I'm uncleanly shutting down the fork. Maybe then the future isn't entirely removed? That is, while in-memory stuff is clearly gone after a fork is gone, any distributed off-memory things are not.

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

I added more details and repros at: dmlc/xgboost#6388 (comment)

There are some things mentioned in the issue that seem to be xgboost's fault, like excessive memory usage on CPU before dask is even used.

However, the fact that the scheduler and worker both keep growing in memory use every completed client python script, despite having client.cancel(X) and client.cancel(y) even (and a clean exit of the script) seems to be a problem with dask.

In the context I originally posted about, only the scheduler seems to grow in memory use, but excessively so. cancels didn't help there either.

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

Eventually the worker gets killed and restarts due to the OOM accumulated.

distributed.nanny - INFO - Worker process 29371 was killed by signal 9
distributed.nanny - WARNING - Restarting worker

This keeps happening over and over.

Everytime this happens, the new worker starts again with less memory but again accumulating after each iteration in the loop. Again it gets killed, etc.

Meanwhile the scheduler continues itself to grow in memory use.

@pseudotensor
Copy link
Author

pseudotensor commented Nov 13, 2020

Maybe related:
dask/distributed#3898
#3530
#6762

@pseudotensor
Copy link
Author

Moved discussion to: dask/distributed#4243 since seems to be dask.distributed problem only perhaps.

@trivialfis
Copy link
Contributor

trivialfis commented Nov 18, 2020

Moving discussion from xgboost to here (or distributed if preferred). Here is a minimum reproducible example modified from script provided by @pseudotensor with xgboost removed:

import pandas as pd
import dask.dataframe as dd
import distributed


def main():
    colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
    df = pd.read_csv('HIGGS.csv',
                     names=colnames)

    y = df['label']
    X = df[df.columns.difference(['label'])]
    print('X.shape', X.shape)

    for i in range(0, 100):
        from dask.distributed import Client
        with Client(scheduler_file='scheduler.json') as client:
            chunksize = 50000
            dX = dd.from_array(X.values, chunksize=chunksize)
            dy = dd.from_array(y.values, chunksize=chunksize)

            dX = client.persist(dX)
            dy = client.persist(dy)
            distributed.wait(dX)


if __name__ == '__main__':
    main()

Spec:

dask.__version__: 2.30.0
distributed.__version__: 2.30.0
Workers: 2
Memory budget for each worker: ~32 GB
Total memory on system: 64GB + 8GB swap
X.shape: (11000000, 28)
Data file size: 7.5G

I can confirm that the scheduler by itself has significant memory (close to 44 GB).

@quasiben
Copy link
Member

quasiben commented Nov 18, 2020

Can you also provide how you started the cluster ? In the above, you are connecting with scheduler.json which does not show how many workers, memory limits, etc.

EDIT:

I now see the spec list some of the cluster setup but having it explicitly laid out would be helpful in reproducing

@quasiben
Copy link
Member

I think it would be good to read through dask/distributed#3032 as it outlines some problems when using from_array with data already loaded in memory. As @rjzamora has noted, the data is included in task graph which is going to run through the scheduler. Instead, I would suggest rewriting things with dask throughout the workflow:

ddf = dd.read_csv('/datasets/bzaitlen/GitRepos/HIGGS.csv', names=colnames)
y = ddf['label']
dy = y.to_dask_array()
dy = dy.persist()

X = ddf[ddf.columns.difference(['label'])]
dX = X.to_dask_array()
dX = dX.persist()

@trivialfis
Copy link
Contributor

@quasiben Thanks for the reply! I believe the your example should be the right way of loading data. But for a small dataset like HIGGS, the in memory size with double is about 2.5 GB (11000000 rows * 29 cols * 8 bytes / 1024^3). To fill out 60 GB of memory there has to be at least 24 copies of data. That seems to be something worth looking into?

@quasiben
Copy link
Member

I would expect this to actually have more memory consumption in the scheduler as you've laid it out. With 11000000 rows and a chunksize of 50000, you end up with 220 nodes in the graph all of which will be 2.5 GBs so you should have 550GBs in the scheduler. If instead you have have a chunksize of 5500000, there will be 2 nodes and ~5GBs on the scheduler.

If you are curious, you can look at what the scheduler is holding onto with something like:

In [32]: keys = client.run_on_scheduler(lambda dask_scheduler: list(dask_scheduler.tasks.keys()))

In [33]: client.run_on_scheduler(lambda dask_scheduler: dask_scheduler.tasks[keys[0]].nbytes)
Out[33]: 1232002128

So, I don't think there is anything wrong here per-se. As said in dask/distributed#3032, there are policy decisions here and it's not obvious what dask should do here to make the experience "better". Instead, try to avoid loading data from memory and if they have to quick process those tasks. So instead of persisting the array, it would be better to move onto the next computation and persist that allowing the scheduler to release those tasks

@pseudotensor
Copy link
Author

But why is there accumulation if one repeats the fit in a loop? Once a task is done, why does the task graph continue to hold the arrays?

@pseudotensor
Copy link
Author

The other point I made earlier. The task graph should never contain data, it can just contain a reference to the clients data.

Same with files actually. read_csv should be possible on the client alone, in the context of multinode with no shared disk, the client should be possible as a reference for where the file is, and any operations on the task graph should be able to keep that as a reference. Currently the file has to be common to all nodes via shared disk.

@rjzamora
Copy link
Member

rjzamora commented Nov 18, 2020

The other point I made earlier. The task graph should never contain data, it can just contain a reference to the clients data.

This seems straightforward on a local cluster, but a bit complicated when the cluster is distributed. In the case of IO, all workers have access to a shared file system with an established API for accessing the data by its path. Accessing remote data by memory reference seems less reliable. Won't this require rmda support in both the software and hardware? Note that I don't know much about this at all :)

@pseudotensor
Copy link
Author

Ya I'm thinking of distributed case too. Both a file or in-memory object could be modified after read_csv or from_array() type operations are done. So both have reliability issues but the API can be made robust to make it clear how things should function, e.g. shallow reference or deep copy. But in any case, no need to store data on graph.

@quasiben
Copy link
Member

But why is there accumulation if one repeats the fit in a loop? Once a task is done, why does the task graph continue to hold the arrays?

In the case of a chunksize of 5500000 I don't see total memory exceeding 5.1GBs.

for i in range(0, 20):
    chunksize = 5500000
    dX = dd.from_array(X.values, chunksize=chunksize)
    dy = dd.from_array(y.values, chunksize=chunksize)

    dX = dX.persist()
    dy = dy.persist()
    wait([dX, dy])

The scheduler should (and does) clean up after futures which no longer are being held onto by worker or client. This can be made more aggressive .

Backing out for a minute, I think we've identified what is happening with the original issue. It seems like now we are moving towards a discussion around whether Dask should support a new model for handling in memory data for from_array. If that is the case, then I would suggest we close this issue and begin anew.

If I am incorrect, apologies, what remaining issues are there ?

@quasiben
Copy link
Member

I was thinking again about this last night and I think you can avoid long term storage on scheduler all together with what I think you are doing.

Typically, the first response to folks who have a large numpy array in memory and then want to move some dask collection is naively, don't do that. Here, the assumption is that the data is loaded directly from disk and instead, the dask collection (array/dataframe) should be responsible for loading that data. Perhaps in your case the data is loaded from disk, some processing happens locally then you end up with some NumPy array which you then want to combine with dask/xgboost

In this case, you could scatter the data the first then create a dask array:

future = client.scatter(arr)
x = da.from_delayed(future, shape=arr.shape, dtype=arr.dtype)

The data will temporarily flow through the scheduler but end up on the workers. All of this is nicely summarized in a stackoverflow post:
https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array

Again, if you think there are better ways of managing in-memory data from the client then we could continue that discussion in a separate issue

@trivialfis
Copy link
Contributor

Those are useful references. I think it might be best for xgboost to list some of these discussions in its doc.

@pseudotensor
Copy link
Author

pseudotensor commented Nov 20, 2020

In the original xgboost script the accumulation was ongoing and futures were never cleaned up. I would continuously hit OOM killer over periods of time. Client process being done didn't matter, scheduler/worker still persist the data uncleaned.

So maybe you found way around the attempt at a repro by @trivialfis , but the original issue cannot be explained then.

Also, chunk size shouldn't matter for accumulation problem.

@quasiben
Copy link
Member

@pseudotensor can you link to or rewrite the reproducer. #6833 (comment) is a good start but I'd like to have X and y explicitly stated. It's best if it can be can be reproduced without downloading a file (using random data or dask.datasets.timeseries()) but using HIGGS.csv or some other canonical dataset would be ok.

@martindurant
Copy link
Member

Following the appearance of high-level-graphs in the scheduler, there is a good chance that this issue has been mitigated. I suggest closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants