Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working towards best practices for packages based on dask #5452

Open
djhoese opened this issue Oct 2, 2019 · 13 comments
Open

Working towards best practices for packages based on dask #5452

djhoese opened this issue Oct 2, 2019 · 13 comments
Labels
discussion Discussing a topic with no specific actions yet

Comments

@djhoese
Copy link
Contributor

djhoese commented Oct 2, 2019

This topic was brought up by @mrocklin on twitter: what best practices and guidelines should packages that depend on dask and other pydata packages follow? This was specifically brought up when talking about Satpy which is highly dependent on xarray DataArrays using dask arrays underneath. So the question is, what kind of things could be presented/documented to people wanting to build a package dependent on these dask/xarray/others. Some initial thoughts and questions I had:

  1. How/when to allow the user to define their own chunk sizes? With semi-recent updates for 'auto' chunking and setting chunks based on in-memory size this is easier, but still requires asking users to understand some low-level implementation details of a package.
  2. How/when should number of workers be configured by the package's user? Things like OMP_NUM_THREADS and DASK_NUM_WORKERS (or the related dask.config.set parameter).
  3. If the package wants to take the advantage of a distributed client (submit, gather, etc) what should the user be required to provide? Is it bad practice to use get_client inside the package's utilities to get any configured client? Should a user be required to provide a client? If not provided can/should a package create a local client? Is that too much magic happening without the user's knowledge (creating sub-processes, etc)?

What other areas could these best practices include?

CC @mraspaud (satpy core developer)

@mrocklin
Copy link
Member

mrocklin commented Oct 2, 2019

How/when to allow the user to define their own chunk sizes? With semi-recent updates for 'auto' chunking and setting chunks based on in-memory size this is easier, but still requires asking users to understand some low-level implementation details of a package.

I would say that this depends on the technical sophistication of your users. Assuming that that sophistication is sometimes low, I would say that you should probably expose chunking optionally, and otherwise should help to provide guides to dask's auto-chunking if possible. Assuming that you're dealing with geotiffs it would be good to use the tile size to inform automatic chunking.

def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks=None):

This is done in an old PR to Xarray, which might be interesting.

https://github.com/pydata/xarray/pull/2255/files#diff-6364b203943c799516f9ecba19e1b119R326

How/when should number of workers be configured by the package's user? Things like OMP_NUM_THREADS and DASK_NUM_WORKERS (or the related dask.config.set parameter).

I think that downstream libraries should probably avoid setting this if possible. Creating the scheduler/client should be orthogonal from libraries that create computations.

If the package wants to take the advantage of a distributed client (submit, gather, etc) what should the user be required to provide? Is it bad practice to use get_client inside the package's utilities to get any configured client? Should a user be required to provide a client? If not provided can/should a package create a local client? Is that too much magic happening without the user's knowledge (creating sub-processes, etc)?

Ideally, they don't have to use submit/gather, and can just use dask.compute/dask.persist. This makes them usable by the broadest set of Dask schedulers.

Of course, as your questions poses, this may not always be possible. I think that using get_client, as you suggest, is probably best.

@djhoese
Copy link
Contributor Author

djhoese commented Oct 3, 2019

So it sounds like to summarize your answers:

  1. It'll be a case by case decision on how/when chunking is specified by package users. In most cases and if done correctly the package should be able to auto-chunk in most cases using normalize_chunks with optional overrides by the user.
  2. Packages point to dask docs.
  3. I was thinking of non-array cases where we have utilities using futures and/or complex delayed functions. All or most of the dask-ness of the utility are hidden from the user except for "you can run this on a cluster if you pass a dask client to the client keyword argument".

@pnuu
Copy link

pnuu commented Oct 3, 2019

Few more (euro) cents from another Pytroll core dev. As I'm more concentrated on the operational satellite processing chains, there are some points that I need to consider all the time, both when writing the software and when setting things up. Below are some thoughts that came to mind when I read the 3-item list @djhoese started with.

There can be several completely independent chains (we at FMI have 10+) running, and some of them need to share resources. So the chains need to be constrained in CPU and/or RAM usage. The first means setting both OMP_NUM_THREADS and DASK_NUM_WORKERS, and the other both DASK_NUM_WORKERS and the chunk size. The CPU limitation is exact. RAM limitation depends a lot on the implementation and use case. With "simple" array operations where chunks are constant, it can be controlled with some precission. With Satpy it depends if reprojection is done, which resampling method is used, if the resampling uses caching, what the output is (JPG, GeoTIFF, NetCDF, ...), what's the originating data, and so on and so on.

As Dask by default uses all the resources available, we Pytroll devs need to advice the users with things like (a bit exaggerated, but not by much) "Try if PYTROLL_CHUNK_SIZE=1024 OMP_NUM_THREADS=1 DASK_NUM_WORKERS=4 BLAS_NUM_THREADS=1 MKL_NUM_THREADS=1 script.py works better." Naturally these depend on the use case.

I guess the point here is: we (the library/application devs) need a simple way to give the user control over the amount of resources the software takes.

@djhoese
Copy link
Contributor Author

djhoese commented Oct 3, 2019

I guess the point here is: we (the library/application devs) need a simple way to give the user control over the amount of resources the software takes.

And any package using dask would need to come up with some sort of best practices for their use cases. So maybe it isn't something that dask can do to help any more than it already is, but that dask's best practices for downstream packages would need to discuss this as something people should be concerned about.

@mraspaud
Copy link

mraspaud commented Oct 3, 2019

Great stuff in here, thanks for starting this issue!

I have one thing I would like to bring up, it's related to IO. In our application (satpy), we do read a lot of data from disk and write at least as much to disk when we're done. However, it's not really practical for us (at my work place) to have these data on shared storage as the data volume is too big and would slow down the entire network, so we have the data locally on the servers. On the other hand, we need to balance the load of our servers, so dask.distributed would be very handy, but I'm not sure how to handle the reading and writing parts as they can only be done on one of the machines. I know that workers can be assigned some flags to represent the resources they have access to, but how do we set this up transparently in a library (satpy) that have to be runable both with and without dask.distributed ? And more generally how can we activate distributed be added transparently to the library ? (I think that last question was also raised indirectly by @djhoese)

@mrocklin
Copy link
Member

mrocklin commented Oct 3, 2019

I would encourage satpy to not make constraints about setting up Dask workers. I think that this is handled best on an application by application basis.

There can be several completely independent chains (we at FMI have 10+) running, and some of them need to share resources. So the chains need to be constrained in CPU and/or RAM usage

Can they all use the same dask cluster? If so, that would load balance for you.

but I'm not sure how to handle the reading and writing parts as they can only be done on one of the machines

Typically people use a network file system for this sort of thing. If I/O is a bottleneck then you might consider compression, nicer file formats, doing more computation per run, or something similar.

@djhoese
Copy link
Contributor Author

djhoese commented Oct 7, 2019

I thought of some new ones this weekend and an update on another:

2a. A lot of scientific libraries that can benefit from dask are usually wrapping old fortran or C code that is fast because of how it is implemented. Some also use OpenMP, but they are part of the existing users' workflow. Blindly setting things like OpenMP or BLAS number of threads could have major performance penalties on these libraries. Perhaps suggesting sub-library developers to take advantage of projects like https://github.com/joblib/threadpoolctl may be a good suggestion. I've never used it myself, but discovered it when worrying about how Satpy uses the pykdtree library (OpenMP-based). If we access pykdtree from dask workers there isn't a real need for OpenMP...but what if it is faster (leading to profiling, benchmarks, etc only to find it doesn't make a big difference)?

  1. How to make a sub-library work for both numpy users, dask users, and xarray users. In most cases numpy and dask arrays should work pretty well together with __array_function__. things like blockwise operations or having to do delayed functions as workarounds for non-vectorized operations can make interfaces complicated. Similarly, having the same interface work with xarray objects can be difficult when you want to maintain .attrs (which get lost on most operations), .dims, etc. This could result in 3 separate implementations of the same functionality or even 4 if you want to throw a numba jit'd version in the mix. I'm not saying that this is necessarily something dask/xarray devs should be concerned about but it can hinder libraries migrating code to be dask/xarray friendly. Side question: Can blockwise be used as a decorator?

  2. Some low level C/fortran libraries are used for reading/writing files and may not be multi-process or multi-thread safe. This makes the sub-library developer have to worry about what dask scheduler the user is using and to choose the most performant method of using these libraries. In Satpy's case we got around some of this by using a thread lock on da.store with rasterio. I think xarray has come up with fancier ways of handling this, but I haven't had time to investigate. Best practices could either help or layout issues like this. Or if there is an easy way to check the scheduler being used or if there is an easy way to tell users "you must create your X library jobs like this so that this entire dask task graph gets run on one threaded worker on a single node" then that could work too. It is something I've wanted to play with more in Satpy but haven't had time.

@djhoese
Copy link
Contributor Author

djhoese commented Oct 14, 2019

  1. Another one based on Remove support for Iterators and Queues distributed#2671. How to write tests (pytest or unittest only) that need to use a distributed Client. When trying to do it in Satpy tests I run in to issues with ports remaining open and not being able to reopen them.

@TomAugspurger
Copy link
Member

When trying to do it in Satpy tests I run in to issues with ports remaining open and not being able to reopen them.

Were you doing anything fancy, like trying to reuse an event loop or state between tests? Or were you just using the public API? I would think that things would be fine as long as you're closing the cluster down between tests. Reports otherwise would be welcome.

@djhoese
Copy link
Contributor Author

djhoese commented Oct 14, 2019

No, nothing too fancy. When I originally wrote the tests I noticed that dask's own tests had some pytest fixtures for handling things so I assumed that might be necessary and just mocked the Client in my tests. I was creating a LocalCluster with 1 worker, creating a Client with that cluster, then after using them I would run .close() on both.

I should note, it passed locally just fine but on Travis it would fail.

Here's the error:

BUG:asyncio:Using selector: EpollSelector

DEBUG:asyncio:Using selector: EpollSelector

/home/travis/miniconda/envs/test/lib/python3.6/site-packages/distributed/dashboard/core.py:72: UserWarning: 

Port 8787 is already in use. 

Perhaps you already have a cluster running?

Hosting the diagnostics dashboard on a random port instead.

  warnings.warn("\n" + msg)

/home/travis/miniconda/envs/test/lib/python3.6/site-packages/distributed/dashboard/core.py:75: ResourceWarning: unclosed <socket.socket fd=30, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('0.0.0.0', 0)>

  raise

ERROR:asyncio:Task exception was never retrieved

future: <Task finished coro=<_wrap_awaitable() done, defined at /home/travis/miniconda/envs/test/lib/python3.6/asyncio/tasks.py:530> exception=RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.',)>

Traceback (most recent call last):

  File "/home/travis/miniconda/envs/test/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable

    return (yield from awaitable.__await__())

  File "/home/travis/miniconda/envs/test/lib/python3.6/site-packages/distributed/nanny.py", line 243, in start

    response = await self.instantiate()

  File "/home/travis/miniconda/envs/test/lib/python3.6/site-packages/distributed/nanny.py", line 325, in instantiate

    result = await self.process.start()

  File "/home/travis/miniconda/envs/test/lib/python3.6/site-packages/distributed/nanny.py", line 490, in start

    await self.process.start()

  File "/home/travis/miniconda/envs/test/lib/python3.6/site-packages/distributed/process.py", line 33, in _call_and_set_future

    res = func(*args, **kwargs)

  File "/home/travis/miniconda/envs/test/lib/python3.6/site-packages/distributed/process.py", line 190, in _start

    process.start()

  File "/home/travis/miniconda/envs/test/lib/python3.6/multiprocessing/process.py", line 105, in start

    self._popen = self._Popen(self)

  File "/home/travis/miniconda/envs/test/lib/python3.6/multiprocessing/context.py", line 291, in _Popen

    return Popen(process_obj)

  File "/home/travis/miniconda/envs/test/lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__

    super().__init__(process_obj)

  File "/home/travis/miniconda/envs/test/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__

    self._launch(process_obj)

  File "/home/travis/miniconda/envs/test/lib/python3.6/multiprocessing/popen_forkserver.py", line 42, in _launch

    prep_data = spawn.get_preparation_data(process_obj._name)

  File "/home/travis/miniconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 143, in get_preparation_data

    _check_not_importing_main()

  File "/home/travis/miniconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 136, in _check_not_importing_main

    is not going to be frozen to produce an executable.''')

RuntimeError: 

        An attempt has been made to start a new process before the

        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your

        child processes and you have forgotten to use the proper idiom

        in the main module:

            if __name__ == '__main__':

                freeze_support()

                ...

        The "freeze_support()" line can be omitted if the program

        is not going to be frozen to produce an executable.

No output has been received in the last 10m0s, this potentially indicates a stalled build or something wrong with the build itself.

Check the details on how to adjust your build configuration on: https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-received

So it is likely in the way we are starting our tests (using python setup.py test to run a unittest suite) that is causing the error. If I have time to get back to it I'll file a separate github issue.

@TomAugspurger
Copy link
Member

When I originally wrote the tests I noticed that dask's own tests had some pytest fixtures for handling things so I assumed that might be necessary and just mocked the Client in my tests.

Those are documented at https://distributed.dask.org/en/latest/develop.html#writing-tests. If you're making assertions about the cluster, it may be helpful to use those helpers, but otherwise they may be too tricky to use.

If I have time to get back to it I'll file a separate github issue.

We have one similar to this already. It's about creating a LocalCluster at the top-level of a python script. I'm not familiar with what setup.py test does unfortunately.

@djhoese
Copy link
Contributor Author

djhoese commented Oct 14, 2019

I'm not familiar with what setup.py test does unfortunately.

It's deprecated (as far as I understand), but essentially calls unittest with the test suite specified in setup.py.

@TomAugspurger
Copy link
Member

Perhaps suggesting sub-library developers to take advantage of projects like https://github.com/joblib/threadpoolctl may be a good suggestion. I've never used it myself, but discovered it when worrying about how Satpy uses the pykdtree library (OpenMP-based).

Just an FYI, scikit-learn is dealing with similar issues: scikit-learn/scikit-learn#14979.

@GenevieveBuckley GenevieveBuckley added the discussion Discussing a topic with no specific actions yet label Oct 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet
Projects
None yet
Development

No branches or pull requests

6 participants