Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Global Variable coordination object #1128

Merged
merged 6 commits into from Jun 4, 2017
Merged

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Jun 3, 2017

This creates a global Variable object to which clients can set and get
values. Coordination happens through the central scheduler.

In [1]: from distributed import Client, Variable
In [2]: client = Client()

In [3]: v = Variable('var-1')
In [4]: v.set(1)
In [5]: v.get()
Out[5]: 1

In [6]: client.scheduler.address
Out[6]: 'tcp://127.0.0.1:36535'
In [1]: from distributed import Client, Variable
In [2]: client = Client('tcp://127.0.0.1:36535')
In [3]: v = Variable('var-1')  # same name

In [4]: v.get()
Out[4]: 1

This can send msgpack values or futures

cc @MLnick this might be a better way to publish your betas rather than with channels.

This creates a global `Variable` object to which clients can set and get
values.  Coordination happens through the central scheduler.

```python
In [1]: from distributed import Client, Variable
In [2]: client = Client()

In [3]: v = Variable('var-1')
In [4]: v.set(1)
In [5]: v.get()
Out[5]: 1

In [6]: client.scheduler.address
Out[6]: 'tcp://127.0.0.1:36535'
```

```python
In [1]: from distributed import Client, Variable
In [2]: client = Client('tcp://127.0.0.1:36535')
In [3]: v = Variable('var-1')  # same name

In [4]: v.get()
Out[4]: 1
```

This can send msgpack values or futures
@mrocklin mrocklin merged commit aa2399c into dask:master Jun 4, 2017
@mrocklin mrocklin deleted the variables branch June 4, 2017 12:23
@MLnick
Copy link

MLnick commented Jun 5, 2017

@mrocklin thanks! I will take a look at using this in context of the dask-glm work when I get a gap.

@d-diaz
Copy link

d-diaz commented Nov 5, 2018

@mrocklin, I'm mapping a pipeline of dask.delayed functions (mostly wrappers of command line calls that manipulate data files) to a list of inputs using dask.distributed and have been trying to keep track of stages of the pipeline that fail and capture the error messages associated with them without halting the whole pipeline. I've been struggling to interpret the dask.distributed docs regarding client.scatter vs. dask.distributed.Variable to know the best way to approach this.

In a nutshell, if one of my dask.delayed functions in the pipeline raises an exception, I want to log it in some type of shared object (e.g., a list or dictionary) so that I can come around aftewards and figure out what failed. The best I've come up with so far to accomplish this is setting up a log_error function that gets and sets a global variable such as the following code snippet. I can't imagine this is an efficient way to be doing this, but couldn't figure out how to append or set individual elements of a global variable rather than getting and re-setting the entire object each time I log an error.

import dask
from dask.distributed import Variable, Client, progress

client = Client()

log = Variable('log')
log.set({})

def log_error(data_id, process_name, error_msg):
    temp_log = log.get()
    temp_log[data_id] = {process_name: error_msg}
    log.set(temp_log)

@dask.delayed
def step1(data_id):
    try:
        do_something(data_id)
    except Exception as e:
        log_error(data_id, 'step1', e.message)
    return data_id

@dask.delayed
def step2(data_id):
    try:
        do_something(data_id)
    except Exception as e:
        log_error(data_id, 'step2', e.message)
    return data_id
...
@dask.delayed
def all_done(*args, **kwargs):
    return

....
# constructing a custom graph 
dsk = {}
for id in data_ids:
    dsk['step1-{}'.format(id)]=(step1, id)
    dsk['step2-{}'.format(id)]=(step2, 'step1-{}'.format(id))
    ...
dask['all_done'] = (all_done, ['step1-{}'.format(id), 'step2-{}'.format(id), ...]
...

work_to_do = client.get(dsk, 'all_done')
results = client.compute(work_to_do)
progress(results) # this may take a while
...

log.get() # to inspect any errors captured during the pipeline

Thanks for any insights or guidance you can offer.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 16, 2018 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants