diff --git a/.circleci/config.yml b/.circleci/config.yml index a74f7ce9c..ebfd2fe14 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -70,7 +70,7 @@ jobs: - "/home/circleci/miniconda" - run: command: | - /home/circleci/miniconda/envs/dask-kubernetes-test/bin/py.test dask_kubernetes -s --verbose --worker-image daskdev/dask:0.17.1 + /home/circleci/miniconda/envs/dask-kubernetes-test/bin/py.test dask_kubernetes -s --verbose --worker-image daskdev/dask:0.17.4.2018-05-18 - run: command: | /home/circleci/miniconda/envs/dask-kubernetes-test/bin/flake8 dask-kubernetes diff --git a/MANIFEST.in b/MANIFEST.in index a10815176..54b70417f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ recursive-include dask_kubernetes *.py +recursive-include dask_kubernetes *.yaml include setup.py include setup.cfg diff --git a/ci/environment-3.6.yml b/ci/environment-3.6.yml index 690f3ddb8..fa3228b1a 100644 --- a/ci/environment-3.6.yml +++ b/ci/environment-3.6.yml @@ -4,11 +4,13 @@ channels: dependencies: - flake8 - ipywidgets - - dask=0.17.1 + - dask=0.17.4 + - distributed=1.21.8 - nomkl - pytest - python=3.6 - pyyaml - pip: - - kubernetes==4 - - distributed==1.21.3 + - kubernetes==6 + - git+https://github.com/dask/dask.git@20145c9a5d12d965ab1ffe52e0c37659aec70ac2 + - git+https://github.com/dask/distributed.git@ed2adf554fc818ec8d4d5a6eac393dcd033e023c diff --git a/dask_kubernetes/__init__.py b/dask_kubernetes/__init__.py index bdebb4075..cb4692501 100644 --- a/dask_kubernetes/__init__.py +++ b/dask_kubernetes/__init__.py @@ -1,3 +1,4 @@ +from . import config from .core import KubeCluster from .objects import make_pod_spec, make_pod_from_dict diff --git a/dask_kubernetes/config.py b/dask_kubernetes/config.py new file mode 100644 index 000000000..14c5bbd7f --- /dev/null +++ b/dask_kubernetes/config.py @@ -0,0 +1,15 @@ +from __future__ import print_function, division, absolute_import + +import os + +import dask +import yaml + + +fn = os.path.join(os.path.dirname(__file__), 'kubernetes.yaml') +dask.config.ensure_file(source=fn) + +with open(fn) as f: + defaults = yaml.load(f) + +dask.config.update_defaults(defaults) diff --git a/dask_kubernetes/core.py b/dask_kubernetes/core.py index 8bddfeed9..0c6b1033c 100644 --- a/dask_kubernetes/core.py +++ b/dask_kubernetes/core.py @@ -13,11 +13,11 @@ except ImportError: yaml = False -from tornado import gen +import dask from distributed.deploy import LocalCluster, Cluster -from distributed.config import config from distributed.comm.utils import offload import kubernetes +from tornado import gen from .objects import make_pod_from_dict, clean_pod_template @@ -128,22 +128,34 @@ def __init__( pod_template=None, name=None, namespace=None, - n_workers=0, - host='0.0.0.0', - port=0, + n_workers=None, + host=None, + port=None, env=None, **kwargs ): - if pod_template is None: - if 'kubernetes-worker-template-path' in config: - import yaml - with open(config['kubernetes-worker-template-path']) as f: - d = yaml.safe_load(f) - pod_template = make_pod_from_dict(d) - else: - msg = ("Worker pod specification not provided. See KubeCluster " - "docstring for ways to specify workers") - raise ValueError(msg) + pod_template = pod_template or dask.config.get('kubernetes.worker-template') + name = name or dask.config.get('kubernetes.name') + namespace = namespace or dask.config.get('kubernetes.namespace') + n_workers = n_workers if n_workers is not None else dask.config.get('kubernetes.count.start') + host = host or dask.config.get('kubernetes.host') + port = port if port is not None else dask.config.get('kubernetes.port') + env = env if env is not None else dask.config.get('kubernetes.env') + + if not pod_template and dask.config.get('kubernetes.worker-template', None): + d = dask.config.get('kubernetes.worker-template') + pod_template = make_pod_from_dict(d) + + if not pod_template and dask.config.get('kubernetes.worker-template-path', None): + import yaml + with open(dask.config.get('kubernetes.worker-template-path')) as f: + d = yaml.safe_load(f) + pod_template = make_pod_from_dict(d) + + if not pod_template: + msg = ("Worker pod specification not provided. See KubeCluster " + "docstring for ways to specify workers") + raise ValueError(msg) self.cluster = LocalCluster(ip=host or socket.gethostname(), scheduler_port=port, @@ -158,10 +170,9 @@ def __init__( if namespace is None: namespace = _namespace_default() - if name is None: - worker_name = config.get('kubernetes-worker-name', 'dask-{user}-{uuid}') - name = worker_name.format(user=escape(getpass.getuser()), - uuid=str(uuid.uuid4())[:10], **os.environ) + name = name.format(user=escape(getpass.getuser()), + uuid=str(uuid.uuid4())[:10], + **os.environ) self.pod_template = clean_pod_template(pod_template) # Default labels that can't be overwritten @@ -225,15 +236,15 @@ def from_yaml(cls, yaml_path, **kwargs): kind: Pod metadata: - labels: - foo: bar - baz: quux + labels: + foo: bar + baz: quux spec: - containers: - - image: daskdev/dask:latest - name: dask-worker - args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB] - restartPolicy: Never + containers: + - image: daskdev/dask:latest + name: dask-worker + args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB] + restartPolicy: Never Examples -------- @@ -381,6 +392,11 @@ def scale_up(self, n, pods=None, **kwargs): -------- >>> cluster.scale_up(20) # ask for twenty workers """ + maximum = dask.config.get('kubernetes.count.max') + if maximum is not None and maximum < n: + logger.info("Tried to scale beyond maximum number of workers %d > %d", + n, maximum) + n = maximum pods = pods or self._cleanup_succeeded_pods(self.pods()) to_create = n - len(pods) new_pods = [] diff --git a/dask_kubernetes/kubernetes.yaml b/dask_kubernetes/kubernetes.yaml new file mode 100644 index 000000000..d974a4ce6 --- /dev/null +++ b/dask_kubernetes/kubernetes.yaml @@ -0,0 +1,38 @@ +kubernetes: + name: "dask-{user}-{uuid}" + namespace: null + count: + start: 0 + max: null + host: '0.0.0.0' + port: 0 + env: {} + + worker-template-path: null + + worker-template: {} + # kind: Pod + # metadata: + # labels: + # foo: bar + # baz: quux + # spec: + # restartPolicy: Never + # containers: + # - image: daskdev/dask:latest + # args: + # - dask-worker + # - --nthreads + # - '2' + # - --no-bokeh + # - --memory-limit + # - 6GB + # - --death-timeout + # - '60' + # resources: + # limits: + # cpu: "1.75" + # memory: 6G + # requests: + # cpu: "1.75" + # memory: 6G diff --git a/dask_kubernetes/tests/test_core.py b/dask_kubernetes/tests/test_core.py index 94784d040..6fd2c5ed2 100644 --- a/dask_kubernetes/tests/test_core.py +++ b/dask_kubernetes/tests/test_core.py @@ -4,11 +4,11 @@ import uuid import yaml +import dask import pytest -from distributed.config import set_config from dask_kubernetes import KubeCluster, make_pod_spec from dask.distributed import Client, wait -from distributed.utils_test import loop # noqa: F401 +from distributed.utils_test import loop, captured_logger # noqa: F401 from distributed.utils import tmpfile import kubernetes from random import random @@ -102,14 +102,14 @@ def test_ipython_display(cluster): def test_dask_worker_name_env_variable(pod_spec, loop, ns): - with set_config(**{'kubernetes-worker-name': 'foo-{USER}-{uuid}'}): + with dask.config.set({'kubernetes.name': 'foo-{USER}-{uuid}'}): with KubeCluster(pod_spec, loop=loop, namespace=ns) as cluster: assert 'foo-' + getpass.getuser() in cluster.name def test_diagnostics_link_env_variable(pod_spec, loop, ns): pytest.importorskip('bokeh') - with set_config(**{'diagnostics-link': 'foo-{USER}-{port}'}): + with dask.config.set({'distributed.dashboard.link': 'foo-{USER}-{port}'}): with KubeCluster(pod_spec, loop=loop, namespace=ns) as cluster: port = cluster.scheduler.services['bokeh'].port cluster._ipython_display_() @@ -262,8 +262,8 @@ def test_bad_args(loop): assert 'KubeCluster.from_yaml' in str(info.value) - with pytest.raises(TypeError) as info: - KubeCluster({}) + with pytest.raises((ValueError, TypeError)) as info: + KubeCluster({'kind': 'Pod'}) assert 'KubeCluster.from_dict' in str(info.value) @@ -434,7 +434,7 @@ def test_automatic_startup(image_name, loop, ns): with tmpfile(extension='yaml') as fn: with open(fn, mode='w') as f: yaml.dump(test_yaml, f) - with set_config(**{'kubernetes-worker-template-path': fn}): + with dask.config.set({'kubernetes.worker-template-path': fn}): with KubeCluster(loop=loop, namespace=ns) as cluster: assert cluster.pod_template.metadata.labels['foo'] == 'bar' @@ -452,7 +452,24 @@ def test_escape_username(pod_spec, loop, ns): try: with KubeCluster(pod_spec, loop=loop, namespace=ns) as cluster: - assert 'foo' in cluster.name + assert 'foo' in cluster.name assert '!' not in cluster.name finally: os.environ['LOGNAME'] = old_logname + + +def test_maximum(cluster): + with dask.config.set(**{'kubernetes.count.max': 1}): + with captured_logger('dask_kubernetes') as logger: + cluster.scale(10) + + start = time() + while len(cluster.scheduler.workers) <= 0: + sleep(0.1) + assert time() < start + 60 + + sleep(0.5) + assert len(cluster.scheduler.workers) == 1 + + result = logger.getvalue() + assert "scale beyond maximum number of workers" in result.lower() diff --git a/requirements.txt b/requirements.txt index fd817eaca..ec9c28222 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ dask[distributed] distributed>=1.21.3 -kubernetes==4 +kubernetes>=4 diff --git a/setup.py b/setup.py index 1f7364d4c..d17b07f46 100755 --- a/setup.py +++ b/setup.py @@ -11,6 +11,7 @@ keywords='dask,kubernetes,distributed', license='BSD', packages=find_packages(), + include_package_data=True, long_description=(open('README.rst').read() if exists('README.rst') else ''), zip_safe=False, install_requires=list(open('requirements.txt').read().strip().split('\n')),