Skip to content

Commit

Permalink
Fix name parameters being ignored in Cluster class (#398)
Browse files Browse the repository at this point in the history
and protocol + security not being passed through
  • Loading branch information
lesteve committed May 29, 2020
1 parent b2a5ac8 commit f199cf9
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 30 deletions.
62 changes: 47 additions & 15 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ def __init__(
memory=None,
processes=None,
nanny=True,
protocol=None,
security=None,
interface=None,
death_timeout=None,
local_directory=None,
Expand All @@ -147,7 +149,6 @@ def __init__(
python=sys.executable,
job_name=None,
config_name=None,
**kwargs
):
self.scheduler = scheduler
self.job_id = None
Expand Down Expand Up @@ -210,6 +211,18 @@ def __init__(

if interface:
extra = extra + ["--interface", interface]
if protocol:
extra = extra + ["--protocol", protocol]
if security:
worker_security_dict = security.get_tls_config_for_role("worker")
security_command_line_list = [
["--tls-" + key.replace("_", "-"), value]
for key, value in worker_security_dict.items()
# 'ciphers' parameter does not have a command-line equivalent
if key != "ciphers"
]
security_command_line = sum(security_command_line_list, [])
extra = extra + security_command_line

# Keep information on process, cores, and memory, for use in subclasses
self.worker_memory = parse_bytes(memory) if memory is not None else None
Expand Down Expand Up @@ -434,7 +447,7 @@ def __init__(
protocol="tcp://",
# Job keywords
config_name=None,
**kwargs
**job_kwargs
):
self.status = "created"

Expand Down Expand Up @@ -499,21 +512,25 @@ def __init__(
"options": scheduler_options,
}

kwargs["config_name"] = config_name
kwargs["interface"] = interface
kwargs["protocol"] = protocol
kwargs["security"] = security
self._kwargs = kwargs
worker = {"cls": self.job_cls, "options": kwargs}
if "processes" in kwargs and kwargs["processes"] > 1:
worker["group"] = ["-" + str(i) for i in range(kwargs["processes"])]
job_kwargs["config_name"] = config_name
job_kwargs["interface"] = interface
job_kwargs["protocol"] = protocol
job_kwargs["security"] = security
self._job_kwargs = job_kwargs

worker = {"cls": self.job_cls, "options": self._job_kwargs}
if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1:
worker["group"] = [
"-" + str(i) for i in range(self._job_kwargs["processes"])
]

self._dummy_job # trigger property to ensure that the job is valid

super().__init__(
scheduler=scheduler,
worker=worker,
loop=loop,
security=security,
silence_logs=silence_logs,
asynchronous=asynchronous,
name=name,
Expand All @@ -535,11 +552,26 @@ def _dummy_job(self):
address = self.scheduler.address # Have we already connected?
except AttributeError:
address = "tcp://<insert-scheduler-address-here>:8786"
return self.job_cls(
scheduler=address or "tcp://<insert-scheduler-address-here>:8786",
name="name",
**self._kwargs
)
try:
return self.job_cls(
address or "tcp://<insert-scheduler-address-here>:8786",
name="name",
**self._job_kwargs
)
except TypeError as exc:
# Very likely this error happened in the self.job_cls constructor
# because an unexpected parameter was used in the JobQueueCluster
# constructor. The next few lines builds a more user-friendly error message.
match = re.search("(unexpected keyword argument.+)", str(exc))
if not match:
raise
message_orig = match.group(1)
raise ValueError(
'Got {}. Very likely this unexpected parameter was passed in "job_kwargs" in the {} constructor:\n'
"job_kwargs={}".format(
message_orig, self.__class__.__name__, self._job_kwargs
)
) from exc

@property
def job_header(self):
Expand Down
6 changes: 3 additions & 3 deletions dask_jobqueue/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def __init__(
disk=None,
job_extra=None,
config_name=None,
**kwargs
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **kwargs
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)

if disk is None:
Expand All @@ -60,7 +60,7 @@ def __init__(
else:
self.job_extra = job_extra

env_extra = kwargs.get("env_extra", None)
env_extra = base_class_kwargs.get("env_extra", None)
if env_extra is None:
env_extra = dask.config.get(
"jobqueue.%s.env-extra" % self.config_name, default=[]
Expand Down
4 changes: 2 additions & 2 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ def __init__(
lsf_units=None,
config_name=None,
use_stdin=None,
**kwargs
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **kwargs
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)

if queue is None:
Expand Down
4 changes: 2 additions & 2 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def __init__(
walltime=None,
job_extra=None,
config_name=None,
**kwargs
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **kwargs
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)

if queue is None:
Expand Down
4 changes: 2 additions & 2 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ def __init__(
walltime=None,
job_extra=None,
config_name=None,
**kwargs
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **kwargs
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)

if queue is None:
Expand Down
4 changes: 2 additions & 2 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def __init__(
walltime=None,
job_extra=None,
config_name=None,
**kwargs
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **kwargs
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)

if queue is None:
Expand Down
4 changes: 2 additions & 2 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def __init__(
job_mem=None,
job_extra=None,
config_name=None,
**kwargs
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **kwargs
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)

if queue is None:
Expand Down
31 changes: 31 additions & 0 deletions dask_jobqueue/tests/ca.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIFazCCA1OgAwIBAgIUHfc6OUT6HK590ABdtsRzglDZsfowDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDA1MjUxNDQ5MzlaFw0zMDA1
MjMxNDQ5MzlaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggIiMA0GCSqGSIb3DQEB
AQUAA4ICDwAwggIKAoICAQCb5PcZthd/Ssv4JpsxGUww9KsA+HknTQVAWUN85bC4
BxLS8IKaE302yGBg8MK9w0z+qTOoMFNMqoo+Yqp7sDGbV/4U9qVPuJfbxddLPyQU
QenGXsyAMLEmVXCmQHXlcZCsIlY0je6c7o2bJw2oJPPktjMRNfHI+lWtzhMVOhWz
nKRMwikfaV80Vn80Jx7zY2A2kcsoGL16Y/GLp4AAhEGWtn3GFJDCCWpJM4fBeChZ
6wOxP/KnNsJZ2PeHb+yTXT347ZFRdKHB6DNDmiWk48BrPgwemKdePt+7YN1UmV88
YCl54BecgGiu3Iav0gqCHv2JIDZtyUtfBA/rPXsUmRbus0ZW9ASCnVEbt9FtHQZ+
e/BmLL6WDg58lGE4TeI+EaK1FqxbVEB3K5idrForTZvadKzf2licSpRP+wlRruhZ
k85lT50aY5tvQIPq+FnX2mQwosMln0hIPXvgqXDMNrSCzUZ9aCLL6PWx+wqLHq9p
dU/2upPM9AEiPh1M9nEXL+FdbQMB6wNbpo/L3yTfQ1pL3Z33KLDC9OfWCGymgarL
1uxeUbt6XeCB7Exb+1fT73RtjeO2QFTZrCjKfxR3vKIBA0/2qb0DfktAasekyx3U
TC2ZhT/wNP4LBP/l1Oasn47e6EPs2slZLPHZXfbdElsBGcGPDwh38RjSVq+fzm7b
bwIDAQABo1MwUTAdBgNVHQ4EFgQU+UmkXa3pmOtN4bN3LEm6Vbc2KVMwHwYDVR0j
BBgwFoAU+UmkXa3pmOtN4bN3LEm6Vbc2KVMwDwYDVR0TAQH/BAUwAwEB/zANBgkq
hkiG9w0BAQsFAAOCAgEAMNWu/3ci9SxiZ2YO/x4VBtvuhB2D0eAF71NXdJW+F2WN
ol+ut2mHyXeqJTqme6nwMkItuCk/M3/rtxJYa+i77PkRP+dv10jfOLNgc0nMTCKx
ll6AlhX3Mz4kt4/8g1h6oyee/Tau4VK9UTLzlxHiPSX4HrPR1g8jEyA5K9pwfX0p
RBLAlaStyemH/HkCX0HpG8PSJfEbmLsbJR5cOKtGAgt+mUnPjKGvuWOkohrOzFci
dMaWCULZX6PEdeJaXNerP7mR/BH3nOHSWja1zqr4t4h4e7hOXREseUrvUB9pHNkv
T2UrbbBMGElHVlHtxtaxdu/zNzL1DBxBkLjyw+SberFFQAQR5I0yCWOhrwzONzH6
XPL81igvUao5BQ7quJpy58EPqGwALAkB5QDsvH4uD7Bgwp8IYAtcRWWGL8WnuEt5
shS4yK4UXFCgUeg580+gZ7t1nH6f55/A7d/g0DdpvBBTZVYRJFpiJCnCTH6LbjI0
XXe5vDubTxjLSZemP4w+w+Oe18U2tJ6DXb2rUZGoWECnfbYfVmO2hdLcgp2nenXl
dcRANXfZT9R34wzVx/LUJA9hjOgfFF38CC2RfOZr9bb1IjLD/X4V5SrxsfgnQ4P/
/RKGyQ5dpeK/lpdgYJuqMgUffy8KDrQdMaBq8+0RdVL3nm5+tPFKcskMI2f/zyc=
-----END CERTIFICATE-----
52 changes: 52 additions & 0 deletions dask_jobqueue/tests/key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCb5PcZthd/Ssv4
JpsxGUww9KsA+HknTQVAWUN85bC4BxLS8IKaE302yGBg8MK9w0z+qTOoMFNMqoo+
Yqp7sDGbV/4U9qVPuJfbxddLPyQUQenGXsyAMLEmVXCmQHXlcZCsIlY0je6c7o2b
Jw2oJPPktjMRNfHI+lWtzhMVOhWznKRMwikfaV80Vn80Jx7zY2A2kcsoGL16Y/GL
p4AAhEGWtn3GFJDCCWpJM4fBeChZ6wOxP/KnNsJZ2PeHb+yTXT347ZFRdKHB6DND
miWk48BrPgwemKdePt+7YN1UmV88YCl54BecgGiu3Iav0gqCHv2JIDZtyUtfBA/r
PXsUmRbus0ZW9ASCnVEbt9FtHQZ+e/BmLL6WDg58lGE4TeI+EaK1FqxbVEB3K5id
rForTZvadKzf2licSpRP+wlRruhZk85lT50aY5tvQIPq+FnX2mQwosMln0hIPXvg
qXDMNrSCzUZ9aCLL6PWx+wqLHq9pdU/2upPM9AEiPh1M9nEXL+FdbQMB6wNbpo/L
3yTfQ1pL3Z33KLDC9OfWCGymgarL1uxeUbt6XeCB7Exb+1fT73RtjeO2QFTZrCjK
fxR3vKIBA0/2qb0DfktAasekyx3UTC2ZhT/wNP4LBP/l1Oasn47e6EPs2slZLPHZ
XfbdElsBGcGPDwh38RjSVq+fzm7bbwIDAQABAoICAHFc4ddfk9yr3oEYSdg9Zitf
cA6noSpUFtKBVtM3D/fypNyhqscyDubMdVFpIqPtpkq1bewLIDfq99Z/1ytUp+4n
4YsLBJFhUYSubG26f5j/iWkIPLunLNsMXHt4+oKbv7F80qUq5O5Xhr/heUvhez3A
xIfqa2VTrQRTi4rvDyLqcIuk0VSXQnUDxUJ+hEJG7IsiH9KLkxWyIc8FQc6eXjej
gviMset3/0M15q1onCcvACNftiukZVYCsZVabXWH423mC7tpDcu897JcIK20NJOH
rjZ9mY+uNvHCcZB4a0mzP9XxgBn9QqKNmJ+4JI4UzRdvRkU1kMqKYK0Wqy8CWCqf
kw7FrRb2AO94bK3CMQ+9G0AJCLD4tlh/Xw9DIh4h2iv3G3T9GVmH0fLnF2LDKy2G
7Z0oC65s3TjeO2a9SRvlgEmf28EujjCil0RZx6Dnni171GVARohdLZaGDquj5V26
Sc2iTaHDXKcRCyntORJ7hyJt7PsCU7nj4dMLc4q35mW4Finxt0Is5w+IGIL+fKEI
0ZrTf6eBRNU4rLagkiCmhBydmJaj8jMfl7XVHXEoOT2AR0dk8XfIK/LO9d/5zTJA
y1Wx9THg5wUK2KoimQXg+qZFw06fEjlbzEl0DlZOdOrk2O1lX+cYrW0pbUH2OKOh
TGjMN3f45sCfDEvHQg35AoIBAQDPTDnv7ZiKDI5vpl6+uNnVQRwlKQ+QrD+FxrW3
52b23eVcPP7kcZ4xDeJ0T6ZdGGJ41SRKUn6gsojQ02yReSn02/BZVmL/shEweXhM
VrHmMbwOUYGwcVmp9jjSTH4VVEl42oMpc8JCrEAKAda09ZuG4+PbFOtGBskWnBNE
uhJHlgL5CRODLmy+jZvafsTLHIqfDlq18jxyPz0SsaXGcV5Lm2Ob6DPRKvp8578X
ehwpJcJMvqAmzwflgVj+Qb2K0/ceDvjajC3BMcp63gjCAojTbbHrIrXzS0ClSjUg
U8sfUaXe0wOAqsKIl+evQtNwiv/2QMmvC1jHI7miMeu7MSJtAoIBAQDAhR7uAzJc
quFtjfbwCJswzAsQ8W4YOioTLWFW2W/+nS4MfFlxFEuOeJpvYmViD67UFJrCY9bO
xXyQvTQPN1iVFAjqG3Ur40/+zFGm1uXR0rF2f0oafceKodT2+TyDcoLkVoi2h93k
PfgCsen+ajwXYNH4utHN9QoykRKESzwWtJOcY2vBWxItFmumFborv/6oJkCMXWC8
o5wjIJHXZ9q/bxvNNnENnXafbYWyKlwu6f0CXeTd4Dt4zD7Ab861/F0zP4GXlrR/
ecPhqxaS5U51VfOZp4q5Ey8HbWxVENMA9z+4dp7P+/A2WLLdyLz23tYo7wDmpqQ0
CUZ2YNeUSWvLAoIBAQCLxczm7uBEbNT5iUcm+ALA3Mb6c2YwWUP3kpVia9+sItAM
0n/XTpioYMFJRY9aBCAZczWl+1uwRNElZPk2WWkl1cqIokvcNpeKhMzNRENtgClZ
yjFU5AjeJcwIWFVHUm670zJPF+NrCzOey8CWgWidmjk/tioxLFAYM6J2W7QJmqdk
fW8vq3TdQyRMPd+5SARb3NTjC3MgYW1vlmK9nCFFf3+5VubhaUY+RBA/5zDnubL6
Bip8IGoloIJ95ZvE6Mkd9mBrE8uiEU2CbQWgsw7I6JTng58Fbb8n9BJAOt+hvW7H
AKbC8eB7M1mffcKNhtuxkdurcE4q7/ax21EkBaw9AoIBAHdA/svIxyWH5GQMkG5X
pmovupsgMmZngTCn56f4wNsjWib50B2vyK3UHzXn6Y040b8llEfduG4U/vhZeyoB
yqlt46fAonAxOphG0D1c2LeEn9EbQDfwue4yGM1zzfxOrq3qvHz05IpBqKNiueOS
wu5oVyiP8O53X327R5ETWYFnEhjJrTH0y+mJ/dy/kLcRExntuAY6wXWYk1tfDXg1
KNd0Z/BSTO12IMjY+vxGKRwWbVdN+jtGbxCA1E438//e94yLRic0f1KHhsL/S9hq
mpMsTt1bXx8NtxAOxBBdf5cVkS8eq3mCQmYnw4SGmCcEGfz5L8Gwb/6b0D379w4v
/MkCggEAEwIOqhgwUjscb1L9NTehFa3CaK8eQwkEvB7aQLhTbTUK4bigSd6f20Ju
3N/zmHhSkEm6Oq9+zrjm8QqgztrB3Zn+nCWu2n7BHFoCexg4Qov3xuKsC3MMh/oa
/xqgVgsFOgqk93YWv/6IvCHM8F3Ntk+rcUlt0M1VxXC8Ym/vRHy5VN12asI19K1M
UbZINK0bVAdEdwaB2OSx14yag9uiaXxKryZe7qTjcR2ELxksVo0nwX7h0NHA1S77
Ubq57jxR5a/wI9VA5iTUZSUkjxfEdPUEAY1eDdp+Ogqcs1Zsa8Er0JUNPxqJzmlJ
Q2ypwp0CapUz0d43a47/izrjAtXzug==
-----END PRIVATE KEY-----
49 changes: 49 additions & 0 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import dask

from distributed.security import Security
from distributed import Client

from dask_jobqueue import (
JobQueueCluster,
PBSCluster,
Expand Down Expand Up @@ -393,3 +396,49 @@ def test_import_scheduler_options_from_config(Cluster):
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") == pass_scheduler_interface
assert scheduler_options.get("port") is None


@pytest.mark.parametrize("Cluster", all_clusters)
def test_wrong_parameter_error(Cluster):
match = re.compile(
"unexpected keyword argument.+wrong_parameter.+"
"{}.+job_kwargs.+cores.+memory.+"
"wrong_parameter.+wrong_parameter_value".format(Cluster.__name__),
re.DOTALL,
)
with pytest.raises(ValueError, match=match):
create_cluster_func(
Cluster, cores=1, memory="1GB", wrong_parameter="wrong_parameter_value",
)


def test_security():
dirname = os.path.dirname(__file__)
key = os.path.join(dirname, "key.pem")
cert = os.path.join(dirname, "ca.pem")
security = Security(
tls_ca_file=cert,
tls_scheduler_key=key,
tls_scheduler_cert=cert,
tls_worker_key=key,
tls_worker_cert=cert,
tls_client_key=key,
tls_client_cert=cert,
require_encryption=True,
)

with LocalCluster(
cores=1, memory="1GB", security=security, protocol="tls"
) as cluster:
assert cluster.security == security
assert cluster.scheduler_spec["options"]["security"] == security
job_script = cluster.job_script()
assert "--tls-key {}".format(key) in job_script
assert "--tls-cert {}".format(cert) in job_script
assert "--tls-ca-file {}".format(cert) in job_script

cluster.scale(jobs=1)
with Client(cluster, security=security) as client:
future = client.submit(lambda x: x + 1, 10)
result = future.result()
assert result == 11
2 changes: 0 additions & 2 deletions dask_jobqueue/tests/test_oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ def test_header():
processes=4,
cores=8,
memory="28GB",
job_cpu=16,
job_mem="100G",
job_extra=["-t besteffort"],
) as cluster:
assert "walltime=" in cluster.job_header
Expand Down
2 changes: 2 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Development version
``threads_per_process=cores``. (:pr:`375`)
- all cluster classes: ``interface`` was ignored when set in a config file.
(:pr:`366`)
- all cluster classes: fix a bug that would allow to pass any named parameter without an error (:pr:`398`)
- all cluster classes: fix a bug where ``security`` was not correctly passed through (:pr:`398`)
- ``LSFCluster``: switch to ``use_stdin=True`` by default (:pr:`388`).
- ``LSFCluster``: add ``use_stdin`` to ``LSFCluster``. This switches between
``bsub < job_script`` and ``bsub job_script`` to launch a ``LSF`` job
Expand Down

0 comments on commit f199cf9

Please sign in to comment.