Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler, task_pool, task_proxy refactor #2157

Merged
merged 27 commits into from
May 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1a74b88
scheduler, task_pool, task_proxy refactor
matthewrmshin Feb 8, 2017
d5f5e9c
scheduler, task_pool, task_proxy refactor
matthewrmshin Feb 28, 2017
be27ea4
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 1, 2017
461515e
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 3, 2017
e2b8d87
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 6, 2017
ebe5053
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 6, 2017
59490f7
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 7, 2017
62be4f6
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 9, 2017
d1644bf
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 15, 2017
7a73fa5
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 15, 2017
f594f92
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 15, 2017
bb8971c
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 16, 2017
cf64cf1
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 17, 2017
790b290
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 20, 2017
ea39e10
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 22, 2017
c2a9895
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 23, 2017
e370853
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 24, 2017
53427bc
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 27, 2017
fc9eca5
scheduler, task_pool, task_proxy refactor
matthewrmshin Mar 28, 2017
834c85a
scheduler, task_pool, task_proxy refactor
matthewrmshin Apr 6, 2017
5e793aa
scheduler, task_pool, task_proxy refactor
matthewrmshin May 4, 2017
2a8f813
scheduler, task_pool, task_proxy refactor
matthewrmshin May 4, 2017
fc3af3e
scheduler, task_pool, task_proxy refactor
matthewrmshin May 5, 2017
53f3016
scheduler, task_pool, task_proxy refactor
matthewrmshin May 5, 2017
0a0b18f
scheduler, task_pool, task_proxy refactor
matthewrmshin May 10, 2017
362901e
scheduler, task_pool, task_proxy refactor
matthewrmshin May 11, 2017
f4dd554
scheduler, task_pool, task_proxy refactor
matthewrmshin May 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions bin/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ task_commands['message'] = ['message', 'task-message']
task_commands['jobs-kill'] = ['jobs-kill']
task_commands['jobs-poll'] = ['jobs-poll']
task_commands['jobs-submit'] = ['jobs-submit']
task_commands['job-submit'] = ['job-submit']

all_commands = OrderedDict()
for dct in [
Expand Down Expand Up @@ -431,7 +430,6 @@ comsum['broadcast'] = 'Change suite [runtime] settings on the fly'
comsum['jobs-kill'] = '(Internal) Kill task jobs'
comsum['jobs-poll'] = '(Internal) Retrieve status for task jobs'
comsum['jobs-submit'] = '(Internal) Submit task jobs'
comsum['job-submit'] = '(Internal) Submit a job'

# utility
comsum['cycle-point'] = 'Cycle point arithmetic and filename templating'
Expand Down
56 changes: 0 additions & 56 deletions bin/cylc-job-submit

This file was deleted.

4 changes: 2 additions & 2 deletions bin/cylc-jobs-kill
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def main():
("JOB-LOG-ROOT", "The log/job sub-directory for the suite"),
("[JOB-LOG-DIR ...]", "A point/name/submit_num sub-directory")])
args = parser.parse_args()[1]
BATCH_SYS_MANAGER.jobs_kill(args[0], args[1:])
BatchSysManager().jobs_kill(args[0], args[1:])


if __name__ == "__main__" and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.batch_sys_manager import BATCH_SYS_MANAGER
from cylc.batch_sys_manager import BatchSysManager
main()
4 changes: 2 additions & 2 deletions bin/cylc-jobs-poll
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def main():
("JOB-LOG-ROOT", "The log/job sub-directory for the suite"),
("[JOB-LOG-DIR ...]", "A point/name/submit_num sub-directory")])
args = parser.parse_args()[1]
BATCH_SYS_MANAGER.jobs_poll(args[0], args[1:])
BatchSysManager().jobs_poll(args[0], args[1:])


if __name__ == "__main__" and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.batch_sys_manager import BATCH_SYS_MANAGER
from cylc.batch_sys_manager import BatchSysManager
main()
4 changes: 2 additions & 2 deletions bin/cylc-jobs-submit
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def main():
help="Is this being run on a remote job host?",
action="store_true", dest="remote_mode", default=False)
opts, args = parser.parse_args()
BATCH_SYS_MANAGER.jobs_submit(
BatchSysManager().jobs_submit(
args[0], args[1:], remote_mode=opts.remote_mode)


if __name__ == "__main__" and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.batch_sys_manager import BATCH_SYS_MANAGER
from cylc.batch_sys_manager import BatchSysManager
main()
10 changes: 5 additions & 5 deletions bin/cylc-monitor
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ from cylc.network.suite_state_client import (
from cylc.wallclock import get_time_string_from_unix_time
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.task_state import (
TaskState, TASK_STATUS_RUNAHEAD, TASK_STATUSES_ORDERED,
TASK_STATUS_RUNAHEAD, TASK_STATUSES_ORDERED,
TASK_STATUSES_RESTRICTED)
from cylc.task_state_prop import get_status_prop


class SuiteMonitor(object):
Expand Down Expand Up @@ -115,7 +116,7 @@ The USER_AT_HOST argument allows suite selection by 'cylc scan' output:

legend = ''
for state in TASK_STATUSES_ORDERED:
legend += TaskState.get_status_prop(state, 'ascii_ctrl')
legend += get_status_prop(state, 'ascii_ctrl')
legend = legend.rstrip()

len_header = sum(len(s) for s in TASK_STATUSES_ORDERED)
Expand Down Expand Up @@ -183,7 +184,7 @@ The USER_AT_HOST argument allows suite selection by 'cylc scan' output:
name_list.add(name)
if point_string not in task_info:
task_info[point_string] = {}
task_info[point_string][name] = TaskState.get_status_prop(
task_info[point_string][name] = get_status_prop(
state, 'ascii_ctrl', subst=name)

# Sort the tasks in each cycle point.
Expand Down Expand Up @@ -229,8 +230,7 @@ The USER_AT_HOST argument allows suite selection by 'cylc scan' output:
state_totals[state] += 1
for state, tot in state_totals.items():
subst = " %d " % tot
summary += TaskState.get_status_prop(state,
'ascii_ctrl', subst)
summary += get_status_prop(state, 'ascii_ctrl', subst)
blit.append(summary)

# Print a divider line containing the suite status string.
Expand Down
22 changes: 18 additions & 4 deletions bin/cylc-reset
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,25 @@ def main():
"-s", "--state", metavar="STATE",
help="Reset task state to STATE, can be %s" % (
', '.join(TASK_STATUSES_CAN_RESET_TO)),
action="store", default=None, dest="state")
choices=list(TASK_STATUSES_CAN_RESET_TO),
action="store", dest="state")

parser.add_option(
"--output", "-O",
metavar="OUTPUT",
help=("Find task output by message string or trigger string, " +
"set complete or incomplete with !OUTPUT, " +
"'*' to set all complete, '!*' to set all incomplete. " +
"Can be used more than once to reset multiple task outputs."),
action="append", default=[], dest="outputs")

options, args = parser.parse_args()

suite = args.pop(0)

if not options.state and not options.outputs:
parser.error("Neither --state=STATE nor --output=OUTPUT is set")

if options.state == "spawn":
# Back compat.
sys.stderr.write(
Expand All @@ -72,8 +85,8 @@ def main():
exc.filename = cmd
raise SystemExit(exc)

if options.state not in TASK_STATUSES_CAN_RESET_TO:
parser.error("Illegal STATE value: " + options.state)
if not options.state:
options.state = ''

prompt('Reset task(s) %s in %s' % (args, suite), options.force)
pclient = SuiteCommandClient(
Expand All @@ -82,7 +95,8 @@ def main():
print_uuid=options.print_uuid)
items = parser.parse_multitask_compat(options, args)
pclient.put_command(
'reset_task_states', items=items, state=options.state)
'reset_task_states', items=items, state=options.state,
outputs=options.outputs)


if __name__ == "__main__":
Expand Down
9 changes: 5 additions & 4 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ from cylc.network.port_scan import scan_all
from cylc.option_parsers import CylcOptionParser as COP
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.owner import USER
from cylc.task_state import TaskState, TASK_STATUSES_ORDERED
from cylc.task_state import TASK_STATUSES_ORDERED
from cylc.task_state_prop import get_status_prop


NO_BOLD = False
Expand Down Expand Up @@ -201,7 +202,7 @@ def main():
if options.color:
n_states = len(TASK_STATUSES_ORDERED)
for index, state in enumerate(TASK_STATUSES_ORDERED):
state_legend += TaskState.get_status_prop(state, 'ascii_ctrl')
state_legend += get_status_prop(state, 'ascii_ctrl')
if index == n_states / 2:
state_legend += "\n"
state_legend = state_legend.rstrip()
Expand Down Expand Up @@ -305,7 +306,7 @@ def get_point_state_count_lines(state_count_totals, state_count_cycles,
for state, tot in sorted(state_count_totals.items()):
if use_color:
subst = " %d " % tot
line += TaskState.get_status_prop(state, 'ascii_ctrl', subst)
line += get_status_prop(state, 'ascii_ctrl', subst)
else:
line += '%s:%d ' % (state, tot)
yield ("", line.strip())
Expand All @@ -315,7 +316,7 @@ def get_point_state_count_lines(state_count_totals, state_count_cycles,
for state, tot in sorted(state_count_cycles[point_string].items()):
if use_color:
subst = " %d " % tot
line += TaskState.get_status_prop(state, 'ascii_ctrl', subst)
line += get_status_prop(state, 'ascii_ctrl', subst)
else:
line += '%s:%d ' % (state, tot)
yield (point_string, line.strip())
Expand Down
81 changes: 47 additions & 34 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.config import SuiteConfig
from cylc.cycling.loader import get_point
import cylc.flags
from cylc.get_task_proxy import get_task_proxy
from cylc.job_file import JobFile
from cylc.job_host import RemoteJobHostManager
from cylc.mp_pool import SuiteProcPool
from cylc.option_parsers import CylcOptionParser as COP
from cylc.suite_db_mgr import SuiteDatabaseManager
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.task_id import TaskID
from cylc.task_job_mgr import TaskJobManager
from cylc.task_proxy import TaskProxy
from cylc.task_state import TASK_STATUS_SUBMIT_FAILED
from cylc.templatevars import load_template_vars
import cylc.version # Ensures '$CYLC_VERSION' is set.
Expand All @@ -64,7 +64,7 @@ def main():
parser = COP(
__doc__, jset=True, icp=True,
argdoc=[("REG", "Suite name"),
("TASK", "Target task (" + TaskID.SYNTAX + ")")])
("TASK [...]", "Family or task ID (%s)" % TaskID.SYNTAX)])
parser.set_defaults(sched=False, dry_run=False)
parser.add_option(
"-d", "--dry-run",
Expand All @@ -73,26 +73,37 @@ def main():
(options, args) = parser.parse_args()
if options.debug:
cylc.flags.debug = True
suite, task_id = args
if not TaskID.is_valid_id(task_id):
sys.exit("Invalid task ID " + task_id)
suiterc = SuiteSrvFilesManager().get_suite_rc(suite)
suite = args.pop(0)
for arg in args:
if not TaskID.is_valid_id(arg):
sys.exit("Invalid task ID %s" % arg)
suite_srv_mgr = SuiteSrvFilesManager()
suiterc = suite_srv_mgr.get_suite_rc(suite)
suite_dir = os.path.dirname(suiterc)
# For user-defined batch system handlers
sys.path.append(os.path.join(suite_dir, 'python'))
suite_run_dir = GLOBAL_CFG.get_derived_host_item(
suite, 'suite run directory')

# load suite config
# Load suite config and tasks
config = SuiteConfig.get_inst(
suite, suiterc,
load_template_vars(options.templatevars, options.templatevars_file),
cli_initial_point_string=options.icp)

itasks = []
for arg in args:
name_str, point_str = TaskID.split(arg)
taskdefs = config.find_taskdefs(name_str)
if not taskdefs:
sys.exit("No task found for %s" % arg)
for taskdef in taskdefs:
itasks.append(TaskProxy(
taskdef, get_point(point_str).standardise(), is_startup=True))

# Initialise job submit environment
GLOBAL_CFG.create_cylc_run_tree(suite)

RemoteJobHostManager.get_inst().single_task_mode = True
JobFile.get_inst().set_suite_env({
task_job_mgr = TaskJobManager(
suite, SuiteProcPool(), SuiteDatabaseManager(), suite_srv_mgr)
task_job_mgr.single_task_mode = True
task_job_mgr.job_file_writer.set_suite_env({
'CYLC_UTC': str(config.cfg['cylc']['UTC mode']),
'CYLC_DEBUG': str(cylc.flags.debug),
'CYLC_VERBOSE': str(cylc.flags.verbose),
Expand All @@ -104,27 +115,29 @@ def main():
config.cfg['scheduling']['final cycle point']),
})

task_name, point_string = TaskID.split(task_id)
point = get_point(point_string).standardise()
# Try to get a graphed task of the given name.
itask = get_task_proxy(task_name, point, is_startup=True)

if itask.prep_submit(dry_run=options.dry_run) is None:
sys.exit(1)
ret_code = 0
if options.dry_run:
print "JOB SCRIPT=%s" % itask.get_job_log_path(
itask.HEAD_MODE_LOCAL, tail=itask.JOB_FILE_BASE)
task_job_mgr.prep_submit_task_jobs(suite, itasks, dry_run=True)
for itask in itasks:
if itask.local_job_file_path:
print('JOB SCRIPT=%s' % itask.local_job_file_path)
else:
print >> sys.stderr, (
'Unable to prepare job file for %s' % itask.identity)
ret_code = 1
else:
proc_pool = SuiteProcPool.get_inst(pool_size=1)
itask.submit()
while proc_pool.results:
proc_pool.handle_results_async()
proc_pool.close()
proc_pool.join()
if itask.summary['submit_method_id'] is not None:
print 'Job ID:', itask.summary['submit_method_id']

sys.exit(itask.state.status == TASK_STATUS_SUBMIT_FAILED)
task_job_mgr.submit_task_jobs(suite, itasks)
while task_job_mgr.proc_pool.results:
task_job_mgr.proc_pool.handle_results_async()
task_job_mgr.proc_pool.close()
task_job_mgr.proc_pool.join()
for itask in itasks:
if itask.summary['submit_method_id'] is not None:
print('[%s] Job ID: %s' % (
itask.identity, itask.summary['submit_method_id']))
if itask.state.status == TASK_STATUS_SUBMIT_FAILED:
ret_code = 1
sys.exit(ret_code)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion bin/cylc-suite-state
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def main():
action="store", dest="msg", default=None)

SuitePoller.add_to_cmd_options(parser)
(options, args) = parser.parse_args(remove_opts=["--db", "--debug"])
(options, args) = parser.parse_args(remove_opts=["--db"])

suite = args[0]

Expand Down
5 changes: 1 addition & 4 deletions bin/cylc-trigger
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ def main():

# Get the job filename from the suite daemon - the task cycle point may
# need standardising to the suite cycle point format.
jobfile_path, compat = info_client.get_info(
jobfile_path = info_client.get_info(
'get_task_jobfile_path', task_id=task_id)
if not jobfile_path:
sys.exit('ERROR: task not found')

if isinstance(jobfile_path, bool):
jobfile_path = compat

# Note: localhost time and file system time may be out of sync,
# so the safe way to detect whether a new file is modified
# or is to detect whether time stamp has changed or not.
Expand Down