Skip to content

Commit

Permalink
Add proxy-user support (#43)
Browse files Browse the repository at this point in the history
* Add proxy-user support

Add option to specify user to proxy as when creating a cluster.

* Fixup
  • Loading branch information
jcrist committed Dec 13, 2018
1 parent 764901b commit f663af6
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
4 changes: 4 additions & 0 deletions dask_yarn/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def _parse_submit_kwargs(**kwargs):
help="Any additional arguments to forward to `script`"),
arg("--name", help="The application name"),
arg("--queue", help="The queue to deploy to"),
arg("--user",
help=("The user to submit the application on behalf of. Default "
"is the current user - submitting as a different user "
"requires proxy-user permissions.")),
arg("--tags",
help=("A comma-separated list of strings to use as "
"tags for this application.")),
Expand Down
10 changes: 9 additions & 1 deletion dask_yarn/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def _make_specification(**kwargs):
name = lookup(kwargs, 'name', 'yarn.name')
queue = lookup(kwargs, 'queue', 'yarn.queue')
tags = lookup(kwargs, 'tags', 'yarn.tags')
user = lookup(kwargs, 'user', 'yarn.user')

environment = lookup(kwargs, 'environment', 'yarn.environment')
if environment is None:
Expand Down Expand Up @@ -127,6 +128,7 @@ def _make_specification(**kwargs):
spec = skein.ApplicationSpec(name=name,
queue=queue,
tags=tags,
user=user,
services=services)
return spec

Expand Down Expand Up @@ -202,6 +204,10 @@ class YarnCluster(object):
The queue to deploy to.
tags : sequence, optional
A set of strings to use as tags for this application.
user : str, optional
The user to submit the application on behalf of. Default is the current
user - submitting as a different user requires user permissions, see
the YARN documentation for more information.
skein_client : skein.Client, optional
The ``skein.Client`` to use. If not provided, one will be started.
Expand All @@ -223,6 +229,7 @@ def __init__(self,
name=None,
queue=None,
tags=None,
user=None,
skein_client=None):

spec = _make_specification(environment=environment,
Expand All @@ -236,7 +243,8 @@ def __init__(self,
deploy_mode=deploy_mode,
name=name,
queue=queue,
tags=tags)
tags=tags,
user=user)

self._start_cluster(spec, skein_client)

Expand Down
13 changes: 8 additions & 5 deletions dask_yarn/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import division, print_function, absolute_import

import os
import shutil
import time

import dask
Expand Down Expand Up @@ -134,10 +133,10 @@ def test_from_current(skein_client, conda_env, monkeypatch, tmpdir):
# Patch environment so it looks like a container
container_id = 'container_1526134340424_0012_01_000005'
cont_dir = tmpdir.mkdir(container_id)
shutil.copyfile(skein_client.security.cert_path,
str(cont_dir.join(".skein.crt")))
shutil.copyfile(skein_client.security.key_path,
str(cont_dir.join(".skein.pem")))
with open(str(cont_dir.join(".skein.crt")), 'wb') as fil:
fil.write(skein_client.security._get_bytes('cert'))
with open(str(cont_dir.join(".skein.pem")), 'wb') as fil:
fil.write(skein_client.security._get_bytes('key'))

for key, val in [('SKEIN_APPLICATION_ID', cluster.app_id),
('CONTAINER_ID', container_id),
Expand Down Expand Up @@ -173,6 +172,7 @@ def test_configuration():
'environment': 'myenv.tar.gz',
'queue': 'myqueue',
'name': 'dask-yarn-tests',
'user': 'alice',
'tags': ['a', 'b', 'c'],
'specification': None,
'worker': {'memory': '1234 MiB', 'count': 1, 'vcores': 1, 'restarts': -1,
Expand All @@ -183,6 +183,7 @@ def test_configuration():
with dask.config.set(config):
spec = _make_specification()
assert spec.name == 'dask-yarn-tests'
assert spec.user == 'alice'
assert spec.queue == 'myqueue'
assert spec.tags == {'a', 'b', 'c'}
assert spec.services['dask.worker'].resources.memory == 1234
Expand Down Expand Up @@ -246,10 +247,12 @@ def test_make_submit_specification(deploy_mode):
deploy_mode=deploy_mode,
environment='myenv.tar.gz',
name='test-name',
user='alice',
client_vcores=2,
client_memory='2 GiB')

assert spec.name == 'test-name'
assert spec.user == 'alice'
if deploy_mode == 'local':
assert set(spec.services) == {'dask.worker'}
assert 'environment' in spec.services['dask.worker'].files
Expand Down
2 changes: 2 additions & 0 deletions dask_yarn/yarn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ yarn:
queue: default # Yarn queue to deploy to
environment: null # Path to conda packed environment
tags: [] # List of strings to tag applications
user: '' # The user to submit the application on behalf of,
# leave as empty string for current user.

scheduler: # Specifications of scheduler container
vcores: 1
Expand Down

0 comments on commit f663af6

Please sign in to comment.