Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Environments and multi-task persistent state #85

Open
broxtronix opened this issue Jan 23, 2016 · 28 comments
Open

Environments and multi-task persistent state #85

broxtronix opened this issue Jan 23, 2016 · 28 comments

Comments

@broxtronix
Copy link
Contributor

I frequently use GPU clusters, and for these workloads I find that I need to do some relatively expensive setup or teardown operations to establish a valid GPU computing context on each worker before I start the distributed processing of my data. There are various ways to "trick" distributed workers into maintaining a stateful execution environment for multiple jobs, probably the best of which is to use global variable in the Python interpreter itself. However, this solution (and others I'm aware of) do not provide particularly fine grained control over the distributed execution environment. It would be very useful if distributed could provide some way to explicitly manage execution "environments" in which jobs could be explicitly run.

I'm imagining something along these lines. I would call

my_gpu_env = executor.environment(my_setup_function(), my_teardown_function())

which would cause each worker to run my_setup_function() and then return a reference to its copy of the new environment. The scheduler would record these and send back a single reference to the collection of environments on the workers, my_new_env in this example. Subsequent calls to run distributed functions could specify the environment in which they would like to be run:

executor.map( some_function_using_the_gpu, data, environment = my_gpu_env )

The scheduler could keep track of the setup() and teardown() functions associated with each environment. Then, if a new worker comes online and is asked to run a function in an environment that it has not yet set up, it could request the necessary initialization routine from the scheduler and run that first before running any jobs.

This is a somewhat rough sketch of what would be desirable here, and I'm curious to start a discussion here to see if there are other users out there that might also want a feature like this. In particular, are there others using distributed to manage a cluster of GPU nodes? How do you manage a cluster-wide execution context?

@mrocklin
Copy link
Member

cc @seibert

@sklam
Copy link

sklam commented Mar 22, 2016

I am working on getting the CUDA support in numba to work with distributed. Numba lazily initializes the CUDA driver. That solves the setup issue. However, there are times when I want to execute some teardown function so that the CUDA profiler works properly. Some way to control environment setup/teardown will be useful.

@Kobzol
Copy link
Contributor

Kobzol commented Apr 16, 2016

I'd want something similar. You talked about how you can trick distributed into keeping the data on the workers and not destroying them, how can I do that, keep a reference to a future containing the data?
I would ideally like to do something like this:
e.save(data) # send data to all workers and keep it there
and then repeatedly (but not at once, so a simple map doesn't suffice) this:
e.submit(fn, data, other_args, ...) # compute function on the remote data, do not resend it to the workers
Is there a way how to do this in distributed?

@mrocklin
Copy link
Member

Under normal operation you would just scatter data to the network

[future] = e.scatter([data])
x = e.submit(func, future, other_args)

And then you would trust the system to share the data around as needed to perform computations. If you really want to enforce that the data go everywhere you would add the broadcast=True keyword argument.

[future] = e.scatter([data], broadcast=True)

But this is generally slower than just trusting the network to move the data around as necessary. You'll probably want to update from master to get the benefits of worker-to-worker work stealing from #229 .

@Kobzol
Copy link
Contributor

Kobzol commented Apr 16, 2016

I have a specific scenario where I basically want to distribute a resource that can't be easily divided, so I just want to distribute it at the beginning to everyone and then tell the workers to take a chunk from it for each task.
Will scatter ensure that the data won't be deleted? How long will it last on the workers?
I tried scatter before already, but it gives me an error (it works without broadcast).
I'm using a version of distributed downloaded via pip.
The object that I'm trying to scatter is a custom Python object that implements the iterator interface.

e.scatter({"data": object}, broadcast=True)
File "/usr/local/lib/python2.7/dist-packages/distributed/worker.py", line 440, in update_data
    data = valmap(loads, data)
  File "/usr/local/lib/python2.7/dist-packages/toolz/dicttoolz.py", line 84, in valmap
    rv.update(zip(iterkeys(d), map(func, itervalues(d))))
AttributeError: 'list' object has no attribute 'iterkeys'```

@mrocklin
Copy link
Member

The scheduler will keep all data pointed to by a Future in memory. It will stay on the cluster for as long as you keep ahold of the future. When the future gets garbage collected from your local process, the scheduler will clean up the remote data.

Scatter not supporting dictionaries during broadcast was a bug. It has been fixed in #230 . Thanks for reporting! Generally people hand scatter lists rather than dictionaries; there is rarely a need for users to specify keys directly.

Sending a custom Python object is fine. Sending an iterator probably isn't. The object will have to be cloud-pickleable and it's assumed that your functions won't mutate it. So rather than sending an iterator I would send an iterable.

@Kobzol
Copy link
Contributor

Kobzol commented Apr 16, 2016

Thanks, I will try it with master version. I'm sending an iterator, but it doesn't matter how the workers modify it, it's basically immutable from their view (they can select a specific index from it, so it's more like a list anyway).

I also tried something like this instead of the scatter:

dask = {
   "data": data,
   "task1": (fn, data, arg1),
   "task2": (fn, data, arg2),
   ...
   "result": ["task1", "task2", ...]
}
result = e.get(dask, "result")

After "profiling" it with wireshark, I noticed that for every task the whole data object was sent over the network. I though that the workers (I used four workers and hundreds of tasks) would just get their own local copy of data and then operate on it. Or do they assume that the data changes? Is there any way to mark the data immutable so that it isn't sent for every task?

@mrocklin
Copy link
Member

If you want to use graphs explicitly you would need to modify your graph to point to the "data" key rather than to the object itself.

Before

dask = {
   "data": data,
   "task1": (fn, data, arg1),
   "task2": (fn, data, arg2),
   ...
   "result": ["task1", "task2", ...]
}

After

dask = {
   "data": data,
   "task1": (fn, "data", arg1),
   "task2": (fn, "data", arg2),
   ...
   "result": ["task1", "task2", ...]
}

But really I would just do the following:

[data_future] = e.scatter([data])
tasks = [e.submit(fn, data_future, arg) for arg in args]
results = e.gather(tasks)

@Kobzol
Copy link
Contributor

Kobzol commented May 2, 2016

When I do this:

[data_future] = e.scatter([data])
tasks = [e.submit(fn, data_future, arg) for arg in args]
results = e.gather(tasks)

the result returned from e.gather(tasks) is None.
I checked the tasks and they were all finished, except for a few that ended with this error:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 779, in start
    old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
ValueError: set_wakeup_fd only works in main thread
distributed.worker - WARNING -  Compute Failed
Function: set_and_compute
args:     (Map, 448, 16)
kwargs:   {}

If I launch the exact same code, but without the scatter (passing data directly into submit), it works.
Do you have any idea what might be causing the error?
I'm using Nannies for launching workers, it happens both when starting scheduler and workers from the API and the CLI.

@mrocklin
Copy link
Member

mrocklin commented May 2, 2016

This seems like a separate issue? Perhaps you should raise a new issue.

Additionally it would be useful to have more information about your data and function. If you can, it'd be great to have an example that I can run locally to reproduce the error.

@Kobzol
Copy link
Contributor

Kobzol commented May 2, 2016

I'll investigate further and if I'll be able to make a MWE and the issue will persist, I will create a separate issue. Thanks.

@mrocklin
Copy link
Member

mrocklin commented Sep 2, 2016

OK, here is a proposal:

An environment has a few components:

  1. A unique name, like gpu
  2. A bit of state, that lives in some global attached to the worker that we make accessible to tasks
  3. A method to determine if a worker is part of the environment
  4. A setup function
  5. A teardown function to run when the worker closes.

Examples

A simple case might be that we want all nodes with a decent amount of available memory:

def has_high_memory():
    import psutil
    return psutil.virtual_memory.total > 30e9

e.register_environment(name='high-memory', condition=has_high_memory)

future = e.submit(func, *args, workers='high-memory')

Example 2 Database access

def has_database_access():
    try:
        connect_to_database()
        return True
    except:
        return False

def setup():
    conn = connect_to_database()
    return conn

def teardown(state):
    state.disconnect()

e.register_environment('database', condition=has_database_access, setup=setup, teardown=teardown)

def get_data(query):
    conn = get_environment_state('database')
    ...

e.submit(get_data, query, workers='database')

I imagine that this could also be a good place for a class.

How this would work

We would run these functions whenever a worker connected and keep them within a group based on whether or not they passed the check. This allows for a decent amount of control on the user's part to choose what kinds of nodes they're looking for. It does run everywhere though, so this should be fairly fast and unlikely to crash things.

Questions

  • Does this satisfy everyone's needs?
  • Are there callbacks that I'm missing?

cc @sklam @broxtronix

@mrocklin
Copy link
Member

mrocklin commented Sep 2, 2016

This could probably start with just the conditions, and add in setup/teardown later. I know many people who would appreciate having worker groups determined by running a predicate on every worker. I don't know as many people who need worker-level setup/teardown. Broxtronix is the exception, but I expect that he has moved on.

@sklam
Copy link

sklam commented Sep 6, 2016

Is the condition executed by every worker-thread?

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2016

As presented here it would be evaluated by every worker process. For the purposes of deciding where something can run all threads in a worker process are considered equivalent.

@sklam
Copy link

sklam commented Sep 6, 2016

So, if I run dask-worker --nprocs=2 --nthreads=3, the condition would run twice?

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2016

Correct

@sklam
Copy link

sklam commented Sep 6, 2016

Would setup/teardown be different? Would be nice if they are executed on per-thread level. For instance, every thread gets a different GPU in a multiGPU context. Would every thread get a different DB connection be good as well?

Anyway, I like the proposal. We can start experimenting with just the condition fn.

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2016

It is less convenient though still doable to add setup per thread. Mostly this is because threads can, to a certain extent, come and go. I would want to do this after setup-per-process, which would come after condition-per-process.

@kszucs
Copy link
Contributor

kszucs commented Sep 9, 2016

The ability that a worker can connect to a database can be evaluated at startup time. Mesos uses attributes for this purpose: --attributes='rack:abc;zone:west;os:centos5;level:10;keys:[1000-1500]' when a mesos-slave starts. For conditions there are roles, see http://mesos.apache.org/documentation/latest/attributes-resources/ .

Marathon defines constraints for task placement.

I can imagine a syntax like dask.callbacks for contextual operations with an additional constraints method. BTW do callbacks work with distributed?

@gminorcoles
Copy link

gminorcoles commented Sep 27, 2016

I have a chunk of data that should live on each worker between task executions. That chunk of data, which is a dictionary of pandas DataFrames, is operated on by each task , but the operations vary depending on the other task parameters. I am trying the scatter() trick you outlined above, but running this on one 36-node machine, where my workers are spawned locally for testing, copying the data between workers is, essentally interminable. It has been taking 25 minutes to scatter this data chunk which is I think about 980MB.

I am new to Dask but I am pretty sure that the ability to pin this data to each worker once, instead of recomputing it every time my task function runs, would be a big advantage.

@mrocklin
Copy link
Member

mrocklin commented Oct 3, 2016

@gminorcoles your question is not related to this issue. Consider using the broadcast= keyword to scatter, described in the API docs here: http://distributed.readthedocs.io/en/latest/api.html#distributed.client.Client.scatter

If you have further questions or comments please raise on StackOverflow or a separate GitHub issue.

@thompson42
Copy link

@mrocklin - has the solution you proposed for "Example 2 Database access" namely:

e.register_environment('database', condition=has_database_access, setup=setup, teardown=teardown)

e.submit(get_data, query, workers='database')

Been implemented or persued at this stage?

Reason I ask is that I am looking at integrating the Cassandra python driver into dask.distributed at the worker level. The database connection pool will only form a connection to the local Cassandra instance and needs to stay persistently available and accessible to the worker.

Or is there another way that people are efficiently handling database connections in dask.distributed?

@mrocklin
Copy link
Member

@thompson42 Environments were implemented in #505 but never merged, so the short answer to your question is "no". However there is renewed activity on rebuilding the workers here: #704 which might swallow the environment work in the near future.

@thompson42
Copy link

@mrocklin - wow, quick response, thanks.

So is there currently a way to handle database connections other than create and destroy them on each job?

@mrocklin
Copy link
Member

That would be the simplest way, yes. You could also use globals if you do it cleverly, perhaps by attaching a value to a module.

def select(query):
    import dask
    try:
        conn = dask.conn
    except AttributeError:
        conn = dask.conn = connect('my-database')

    results = conn.select(query)

There is also the distributed.worker.thread_state global to which you can attach things on a per-thread basis, which might be a bit safer.

@olly-writes-code
Copy link

Any idea what happened to this? Was this solved in a later release or is it still an open problem?

@olly-writes-code
Copy link

@thompson42 - how did you solve this for your use case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants