Skip to content

Commit

Permalink
Use Cluster superclass in distributed.deploy to remove functionality (#…
Browse files Browse the repository at this point in the history
…51)

* Use Cluster superclass in distributed.deploy to remove functionality

This generalizes the widget and adapt work in dask-kubernetes to upstream

* install distributed from PyPI
  • Loading branch information
mrocklin committed Mar 10, 2018
1 parent d5d87ca commit a6fc441
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 92 deletions.
2 changes: 1 addition & 1 deletion ci/environment-3.6.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ channels:
dependencies:
- flake8
- ipywidgets
- distributed=1.21.1
- dask=0.17.1
- nomkl
- pytest
- python=3.6
- pyyaml
- pip:
- kubernetes==4
- distributed==1.21.3
81 changes: 2 additions & 79 deletions dask_kubernetes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from tornado import gen
from tornado.ioloop import IOLoop

from distributed.deploy import LocalCluster
from distributed.deploy import LocalCluster, Cluster
import kubernetes

from .config import config
Expand All @@ -23,7 +23,7 @@
logger = logging.getLogger(__name__)


class KubeCluster(object):
class KubeCluster(Cluster):
""" Launch a Dask cluster on Kubernetes
This starts a local Dask scheduler and then dynamically launches
Expand Down Expand Up @@ -181,8 +181,6 @@ def __init__(

finalize(self, _cleanup_pods, self.namespace, self.pod_template.metadata.labels)

self._cached_widget = None

if n_workers:
self.scale(n_workers)

Expand Down Expand Up @@ -257,50 +255,6 @@ def namespace(self):
def name(self):
return self.pod_template.metadata.generate_name

def _widget(self):
""" Create IPython widget for display within a notebook """
if self._cached_widget:
return self._cached_widget
import ipywidgets
layout = ipywidgets.Layout(width='150px')

elements = []
if 'bokeh' in self.scheduler.services:
if 'diagnostics-link' in config:
template = config['diagnostics-link']
else:
template = 'http://{host}:{port}/status'

host = self.scheduler.address.split('://')[1].split(':')[0]
port = self.scheduler.services['bokeh'].port
link = template.format(host=host, port=port, **os.environ)
link = ipywidgets.HTML('<b>Dashboard:</b> <a href="%s" target="_blank">%s</a>' %
(link, link))
elements.append(link)

n_workers = ipywidgets.IntText(0, description='Requested', layout=layout)
actual = ipywidgets.Text('0', description='Actual', layout=layout)
button = ipywidgets.Button(description='Scale', layout=layout)
elements.extend([n_workers, actual, button])
box = ipywidgets.VBox(elements)
self._cached_widget = box

def cb(b):
n = n_workers.value
self.scale(n)

button.on_click(cb)

worker_ref = ref(actual)
scheduler_ref = ref(self.scheduler)

IOLoop.current().add_callback(_update_worker_label, worker_ref,
scheduler_ref)
return box

def _ipython_display_(self, **kwargs):
return self._widget()._ipython_display_(**kwargs)

@property
def scheduler(self):
return self.cluster.scheduler
Expand Down Expand Up @@ -444,19 +398,6 @@ def __exit__(self, type, value, traceback):
_cleanup_pods(self.namespace, self.pod_template.metadata.labels)
self.cluster.__exit__(type, value, traceback)

def adapt(self):
""" Have cluster dynamically allocate workers based on load
http://distributed.readthedocs.io/en/latest/adaptive.html
Examples
--------
>>> cluster = KubeCluster.from_yaml('worker-template.yaml')
>>> cluster.adapt()
"""
from distributed.deploy import Adaptive
return Adaptive(self.scheduler, self)


def _cleanup_pods(namespace, labels):
""" Remove all pods with these labels in this namespace """
Expand All @@ -481,24 +422,6 @@ def format_labels(labels):
return ''


@gen.coroutine
def _update_worker_label(worker_ref, scheduler_ref):
""" Periodically check the scheduler's workers and update widget
See Also
--------
KubeCluster._widget
"""
while True:
worker = worker_ref()
scheduler = scheduler_ref()
if worker and scheduler:
worker.value = str(len(scheduler.workers))
else:
return
yield gen.sleep(0.5)


def _namespace_default():
"""
Get current namespace if running in a k8s cluster
Expand Down
14 changes: 4 additions & 10 deletions dask_kubernetes/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import yaml

import pytest
from distributed.config import set_config
from dask_kubernetes import KubeCluster, make_pod_spec, config
from dask.distributed import Client, wait
from distributed.utils_test import loop # noqa: F401
Expand Down Expand Up @@ -91,9 +92,7 @@ def test_ipython_display(cluster):
assert cluster._cached_widget is box

start = time()
workers = [child for child in box.children
if child.description == 'Actual'][0]
while workers.value == 0:
while "<td>1</td>" not in str(box): # one worker in a table
assert time() < start + 10
sleep(0.5)

Expand All @@ -109,18 +108,13 @@ def test_dask_worker_name_env_variable(pod_spec, loop, ns):

def test_diagnostics_link_env_variable(pod_spec, loop, ns):
pytest.importorskip('bokeh')
config['diagnostics-link'] = 'foo-{USER}-{port}'
try:
with set_config(**{'diagnostics-link': 'foo-{USER}-{port}'}):
with KubeCluster(pod_spec, loop=loop, namespace=ns) as cluster:
port = cluster.scheduler.services['bokeh'].port
cluster._ipython_display_()
box = cluster._cached_widget

link = box.children[0]
assert 'foo-' + getpass.getuser() + '-' + str(port) in link.value
assert 'href' in link.value
finally:
del config['diagnostics-link']
assert 'foo-' + getpass.getuser() + '-' + str(port) in str(box)


def test_namespace(pod_spec, loop, ns):
Expand Down
6 changes: 4 additions & 2 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ There are a few special environment variables that affect dask-kubernetes behavi

cluster = KubeCluster() # reads provided yaml file

2. ``DASK_KUBERNETES_DIAGNOSTICS_LINK``: a Python pre-formatted string that shows
2. ``DASK_DIAGNOSTICS_LINK``: a Python pre-formatted string that shows
the location of Dask's dashboard. This string will receive values for
``host``, ``port``, and all environment variables. This is useful when
using dask-kubernetes with JupyterHub and nbserverproxy to route the dashboard
link to a proxied address as follows::

export DASK_KUBERNETES_DIANGOSTICS_LINK="{JUPYTERHUB_SERVICE_PREFIX}proxy/{port}/status"
export DASK_DIANGOSTICS_LINK="{JUPYTERHUB_SERVICE_PREFIX}proxy/{port}/status"

This is inherited from general Dask behavior.

3. ``DASK_KUBERNETES_WORKER_NAME``: a Python pre-formatted string to use
when naming dask worker pods. This string will receive values for ``user``,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dask[distributed]
distributed>=1.21.3
kubernetes==4

0 comments on commit a6fc441

Please sign in to comment.