Skip to content

Commit

Permalink
Simplify default config_name and job_cls mechanism (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesteve committed Nov 21, 2019
1 parent af37667 commit 45fc714
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 89 deletions.
83 changes: 52 additions & 31 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,43 +143,43 @@ def __init__(

super().__init__()

default_config_name = self.default_config_name()
if config_name is None:
config_name = getattr(type(self), "config_name")
if config_name is None:
raise ValueError(
"Looks like you are trying to create a class that inherits from dask_jobqueue.core.Job. "
"If that is the case, you need to:\n"
"- set the 'config_name' class variable to a non-None value\n"
"- create a section in jobqueue.yaml with the value of 'config_name'\n"
"If that is not the case, please open an issue in https://github.com/dask/dask-jobqueue/issues."
)
config_name = default_config_name
self.config_name = config_name

if job_name is None:
job_name = dask.config.get("jobqueue.%s.name" % config_name)
job_name = dask.config.get("jobqueue.%s.name" % self.config_name)
if cores is None:
cores = dask.config.get("jobqueue.%s.cores" % config_name)
cores = dask.config.get("jobqueue.%s.cores" % self.config_name)
if memory is None:
memory = dask.config.get("jobqueue.%s.memory" % config_name)
memory = dask.config.get("jobqueue.%s.memory" % self.config_name)
if processes is None:
processes = dask.config.get("jobqueue.%s.processes" % config_name)
processes = dask.config.get("jobqueue.%s.processes" % self.config_name)
if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)
interface = dask.config.get("jobqueue.%s.interface" % self.config_name)
if death_timeout is None:
death_timeout = dask.config.get("jobqueue.%s.death-timeout" % config_name)
death_timeout = dask.config.get(
"jobqueue.%s.death-timeout" % self.config_name
)
if local_directory is None:
local_directory = dask.config.get(
"jobqueue.%s.local-directory" % config_name
"jobqueue.%s.local-directory" % self.config_name
)
if extra is None:
extra = dask.config.get("jobqueue.%s.extra" % config_name)
extra = dask.config.get("jobqueue.%s.extra" % self.config_name)
if env_extra is None:
env_extra = dask.config.get("jobqueue.%s.env-extra" % config_name)
env_extra = dask.config.get("jobqueue.%s.env-extra" % self.config_name)
if header_skip is None:
header_skip = dask.config.get("jobqueue.%s.header-skip" % config_name, ())
header_skip = dask.config.get(
"jobqueue.%s.header-skip" % self.config_name, ()
)
if log_directory is None:
log_directory = dask.config.get("jobqueue.%s.log-directory" % config_name)
log_directory = dask.config.get(
"jobqueue.%s.log-directory" % self.config_name
)
if shebang is None:
shebang = dask.config.get("jobqueue.%s.shebang" % config_name)
shebang = dask.config.get("jobqueue.%s.shebang" % self.config_name)

if cores is None or memory is None:
job_class_name = self.__class__.__name__
Expand All @@ -191,7 +191,7 @@ def __init__(
)
)

# This attribute should be overridden
# This attribute should be set in the derived class
self.job_header = None

if interface:
Expand Down Expand Up @@ -239,6 +239,18 @@ def __init__(
if not os.path.exists(self.log_directory):
os.makedirs(self.log_directory)

@classmethod
def default_config_name(cls):
config_name = getattr(cls, "config_name", None)
if config_name is None:
raise ValueError(
"The class {} is required to have 'config_name' class variable.\n"
"If you have created this class, please add a 'config_name' class variable.\n"
"If not this may be a bug, feel free to create an issue at: "
"https://github.com/dask/dask-jobqueue/issues/new".format(cls)
)
return config_name

def job_script(self):
""" Construct a job submission script """
header = "\n".join(
Expand Down Expand Up @@ -392,8 +404,6 @@ class JobQueueCluster(SpecCluster):
cluster_parameters=cluster_parameters
)

job_cls = None

def __init__(
self,
n_workers=0,
Expand All @@ -414,18 +424,29 @@ def __init__(
**kwargs
):
self.status = "created"

default_job_cls = getattr(type(self), "job_cls", None)
self.job_cls = default_job_cls
if job_cls is not None:
self.job_cls = job_cls

if self.job_cls is None:
raise ValueError(
"You must provide a Job type like PBSJob, SLURMJob, "
"or SGEJob with the job_cls= argument."
"You need to specify a Job type. Two cases:\n"
"- you are inheriting from JobQueueCluster (most likely): you need to add a 'job_cls' class variable "
"in your JobQueueCluster-derived class {}\n"
"- you are using JobQueueCluster directly (less likely, only useful for tests): "
"please explicitly pass a Job type through the 'job_cls' parameter.".format(
type(self)
)
)

if config_name:
if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)
default_config_name = self.job_cls.default_config_name()
if config_name is None:
config_name = default_config_name

if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)

scheduler = {
"cls": Scheduler, # Use local scheduler for now
Expand All @@ -437,8 +458,8 @@ def __init__(
"security": security,
},
}
if config_name:
kwargs["config_name"] = config_name

kwargs["config_name"] = config_name
kwargs["interface"] = interface
kwargs["protocol"] = protocol
kwargs["security"] = security
Expand Down
19 changes: 10 additions & 9 deletions dask_jobqueue/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,29 @@ class HTCondorJob(Job):
# Python (can't find its libs), so we have to go through the shell.
executable = "/bin/sh"

def __init__(
self, *args, disk=None, job_extra=None, config_name="htcondor", **kwargs
):
config_name = "htcondor"

def __init__(self, *args, disk=None, job_extra=None, config_name=None, **kwargs):
super().__init__(*args, config_name=config_name, **kwargs)

if disk is None:
disk = dask.config.get("jobqueue.%s.disk" % config_name)
disk = dask.config.get("jobqueue.%s.disk" % self.config_name)
if disk is None:
raise ValueError(
"You must specify how much disk to use per job like ``disk='1 GB'``"
)
self.worker_disk = parse_bytes(disk)
if job_extra is None:
self.job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name, {})
self.job_extra = dask.config.get(
"jobqueue.%s.job-extra" % self.config_name, {}
)
else:
self.job_extra = job_extra

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

env_extra = kwargs.get("env_extra", None)
if env_extra is None:
env_extra = dask.config.get(
"jobqueue.%s.env-extra" % config_name, default=[]
"jobqueue.%s.env-extra" % self.config_name, default=[]
)
self.env_dict = self.env_lines_to_dict(env_extra)
self.env_dict["JOB_ID"] = "$F(MY.JobId)"
Expand Down
2 changes: 1 addition & 1 deletion dask_jobqueue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="local",
config_name=None,
**kwargs
):
# Instantiate args and parameters from parent abstract class
Expand Down
24 changes: 12 additions & 12 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
class LSFJob(Job):
submit_command = "bsub"
cancel_command = "bkill"
config_name = "lsf"

def __init__(
self,
Expand All @@ -28,34 +29,33 @@ def __init__(
walltime=None,
job_extra=None,
lsf_units=None,
config_name="lsf",
config_name=None,
use_stdin=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % config_name)
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if ncpus is None:
ncpus = dask.config.get("jobqueue.%s.ncpus" % config_name)
ncpus = dask.config.get("jobqueue.%s.ncpus" % self.config_name)
if mem is None:
mem = dask.config.get("jobqueue.%s.mem" % config_name)
mem = dask.config.get("jobqueue.%s.mem" % self.config_name)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)
if lsf_units is None:
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % config_name)
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % self.config_name)

if use_stdin is None:
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % config_name)
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name)
if use_stdin is None:
use_stdin = lsf_version() < "10"
self.use_stdin = use_stdin

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

header_lines = []
# LSF header build
if self.name is not None:
Expand Down
19 changes: 11 additions & 8 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class OARJob(Job):
submit_command = "oarsub"
cancel_command = "oardel"
job_id_regexp = r"OAR_JOB_ID=(?P<job_id>\d+)"
config_name = "oar"

def __init__(
self,
Expand All @@ -23,21 +24,23 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="oar",
config_name=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % config_name)
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get("jobqueue.%s.resource-spec" % config_name)
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)

super().__init__(*args, config_name=config_name, **kwargs)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)

header_lines = []
if self.job_name is not None:
Expand Down
19 changes: 10 additions & 9 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,26 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="pbs",
config_name=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get("jobqueue.%s.resource-spec" % config_name)
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)
if project is None:
project = dask.config.get(
"jobqueue.%s.project" % config_name
"jobqueue.%s.project" % self.config_name
) or os.environ.get("PBS_ACCOUNT")

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

# Try to find a project name from environment variable
project = project or os.environ.get("PBS_ACCOUNT")

Expand Down
20 changes: 11 additions & 9 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class SGEJob(Job):
submit_command = "qsub"
cancel_command = "qdel"
config_name = "sge"

def __init__(
self,
Expand All @@ -19,21 +20,23 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="sge",
config_name=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % config_name)
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get("jobqueue.%s.resource-spec" % config_name)
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)

super().__init__(*args, config_name=config_name, **kwargs)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)

header_lines = []
if self.job_name is not None:
Expand Down Expand Up @@ -114,4 +117,3 @@ class SGECluster(JobQueueCluster):
job=job_parameters, cluster=cluster_parameters
)
job_cls = SGEJob
config_name = "sge"

0 comments on commit 45fc714

Please sign in to comment.