Skip to content

Commit

Permalink
Update memory parsing/erroring
Browse files Browse the repository at this point in the history
- All examples should be in even units of MiB to match with YARN
behavior (otherwise requests will be rounded up to the nearest MiB).
- Tweak memory usage error message.
- Use MiB everywhere instead of MB
  • Loading branch information
jcrist committed Oct 30, 2018
1 parent ebf610f commit f68eaf4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 28 deletions.
2 changes: 1 addition & 1 deletion dask_yarn/cli/dask_yarn_worker.py
Expand Up @@ -25,7 +25,7 @@ def start_worker(nthreads=None, memory_limit=None):
enable_proctitle_on_children()

if memory_limit is None:
memory_limit = int(skein.properties.container_resources.memory * 1e6)
memory_limit = int(skein.properties.container_resources.memory * 2**20)
if nthreads is None:
nthreads = skein.properties.container_resources.vcores

Expand Down
31 changes: 16 additions & 15 deletions dask_yarn/core.py
Expand Up @@ -15,15 +15,15 @@
import weakref


memory_warning = """
_memory_error = """The `{0}_memory` keyword takes string parameters
that include units like "4 GiB" or "2048 MiB"
The memory keywords takes string parameters
that include units like "4 GiB" or "2048 MB"
You provided: %d
Perhaps you meant: "%d MB"
You provided: {1}
Perhaps you meant: "{1} MiB"
"""

_one_MiB = 2**20


# exposed for testing only

Expand Down Expand Up @@ -69,18 +69,18 @@ def either(a, b):

if isinstance(scheduler_memory, str):
scheduler_memory = dask.utils.parse_bytes(scheduler_memory)
elif scheduler_memory < _one_MiB:
raise ValueError(_memory_error.format('scheduler', scheduler_memory))

if isinstance(worker_memory, str):
worker_memory = dask.utils.parse_bytes(worker_memory)

if scheduler_memory < 2**20:
raise ValueError(memory_warning % (scheduler_memory, scheduler_memory))
if worker_memory < 2**20:
raise ValueError(memory_warning % (worker_memory, worker_memory))
elif worker_memory < _one_MiB:
raise ValueError(_memory_error.format('worker', worker_memory))

scheduler = skein.Service(instances=1,
resources=skein.Resources(
vcores=scheduler_vcores,
memory=int(scheduler_memory / 1e6)
memory=int(scheduler_memory / _one_MiB)
),
max_restarts=0,
files={'environment': environment},
Expand All @@ -90,7 +90,7 @@ def either(a, b):
worker = skein.Service(instances=n_workers,
resources=skein.Resources(
vcores=worker_vcores,
memory=int(worker_memory / 1e6)
memory=int(worker_memory / _one_MiB)
),
max_restarts=worker_restarts,
depends=['dask.scheduler'],
Expand Down Expand Up @@ -124,7 +124,7 @@ class YarnCluster(object):
The number of virtual cores to allocate per worker.
worker_memory : str, optional
The amount of memory to allocate per worker. Accepts a unit suffix
(e.g. '2 GiB' or '4096 MB').
(e.g. '2 GiB' or '4096 MiB'). Will be rounded up to the nearest MiB.
worker_restarts : int, optional
The maximum number of worker restarts to allow before failing the
application. Default is unlimited.
Expand All @@ -135,7 +135,8 @@ class YarnCluster(object):
The number of virtual cores to allocate per scheduler.
scheduler_memory : str, optional
The amount of memory to allocate to the scheduler. Accepts a unit
suffix (e.g. '2 GiB' or '4096 MB')
suffix (e.g. '2 GiB' or '4096 MiB'). Will be rounded up to the nearest
MiB.
name : str, optional
The application name.
queue : str, optional
Expand Down
20 changes: 10 additions & 10 deletions dask_yarn/tests/test_core.py
Expand Up @@ -46,8 +46,8 @@ def check_is_shutdown(client, app_id, status='SUCCEEDED'):

def test_basic(skein_client, conda_env, loop):
with YarnCluster(environment=conda_env,
worker_memory='512 MB',
scheduler_memory='512 MB',
worker_memory='512 MiB',
scheduler_memory='512 MiB',
name=APPNAME,
skein_client=skein_client) as cluster:
cluster.scale(2)
Expand All @@ -67,8 +67,8 @@ def test_basic(skein_client, conda_env, loop):

def test_from_specification(skein_client, conda_env, tmpdir, loop):
spec = _make_specification(environment=conda_env,
worker_memory='512 MB',
scheduler_memory='512 MB',
worker_memory='512 MiB',
scheduler_memory='512 MiB',
name=APPNAME)
fn = os.path.join(str(tmpdir), 'spec.yaml')
with open(fn, 'w') as f:
Expand All @@ -88,9 +88,9 @@ def test_configuration():
'name': 'dask-yarn-tests',
'tags': ['a', 'b', 'c'],
'specification': None,
'worker': {'memory': '1234MB', 'count': 1, 'vcores': 1, 'restarts': -1,
'worker': {'memory': '1234 MiB', 'count': 1, 'vcores': 1, 'restarts': -1,
'env': {'foo': 'bar'}},
'scheduler': {'memory': '1234MB', 'vcores': 1}}
'scheduler': {'memory': '1234 MiB', 'vcores': 1}}
}

with dask.config.set(config):
Expand All @@ -105,8 +105,8 @@ def test_configuration():

def test_configuration_full_specification(conda_env, tmpdir):
spec = _make_specification(environment=conda_env,
worker_memory='512 MB',
scheduler_memory='512 MB',
worker_memory='512 MiB',
scheduler_memory='512 MiB',
name=APPNAME)
fn = os.path.join(str(tmpdir), 'spec.yaml')
with open(fn, 'w') as f:
Expand Down Expand Up @@ -144,12 +144,12 @@ def test_make_specification_errors():
with pytest.raises(ValueError) as info:
_make_specification(environment='foo.tar.gz', worker_memory=1234)

assert '1234 MB' in str(info.value)
assert '1234 MiB' in str(info.value)

with pytest.raises(ValueError) as info:
_make_specification(environment='foo.tar.gz', scheduler_memory=1234)

assert '1234 MB' in str(info.value)
assert '1234 MiB' in str(info.value)


def test_environment_relative_paths(conda_env):
Expand Down
4 changes: 2 additions & 2 deletions docs/source/index.rst
Expand Up @@ -12,10 +12,10 @@ natively from Python.
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster where each worker has two cores and eight GB of memory
# Create a cluster where each worker has two cores and eight GiB of memory
cluster = YarnCluster(environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GB")
worker_memory="8GiB")
# Scale out to ten such workers
cluster.scale(10)
Expand Down

0 comments on commit f68eaf4

Please sign in to comment.