Skip to content

Commit

Permalink
Merge pull request #2157 from matthewrmshin/refactor-task-proxy
Browse files Browse the repository at this point in the history
scheduler, task_pool, task_proxy refactor
  • Loading branch information
oliver-sanders committed May 11, 2017
2 parents ddfa27c + f4dd554 commit 57323f7
Show file tree
Hide file tree
Showing 83 changed files with 4,060 additions and 4,589 deletions.
2 changes: 0 additions & 2 deletions bin/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ task_commands['message'] = ['message', 'task-message']
task_commands['jobs-kill'] = ['jobs-kill']
task_commands['jobs-poll'] = ['jobs-poll']
task_commands['jobs-submit'] = ['jobs-submit']
task_commands['job-submit'] = ['job-submit']

all_commands = OrderedDict()
for dct in [
Expand Down Expand Up @@ -431,7 +430,6 @@ comsum['broadcast'] = 'Change suite [runtime] settings on the fly'
comsum['jobs-kill'] = '(Internal) Kill task jobs'
comsum['jobs-poll'] = '(Internal) Retrieve status for task jobs'
comsum['jobs-submit'] = '(Internal) Submit task jobs'
comsum['job-submit'] = '(Internal) Submit a job'

# utility
comsum['cycle-point'] = 'Cycle point arithmetic and filename templating'
Expand Down
56 changes: 0 additions & 56 deletions bin/cylc-job-submit

This file was deleted.

4 changes: 2 additions & 2 deletions bin/cylc-jobs-kill
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def main():
("JOB-LOG-ROOT", "The log/job sub-directory for the suite"),
("[JOB-LOG-DIR ...]", "A point/name/submit_num sub-directory")])
args = parser.parse_args()[1]
BATCH_SYS_MANAGER.jobs_kill(args[0], args[1:])
BatchSysManager().jobs_kill(args[0], args[1:])


if __name__ == "__main__" and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.batch_sys_manager import BATCH_SYS_MANAGER
from cylc.batch_sys_manager import BatchSysManager
main()
4 changes: 2 additions & 2 deletions bin/cylc-jobs-poll
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def main():
("JOB-LOG-ROOT", "The log/job sub-directory for the suite"),
("[JOB-LOG-DIR ...]", "A point/name/submit_num sub-directory")])
args = parser.parse_args()[1]
BATCH_SYS_MANAGER.jobs_poll(args[0], args[1:])
BatchSysManager().jobs_poll(args[0], args[1:])


if __name__ == "__main__" and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.batch_sys_manager import BATCH_SYS_MANAGER
from cylc.batch_sys_manager import BatchSysManager
main()
4 changes: 2 additions & 2 deletions bin/cylc-jobs-submit
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def main():
help="Is this being run on a remote job host?",
action="store_true", dest="remote_mode", default=False)
opts, args = parser.parse_args()
BATCH_SYS_MANAGER.jobs_submit(
BatchSysManager().jobs_submit(
args[0], args[1:], remote_mode=opts.remote_mode)


if __name__ == "__main__" and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.batch_sys_manager import BATCH_SYS_MANAGER
from cylc.batch_sys_manager import BatchSysManager
main()
10 changes: 5 additions & 5 deletions bin/cylc-monitor
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ from cylc.network.suite_state_client import (
from cylc.wallclock import get_time_string_from_unix_time
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.task_state import (
TaskState, TASK_STATUS_RUNAHEAD, TASK_STATUSES_ORDERED,
TASK_STATUS_RUNAHEAD, TASK_STATUSES_ORDERED,
TASK_STATUSES_RESTRICTED)
from cylc.task_state_prop import get_status_prop


class SuiteMonitor(object):
Expand Down Expand Up @@ -115,7 +116,7 @@ The USER_AT_HOST argument allows suite selection by 'cylc scan' output:

legend = ''
for state in TASK_STATUSES_ORDERED:
legend += TaskState.get_status_prop(state, 'ascii_ctrl')
legend += get_status_prop(state, 'ascii_ctrl')
legend = legend.rstrip()

len_header = sum(len(s) for s in TASK_STATUSES_ORDERED)
Expand Down Expand Up @@ -183,7 +184,7 @@ The USER_AT_HOST argument allows suite selection by 'cylc scan' output:
name_list.add(name)
if point_string not in task_info:
task_info[point_string] = {}
task_info[point_string][name] = TaskState.get_status_prop(
task_info[point_string][name] = get_status_prop(
state, 'ascii_ctrl', subst=name)

# Sort the tasks in each cycle point.
Expand Down Expand Up @@ -229,8 +230,7 @@ The USER_AT_HOST argument allows suite selection by 'cylc scan' output:
state_totals[state] += 1
for state, tot in state_totals.items():
subst = " %d " % tot
summary += TaskState.get_status_prop(state,
'ascii_ctrl', subst)
summary += get_status_prop(state, 'ascii_ctrl', subst)
blit.append(summary)

# Print a divider line containing the suite status string.
Expand Down
22 changes: 18 additions & 4 deletions bin/cylc-reset
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,25 @@ def main():
"-s", "--state", metavar="STATE",
help="Reset task state to STATE, can be %s" % (
', '.join(TASK_STATUSES_CAN_RESET_TO)),
action="store", default=None, dest="state")
choices=list(TASK_STATUSES_CAN_RESET_TO),
action="store", dest="state")

parser.add_option(
"--output", "-O",
metavar="OUTPUT",
help=("Find task output by message string or trigger string, " +
"set complete or incomplete with !OUTPUT, " +
"'*' to set all complete, '!*' to set all incomplete. " +
"Can be used more than once to reset multiple task outputs."),
action="append", default=[], dest="outputs")

options, args = parser.parse_args()

suite = args.pop(0)

if not options.state and not options.outputs:
parser.error("Neither --state=STATE nor --output=OUTPUT is set")

if options.state == "spawn":
# Back compat.
sys.stderr.write(
Expand All @@ -72,8 +85,8 @@ def main():
exc.filename = cmd
raise SystemExit(exc)

if options.state not in TASK_STATUSES_CAN_RESET_TO:
parser.error("Illegal STATE value: " + options.state)
if not options.state:
options.state = ''

prompt('Reset task(s) %s in %s' % (args, suite), options.force)
pclient = SuiteCommandClient(
Expand All @@ -82,7 +95,8 @@ def main():
print_uuid=options.print_uuid)
items = parser.parse_multitask_compat(options, args)
pclient.put_command(
'reset_task_states', items=items, state=options.state)
'reset_task_states', items=items, state=options.state,
outputs=options.outputs)


if __name__ == "__main__":
Expand Down
9 changes: 5 additions & 4 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ from cylc.network.port_scan import scan_all
from cylc.option_parsers import CylcOptionParser as COP
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.owner import USER
from cylc.task_state import TaskState, TASK_STATUSES_ORDERED
from cylc.task_state import TASK_STATUSES_ORDERED
from cylc.task_state_prop import get_status_prop


NO_BOLD = False
Expand Down Expand Up @@ -201,7 +202,7 @@ def main():
if options.color:
n_states = len(TASK_STATUSES_ORDERED)
for index, state in enumerate(TASK_STATUSES_ORDERED):
state_legend += TaskState.get_status_prop(state, 'ascii_ctrl')
state_legend += get_status_prop(state, 'ascii_ctrl')
if index == n_states / 2:
state_legend += "\n"
state_legend = state_legend.rstrip()
Expand Down Expand Up @@ -305,7 +306,7 @@ def get_point_state_count_lines(state_count_totals, state_count_cycles,
for state, tot in sorted(state_count_totals.items()):
if use_color:
subst = " %d " % tot
line += TaskState.get_status_prop(state, 'ascii_ctrl', subst)
line += get_status_prop(state, 'ascii_ctrl', subst)
else:
line += '%s:%d ' % (state, tot)
yield ("", line.strip())
Expand All @@ -315,7 +316,7 @@ def get_point_state_count_lines(state_count_totals, state_count_cycles,
for state, tot in sorted(state_count_cycles[point_string].items()):
if use_color:
subst = " %d " % tot
line += TaskState.get_status_prop(state, 'ascii_ctrl', subst)
line += get_status_prop(state, 'ascii_ctrl', subst)
else:
line += '%s:%d ' % (state, tot)
yield (point_string, line.strip())
Expand Down
81 changes: 47 additions & 34 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.config import SuiteConfig
from cylc.cycling.loader import get_point
import cylc.flags
from cylc.get_task_proxy import get_task_proxy
from cylc.job_file import JobFile
from cylc.job_host import RemoteJobHostManager
from cylc.mp_pool import SuiteProcPool
from cylc.option_parsers import CylcOptionParser as COP
from cylc.suite_db_mgr import SuiteDatabaseManager
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.task_id import TaskID
from cylc.task_job_mgr import TaskJobManager
from cylc.task_proxy import TaskProxy
from cylc.task_state import TASK_STATUS_SUBMIT_FAILED
from cylc.templatevars import load_template_vars
import cylc.version # Ensures '$CYLC_VERSION' is set.
Expand All @@ -64,7 +64,7 @@ def main():
parser = COP(
__doc__, jset=True, icp=True,
argdoc=[("REG", "Suite name"),
("TASK", "Target task (" + TaskID.SYNTAX + ")")])
("TASK [...]", "Family or task ID (%s)" % TaskID.SYNTAX)])
parser.set_defaults(sched=False, dry_run=False)
parser.add_option(
"-d", "--dry-run",
Expand All @@ -73,26 +73,37 @@ def main():
(options, args) = parser.parse_args()
if options.debug:
cylc.flags.debug = True
suite, task_id = args
if not TaskID.is_valid_id(task_id):
sys.exit("Invalid task ID " + task_id)
suiterc = SuiteSrvFilesManager().get_suite_rc(suite)
suite = args.pop(0)
for arg in args:
if not TaskID.is_valid_id(arg):
sys.exit("Invalid task ID %s" % arg)
suite_srv_mgr = SuiteSrvFilesManager()
suiterc = suite_srv_mgr.get_suite_rc(suite)
suite_dir = os.path.dirname(suiterc)
# For user-defined batch system handlers
sys.path.append(os.path.join(suite_dir, 'python'))
suite_run_dir = GLOBAL_CFG.get_derived_host_item(
suite, 'suite run directory')

# load suite config
# Load suite config and tasks
config = SuiteConfig.get_inst(
suite, suiterc,
load_template_vars(options.templatevars, options.templatevars_file),
cli_initial_point_string=options.icp)

itasks = []
for arg in args:
name_str, point_str = TaskID.split(arg)
taskdefs = config.find_taskdefs(name_str)
if not taskdefs:
sys.exit("No task found for %s" % arg)
for taskdef in taskdefs:
itasks.append(TaskProxy(
taskdef, get_point(point_str).standardise(), is_startup=True))

# Initialise job submit environment
GLOBAL_CFG.create_cylc_run_tree(suite)

RemoteJobHostManager.get_inst().single_task_mode = True
JobFile.get_inst().set_suite_env({
task_job_mgr = TaskJobManager(
suite, SuiteProcPool(), SuiteDatabaseManager(), suite_srv_mgr)
task_job_mgr.single_task_mode = True
task_job_mgr.job_file_writer.set_suite_env({
'CYLC_UTC': str(config.cfg['cylc']['UTC mode']),
'CYLC_DEBUG': str(cylc.flags.debug),
'CYLC_VERBOSE': str(cylc.flags.verbose),
Expand All @@ -104,27 +115,29 @@ def main():
config.cfg['scheduling']['final cycle point']),
})

task_name, point_string = TaskID.split(task_id)
point = get_point(point_string).standardise()
# Try to get a graphed task of the given name.
itask = get_task_proxy(task_name, point, is_startup=True)

if itask.prep_submit(dry_run=options.dry_run) is None:
sys.exit(1)
ret_code = 0
if options.dry_run:
print "JOB SCRIPT=%s" % itask.get_job_log_path(
itask.HEAD_MODE_LOCAL, tail=itask.JOB_FILE_BASE)
task_job_mgr.prep_submit_task_jobs(suite, itasks, dry_run=True)
for itask in itasks:
if itask.local_job_file_path:
print('JOB SCRIPT=%s' % itask.local_job_file_path)
else:
print >> sys.stderr, (
'Unable to prepare job file for %s' % itask.identity)
ret_code = 1
else:
proc_pool = SuiteProcPool.get_inst(pool_size=1)
itask.submit()
while proc_pool.results:
proc_pool.handle_results_async()
proc_pool.close()
proc_pool.join()
if itask.summary['submit_method_id'] is not None:
print 'Job ID:', itask.summary['submit_method_id']

sys.exit(itask.state.status == TASK_STATUS_SUBMIT_FAILED)
task_job_mgr.submit_task_jobs(suite, itasks)
while task_job_mgr.proc_pool.results:
task_job_mgr.proc_pool.handle_results_async()
task_job_mgr.proc_pool.close()
task_job_mgr.proc_pool.join()
for itask in itasks:
if itask.summary['submit_method_id'] is not None:
print('[%s] Job ID: %s' % (
itask.identity, itask.summary['submit_method_id']))
if itask.state.status == TASK_STATUS_SUBMIT_FAILED:
ret_code = 1
sys.exit(ret_code)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion bin/cylc-suite-state
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def main():
action="store", dest="msg", default=None)

SuitePoller.add_to_cmd_options(parser)
(options, args) = parser.parse_args(remove_opts=["--db", "--debug"])
(options, args) = parser.parse_args(remove_opts=["--db"])

suite = args[0]

Expand Down
5 changes: 1 addition & 4 deletions bin/cylc-trigger
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ def main():

# Get the job filename from the suite daemon - the task cycle point may
# need standardising to the suite cycle point format.
jobfile_path, compat = info_client.get_info(
jobfile_path = info_client.get_info(
'get_task_jobfile_path', task_id=task_id)
if not jobfile_path:
sys.exit('ERROR: task not found')

if isinstance(jobfile_path, bool):
jobfile_path = compat

# Note: localhost time and file system time may be out of sync,
# so the safe way to detect whether a new file is modified
# or is to detect whether time stamp has changed or not.
Expand Down

0 comments on commit 57323f7

Please sign in to comment.