Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: worker environments #505

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .compatibility import Queue as pyQueue, Empty, isqueue
from .core import (read, write, connect, coerce_to_rpc, dumps,
clean_exception, loads)
from .environments import Environment
from .worker import dumps_function, dumps_task
from .utils import (All, sync, funcname, ignoring, queue_to_iterator,
tokey, log_errors, str_graph)
Expand Down Expand Up @@ -1090,6 +1091,28 @@ def get_dataset(self, name):
"""
return sync(self.loop, self._get_dataset, tokey(name))

@gen.coroutine
def _register_environment(self, name, env):
if not isinstance(env, Environment):
env = Environment(condition=env)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also do this with keyword arguments, like the following:

def register_environment(self, name, environment=None, condition=None):
    ...

envs = {name: dumps(env)}
exceptions = yield self.scheduler.register_environments(environments=envs)
if exceptions:
raise loads(exceptions[name][0]) # only raise the first exception

def register_environment(self, name, env):
"""Register an environment with the scheduler.

Parameters
----------
name : str
The name of the environment.
env : Environment or callable
The environment to register. If a callable, will be used as the
``condition`` method of the environment.
"""
return sync(self.loop, self._register_environment, name, env)

@gen.coroutine
def _run(self, function, *args, **kwargs):
nanny = kwargs.pop('nanny', False)
Expand Down
16 changes: 16 additions & 0 deletions distributed/environments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import absolute_import, division, print_function


class Environment(object):
def __init__(self, condition=None, setup=None):
self._condition = condition
self._setup = setup
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps default to the following:

self.condition = condition or lambda: True
self.setup = setup or lambda: None

and then do away with the method definitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you can't subclass without overriding __init__.


def condition(self):
if self._condition:
return self._condition()
return True

def setup(self):
if self._setup:
self._setup()
159 changes: 107 additions & 52 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ def __init__(self, center=None, loop=None,
self.saturated = set()
self.occupancy = dict()

# Environment state
self.environments = {}
self.environment_workers = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I ask that we also include environment_workers in self.identity()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine wanting this information in diagnostics

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


self.plugins = []
self.transition_log = deque(maxlen=config.get('transition-log-length',
100000))
Expand Down Expand Up @@ -269,6 +273,7 @@ def __init__(self, center=None, loop=None,
'get_dataset': self.get_dataset,
'publish_dataset': self.publish_dataset,
'unpublish_dataset': self.unpublish_dataset,
'register_environments': self.register_environments,
'update_data': self.update_data,
'change_worker_cores': self.change_worker_cores}

Expand Down Expand Up @@ -336,7 +341,8 @@ def identity(self, stream):
'id': str(self.id),
'workers': list(self.ncores),
'services': {key: v.port for (key, v) in self.services.items()},
'workers': dict(self.worker_info)}
'workers': dict(self.worker_info),
'environments': valmap(set, self.environment_workers)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Msgpack may not like the sets. Recommend lists instead.

return d

def start(self, port=8786, start_queues=True):
Expand Down Expand Up @@ -502,17 +508,28 @@ def add_worker(self, stream=None, address=None, keys=(), ncores=None,
if self.ncores[address] > len(self.processing[address]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see that you've pulled it out below. Will take a look.

Generally some history here is that for a while I pulled everything out into little small functions like this. All of the indirection soon became very hard to follow. I tried to keep everything fairly shallow after the refactor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are obviously pros and cons here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's called in 2 places, so I figured it could be its own tiny method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't think of a cleaner way. In that case I suppose I'm just bringing it up to bring it up. Shallowness is a design goal of sorts when convenient.

self.idle.add(address)

for key in list(self.unrunnable):
r = self.restrictions.get(key, [])
if address in r or host in r or name in r:
self.transitions({key: 'released'})
if self.environments:
self.loop.add_callback(self.register_environments,
workers=[address])
else:
# Called by `register_environments`, no need to call here
self._maybe_transition_unrunnable_release([address])

self.maybe_idle.add(address)
self.ensure_occupied()

logger.info("Register %s", str(address))
return 'OK'

def _maybe_transition_unrunnable_release(self, workers):
recommendations = {}
for key in self.unrunnable:
if key in self.restrictions:
if self.workers_set(self.restrictions[key]).intersection(workers):
recommendations[key] = 'released'
if recommendations:
self.transitions(recommendations)

def update_graph(self, client=None, tasks=None, keys=None,
dependencies=None, restrictions=None, priority=None,
loose_restrictions=None):
Expand Down Expand Up @@ -577,10 +594,9 @@ def update_graph(self, client=None, tasks=None, keys=None,
self.priority[key] = (self.generation, new_priority[key]) # prefer old

if restrictions:
restrictions = {k: set(map(self.coerce_address, v))
restrictions = {k: set(tuple(r) if isinstance(r, list) else r for r in v)
for k, v in restrictions.items()}
self.restrictions.update(restrictions)

if loose_restrictions:
self.loose_restrictions |= set(loose_restrictions)

Expand Down Expand Up @@ -711,6 +727,9 @@ def remove_worker(self, stream=None, address=None):
if address in self.saturated:
self.saturated.remove(address)

for worker_sets in self.environment_workers.values():
worker_sets.discard(address)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test inside of Scheduler.validate to check that all workers in self.environment_workers are also inself.worker_info. Don't bother with scheduler.py:validate() (which is the prominent function in Scheduler.validate)


recommendations = OrderedDict()

in_flight = set(self.processing.pop(address))
Expand Down Expand Up @@ -740,7 +759,6 @@ def remove_worker(self, stream=None, address=None):
else:
recommendations[key] = 'forgotten'


self.transitions(recommendations)

if not self.stacks:
Expand Down Expand Up @@ -1186,9 +1204,11 @@ def scatter(self, stream=None, data=None, workers=None, client=None,
"""
if not self.ncores:
raise ValueError("No workers yet found.")
workers2 = list(self.workers_set(workers))
if workers is not None:
workers = [self.coerce_address(w) for w in workers]
ncores = workers if workers is not None else self.ncores
ncores = {w: self.ncores[w] for w in workers2}
else:
ncores = self.ncores
keys, who_has, nbytes = yield scatter_to_workers(ncores, data,
report=False,
serialize=False)
Expand All @@ -1200,7 +1220,7 @@ def scatter(self, stream=None, data=None, workers=None, client=None,
n = len(ncores)
else:
n = broadcast
yield self.replicate(keys=keys, workers=workers, n=n)
yield self.replicate(keys=keys, workers=workers2, n=n)

raise gen.Return(keys)

Expand Down Expand Up @@ -1267,20 +1287,9 @@ def restart(self, environment=None):
logger.exception(e)

@gen.coroutine
def broadcast(self, stream=None, msg=None, workers=None, hosts=None,
nanny=False):
def broadcast(self, stream=None, msg=None, workers=None, nanny=False):
""" Broadcast message to workers, return all results """
if workers is None:
if hosts is None:
workers = list(self.ncores)
else:
workers = []
if hosts is not None:
for host in hosts:
if host in self.host_info:
workers.extend([host + ':' + port
for port in self.host_info[host]['ports']])
# TODO replace with worker_list
workers = self.workers_set(workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hooray!


if nanny:
addresses = []
Expand All @@ -1301,7 +1310,7 @@ def rebalance(self, stream=None, keys=None, workers=None):
""" Rebalance keys so that each worker stores roughly equal bytes """
with log_errors():
keys = set(keys or self.who_has)
workers = set(workers or self.ncores)
workers = self.workers_set(workers)

if not keys.issubset(self.who_has):
raise Return({'status': 'missing-data',
Expand Down Expand Up @@ -1391,7 +1400,7 @@ def replicate(self, stream=None, keys=None, n=None, workers=None, branching_fact
Scheduler.rebalance
"""
with log_errors():
workers = set(self.workers_list(workers))
workers = self.workers_set(workers)
if n is None:
n = len(workers)
n = min(n, len(workers))
Expand Down Expand Up @@ -1567,14 +1576,14 @@ def feed(self, stream, function=None, setup=None, teardown=None, interval=1, **k

def get_stacks(self, stream=None, workers=None):
if workers is not None:
workers = set(map(self.coerce_address, workers))
workers = self.workers_set(workers)
return {w: list(self.stacks[w]) for w in workers}
else:
return valmap(list, self.stacks)

def get_processing(self, stream=None, workers=None):
if workers is not None:
workers = set(map(self.coerce_address, workers))
workers = self.workers_set(workers)
return {w: list(self.processing[w]) for w in workers}
else:
return valmap(list, self.processing)
Expand All @@ -1587,14 +1596,14 @@ def get_who_has(self, stream=None, keys=None):

def get_has_what(self, stream=None, workers=None):
if workers is not None:
workers = map(self.coerce_address, workers)
workers = self.workers_set(workers)
return {w: list(self.has_what.get(w, ())) for w in workers}
else:
return valmap(list, self.has_what)

def get_ncores(self, stream=None, workers=None):
if workers is not None:
workers = map(self.coerce_address, workers)
workers = self.workers_set(workers)
return {w: self.ncores.get(w, None) for w in workers}
else:
return self.ncores
Expand Down Expand Up @@ -1635,6 +1644,38 @@ def get_dataset(self, stream, name=None, client=None):
else:
raise KeyError("Dataset '%s' not found" % name)

@gen.coroutine
def register_environments(self, stream=None, environments=None, workers=None):
if environments is None:
environments = self.environments
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the importance of this case?

else:
for name, env in environments.items():
if name in self.environments and self.environments[name] != env:
raise KeyError("Environment %s already exists" % name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps only err if the value is different?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

self.environments[name] = env
self.environment_workers[name] = set()

if workers is None:
workers = list(self.worker_info)

results = yield [self.rpc(addr=worker).register_environments(environments=environments)
for worker in workers]

exceptions = {}
for worker, result in zip(workers, results):
for name, val in result.items():
if val is True:
self.environment_workers[name].add(worker)
elif isinstance(val, str):
if name not in exceptions:
exceptions[name] = [val]
else:
exceptions[name].append(val)

self._maybe_transition_unrunnable_release(workers)

raise Return(exceptions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to return environment counts back to the client here. I expect people will want feedback on how many gpus machines they have and such or if their condition was ever satisfied.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's doable. Since these can change as workers are added, perhaps making environment_workers accessible on the client might be nice as well (would probably call it environments and make it a property that returns a dictionary).


def change_worker_cores(self, stream=None, worker=None, diff=0):
""" Add or remove cores from a worker

Expand Down Expand Up @@ -1722,10 +1763,15 @@ def transition_waiting_ready(self, key):
del self.waiting[key]

if self.dependencies.get(key, None) or key in self.restrictions:
if key in self.restrictions:
restrictions = self.workers_set(self.restrictions[key])
loose = key in self.loose_restrictions
else:
loose = restrictions = None
new_worker = decide_worker(self.dependencies, self.stacks,
self.processing, self.who_has, self.has_what,
self.restrictions, self.loose_restrictions, self.nbytes,
key)
self.processing, self.who_has,
self.has_what, restrictions, loose,
self.nbytes, key)
if not new_worker:
self.unrunnable.add(key)
self.task_state[key] = 'no-worker'
Expand Down Expand Up @@ -2651,23 +2697,32 @@ def coerce_address(self, addr):
addr = '%s:%d' % (ip, port)
return addr

def workers_list(self, workers):
"""
List of qualifying workers
def workers_set(self, workers):
"""Set of qualifying workers.

Takes a list of worker addresses or hostnames.
Returns a list of all worker addresses that match
Takes a list of worker addresses, aliases, ip's, hostnames, or
environment names. Returns a list of all worker addresses that match
"""
if workers is None:
return list(self.ncores)
return set(self.ncores)

out = set()
for w in workers:
if ':' in w:
out.add(w)
if isinstance(w, list):
w = tuple(w)
if w in self.environment_workers:
out.update(self.environment_workers[w])
else:
out.update({ww for ww in self.ncores if w in ww}) # TODO: quadratic
return list(out)
try:
w = self.coerce_address(w)
except Exception:
continue
if ':' in w:
out.add(w)
else:
out.update('%s:%s' % (w, port)
for port in self.host_info[w].get('ports', ()))
return out
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can see how the hostname environment-name ambiguity will start to hurt here


def start_ipython(self, stream=None):
"""Start an IPython kernel
Expand All @@ -2685,7 +2740,7 @@ def start_ipython(self, stream=None):


def decide_worker(dependencies, stacks, processing, who_has, has_what, restrictions,
loose_restrictions, nbytes, key):
loose, nbytes, key):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good move. I would expect a lot of test_scheduler.py tests to get angry with you though :)

""" Decide which worker should take task

>>> dependencies = {'c': {'b'}, 'b': {'a'}}
Expand All @@ -2694,8 +2749,8 @@ def decide_worker(dependencies, stacks, processing, who_has, has_what, restricti
>>> who_has = {'a': {'alice:8000'}}
>>> has_what = {'alice:8000': {'a'}}
>>> nbytes = {'a': 100}
>>> restrictions = {}
>>> loose_restrictions = set()
>>> restrictions = None
>>> loose = None

We choose the worker that has the data on which 'b' depends (alice has 'a')

Expand Down Expand Up @@ -2739,15 +2794,15 @@ def decide_worker(dependencies, stacks, processing, who_has, has_what, restricti
for w in who_has[dep]])
if not workers:
workers = stacks
if key in restrictions:
r = restrictions[key]
workers = {w for w in workers if w in r or w.split(':')[0] in r} # TODO: nonlinear
if restrictions is not None:
workers = restrictions.intersection(workers)
if not workers:
workers = {w for w in stacks if w in r or w.split(':')[0] in r}
workers = restrictions.intersection(stacks)
if not workers:
if key in loose_restrictions:
if loose:
return decide_worker(dependencies, stacks, processing,
who_has, has_what, {}, set(), nbytes, key)
who_has, has_what, {}, False,
nbytes, key)
else:
return None
if not workers or not stacks:
Expand Down