Skip to content

Commit

Permalink
Add S014: Use Execution Time Limit in preference to directives.
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Jun 19, 2024
1 parent c43bf80 commit 700eeec
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 13 deletions.
1 change: 1 addition & 0 deletions changes.d/6137.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New Cylc lint rule: S014: Don't use job runner specific execution time limit directives, use execution time limit.
5 changes: 3 additions & 2 deletions cylc/flow/job_runner_handlers/loadleveler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class LoadlevelerHandler():
re.compile("^llsubmit: Processed command file through Submit Filter:")]
SUBMIT_CMD_TMPL = "llsubmit '%(job)s'"
VACATION_SIGNAL = "USR1"
ETL_DIRECTIVE = "wall_clock_limit"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -96,8 +97,8 @@ def format_directives(self, job_conf):
directives["output"] = job_file_path + ".out"
directives["error"] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("wall_clock_limit") is None):
directives["wall_clock_limit"] = "%d,%d" % (
directives.get(self.ETL_DIRECTIVE) is None):
directives[self.ETL_DIRECTIVE] = "%d,%d" % (
job_conf["execution_time_limit"] + 60,
job_conf["execution_time_limit"])
for key, value in list(job_conf["directives"].items()):
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/job_runner_handlers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class LSFHandler():
POLL_CMD = "bjobs"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"^Job <(?P<id>\d+)>")
SUBMIT_CMD_TMPL = "bsub"
ETL_DIRECTIVE = "-W"

@classmethod
def format_directives(cls, job_conf):
Expand All @@ -82,8 +83,11 @@ def format_directives(cls, job_conf):
)
directives["-o"] = job_file_path + ".out"
directives["-e"] = job_file_path + ".err"
if job_conf["execution_time_limit"] and directives.get("-W") is None:
directives["-W"] = str(math.ceil(
if (
job_conf["execution_time_limit"]
and directives.get(cls.ETL_DIRECTIVE) is None
):
directives[cls.ETL_DIRECTIVE] = str(math.ceil(
job_conf["execution_time_limit"] / 60))
for key, value in list(job_conf["directives"].items()):
directives[key] = value
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/job_runner_handlers/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class MoabHandler:
POLL_CMD = "checkjob"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"""\A\s*(?P<id>\S+)\s*\Z""")
SUBMIT_CMD_TMPL = "msub '%(job)s'"
ETL_DIRECTIVE = "-l walltime"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -91,8 +92,9 @@ def format_directives(self, job_conf):
directives["-o"] = job_file_path + ".out"
directives["-e"] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("-l walltime") is None):
directives["-l walltime"] = "%d" % job_conf["execution_time_limit"]
directives.get(self.ETL_DIRECTIVE) is None):
directives[self.ETL_DIRECTIVE] = "%d" % job_conf[
"execution_time_limit"]
# restartable?
directives.update(job_conf["directives"])
lines = []
Expand Down
10 changes: 7 additions & 3 deletions cylc/flow/job_runner_handlers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class PBSHandler:
POLL_CANT_CONNECT_ERR = "Connection refused"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"^\s*(?P<id>\d+)", re.M)
SUBMIT_CMD_TMPL = "qsub '%(job)s'"
ETL_DIRECTIVE = "-l walltime"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -105,9 +106,12 @@ def format_directives(self, job_conf):

directives["-o"] = job_file_path + ".out"
directives["-e"] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("-l walltime") is None):
directives["-l walltime"] = "%d" % job_conf["execution_time_limit"]
if (
job_conf["execution_time_limit"]
and directives.get(self.ETL_DIRECTIVE) is None
):
directives[self.ETL_DIRECTIVE] = "%d" % job_conf[
"execution_time_limit"]
for key, value in list(job_conf["directives"].items()):
directives[key] = value
lines = []
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/job_runner_handlers/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class SGEHandler:
POLL_CMD = "qstat"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"\D+(?P<id>\d+)\D+")
SUBMIT_CMD_TMPL = "qsub '%(job)s'"
ETL_DIRECTIVE = "-l h_rt"

def format_directives(self, job_conf):
"""Format the job directives for a job file."""
Expand All @@ -88,8 +89,8 @@ def format_directives(self, job_conf):
directives['-o'] = job_file_path + ".out"
directives['-e'] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("-l h_rt") is None):
directives["-l h_rt"] = "%d:%02d:%02d" % (
directives.get(self.ETL_DIRECTIVE) is None):
directives[self.ETL_DIRECTIVE] = "%d:%02d:%02d" % (
job_conf["execution_time_limit"] / 3600,
(job_conf["execution_time_limit"] / 60) % 60,
job_conf["execution_time_limit"] % 60)
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/job_runner_handlers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class SLURMHandler():
# Separator between het job directive sections
SEP_HETJOB = "#SBATCH hetjob"

ETL_DIRECTIVE = "--time"

@classmethod
def filter_poll_many_output(cls, out):
"""Return list of job IDs extracted from job poll stdout.
Expand All @@ -161,8 +163,8 @@ def format_directives(cls, job_conf):
directives['--output'] = job_file_path.replace('%', '%%') + ".out"
directives['--error'] = job_file_path.replace('%', '%%') + ".err"
if (job_conf["execution_time_limit"] and
directives.get("--time") is None):
directives["--time"] = "%d:%02d" % (
directives.get(cls.ETL_DIRECTIVE) is None):
directives[cls.ETL_DIRECTIVE] = "%d:%02d" % (
job_conf["execution_time_limit"] / 60,
job_conf["execution_time_limit"] % 60)
for key, value in list(job_conf['directives'].items()):
Expand Down
55 changes: 55 additions & 0 deletions cylc/flow/scripts/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
max-line-length = 130 # Max line length for linting
"""
import functools
import pkgutil
from pathlib import Path
import re
import sys
Expand Down Expand Up @@ -78,6 +79,8 @@
from cylc.flow import LOG
from cylc.flow.exceptions import CylcError
import cylc.flow.flags
from cylc.flow import job_runner_handlers
from cylc.flow.job_runner_mgr import JobRunnerManager
from cylc.flow.loggingutil import set_timestamps
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
Expand Down Expand Up @@ -162,6 +165,34 @@
}


def get_wallclock_directives():
"""Get a set of directives equivelent to execution time limit"""
job_runner_manager = JobRunnerManager()
directives = {}
for module in pkgutil.iter_modules(job_runner_handlers.__path__):
directive = getattr(
job_runner_manager._get_sys(module.name),
'ETL_DIRECTIVE',
None
)
if directive:
directives[module.name] = directive
return directives


WALLCLOCK_DIRECTIVES = get_wallclock_directives()


def check_etl_directives(line: str) -> bool:
"""Check for job runner specific directives
equivelent to exection time limit.
"""
for directive in set(WALLCLOCK_DIRECTIVES.values()):
if line.strip().startswith(directive):
return {'directive': line.strip()}
return False


def check_jinja2_no_shebang(
line: str,
file: Path,
Expand Down Expand Up @@ -533,6 +564,30 @@ def list_wrapper(line: str, check: Callable) -> Optional[Dict[str, str]]:
'S013': {
'short': 'Items should be indented in 4 space blocks.',
FUNCTION: check_indentation
},
'S014': {
'short': (
'Use ``[runtime][TASK]execution time limit``'
' rather than job runner directive: ``{directive}``.'
),
'rst': (
'Using ``[runtime][TASK]execution time limit`` is'
' recommended in preference to using job runner'
' directives because it allows Cylc to retain awareness'
' of whether the job should have finished, even if contact'
' with the target job runner\'s platform has been lost.'
' \n\nThe following directives are considered equivelent to'
' execution time limit:\n * '
)
+ '\n * '.join((
f'``{directive}`` ({job_runner})'
for job_runner, directive in WALLCLOCK_DIRECTIVES.items()
)) + (
'\n\n.. note:: Using ``execution time limit`` which'
' is automatically translated to the job runner\'s timeout'
' directive can make your workflow more portable.'
),
FUNCTION: check_etl_directives,
}
}
# Subset of deprecations which are tricky (impossible?) to scrape from the
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/scripts/test_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@
something\t
[[bar]]
platform = $(some-script foo)
[[[dependencies]]]
-l walltime 666
[[baz]]
platform = `no backticks`
""" + (
Expand Down

0 comments on commit 700eeec

Please sign in to comment.