Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTCondorCluster #245

Merged
merged 12 commits into from Mar 7, 2019
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Expand Up @@ -7,6 +7,7 @@
from .sge import SGECluster
from .lsf import LSFCluster
from .oar import OARCluster
from .htcondor import HTCondorCluster

from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
212 changes: 212 additions & 0 deletions dask_jobqueue/htcondor.py
@@ -0,0 +1,212 @@
from __future__ import absolute_import, division, print_function

import logging
import re
import shlex
from collections import OrderedDict

import dask
from distributed.utils import parse_bytes

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


class HTCondorCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on an HTCondor cluster with a shared file system

Parameters
----------
disk : str
Total amount of disk per job
job_extra : dict
Extra submit file attributes for the job
%(JobQueueCluster.parameters)s

Examples
--------
>>> from dask_jobqueue.htcondor import HTCondorCluster
>>> cluster = HTCondorCluster(cores=24, memory="4GB", disk="4GB")
>>> cluster.scale(10)

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and kill workers based on load.
HTCondor can take longer to start jobs than other batch systems - tune Adaptive parameters accordingly.

>>> cluster.adapt(minimum=5, startup_cost='60s')
""", 4)

_script_template = """
%(shebang)s

%(job_header)s

Environment = "%(quoted_environment)s"
Arguments = "%(quoted_arguments)s"
Executable = %(executable)s
""".lstrip()

submit_command = "condor_submit -queue 1 -file"
cancel_command = "condor_rm"
job_id_regexp = r'(?P<job_id>\d+\.\d+)'

# condor sets argv[0] of the executable to "condor_exec.exe", which confuses
# Python (can't find its libs), so we have to go through the shell.
executable = "/bin/sh"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means we cannot put something like dask-worker here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dask-worker would work well, I just noticed that none of the other implementations were using it so I thought it was deprecated or something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually use

'%(python)s -m distributed.cli.dask_worker' % dict(python=sys.executable)

see #77

This is just to be sure to find dask-worker script if not in the PATH or python bin directory.

But you may not be able to pass several arguments here... So maybe it's just simpler to use /bin/sh.


def __init__(self, disk=None, job_extra=None,
config_name='htcondor', **kwargs):
if disk is None:
disk = dask.config.get("jobqueue.%s.disk" % config_name)
if disk is None:
raise ValueError("You must specify how much disk to use per job like ``disk='1 GB'``")
self.worker_disk = parse_bytes(disk)
if job_extra is None:
self.job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name, {})
else:
self.job_extra = job_extra

# Instantiate args and parameters from parent abstract class
super(HTCondorCluster, self).__init__(config_name=config_name, **kwargs)

env_extra = kwargs.get("env_extra", None)
if env_extra is None:
env_extra = dask.config.get("jobqueue.%s.env-extra" % config_name, default=[])
# env_extra is an array of export statements -- turn it into a dict
self.env_dict = {}
for env_line in env_extra:
split_env_line = shlex.split(env_line)
if split_env_line[0] == "export":
split_env_line = split_env_line[1:]
for item in split_env_line:
if '=' in item:
k, v = item.split('=', 1)
self.env_dict[k] = v
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make at least the above lines a function.

Could it also be move directly into quote_environment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move it to a function but I'd like to keep it separate from quote_environment; I think turning the config lines into a dict is a separate task from turning the dict into a string, and I'd like to leave the option open to add to/edit the dict after parsing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fine by me.

self.env_dict["JOB_ID"] = "$F(MY.JobId)"

self.job_header_dict = {
"MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"',
"RequestCpus": "MY.DaskWorkerCores",
"RequestMemory": "floor(MY.DaskWorkerMemory / 1048576)",
"RequestDisk": "floor(MY.DaskWorkerDisk / 1024)",
"MY.JobId": '"$(ClusterId).$(ProcId)"',
"MY.DaskWorkerCores": self.worker_cores,
"MY.DaskWorkerMemory": self.worker_memory,
"MY.DaskWorkerDisk": self.worker_disk,
}
if self.log_directory:
self.job_header_dict.update({
"LogDirectory": self.log_directory,
# $F(...) strips quotes
"Output": "$(LogDirectory)/worker-$F(MY.JobId).out",
"Error": "$(LogDirectory)/worker-$F(MY.JobId).err",
"Log": "$(LogDirectory)/worker-$(ClusterId).log",
# We kill all the workers to stop them so we need to stream their
# output+error if we ever want to see anything
"Stream_Output": True,
"Stream_Error": True,
})
if self.job_extra:
self.job_header_dict.update(self.job_extra)

self.make_job_header()

def make_job_header(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really useful to have a function for a one liner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can inline it, but in that case, I'd like to move it into job_script(). I'd like a subclass to be able to edit the job header via the job_header_dict and have those changes be reflected in the generated script.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but this is somehow different from what we do for other implementation. Do you foresee there will be some subclasses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'd like to subclass it to add HTCondor file transfer support and wrapper scripts and other things needed to get rid of the shared file system dependency. As I suggested in #100, I'd put this in a separate repo so you folks wouldn't have to maintain the extra code.

""" The string version of the submit file attributes """
self.job_header = "\n".join("%s = %s" % (k, v) for k, v in self.job_header_dict.items())

def job_script(self):
""" Construct a job submission script """
quoted_arguments = quote_arguments(["-c", self._command_template])
quoted_environment = quote_environment(self.env_dict)
return self._script_template % {
'shebang': self.shebang,
'job_header': self.job_header,
'quoted_environment': quoted_environment,
'quoted_arguments': quoted_arguments,
'executable': self.executable,
}

def _job_id_from_submit_output(self, out):
cluster_id_regexp = r"submitted to cluster (\d+)"
guillaumeeb marked this conversation as resolved.
Show resolved Hide resolved
match = re.search(cluster_id_regexp, out)
if match is None:
msg = ("Could not parse cluster id from submission command output.\n"
"Cluster id regexp is {!r}\n"
"Submission command output is:\n{}".format(cluster_id_regexp, out))
raise ValueError(msg)
return "%s.0" % match.group(1)


def _double_up_quotes(instr):
return instr.replace("'", "''").replace('"', '""')


def quote_arguments(args):
"""Quote a string or list of strings using the Condor submit file "new" argument quoting rules.
guillaumeeb marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
str
The arguments in a quoted form.

Warnings
--------
You will need to surround the result in double-quotes before using it in
the Arguments attribute.

Examples
--------
>>> quote_arguments(["3", "simple", "arguments"])
'3 simple arguments'
>>> quote_arguments(["one", "two with spaces", "three"])
'one \'two with spaces\' three'
>>> quote_arguments(["one", "\"two\"", "spacy 'quoted' argument"])
'one ""two"" \'spacey \'\'quoted\'\' argument\''
"""
if isinstance(args, str):
args_list = [args]
else:
args_list = args

quoted_args = []
for a in args_list:
qa = _double_up_quotes(a)
if ' ' in qa or "'" in qa:
qa = "'" + qa + "'"
quoted_args.append(qa)
return " ".join(quoted_args)


def quote_environment(env):
"""Quote a dict of strings using the Condor submit file "new" environment quoting rules.

Returns
-------
str
The environment in quoted form.

Warnings
--------
You will need to surround the result in double-quotes before using it in
the Environment attribute.

Examples
--------
>>> quote_environment(OrderedDict([("one", 1), ("two", '"2"'), ("three", "spacey 'quoted' value")]))
'one=1 two=""2"" three=\'spacey \'\'quoted\'\' value\''
"""
if not isinstance(env, dict):
raise TypeError("env must be a dict")

entries = []
for k, v in env.items():
qv = _double_up_quotes(str(v))
if ' ' in qv or "'" in qv:
qv = "'" + qv + "'"
entries.append("%s=%s" % (k, qv))

return " ".join(entries)
21 changes: 21 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Expand Up @@ -138,3 +138,24 @@ jobqueue:
mem: null
job-extra: []
log-directory: null

htcondor:
name: dask-worker

# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR

# HTCondor Resource Manager options
disk: null # Total amount of disk per job
extra: []
env-extra: []
job-extra: {} # Extra submit attributes
log-directory: null
shebang: "#!/usr/bin/env condor_submit"

93 changes: 93 additions & 0 deletions dask_jobqueue/tests/test_htcondor.py
@@ -0,0 +1,93 @@
from __future__ import absolute_import, division, print_function

import sys
from time import sleep, time

import pytest
from distributed import Client

import dask

from dask_jobqueue import HTCondorCluster

QUEUE_WAIT = 30 # seconds


def test_header():
with HTCondorCluster(cores=1, memory='100MB', disk='100MB') as cluster:
job_header = cluster.job_header
assert 'RequestCpus = MY.DaskWorkerCores' in job_header
assert 'RequestDisk = floor(MY.DaskWorkerDisk / 1024)' in job_header
assert 'RequestMemory = floor(MY.DaskWorkerMemory / 1048576)' in job_header
assert 'MY.DaskWorkerCores = 1' in job_header
assert 'MY.DaskWorkerDisk = 100000000' in job_header
assert 'MY.DaskWorkerMemory = 100000000' in job_header
assert 'MY.JobId = "$(ClusterId).$(ProcId)"' in job_header


def test_job_script():
with HTCondorCluster(cores=4, processes=2, memory='100MB', disk='100MB',
env_extra=['export LANG="en_US.utf8"',
'export LC_ALL="en_US.utf8"'],
job_extra={'+Extra': "True"}) as cluster:
job_script = cluster.job_script()
assert 'LANG=en_US.utf8' in job_script
assert 'LC_ALL=en_US.utf8' in job_script
assert 'JOB_ID=$F(MY.JobId)' in job_script
assert 'export' not in job_script
assert '+Extra = True' in job_script

assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--memory-limit 50.00MB' in job_script
assert '--nthreads 2' in job_script
assert '--nprocs 2' in job_script


@pytest.mark.env("htcondor")
def test_basic(loop):
with HTCondorCluster(cores=1, memory='100MB', disk='100MB', loop=loop) as cluster:
with Client(cluster) as client:

cluster.scale(2)

start = time()
while not (cluster.pending_jobs or cluster.running_jobs):
sleep(0.100)
assert time() < start + QUEUE_WAIT

future = client.submit(lambda x: x + 1, 10)
assert future.result(QUEUE_WAIT) == 11
assert cluster.running_jobs

workers = list(client.scheduler_info()['workers'].values())
w = workers[0]
assert w['memory_limit'] == 1e6
assert w['ncores'] == 1

cluster.scale(0)

start = time()
while cluster.running_jobs:
sleep(0.100)
assert time() < start + QUEUE_WAIT


def test_config_name_htcondor_takes_custom_config():
conf = {'cores': 1,
'memory': '120 MB',
'disk': '120 MB',
'job-extra': [],
'name': 'myname',
'processes': 1,
'interface': None,
'death-timeout': None,
'extra': [],
'env-extra': [],
'log-directory': None,
'shebang': "#!/usr/bin/env condor_submit",
'local-directory': "/tmp",
}

with dask.config.set({'jobqueue.htcondor-config-name': conf}):
with HTCondorCluster(config_name='htcondor-config-name') as cluster:
assert cluster.name == 'myname'