Skip to content

Commit

Permalink
Patch for new distributed release (#78)
Browse files Browse the repository at this point in the history
- Use ncores instead of nthreads for new distributed
- Fixup dashboard link formatting

[ci skip]
  • Loading branch information
AlJohri authored and jcrist committed Jul 15, 2019
1 parent a991a88 commit 513c93d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -21,7 +21,7 @@ jobs:
include:
- env:
- CLUSTER_CONFIG=kerberos
- PYTHON=3.6
- PYTHON=3.7
- env:
- CLUSTER_CONFIG=simple
- PYTHON=2.7
Expand Down
4 changes: 1 addition & 3 deletions continuous_integration/travis/install.sh
Expand Up @@ -15,14 +15,12 @@ conda create -n test-environment \

source activate test-environment

pip install conda-pack grpcio protobuf grpcio-tools
pip install conda-pack skein

if [[ $1 == '2.7' ]]; then
pip install backports.weakref
fi

pip install --no-deps git+https://github.com/jcrist/skein

cd ~/dask-yarn
pip install -v --no-deps .

Expand Down
7 changes: 4 additions & 3 deletions dask_yarn/cli.py
Expand Up @@ -20,7 +20,8 @@

from . import __version__
from .compat import urlparse
from .core import _make_submit_specification, YarnCluster, _get_skein_client
from .core import (_make_submit_specification, YarnCluster, _get_skein_client,
_NTHREADS_KEYWORD)


class _Formatter(argparse.HelpFormatter):
Expand Down Expand Up @@ -359,8 +360,8 @@ def worker(nthreads=None, memory_limit=None): # pragma: nocover

loop = IOLoop.current()

worker = Nanny(scheduler, ncores=nthreads, loop=loop,
memory_limit=memory_limit, worker_port=0)
worker = Nanny(scheduler, loop=loop, memory_limit=memory_limit,
worker_port=0, **{_NTHREADS_KEYWORD: nthreads})

@gen.coroutine
def close(signalnum):
Expand Down
25 changes: 19 additions & 6 deletions dask_yarn/core.py
Expand Up @@ -17,6 +17,22 @@
from .compat import weakref, urlparse


DISTRIBUTED_VERSION = LooseVersion(distributed.__version__)

if DISTRIBUTED_VERSION >= '2.0.0':
_NTHREADS_KEYWORD = 'nthreads'
else:
_NTHREADS_KEYWORD = 'ncores'


try:
from distributed.utils import format_dashboard_link
except ImportError:
def format_dashboard_link(host, port):
template = dask.config.get('distributed.dashboard.link')
return template.format(host=host, port=port, **os.environ)


_memory_error = """Memory specification for the `{0}` take string parameters
with units like "4 GiB" or "2048 MiB"
Expand Down Expand Up @@ -324,11 +340,8 @@ def dashboard_link(self):
"""Link to the dask dashboard. None if dashboard isn't running"""
if self._dashboard_address is None:
return None
template = dask.config.get('distributed.dashboard.link')
dashboard = urlparse(self._dashboard_address)
params = dict(os.environ)
params.update({'host': dashboard.hostname, 'port': dashboard.port})
return template.format(**params)
return format_dashboard_link(dashboard.hostname, dashboard.port)

@classmethod
def from_specification(cls, spec, skein_client=None):
Expand Down Expand Up @@ -365,7 +378,7 @@ def _start_cluster(self, spec, skein_client=None):

if 'dask.scheduler' not in spec.services:
# deploy_mode == 'local'
if LooseVersion(distributed.__version__) >= '1.27.0':
if DISTRIBUTED_VERSION >= '1.27.0':
kwargs = {'dashboard_address': '0.0.0.0:0'}
else:
kwargs = {'diagnostics_port': ('', 0)}
Expand Down Expand Up @@ -582,7 +595,7 @@ def _widget_status(self):
workers = client.scheduler_info()['workers']

n_workers = len(workers)
cores = sum(w['ncores'] for w in workers.values())
cores = sum(w[_NTHREADS_KEYWORD] for w in workers.values())
memory = sum(w['memory_limit'] for w in workers.values())

text = """
Expand Down

0 comments on commit 513c93d

Please sign in to comment.