Skip to content

Commit

Permalink
Allow using Python local to every node (#44)
Browse files Browse the repository at this point in the history
Adds ability to specifiy Python environments local to each node,
avoiding distributing an archive with every application. Available
environment specifications include:

- The path to an archived environment
- The path to a conda environment (as ``conda:///...``)
- The path to a virtual environment (as ``venv:///...``)
- The path to a python executable (as ``python:///...``)
  • Loading branch information
jcrist committed Dec 13, 2018
1 parent f663af6 commit f5a50e4
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 34 deletions.
4 changes: 2 additions & 2 deletions dask_yarn/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ def _parse_submit_kwargs(**kwargs):
help=("A comma-separated list of strings to use as "
"tags for this application.")),
arg("--environment",
help=("Path to an archived Python environment (either "
"``tar.gz`` or ``zip``).")),
help=("Path to the Python environment to use. See the docs "
"for more information")),
arg("--deploy-mode",
help=("Either 'remote' (default) or 'local'. If 'remote', the "
"scheduler and client will be deployed in a YARN "
Expand Down
76 changes: 59 additions & 17 deletions dask_yarn/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,36 @@ def _get_skein_client(skein_client=None):

# exposed for testing only

def _files_and_build_cmds(environment):
parsed = urlparse(environment)
scheme = parsed.scheme

if scheme in {'conda', 'venv', 'python'}:
path = environment[len(scheme) + 3:]
files = {}
else:
# Treat archived environments the same as venvs
scheme = 'venv'
path = 'environment'
files = {'environment': environment}

if scheme == 'conda':
setup = 'conda activate %s' % path
cli = 'dask-yarn'
elif scheme == 'venv':
setup = 'source %s/bin/activate' % path
cli = 'dask-yarn'
else:
setup = ''
cli = '%s -m dask_yarn.cli' % path

def make_commands(cmd):
command = '%s %s' % (cli, cmd)
return [setup, command] if setup else [command]

return files, make_commands


def _make_specification(**kwargs):
""" Create specification to run Dask Cluster
Expand Down Expand Up @@ -74,9 +104,14 @@ def _make_specification(**kwargs):

environment = lookup(kwargs, 'environment', 'yarn.environment')
if environment is None:
msg = ("You must provide a path to an archived Python environment for "
"the workers.\n"
"This is commonly achieved through conda-pack or venv-pack.\n\n"
msg = ("You must provide a path to a Python environment for the workers.\n"
"This may be one of the following:\n"
"- A conda environment archived with conda-pack\n"
"- A virtual environment archived with venv-pack\n"
"- A path to a conda environment, specified as conda://...\n"
"- A path to a virtual environment, specified as venv://...\n"
"- A path to a python binary to use, specified as python://...\n"
"\n"
"See http://yarn.dask.org/environments.html for more information.")
raise ValueError(msg)

Expand All @@ -89,6 +124,8 @@ def _make_specification(**kwargs):

services = {}

files, build_cmds = _files_and_build_cmds(environment)

if deploy_mode == 'remote':
scheduler_vcores = lookup(kwargs, 'scheduler_vcores',
'yarn.scheduler.vcores')
Expand All @@ -103,9 +140,8 @@ def _make_specification(**kwargs):
memory=scheduler_memory
),
max_restarts=0,
files={'environment': environment},
commands=['source environment/bin/activate',
'dask-yarn services scheduler']
files=files,
commands=build_cmds('services scheduler')
)
worker_depends = ['dask.scheduler']
else:
Expand All @@ -119,10 +155,9 @@ def _make_specification(**kwargs):
),
max_restarts=worker_restarts,
depends=worker_depends,
files={'environment': environment},
files=files,
env=worker_env,
commands=['source environment/bin/activate',
'dask-yarn services worker']
commands=build_cmds('services worker')
)

spec = skein.ApplicationSpec(name=name,
Expand All @@ -136,14 +171,18 @@ def _make_specification(**kwargs):
def _make_submit_specification(script, args=(), **kwargs):
spec = _make_specification(**kwargs)

environment = lookup(kwargs, 'environment', 'yarn.environment')
files, build_cmds = _files_and_build_cmds(environment)

if 'dask.scheduler' in spec.services:
# deploy_mode == 'remote'
client_vcores = lookup(kwargs, 'client_vcores', 'yarn.client.vcores')
client_memory = lookup(kwargs, 'client_memory', 'yarn.client.memory')
client_env = lookup(kwargs, 'client_env', 'yarn.client.env')
client_memory = parse_memory(client_memory, 'client')
environment = spec.services['dask.worker'].files['environment']

script_name = os.path.basename(script)
files[script_name] = script

spec.services['dask.client'] = skein.Service(
instances=1,
Expand All @@ -153,13 +192,10 @@ def _make_submit_specification(script, args=(), **kwargs):
),
max_restarts=0,
depends=['dask.scheduler'],
files={'environment': environment,
script_name: script},
files=files,
env=client_env,
commands=[
'source environment/bin/activate',
'dask-yarn services client %s %s' % (script_name, ' '.join(args))
]
commands=build_cmds('services client %s %s'
% (script_name, ' '.join(args)))
)
return spec

Expand All @@ -174,7 +210,13 @@ class YarnCluster(object):
Parameters
----------
environment : str, optional
Path to an archived Python environment (either ``tar.gz`` or ``zip``).
The Python environment to use. Can be one of the following:
- A path to an archived Python environment
- A path to a conda environment, specified as `conda:///...`
- A path to a virtual environment, specified as `venv:///...`
- A path to a python executable, specifed as `python:///...`
Note that if not an archive, the paths specified must be valid on all
nodes in the cluster.
n_workers : int, optional
The number of workers to initially start.
worker_vcores : int, optional
Expand Down
90 changes: 87 additions & 3 deletions dask_yarn/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,88 @@ def test_make_specification_errors():
assert '1234 MiB' in str(info.value)


@pytest.mark.parametrize('env,path', [
('conda://env-name', 'env-name'),
('conda:///env/path', '/env/path')
])
def test_environment_conda(env, path):
spec = _make_submit_specification('script.py',
args=('foo', 'bar'),
deploy_mode='remote',
environment=env)
scheduler = spec.services['dask.scheduler']
assert not scheduler.files
assert scheduler.commands == ['conda activate %s' % path,
'dask-yarn services scheduler']
worker = spec.services['dask.worker']
assert not worker.files
assert worker.commands == ['conda activate %s' % path,
'dask-yarn services worker']
client = spec.services['dask.client']
assert set(client.files) == {'script.py'}
assert client.commands == ['conda activate %s' % path,
'dask-yarn services client script.py foo bar']


def test_environment_venv():
env = 'venv:///path/to/environment'
path = '/path/to/environment'
spec = _make_submit_specification('script.py',
args=('foo', 'bar'),
deploy_mode='remote',
environment=env)
scheduler = spec.services['dask.scheduler']
assert not scheduler.files
assert scheduler.commands == ['source %s/bin/activate' % path,
'dask-yarn services scheduler']
worker = spec.services['dask.worker']
assert not worker.files
assert worker.commands == ['source %s/bin/activate' % path,
'dask-yarn services worker']
client = spec.services['dask.client']
assert set(client.files) == {'script.py'}
assert client.commands == ['source %s/bin/activate' % path,
'dask-yarn services client script.py foo bar']


def test_environment_python_path():
env = "python:///path/to/python"
cmd = "/path/to/python -m dask_yarn.cli"
spec = _make_submit_specification('script.py',
args=('foo', 'bar'),
deploy_mode='remote',
environment=env)
scheduler = spec.services['dask.scheduler']
assert not scheduler.files
assert scheduler.commands == ['%s services scheduler' % cmd]
worker = spec.services['dask.worker']
assert not worker.files
assert worker.commands == ['%s services worker' % cmd]
client = spec.services['dask.client']
assert set(client.files) == {'script.py'}
assert client.commands == ['%s services client script.py foo bar' % cmd]


def test_environment_archive():
env = 'env.tar.gz'
spec = _make_submit_specification('script.py',
args=('foo', 'bar'),
deploy_mode='remote',
environment=env)
scheduler = spec.services['dask.scheduler']
assert set(scheduler.files) == {'environment'}
assert scheduler.commands == ['source environment/bin/activate',
'dask-yarn services scheduler']
worker = spec.services['dask.worker']
assert set(worker.files) == {'environment'}
assert worker.commands == ['source environment/bin/activate',
'dask-yarn services worker']
client = spec.services['dask.client']
assert set(client.files) == {'environment', 'script.py'}
assert client.commands == ['source environment/bin/activate',
'dask-yarn services client script.py foo bar']


@pytest.mark.parametrize('deploy_mode', ['local', 'remote'])
def test_make_submit_specification(deploy_mode):
spec = _make_submit_specification('../script.py',
Expand Down Expand Up @@ -283,7 +365,9 @@ def test_make_submit_specification(deploy_mode):
assert spec.services['dask.client'].env == {'foo': 'bar'}


def test_environment_relative_paths(conda_env):
a = _make_specification(environment=conda_env).to_dict()
b = _make_specification(environment=os.path.relpath(conda_env)).to_dict()
def test_environment_relative_paths():
relpath = 'path/to/foo.tar.gz'
abspath = os.path.abspath(relpath)
a = _make_specification(environment=relpath).to_dict()
b = _make_specification(environment=abspath).to_dict()
assert a == b
83 changes: 71 additions & 12 deletions docs/source/environments.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
Distributing Python Environments
================================
Managing Python Environments
============================

We need to ensure that the libraries used on the Yarn cluster are the same as
what you are using locally. By default, ``dask-yarn`` handles this by
distributing a packaged python environment to the Yarn cluster as part of the
applications. This is typically handled using
what you are using locally. There are a few ways to specify this:

- The path to an archived environment (either conda_ or virtual_ environments)
- The path to a Conda_ environment (as ``conda:///...``)
- The path to a `virtual environment`_ (as ``venv:///...``)
- The path to a python executable (as ``python:///...``)

Note that when not using an archive, the provided path must be valid on all
nodes in the cluster.

Using Archived Python Environments
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The most common way to use ``dask-yarn`` is to distribute an archived Python
environment throughout the YARN cluster as part of the application. Packaging
the environment for distribution is typically handled using

- conda-pack_ for Conda_ environments
- venv-pack_ for `virtual environments`_
Expand All @@ -13,8 +26,8 @@ These environments can contain any Python packages you might need, but require
``dask-yarn`` (and its dependencies) at a minimum.


Packing Conda Environments using Conda-Pack
-------------------------------------------
Archiving Conda Environments Using Conda-Pack
---------------------------------------------

You can package a conda environment using conda-pack_.

Expand All @@ -30,8 +43,8 @@ You can package a conda environment using conda-pack_.
[########################################] | 100% Completed | 12.2s
Packing Virtual Environments using Venv-Pack
--------------------------------------------
Archiving Virtual Environments Using Venv-Pack
----------------------------------------------

You can package a virtual environment using venv-pack_. The virtual environment
can be created using either venv_ or virtualenv_. Note that the python linked
Expand All @@ -55,12 +68,21 @@ the `venv-pack documentation`_.
[########################################] | 100% Completed | 8.3s
Specifying the Environment for the Cluster
------------------------------------------
Specifying the Archived Environment
-----------------------------------

You can now start a cluster with the packaged environment by passing the
path to the constructor, e.g. ``YarnCluster(environment='my-env.tar.gz',
...)``. After startup you may want to verify that your versions match with the
...)``.

Note that if the environment is a local file, the archive will be automatically
uploaded to a temporary directory on HDFS before starting the application. If
you find yourself reusing the same environment multiple times, you may want to
upload the environment to HDFS once beforehand to avoid repeating this process
for each cluster (the environment is then specified as
``hdfs:///path/to/my-env.tar.gz``).

After startup you may want to verify that your versions match with the
following:

.. code-block:: python
Expand All @@ -73,9 +95,46 @@ following:
client.get_versions(check=True) # check that versions match between all nodes
Using Python Environments Local to Each Node
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Alternatively, you can specify the path to a `conda environment`_, `virtual
environment`_, or Python executable that is already found on each node:

.. code-block:: python
from dask_yarn import YarnCluster
# Use a conda environment at /path/to/my/conda/env
cluster = YarnCluster(environment='conda:///path/to/my/conda/env')
# Use a virtual environment at /path/to/my/virtual/env
cluster = YarnCluster(environment='venv:///path/to/my/virtual/env')
# Use a Python executable at /path/to/my/python
cluster = YarnCluster(environment='python:///path/to/my/python')
As before, these environments can have any Python packages, but must include
``dask-yarn`` (and its dependencies) at a minimum. It's also *very important*
that these environments are uniform across all nodes; mismatched environments
can lead to hard to diagnose issues. To check this, you can use the
``Client.get_versions`` method:

.. code-block:: python
from dask.distributed import Client
client = Client(cluster)
client.get_versions(check=True) # check that versions match between all nodes
.. _conda-pack: https://conda.github.io/conda-pack/
.. _conda environment: http://conda.io/
.. _conda: http://conda.io/
.. _venv:
.. _virtual:
.. _virtual environment:
.. _virtual environments: https://docs.python.org/3/library/venv.html
.. _virtualenv: https://virtualenv.pypa.io/en/stable/
.. _venv-pack documentation:
Expand Down

0 comments on commit f5a50e4

Please sign in to comment.