Skip to content

Commit

Permalink
scheduler, task_pool, task_proxy refactor
Browse files Browse the repository at this point in the history
`cylc submit` can now submit multiple tasks and families.
  • Loading branch information
matthewrmshin committed Mar 24, 2017
1 parent 51704e2 commit 1e06098
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 57 deletions.
56 changes: 35 additions & 21 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def main():
parser = COP(
__doc__, jset=True, icp=True,
argdoc=[("REG", "Suite name"),
("TASK", "Target task (" + TaskID.SYNTAX + ")")])
("TASK [...]", "Family or task ID (%s)" % TaskID.SYNTAX)])
parser.set_defaults(sched=False, dry_run=False)
parser.add_option(
"-d", "--dry-run",
Expand All @@ -73,23 +73,33 @@ def main():
(options, args) = parser.parse_args()
if options.debug:
cylc.flags.debug = True
suite, task_id = args
if not TaskID.is_valid_id(task_id):
sys.exit("Invalid task ID " + task_id)
suite = args.pop(0)
for arg in args:
if not TaskID.is_valid_id(arg):
sys.exit("Invalid task ID %s" % arg)
suite_srv_mgr = SuiteSrvFilesManager()
suiterc = suite_srv_mgr.get_suite_rc(suite)
suite_dir = os.path.dirname(suiterc)
# For user-defined batch system handlers
sys.path.append(os.path.join(suite_dir, 'python'))

# load suite config
# Load suite config and tasks
config = SuiteConfig.get_inst(
suite, suiterc,
load_template_vars(options.templatevars, options.templatevars_file),
cli_initial_point_string=options.icp)

itasks = []
for arg in args:
name_str, point_str = TaskID.split(arg)
taskdefs = config.find_taskdefs(name_str)
if not taskdefs:
sys.exit("No task found for %s" % arg)
for taskdef in taskdefs:
itasks.append(TaskProxy(
taskdef, get_point(point_str).standardise(), is_startup=True))

# Initialise job submit environment
GLOBAL_CFG.create_cylc_run_tree(suite)

task_job_mgr = TaskJobManager(
suite, SuiteProcPool(), SuiteDatabaseManager(), suite_srv_mgr)
task_job_mgr.single_task_mode = True
Expand All @@ -105,26 +115,30 @@ def main():
config.cfg['scheduling']['final cycle point']),
})

task_name, point_str = TaskID.split(task_id)
itask = TaskProxy(
config.get_taskdef(task_name),
get_point(point_str).standardise(),
is_startup=True)
ret_code = 0
if options.dry_run:
if not task_job_mgr.prep_submit_task_jobs(
suite, [itask], dry_run=True):
sys.exit(1)
print('JOB SCRIPT=%s' % itask.local_job_file_path)
task_job_mgr.prep_submit_task_jobs(suite, itasks, dry_run=True)
ret_code = 0
for itask in itasks:
if itask.local_job_file_path:
print('JOB SCRIPT=%s' % itask.local_job_file_path)
else:
print >> sys.stderr, (
'Unable to prepare job file for %s' % itask.identity)
ret_code = 1
else:
task_job_mgr.submit_task_jobs(suite, [itask])
task_job_mgr.submit_task_jobs(suite, itasks)
while task_job_mgr.proc_pool.results:
task_job_mgr.proc_pool.handle_results_async()
task_job_mgr.proc_pool.close()
task_job_mgr.proc_pool.join()
if itask.summary['submit_method_id'] is not None:
print('Job ID: %s' % itask.summary['submit_method_id'])

sys.exit(itask.state.status == TASK_STATUS_SUBMIT_FAILED)
for itask in itasks:
if itask.summary['submit_method_id'] is not None:
print('[%s] Job ID: %s' % (
itask.identity, itask.summary['submit_method_id']))
if itask.state.status == TASK_STATUS_SUBMIT_FAILED:
ret_code = 1
sys.exit(ret_code)


if __name__ == "__main__":
Expand Down
32 changes: 32 additions & 0 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@


from copy import deepcopy, copy
from fnmatch import fnmatchcase
import os
import re
import traceback
Expand Down Expand Up @@ -1845,6 +1846,37 @@ def _proc_triggers(self, triggers, original, section, seq):
self.generate_triggers(
expr, lefts, right, seq, suicide, base_interval)

def find_taskdefs(self, name):
"""Find TaskDef objects in family "name" or matching "name".
Return a list of TaskDef objects which:
* have names that glob matches "name".
* are in a family that glob matches "name".
"""
ret = []
if name in self.taskdefs:
# Match a task name
ret.append(self.taskdefs[name])
else:
fams = self.get_first_parent_descendants()
# Match a family name
if name in fams:
for member in fams[name]:
if member in self.taskdefs:
ret.append(self.taskdefs[member])
else:
# Glob match task names
for key, taskdef in self.taskdefs.items():
if fnmatchcase(key, name):
ret.append(taskdef)
# Glob match family names
for key, members in fams.items():
if fnmatchcase(key, name):
for member in members:
if member in self.taskdefs:
ret.append(self.taskdefs[member])
return ret

def get_taskdef(self, name, orig_expr=None):
"""Return an instance of TaskDef for task name."""
if name not in self.taskdefs:
Expand Down
49 changes: 14 additions & 35 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ def assign_queues(self):
def insert_tasks(self, items, stop_point_str, no_check=False):
"""Insert tasks."""
n_warnings = 0
names = self.config.get_task_name_list()
fams = self.config.runtime['first-parent descendants']
task_ids = []
task_items = []
for item in items:
point_str, name_str, _ = self._parse_task_item(item)
if point_str is None:
Expand All @@ -130,30 +128,13 @@ def insert_tasks(self, items, stop_point_str, no_check=False):
self.ERR_PREFIX_TASKID_MATCH + ("%s (%s)" % (item, exc)))
n_warnings += 1
continue
i_names = []
if name_str in names:
i_names.append(name_str)
elif name_str in fams:
for name in fams[name_str]:
if name in names:
i_names.append(name)
else:
for name in names:
if fnmatchcase(name, name_str):
i_names.append(name)
for fam, fam_names in fams.items():
if not fnmatchcase(fam, name_str):
continue
for name in fam_names:
if name in names:
i_names.append(name)
if i_names:
for name in i_names:
task_ids.append((name, point_str))
else:
taskdefs = self.config.find_taskdefs(name_str)
if not taskdefs:
LOG.warning(self.ERR_PREFIX_TASKID_MATCH + item)
n_warnings += 1
continue
for taskdef in taskdefs:
task_items.append([(taskdef.name, point_str), taskdef])
if stop_point_str is None:
stop_point = None
else:
Expand All @@ -167,30 +148,28 @@ def insert_tasks(self, items, stop_point_str, no_check=False):
return n_warnings
task_states_data = (
self.suite_db_mgr.pri_dao.select_task_states_by_task_ids(
["submit_num"], task_ids))
for name_str, point_str in task_ids:
["submit_num"], [task_item[0] for task_item in task_items]))
for key, taskdef in task_items:
# TODO - insertion of start-up tasks? (startup=False assumed here)

# Check that the cycle point is on one of the tasks sequences.
on_sequence = False
point_str = key[1]
point = get_point(point_str)
if not no_check: # Check if cycle point is on the tasks sequence.
for sequence in self.config.taskdefs[name_str].sequences:
for sequence in taskdef.sequences:
if sequence.is_on_sequence(point):
on_sequence = True
break
if not on_sequence:
else:
LOG.warning("%s%s, %s" % (
self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE, name_str,
self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE, taskdef.name,
point_str))
continue

submit_num = None
if (name_str, point_str) in task_states_data:
submit_num = task_states_data[(name_str, point_str)].get(
"submit_num")
if key in task_states_data:
submit_num = task_states_data[key].get("submit_num")
self.add_to_runahead_pool(TaskProxy(
self.config.get_taskdef(name_str), get_point(point_str),
taskdef, get_point(point_str),
stop_point=stop_point, submit_num=submit_num))
return n_warnings

Expand Down
2 changes: 1 addition & 1 deletion tests/cylc-submit/00-bg.t
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ else
poll ! grep -q 'CYLC_BATCH_SYS_JOB_ID=' "${ST_FILE}" 2>/dev/null
JOB_ID=$(awk -F= '$1 == "CYLC_BATCH_SYS_JOB_ID" {print $2}' "${ST_FILE}")
fi
grep_ok "Job ID: ${JOB_ID}" "${TEST_NAME_BASE}.stdout"
contains_ok "${TEST_NAME_BASE}.stdout" <<<"[foo.1] Job ID: ${JOB_ID}"
cmp_ok "${TEST_NAME_BASE}.stderr" <'/dev/null'
#-------------------------------------------------------------------------------
if [[ -n "${SSH}" ]]; then
Expand Down
57 changes: 57 additions & 0 deletions tests/cylc-submit/11-multi.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2017 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test "cylc submit" multiple tasks + families.
CYLC_TEST_IS_GENERIC=false
. "$(dirname "$0")/test_header"

set_test_number 4

init_suite "${TEST_NAME_BASE}" <<'__SUITE_RC__'
[cylc]
UTC mode = True
cycle point format = %Y
[scheduling]
initial cycle point = 2020
final cycle point = 2021
[[dependencies]]
[[[P1Y]]]
graph = FOO & bar
[runtime]
[[FOO]]
script = echo "${CYLC_TASK_ID}"
[[foo1, foo2, foo3]]
inherit = FOO
[[bar]]
script = echo "${CYLC_TASK_ID}"
__SUITE_RC__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"
run_ok "${TEST_NAME_BASE}" cylc submit "${SUITE_NAME}" 'FOO.2020' 'bar.2021'
for TASK_ID in 'foo1.2020' 'foo2.2020' 'foo3.2020' 'bar.2021'; do
POINT="${TASK_ID#*.}"
NAME="${TASK_ID%.*}"
ST_FILE="${SUITE_RUN_DIR}/log/job/${POINT}/${NAME}/01/job.status"
JOB_ID="$(awk -F= '$1 == "CYLC_BATCH_SYS_JOB_ID" {print $2}' "${ST_FILE}")"
echo "[${TASK_ID}] Job ID: ${JOB_ID}"
done >'expected.out'
contains_ok "${TEST_NAME_BASE}.stdout" 'expected.out'
cmp_ok "${TEST_NAME_BASE}.stderr" <'/dev/null'

purge_suite "${SUITE_NAME}"
exit

0 comments on commit 1e06098

Please sign in to comment.