From f68eaf48c9a465a97163f8bfb8dea09524369e00 Mon Sep 17 00:00:00 2001 From: Jim Crist Date: Tue, 30 Oct 2018 04:31:49 -0500 Subject: [PATCH] Update memory parsing/erroring - All examples should be in even units of MiB to match with YARN behavior (otherwise requests will be rounded up to the nearest MiB). - Tweak memory usage error message. - Use MiB everywhere instead of MB --- dask_yarn/cli/dask_yarn_worker.py | 2 +- dask_yarn/core.py | 31 ++++++++++++++++--------------- dask_yarn/tests/test_core.py | 20 ++++++++++---------- docs/source/index.rst | 4 ++-- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/dask_yarn/cli/dask_yarn_worker.py b/dask_yarn/cli/dask_yarn_worker.py index 3fab1f9..c7b70a2 100644 --- a/dask_yarn/cli/dask_yarn_worker.py +++ b/dask_yarn/cli/dask_yarn_worker.py @@ -25,7 +25,7 @@ def start_worker(nthreads=None, memory_limit=None): enable_proctitle_on_children() if memory_limit is None: - memory_limit = int(skein.properties.container_resources.memory * 1e6) + memory_limit = int(skein.properties.container_resources.memory * 2**20) if nthreads is None: nthreads = skein.properties.container_resources.vcores diff --git a/dask_yarn/core.py b/dask_yarn/core.py index 0ed27c3..b101e3b 100644 --- a/dask_yarn/core.py +++ b/dask_yarn/core.py @@ -15,15 +15,15 @@ import weakref -memory_warning = """ +_memory_error = """The `{0}_memory` keyword takes string parameters +that include units like "4 GiB" or "2048 MiB" -The memory keywords takes string parameters -that include units like "4 GiB" or "2048 MB" - -You provided: %d -Perhaps you meant: "%d MB" +You provided: {1} +Perhaps you meant: "{1} MiB" """ +_one_MiB = 2**20 + # exposed for testing only @@ -69,18 +69,18 @@ def either(a, b): if isinstance(scheduler_memory, str): scheduler_memory = dask.utils.parse_bytes(scheduler_memory) + elif scheduler_memory < _one_MiB: + raise ValueError(_memory_error.format('scheduler', scheduler_memory)) + if isinstance(worker_memory, str): worker_memory = dask.utils.parse_bytes(worker_memory) - - if scheduler_memory < 2**20: - raise ValueError(memory_warning % (scheduler_memory, scheduler_memory)) - if worker_memory < 2**20: - raise ValueError(memory_warning % (worker_memory, worker_memory)) + elif worker_memory < _one_MiB: + raise ValueError(_memory_error.format('worker', worker_memory)) scheduler = skein.Service(instances=1, resources=skein.Resources( vcores=scheduler_vcores, - memory=int(scheduler_memory / 1e6) + memory=int(scheduler_memory / _one_MiB) ), max_restarts=0, files={'environment': environment}, @@ -90,7 +90,7 @@ def either(a, b): worker = skein.Service(instances=n_workers, resources=skein.Resources( vcores=worker_vcores, - memory=int(worker_memory / 1e6) + memory=int(worker_memory / _one_MiB) ), max_restarts=worker_restarts, depends=['dask.scheduler'], @@ -124,7 +124,7 @@ class YarnCluster(object): The number of virtual cores to allocate per worker. worker_memory : str, optional The amount of memory to allocate per worker. Accepts a unit suffix - (e.g. '2 GiB' or '4096 MB'). + (e.g. '2 GiB' or '4096 MiB'). Will be rounded up to the nearest MiB. worker_restarts : int, optional The maximum number of worker restarts to allow before failing the application. Default is unlimited. @@ -135,7 +135,8 @@ class YarnCluster(object): The number of virtual cores to allocate per scheduler. scheduler_memory : str, optional The amount of memory to allocate to the scheduler. Accepts a unit - suffix (e.g. '2 GiB' or '4096 MB') + suffix (e.g. '2 GiB' or '4096 MiB'). Will be rounded up to the nearest + MiB. name : str, optional The application name. queue : str, optional diff --git a/dask_yarn/tests/test_core.py b/dask_yarn/tests/test_core.py index 7a78efd..2106eca 100644 --- a/dask_yarn/tests/test_core.py +++ b/dask_yarn/tests/test_core.py @@ -46,8 +46,8 @@ def check_is_shutdown(client, app_id, status='SUCCEEDED'): def test_basic(skein_client, conda_env, loop): with YarnCluster(environment=conda_env, - worker_memory='512 MB', - scheduler_memory='512 MB', + worker_memory='512 MiB', + scheduler_memory='512 MiB', name=APPNAME, skein_client=skein_client) as cluster: cluster.scale(2) @@ -67,8 +67,8 @@ def test_basic(skein_client, conda_env, loop): def test_from_specification(skein_client, conda_env, tmpdir, loop): spec = _make_specification(environment=conda_env, - worker_memory='512 MB', - scheduler_memory='512 MB', + worker_memory='512 MiB', + scheduler_memory='512 MiB', name=APPNAME) fn = os.path.join(str(tmpdir), 'spec.yaml') with open(fn, 'w') as f: @@ -88,9 +88,9 @@ def test_configuration(): 'name': 'dask-yarn-tests', 'tags': ['a', 'b', 'c'], 'specification': None, - 'worker': {'memory': '1234MB', 'count': 1, 'vcores': 1, 'restarts': -1, + 'worker': {'memory': '1234 MiB', 'count': 1, 'vcores': 1, 'restarts': -1, 'env': {'foo': 'bar'}}, - 'scheduler': {'memory': '1234MB', 'vcores': 1}} + 'scheduler': {'memory': '1234 MiB', 'vcores': 1}} } with dask.config.set(config): @@ -105,8 +105,8 @@ def test_configuration(): def test_configuration_full_specification(conda_env, tmpdir): spec = _make_specification(environment=conda_env, - worker_memory='512 MB', - scheduler_memory='512 MB', + worker_memory='512 MiB', + scheduler_memory='512 MiB', name=APPNAME) fn = os.path.join(str(tmpdir), 'spec.yaml') with open(fn, 'w') as f: @@ -144,12 +144,12 @@ def test_make_specification_errors(): with pytest.raises(ValueError) as info: _make_specification(environment='foo.tar.gz', worker_memory=1234) - assert '1234 MB' in str(info.value) + assert '1234 MiB' in str(info.value) with pytest.raises(ValueError) as info: _make_specification(environment='foo.tar.gz', scheduler_memory=1234) - assert '1234 MB' in str(info.value) + assert '1234 MiB' in str(info.value) def test_environment_relative_paths(conda_env): diff --git a/docs/source/index.rst b/docs/source/index.rst index 1c015ac..142a122 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -12,10 +12,10 @@ natively from Python. from dask_yarn import YarnCluster from dask.distributed import Client - # Create a cluster where each worker has two cores and eight GB of memory + # Create a cluster where each worker has two cores and eight GiB of memory cluster = YarnCluster(environment='environment.tar.gz', worker_vcores=2, - worker_memory="8GB") + worker_memory="8GiB") # Scale out to ten such workers cluster.scale(10)