Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1792 - task polling logic and state reset. #2396

Merged
merged 8 commits into from Sep 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 8 additions & 11 deletions bin/cylc-poll
Expand Up @@ -18,17 +18,10 @@

"""cylc [control] poll [OPTIONS] ARGS

Poll jobs of active tasks to verify or update their statuses.
Poll (query) task jobs to verify and update their statuses.

To poll one or more tasks, "cylc poll REG TASKID"; to poll all active
tasks: "cylc poll REG".

Note that automatic job polling can used to track task status on task hosts
that do not allow any communication by HTTPS or ssh back to the suite host
- see site/user config file documentation.

Polling is also done automatically on restarting a suite, for any tasks that
were recorded as submitted or running when the suite went down.
Use "cylc poll REG" to poll all active tasks, or "cylc poll REG TASKID" to poll
individual tasks or families, or groups of them.
"""

import sys
Expand All @@ -51,6 +44,10 @@ def main():
('REG', 'Suite name'),
('[TASKID ...]', 'Task identifiers')])

parser.add_option(
"-s", "--succeeded", help="Allow polling of succeeded tasks.",
action="store_true", default=False, dest="poll_succ")

options, args = parser.parse_args()

suite = args.pop(0)
Expand All @@ -63,7 +60,7 @@ def main():
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
items = parser.parse_multitask_compat(options, args)
pclient.put_command('poll_tasks', items=items)
pclient.put_command('poll_tasks', items=items, poll_succ=options.poll_succ)


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion conf/cylc.lang
Expand Up @@ -179,7 +179,6 @@
<keyword>exclude at start-up</keyword>
<keyword>exclude</keyword>
<keyword>env-script</keyword>
<keyword>enable resurrection</keyword>
<keyword>disable automatic shutdown</keyword>
<keyword>description</keyword>
<keyword>default node attributes</keyword>
Expand Down
1 change: 0 additions & 1 deletion conf/cylc.xml
Expand Up @@ -106,7 +106,6 @@
<RegExpr attribute='Keyword' String=' exclude at start-up '/>
<RegExpr attribute='Keyword' String=' exclude '/>
<RegExpr attribute='Keyword' String=' env-script '/>
<RegExpr attribute='Keyword' String=' enable resurrection '/>
<RegExpr attribute='Keyword' String=' dummy mode suite timeout '/>
<RegExpr attribute='Keyword' String=' disable automatic shutdown '/>
<RegExpr attribute='Keyword' String=' description '/>
Expand Down
44 changes: 14 additions & 30 deletions doc/src/cylc-user-guide/cug.tex
Expand Up @@ -6757,36 +6757,20 @@ \subsection{Handling Job Preemption}
or suspend running low priority jobs in order to make way for high
priority jobs. The preempted jobs may then be automatically restarted
by the resource manager, from the same point (if suspended) or requeued
to run again from the start (if killed). If a running cylc task gets
suspended or hard-killed
(\lstinline=kill -9 <PID>= is not a trappable signal so cylc cannot detect
task failure in this case) and then later restarted, it will just appear
to cylc as if it takes longer than normal to run. If the job is
soft-killed the signal will be trapped by the task job script and a
failure message sent, resulting in cylc putting the task into the failed
state. When the preempted task restarts and sends its started message
cylc would normally treat this as an error condition (a dead task is not
supposed to be sending messages) - a warning will be logged and the task
will remain in the failed state. However, if you know that preemption is
possible on your system you can tell cylc that affected tasks should be
resurrected from the dead, to carry on as normal if progress messages
start coming in again after a failure:

\lstset{language=suiterc}
\begin{lstlisting}
# ...
[runtime]
[[HPC]]
enable resurrection = True
[[TaskFoo]]
inherit = HPC
# ...
\end{lstlisting}

To test this in any suite, manually kill a running task then, after cylc
registers the task failed, resubmit the killed job manually by
cutting-and-pasting the original job submission command from the suite
stdout stream.
to run again from the start (if killed).

Suspended jobs will poll as still running (their job status file says they
started running, and they still appear in the resource manager queue).
Loadleveler jobs that are preempted by kill-and-requeue ("job vacation") are
automatically returned to the submitted state by Cylc. This is possible
because Loadleveler sends the SIGUSR1 signal before SIGKILL for preemption.
Other batch schedulers just send SIGTERM before SIGKILL as normal, so Cylc
cannot distinguish a preemption job kill from a normal job kill. After this the
job will poll as failed (correctly, because it was killed, and the job status
file records that). To handle this kind of preemption automatically you could
use a task failed or retry event handler that queries the batch scheduler queue
(after an appropriate delay if necessary) and then, if the job has been
requeued, uses \lstinline=cylc reset= to reset the task to the submitted state.

\subsection{Manual Task Triggering and Edit-Run}

Expand Down
16 changes: 0 additions & 16 deletions doc/src/cylc-user-guide/suiterc.tex
Expand Up @@ -1245,22 +1245,6 @@ \subsection{[runtime]}
instances of the task will share the same workspace. Consider the effect on
cycle point offset housekeeping of work directories before doing this.

\paragraph[enable resurrection]{ [runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow enable resurrection}

If a message is received from a failed task cylc will normally treat
this as an error condition, issue a warning, and leave the task in the
``failed'' state. But if ``enable resurrection'' is switched on failed
tasks can come back from the dead: if the same task job script is
executed again cylc will put the task back into the running state and
continue as normal when the started message is received. This can be
used to handle HPC-style job preemption wherein a resource manager may
kill a running task and reschedule it to run again later, to make way
for a job with higher immediate priority. See also~\ref{PreemptionHPC}
\begin{myitemize}
\item {\em type:} boolean
\item {\em default:} False
\end{myitemize}

\paragraph[{[[[}meta{]]]}]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[meta]]]}

Section containing metadata items for this task or family namespace. Several items
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/cfgspec/suite.py
Expand Up @@ -340,7 +340,6 @@ def _coerce_parameter_list(value, keys, _):
'script': vdr(vtype='string', default=""),
'post-script': vdr(vtype='string', default=""),
'extra log files': vdr(vtype='string_list', default=[]),
'enable resurrection': vdr(vtype='boolean', default=False),
'work sub-directory': vdr(vtype='string'),
'meta': {
'title': vdr(vtype='string', default=""),
Expand Down Expand Up @@ -548,6 +547,7 @@ def upg(cfg, descr):
u.obsolete('7.2.2', ['cylc', 'simulation mode'])
u.obsolete('7.2.2', ['runtime', '__MANY__', 'dummy mode'])
u.obsolete('7.2.2', ['runtime', '__MANY__', 'simulation mode'])
u.obsolete('7.6.0', ['runtime', '__MANY__', 'enable resurrection'])
u.upgrade()


Expand Down
52 changes: 12 additions & 40 deletions lib/cylc/gui/app_gcylc.py
Expand Up @@ -66,11 +66,10 @@
from cylc.cfgspec.gcylc import gcfg
from cylc.wallclock import get_current_time_string
from cylc.task_state import (
TASK_STATUSES_ALL, TASK_STATUSES_RESTRICTED,
TASK_STATUSES_ALL, TASK_STATUSES_RESTRICTED, TASK_STATUSES_CAN_RESET_TO,
TASK_STATUSES_WITH_JOB_SCRIPT, TASK_STATUSES_WITH_JOB_LOGS,
TASK_STATUSES_TRIGGERABLE, TASK_STATUSES_ACTIVE,
TASK_STATUS_WAITING, TASK_STATUS_HELD, TASK_STATUS_READY,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED)
TASK_STATUSES_TRIGGERABLE, TASK_STATUSES_ACTIVE, TASK_STATUS_RUNNING,
TASK_STATUS_HELD, TASK_STATUS_FAILED)
from cylc.task_state_prop import get_status_prop


Expand Down Expand Up @@ -1500,42 +1499,15 @@ def get_right_click_menu(self, task_ids, t_states, task_is_family=False,
# graph-view so use connect_right_click_sub_menu instead of
# item.connect

reset_ready_item = gtk.ImageMenuItem('"%s"' % TASK_STATUS_READY)
reset_img = gtk.image_new_from_stock(
gtk.STOCK_CONVERT, gtk.ICON_SIZE_MENU)
reset_ready_item.set_image(reset_img)
reset_menu.append(reset_ready_item)
self.connect_right_click_sub_menu(is_graph_view, reset_ready_item,
self.reset_task_state, task_ids,
TASK_STATUS_READY)

reset_waiting_item = gtk.ImageMenuItem('"%s"' % TASK_STATUS_WAITING)
reset_img = gtk.image_new_from_stock(
gtk.STOCK_CONVERT, gtk.ICON_SIZE_MENU)
reset_waiting_item.set_image(reset_img)
reset_menu.append(reset_waiting_item)
self.connect_right_click_sub_menu(is_graph_view, reset_waiting_item,
self.reset_task_state, task_ids,
TASK_STATUS_WAITING)

reset_succeeded_item = gtk.ImageMenuItem(
'"%s"' % TASK_STATUS_SUCCEEDED)
reset_img = gtk.image_new_from_stock(gtk.STOCK_CONVERT,
gtk.ICON_SIZE_MENU)
reset_succeeded_item.set_image(reset_img)
reset_menu.append(reset_succeeded_item)
self.connect_right_click_sub_menu(is_graph_view, reset_succeeded_item,
self.reset_task_state, task_ids,
TASK_STATUS_SUCCEEDED)

reset_failed_item = gtk.ImageMenuItem('"%s"' % TASK_STATUS_FAILED)
reset_img = gtk.image_new_from_stock(gtk.STOCK_CONVERT,
gtk.ICON_SIZE_MENU)
reset_failed_item.set_image(reset_img)
reset_menu.append(reset_failed_item)
self.connect_right_click_sub_menu(is_graph_view, reset_failed_item,
self.reset_task_state, task_ids,
TASK_STATUS_FAILED)
for status in TASK_STATUSES_CAN_RESET_TO:
reset_item = gtk.ImageMenuItem('"%s"' % status)
reset_img = gtk.image_new_from_stock(
gtk.STOCK_CONVERT, gtk.ICON_SIZE_MENU)
reset_item.set_image(reset_img)
reset_menu.append(reset_item)
self.connect_right_click_sub_menu(is_graph_view, reset_item,
self.reset_task_state, task_ids,
status)

spawn_item = gtk.ImageMenuItem('Force spawn')
img = gtk.image_new_from_stock(gtk.STOCK_ADD, gtk.ICON_SIZE_MENU)
Expand Down
6 changes: 4 additions & 2 deletions lib/cylc/network/httpserver.py
Expand Up @@ -466,15 +466,17 @@ def ping_task(self, task_id, exists_only=False):

@cherrypy.expose
@cherrypy.tools.json_out()
def poll_tasks(self, items=None):
def poll_tasks(self, items=None, poll_succ=False):
"""Poll task jobs.

items is a list of identifiers for matching task proxies.
"""
self._check_access_priv_and_report(PRIV_FULL_CONTROL)
if items is not None and not isinstance(items, list):
items = [items]
self.schd.command_queue.put(("poll_tasks", (items,), {}))
self.schd.command_queue.put(
("poll_tasks", (items,),
{"poll_succ": poll_succ in ['True', True]}))
return (True, 'Command queued')

@cherrypy.expose
Expand Down
3 changes: 1 addition & 2 deletions lib/cylc/option_parsers.py
Expand Up @@ -28,8 +28,7 @@ class CylcOptionParser(OptionParser):
"""Common options for all cylc CLI commands."""

MULTITASK_USAGE = """
A TASKID is an identifier for matching individual task proxies and/or families
of them. It can be written in these syntaxes:
TASKID is a pattern to match task proxies or task families, or groups of them:
* [CYCLE-POINT-GLOB/]TASK-NAME-GLOB[:TASK-STATE]
* [CYCLE-POINT-GLOB/]FAMILY-NAME-GLOB[:TASK-STATE]
* TASK-NAME-GLOB[.CYCLE-POINT-GLOB][:TASK-STATE]
Expand Down
16 changes: 11 additions & 5 deletions lib/cylc/scheduler.py
Expand Up @@ -526,7 +526,8 @@ def process_queued_task_messages(self):
if itask.identity in task_id_messages:
for priority, message in task_id_messages[itask.identity]:
self.task_events_mgr.process_message(
itask, priority, message, is_incoming=True)
itask, priority, message,
self.task_job_mgr.poll_task_jobs, is_incoming=True)

def process_command_queue(self):
"""Process queued commands."""
Expand Down Expand Up @@ -743,12 +744,17 @@ def command_release_tasks(self, items):
"""Release tasks."""
return self.pool.release_tasks(items)

def command_poll_tasks(self, items=None):
"""Poll all tasks or a task/family if options are provided."""
def command_poll_tasks(self, items=None, poll_succ=False):
"""Poll pollable tasks or a task/family if options are provided.

Don't poll succeeded tasks unless poll_succ is True.

"""
if self.run_mode == 'simulation':
return
itasks, bad_items = self.pool.filter_task_proxies(items)
self.task_job_mgr.poll_task_jobs(self.suite, itasks, items is not None)
self.task_job_mgr.poll_task_jobs(self.suite, itasks,
poll_succ=poll_succ)
return len(bad_items)

def command_kill_tasks(self, items=None):
Expand All @@ -759,7 +765,7 @@ def command_kill_tasks(self, items=None):
if itask.state.status in TASK_STATUSES_ACTIVE:
itask.state.reset_state(TASK_STATUS_FAILED)
return len(bad_items)
self.task_job_mgr.kill_task_jobs(self.suite, itasks, items is not None)
self.task_job_mgr.kill_task_jobs(self.suite, itasks)
return len(bad_items)

def command_release_suite(self):
Expand Down