Skip to content

Commit

Permalink
New attempt at #411 (adding extra argument to condor_submit) (#514)
Browse files Browse the repository at this point in the history
* Add submit_command_extra and cancel_command_extra arguments

`submit_command_extra` and `cancel_command_extra` (config options
`jobqueue.X.submit-command-extra` and `jobqueue.X.cancel-command-extra`)
are lists of strings that are arguments passed on as-is to `condor_submit`
and `condor_rm` when submitting or removing HTCondor jobs.

* Inline shell quoting for setting submit_command/cancel_command

* Missed a space

* Turn core.Job._close_job() into an instance method so we can override cancel_command in the instance

* Add hostname to htcondor test container

Some htcondor parameters (e.g. "-name" to specify a schedd name) expect
to be given an FQDN (or something that looks like one) and will fail if
not given a parameter with a `.` in it.

* Add stub test for submit_command_extra and cancel_command_extra
i# On branch pr/411-extra-commands

* Add tests for:

- htcondor with a broken submit command
- htcondor with working submit/cancel commands with extra arguments
- htcondor with a broken cancel command

* Better test for whether a remove happened

(assuming no other workers are running -- it would be even better if
I had the job IDs of the workers from the cluster.

Also don't wait so long.  A remove is quicker than a submit.

* Got the assert backwards.

* Turn core.Job._close_job() into an instance method so we can override cancel_command in the instance

* revert changing internal close_job to instance method

* add cancel_command also to finalizer and local overide

* revert to some more relaxed testing

* try different testing

* add timeout to testing

* remove broken_submit test from htcondor

* revert unrelated change

Co-authored-by: Matyas Selmeci <matyas@cs.wisc.edu>
  • Loading branch information
riedel and matyasselmeci committed Oct 5, 2021
1 parent 79a9a3b commit 26fd3ae
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 5 deletions.
8 changes: 4 additions & 4 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ async def start(self):
out = await self._submit_job(fn)
self.job_id = self._job_id_from_submit_output(out)

weakref.finalize(self, self._close_job, self.job_id)
weakref.finalize(self, self._close_job, self.job_id, self.cancel_command)

logger.debug("Starting job: %s", self.job_id)
await super().start()
Expand Down Expand Up @@ -356,13 +356,13 @@ def _job_id_from_submit_output(self, out):

async def close(self):
logger.debug("Stopping worker: %s job: %s", self.name, self.job_id)
self._close_job(self.job_id)
self._close_job(self.job_id, self.cancel_command)

@classmethod
def _close_job(cls, job_id):
def _close_job(cls, job_id, cancel_command):
if job_id:
with suppress(RuntimeError): # deleting job when job already gone
cls._call(shlex.split(cls.cancel_command) + [job_id])
cls._call(shlex.split(cancel_command) + [job_id])
logger.debug("Closed job %s", job_id)

@staticmethod
Expand Down
28 changes: 28 additions & 0 deletions dask_jobqueue/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(
disk=None,
job_extra=None,
config_name=None,
submit_command_extra=None,
cancel_command_extra=None,
**base_class_kwargs
):
super().__init__(
Expand Down Expand Up @@ -94,6 +96,28 @@ def __init__(
if self.job_extra:
self.job_header_dict.update(self.job_extra)

if submit_command_extra is None:
submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
HTCondorJob.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in submit_command_extra)
)

if cancel_command_extra is None:
cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
HTCondorJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in cancel_command_extra)
)

def env_lines_to_dict(self, env_lines):
"""Convert an array of export statements (what we get from env-extra
in the config) into a dict"""
Expand Down Expand Up @@ -217,6 +241,10 @@ class HTCondorCluster(JobQueueCluster):
Total amount of disk per job
job_extra : dict
Extra submit file attributes for the job
submit_command_extra : list of str
Extra arguments to pass to condor_submit
cancel_command_extra : list of str
Extra arguments to pass to condor_rm
{job}
{cluster}
Expand Down
2 changes: 2 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ jobqueue:
disk: null # Total amount of disk per job
env-extra: []
job-extra: {} # Extra submit attributes
submit-command-extra: [] # Extra condor_submit arguments
cancel-command-extra: [] # Extra condor_rm arguments
log-directory: null
shebang: "#!/usr/bin/env condor_submit"

Expand Down
2 changes: 1 addition & 1 deletion dask_jobqueue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def _submit_job(self, script_filename):
return str(self.process.pid)

@classmethod
def _close_job(self, job_id):
def _close_job(self, job_id, cancel_command):
os.kill(int(job_id), 9)
# from distributed.utils_test import terminate_process
# terminate_process(self.process)
Expand Down
38 changes: 38 additions & 0 deletions dask_jobqueue/tests/test_htcondor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import sys
from time import sleep, time

Expand All @@ -8,6 +9,7 @@
from dask.utils import format_bytes, parse_bytes

from dask_jobqueue import HTCondorCluster
from dask_jobqueue.core import Job

QUEUE_WAIT = 30 # seconds

Expand All @@ -27,6 +29,8 @@ def test_job_script():
disk="100MB",
env_extra=['export LANG="en_US.utf8"', 'export LC_ALL="en_US.utf8"'],
job_extra={"+Extra": "True"},
submit_command_extra=["-verbose"],
cancel_command_extra=["-forcex"],
) as cluster:
job_script = cluster.job_script()
assert "RequestCpus = MY.DaskWorkerCores" in job_script
Expand All @@ -40,6 +44,10 @@ def test_job_script():
assert "LC_ALL=en_US.utf8" in job_script
assert "export" not in job_script
assert "+Extra = True" in job_script
assert re.search(
r"condor_submit\s.*-verbose", cluster._dummy_job.submit_command
)
assert re.search(r"condor_rm\s.*-forcex", cluster._dummy_job.cancel_command)

assert (
"{} -m distributed.cli.dask_worker tcp://".format(sys.executable)
Expand Down Expand Up @@ -77,6 +85,36 @@ def test_basic(loop):
assert time() < start + QUEUE_WAIT


@pytest.mark.env("htcondor")
def test_extra_args_broken_cancel(loop):
with HTCondorCluster(
cores=1,
memory="100MB",
disk="100MB",
loop=loop,
cancel_command_extra=["-name", "wrong.docker"],
) as cluster:
with Client(cluster) as client:

cluster.scale(2)

client.wait_for_workers(2)
workers = Job._call(["condor_q", "-af", "jobpid"]).strip()
assert workers, "we got dask workers"

cluster.scale(0)

start = time()
while client.scheduler_info()["workers"]:
sleep(0.100)

workers = Job._call(["condor_q", "-af", "jobpid"]).strip()
assert workers, "killing workers with broken cancel_command didn't fail"

if time() > start + QUEUE_WAIT // 3:
return


def test_config_name_htcondor_takes_custom_config():
conf = {
"cores": 1,
Expand Down

0 comments on commit 26fd3ae

Please sign in to comment.