Skip to content

Commit

Permalink
Update Configuration (#75)
Browse files Browse the repository at this point in the history
This updates dask-kubernetes to use the new configuration 
system in dask/dask.
  • Loading branch information
mrocklin committed May 19, 2018
1 parent a7fc09b commit c773aad
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Expand Up @@ -70,7 +70,7 @@ jobs:
- "/home/circleci/miniconda"
- run:
command: |
/home/circleci/miniconda/envs/dask-kubernetes-test/bin/py.test dask_kubernetes -s --verbose --worker-image daskdev/dask:0.17.1
/home/circleci/miniconda/envs/dask-kubernetes-test/bin/py.test dask_kubernetes -s --verbose --worker-image daskdev/dask:0.17.4.2018-05-18
- run:
command: |
/home/circleci/miniconda/envs/dask-kubernetes-test/bin/flake8 dask-kubernetes
1 change: 1 addition & 0 deletions MANIFEST.in
@@ -1,4 +1,5 @@
recursive-include dask_kubernetes *.py
recursive-include dask_kubernetes *.yaml

include setup.py
include setup.cfg
Expand Down
8 changes: 5 additions & 3 deletions ci/environment-3.6.yml
Expand Up @@ -4,11 +4,13 @@ channels:
dependencies:
- flake8
- ipywidgets
- dask=0.17.1
- dask=0.17.4
- distributed=1.21.8
- nomkl
- pytest
- python=3.6
- pyyaml
- pip:
- kubernetes==4
- distributed==1.21.3
- kubernetes==6
- git+https://github.com/dask/dask.git@20145c9a5d12d965ab1ffe52e0c37659aec70ac2
- git+https://github.com/dask/distributed.git@ed2adf554fc818ec8d4d5a6eac393dcd033e023c
1 change: 1 addition & 0 deletions dask_kubernetes/__init__.py
@@ -1,3 +1,4 @@
from . import config
from .core import KubeCluster
from .objects import make_pod_spec, make_pod_from_dict

Expand Down
15 changes: 15 additions & 0 deletions dask_kubernetes/config.py
@@ -0,0 +1,15 @@
from __future__ import print_function, division, absolute_import

import os

import dask
import yaml


fn = os.path.join(os.path.dirname(__file__), 'kubernetes.yaml')
dask.config.ensure_file(source=fn)

with open(fn) as f:
defaults = yaml.load(f)

dask.config.update_defaults(defaults)
70 changes: 43 additions & 27 deletions dask_kubernetes/core.py
Expand Up @@ -13,11 +13,11 @@
except ImportError:
yaml = False

from tornado import gen
import dask
from distributed.deploy import LocalCluster, Cluster
from distributed.config import config
from distributed.comm.utils import offload
import kubernetes
from tornado import gen

from .objects import make_pod_from_dict, clean_pod_template

Expand Down Expand Up @@ -128,22 +128,34 @@ def __init__(
pod_template=None,
name=None,
namespace=None,
n_workers=0,
host='0.0.0.0',
port=0,
n_workers=None,
host=None,
port=None,
env=None,
**kwargs
):
if pod_template is None:
if 'kubernetes-worker-template-path' in config:
import yaml
with open(config['kubernetes-worker-template-path']) as f:
d = yaml.safe_load(f)
pod_template = make_pod_from_dict(d)
else:
msg = ("Worker pod specification not provided. See KubeCluster "
"docstring for ways to specify workers")
raise ValueError(msg)
pod_template = pod_template or dask.config.get('kubernetes.worker-template')
name = name or dask.config.get('kubernetes.name')
namespace = namespace or dask.config.get('kubernetes.namespace')
n_workers = n_workers if n_workers is not None else dask.config.get('kubernetes.count.start')
host = host or dask.config.get('kubernetes.host')
port = port if port is not None else dask.config.get('kubernetes.port')
env = env if env is not None else dask.config.get('kubernetes.env')

if not pod_template and dask.config.get('kubernetes.worker-template', None):
d = dask.config.get('kubernetes.worker-template')
pod_template = make_pod_from_dict(d)

if not pod_template and dask.config.get('kubernetes.worker-template-path', None):
import yaml
with open(dask.config.get('kubernetes.worker-template-path')) as f:
d = yaml.safe_load(f)
pod_template = make_pod_from_dict(d)

if not pod_template:
msg = ("Worker pod specification not provided. See KubeCluster "
"docstring for ways to specify workers")
raise ValueError(msg)

self.cluster = LocalCluster(ip=host or socket.gethostname(),
scheduler_port=port,
Expand All @@ -158,10 +170,9 @@ def __init__(
if namespace is None:
namespace = _namespace_default()

if name is None:
worker_name = config.get('kubernetes-worker-name', 'dask-{user}-{uuid}')
name = worker_name.format(user=escape(getpass.getuser()),
uuid=str(uuid.uuid4())[:10], **os.environ)
name = name.format(user=escape(getpass.getuser()),
uuid=str(uuid.uuid4())[:10],
**os.environ)

self.pod_template = clean_pod_template(pod_template)
# Default labels that can't be overwritten
Expand Down Expand Up @@ -225,15 +236,15 @@ def from_yaml(cls, yaml_path, **kwargs):
kind: Pod
metadata:
labels:
foo: bar
baz: quux
labels:
foo: bar
baz: quux
spec:
containers:
- image: daskdev/dask:latest
name: dask-worker
args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB]
restartPolicy: Never
containers:
- image: daskdev/dask:latest
name: dask-worker
args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB]
restartPolicy: Never
Examples
--------
Expand Down Expand Up @@ -381,6 +392,11 @@ def scale_up(self, n, pods=None, **kwargs):
--------
>>> cluster.scale_up(20) # ask for twenty workers
"""
maximum = dask.config.get('kubernetes.count.max')
if maximum is not None and maximum < n:
logger.info("Tried to scale beyond maximum number of workers %d > %d",
n, maximum)
n = maximum
pods = pods or self._cleanup_succeeded_pods(self.pods())
to_create = n - len(pods)
new_pods = []
Expand Down
38 changes: 38 additions & 0 deletions dask_kubernetes/kubernetes.yaml
@@ -0,0 +1,38 @@
kubernetes:
name: "dask-{user}-{uuid}"
namespace: null
count:
start: 0
max: null
host: '0.0.0.0'
port: 0
env: {}

worker-template-path: null

worker-template: {}
# kind: Pod
# metadata:
# labels:
# foo: bar
# baz: quux
# spec:
# restartPolicy: Never
# containers:
# - image: daskdev/dask:latest
# args:
# - dask-worker
# - --nthreads
# - '2'
# - --no-bokeh
# - --memory-limit
# - 6GB
# - --death-timeout
# - '60'
# resources:
# limits:
# cpu: "1.75"
# memory: 6G
# requests:
# cpu: "1.75"
# memory: 6G
33 changes: 25 additions & 8 deletions dask_kubernetes/tests/test_core.py
Expand Up @@ -4,11 +4,11 @@
import uuid
import yaml

import dask
import pytest
from distributed.config import set_config
from dask_kubernetes import KubeCluster, make_pod_spec
from dask.distributed import Client, wait
from distributed.utils_test import loop # noqa: F401
from distributed.utils_test import loop, captured_logger # noqa: F401
from distributed.utils import tmpfile
import kubernetes
from random import random
Expand Down Expand Up @@ -102,14 +102,14 @@ def test_ipython_display(cluster):


def test_dask_worker_name_env_variable(pod_spec, loop, ns):
with set_config(**{'kubernetes-worker-name': 'foo-{USER}-{uuid}'}):
with dask.config.set({'kubernetes.name': 'foo-{USER}-{uuid}'}):
with KubeCluster(pod_spec, loop=loop, namespace=ns) as cluster:
assert 'foo-' + getpass.getuser() in cluster.name


def test_diagnostics_link_env_variable(pod_spec, loop, ns):
pytest.importorskip('bokeh')
with set_config(**{'diagnostics-link': 'foo-{USER}-{port}'}):
with dask.config.set({'distributed.dashboard.link': 'foo-{USER}-{port}'}):
with KubeCluster(pod_spec, loop=loop, namespace=ns) as cluster:
port = cluster.scheduler.services['bokeh'].port
cluster._ipython_display_()
Expand Down Expand Up @@ -262,8 +262,8 @@ def test_bad_args(loop):

assert 'KubeCluster.from_yaml' in str(info.value)

with pytest.raises(TypeError) as info:
KubeCluster({})
with pytest.raises((ValueError, TypeError)) as info:
KubeCluster({'kind': 'Pod'})

assert 'KubeCluster.from_dict' in str(info.value)

Expand Down Expand Up @@ -434,7 +434,7 @@ def test_automatic_startup(image_name, loop, ns):
with tmpfile(extension='yaml') as fn:
with open(fn, mode='w') as f:
yaml.dump(test_yaml, f)
with set_config(**{'kubernetes-worker-template-path': fn}):
with dask.config.set({'kubernetes.worker-template-path': fn}):
with KubeCluster(loop=loop, namespace=ns) as cluster:
assert cluster.pod_template.metadata.labels['foo'] == 'bar'

Expand All @@ -452,7 +452,24 @@ def test_escape_username(pod_spec, loop, ns):

try:
with KubeCluster(pod_spec, loop=loop, namespace=ns) as cluster:
assert 'foo' in cluster.name
assert 'foo' in cluster.name
assert '!' not in cluster.name
finally:
os.environ['LOGNAME'] = old_logname


def test_maximum(cluster):
with dask.config.set(**{'kubernetes.count.max': 1}):
with captured_logger('dask_kubernetes') as logger:
cluster.scale(10)

start = time()
while len(cluster.scheduler.workers) <= 0:
sleep(0.1)
assert time() < start + 60

sleep(0.5)
assert len(cluster.scheduler.workers) == 1

result = logger.getvalue()
assert "scale beyond maximum number of workers" in result.lower()
2 changes: 1 addition & 1 deletion requirements.txt
@@ -1,3 +1,3 @@
dask[distributed]
distributed>=1.21.3
kubernetes==4
kubernetes>=4
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -11,6 +11,7 @@
keywords='dask,kubernetes,distributed',
license='BSD',
packages=find_packages(),
include_package_data=True,
long_description=(open('README.rst').read() if exists('README.rst') else ''),
zip_safe=False,
install_requires=list(open('requirements.txt').read().strip().split('\n')),
Expand Down

0 comments on commit c773aad

Please sign in to comment.