Skip to content

Commit

Permalink
Polling logic overhauled.
Browse files Browse the repository at this point in the history
 * Allow reset to 'submitted' or 'running'.
 * Allow polling of succeeded or failed tasks (but not succeeded by default).
 * Poll to confirm, if a message implies a state reversal.
 * Remove 'enable resurrection' - all tasks can return from failed.
 * Document how to handle preemption in light of these changes.
  • Loading branch information
hjoliver committed Aug 16, 2017
1 parent 976f75b commit cc9514f
Show file tree
Hide file tree
Showing 20 changed files with 276 additions and 206 deletions.
6 changes: 5 additions & 1 deletion bin/cylc-poll
Expand Up @@ -51,6 +51,10 @@ def main():
('REG', 'Suite name'),
('[TASKID ...]', 'Task identifiers')])

parser.add_option(
"-s", "--succeeded", help="Allow polling of succeeded tasks.",
action="store_true", default=False, dest="poll_all")

options, args = parser.parse_args()

suite = args.pop(0)
Expand All @@ -63,7 +67,7 @@ def main():
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
items = parser.parse_multitask_compat(options, args)
pclient.put_command('poll_tasks', items=items)
pclient.put_command('poll_tasks', items=items, poll_all=options.poll_all)


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion conf/cylc.lang
Expand Up @@ -179,7 +179,6 @@
<keyword>exclude at start-up</keyword>
<keyword>exclude</keyword>
<keyword>env-script</keyword>
<keyword>enable resurrection</keyword>
<keyword>disable automatic shutdown</keyword>
<keyword>description</keyword>
<keyword>default node attributes</keyword>
Expand Down
1 change: 0 additions & 1 deletion conf/cylc.xml
Expand Up @@ -106,7 +106,6 @@
<RegExpr attribute='Keyword' String=' exclude at start-up '/>
<RegExpr attribute='Keyword' String=' exclude '/>
<RegExpr attribute='Keyword' String=' env-script '/>
<RegExpr attribute='Keyword' String=' enable resurrection '/>
<RegExpr attribute='Keyword' String=' dummy mode suite timeout '/>
<RegExpr attribute='Keyword' String=' disable automatic shutdown '/>
<RegExpr attribute='Keyword' String=' description '/>
Expand Down
44 changes: 14 additions & 30 deletions doc/src/cylc-user-guide/cug.tex
Expand Up @@ -6742,36 +6742,20 @@ \subsection{Handling Job Preemption}
or suspend running low priority jobs in order to make way for high
priority jobs. The preempted jobs may then be automatically restarted
by the resource manager, from the same point (if suspended) or requeued
to run again from the start (if killed). If a running cylc task gets
suspended or hard-killed
(\lstinline=kill -9 <PID>= is not a trappable signal so cylc cannot detect
task failure in this case) and then later restarted, it will just appear
to cylc as if it takes longer than normal to run. If the job is
soft-killed the signal will be trapped by the task job script and a
failure message sent, resulting in cylc putting the task into the failed
state. When the preempted task restarts and sends its started message
cylc would normally treat this as an error condition (a dead task is not
supposed to be sending messages) - a warning will be logged and the task
will remain in the failed state. However, if you know that preemption is
possible on your system you can tell cylc that affected tasks should be
resurrected from the dead, to carry on as normal if progress messages
start coming in again after a failure:
\lstset{language=suiterc}
\begin{lstlisting}
# ...
[runtime]
[[HPC]]
enable resurrection = True
[[TaskFoo]]
inherit = HPC
# ...
\end{lstlisting}
To test this in any suite, manually kill a running task then, after cylc
registers the task failed, resubmit the killed job manually by
cutting-and-pasting the original job submission command from the suite
stdout stream.
to run again from the start (if killed).
Suspended jobs will poll as still running (their job status file says they
started running, and they still appear in the resource manager queue).
Loadleveler jobs that are preempted by kill-and-requeue ("job vacation") are
automatically returned to the submitted state by Cylc. This is possible
because Loadleveler sends the SIGUSR1 signal before SIGKILL for preemption.
Other batch schedulers just send SIGTERM before SIGKILL as normal, so Cylc
cannot distinguish a preemption job kill from a normal job kill. After this the
job will poll as failed (correctly, because it was killed, and the job status
file records that). To handle this kind of preemption automatically you could
use a task failed or retry event handler that queries the batch scheduler queue
(after an appropriate delay if necessary) and then, if the job has been
requeued, uses \lstinline=cylc reset= to reset the task to the submitted state.

\subsection{Manual Task Triggering and Edit-Run}

Expand Down
16 changes: 0 additions & 16 deletions doc/src/cylc-user-guide/suiterc.tex
Expand Up @@ -1210,22 +1210,6 @@ \subsection{[runtime]}
instances of the task will share the same workspace. Consider the effect on
cycle point offset housekeeping of work directories before doing this.

\paragraph[enable resurrection]{ [runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow enable resurrection}

If a message is received from a failed task cylc will normally treat
this as an error condition, issue a warning, and leave the task in the
``failed'' state. But if ``enable resurrection'' is switched on failed
tasks can come back from the dead: if the same task job script is
executed again cylc will put the task back into the running state and
continue as normal when the started message is received. This can be
used to handle HPC-style job preemption wherein a resource manager may
kill a running task and reschedule it to run again later, to make way
for a job with higher immediate priority. See also~\ref{PreemptionHPC}
\begin{myitemize}
\item {\em type:} boolean
\item {\em default:} False
\end{myitemize}

\paragraph[{[[[}meta{]]]}]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[meta]]]}

Section containing metadata items for this task or family namespace. Several items
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/cfgspec/suite.py
Expand Up @@ -336,7 +336,6 @@ def _coerce_parameter_list(value, keys, _):
'script': vdr(vtype='string', default=""),
'post-script': vdr(vtype='string', default=""),
'extra log files': vdr(vtype='string_list', default=[]),
'enable resurrection': vdr(vtype='boolean', default=False),
'work sub-directory': vdr(vtype='string'),
'meta': {
'title': vdr(vtype='string', default=""),
Expand Down Expand Up @@ -531,6 +530,7 @@ def upg(cfg, descr):
u.obsolete('7.2.2', ['cylc', 'simulation mode'])
u.obsolete('7.2.2', ['runtime', '__MANY__', 'dummy mode'])
u.obsolete('7.2.2', ['runtime', '__MANY__', 'simulation mode'])
u.obsolete('7.5.0', ['runtime', '__MANY__', 'enable resurrection'])
u.upgrade()


Expand Down
52 changes: 12 additions & 40 deletions lib/cylc/gui/app_gcylc.py
Expand Up @@ -66,11 +66,10 @@
from cylc.cfgspec.gcylc import gcfg
from cylc.wallclock import get_current_time_string
from cylc.task_state import (
TASK_STATUSES_ALL, TASK_STATUSES_RESTRICTED,
TASK_STATUSES_ALL, TASK_STATUSES_RESTRICTED, TASK_STATUSES_CAN_RESET_TO,
TASK_STATUSES_WITH_JOB_SCRIPT, TASK_STATUSES_WITH_JOB_LOGS,
TASK_STATUSES_TRIGGERABLE, TASK_STATUSES_ACTIVE,
TASK_STATUS_WAITING, TASK_STATUS_HELD, TASK_STATUS_READY,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED)
TASK_STATUSES_TRIGGERABLE, TASK_STATUSES_ACTIVE, TASK_STATUS_RUNNING,
TASK_STATUS_HELD, TASK_STATUS_FAILED)
from cylc.task_state_prop import get_status_prop


Expand Down Expand Up @@ -1500,42 +1499,15 @@ def get_right_click_menu(self, task_ids, t_states, task_is_family=False,
# graph-view so use connect_right_click_sub_menu instead of
# item.connect

reset_ready_item = gtk.ImageMenuItem('"%s"' % TASK_STATUS_READY)
reset_img = gtk.image_new_from_stock(
gtk.STOCK_CONVERT, gtk.ICON_SIZE_MENU)
reset_ready_item.set_image(reset_img)
reset_menu.append(reset_ready_item)
self.connect_right_click_sub_menu(is_graph_view, reset_ready_item,
self.reset_task_state, task_ids,
TASK_STATUS_READY)

reset_waiting_item = gtk.ImageMenuItem('"%s"' % TASK_STATUS_WAITING)
reset_img = gtk.image_new_from_stock(
gtk.STOCK_CONVERT, gtk.ICON_SIZE_MENU)
reset_waiting_item.set_image(reset_img)
reset_menu.append(reset_waiting_item)
self.connect_right_click_sub_menu(is_graph_view, reset_waiting_item,
self.reset_task_state, task_ids,
TASK_STATUS_WAITING)

reset_succeeded_item = gtk.ImageMenuItem(
'"%s"' % TASK_STATUS_SUCCEEDED)
reset_img = gtk.image_new_from_stock(gtk.STOCK_CONVERT,
gtk.ICON_SIZE_MENU)
reset_succeeded_item.set_image(reset_img)
reset_menu.append(reset_succeeded_item)
self.connect_right_click_sub_menu(is_graph_view, reset_succeeded_item,
self.reset_task_state, task_ids,
TASK_STATUS_SUCCEEDED)

reset_failed_item = gtk.ImageMenuItem('"%s"' % TASK_STATUS_FAILED)
reset_img = gtk.image_new_from_stock(gtk.STOCK_CONVERT,
gtk.ICON_SIZE_MENU)
reset_failed_item.set_image(reset_img)
reset_menu.append(reset_failed_item)
self.connect_right_click_sub_menu(is_graph_view, reset_failed_item,
self.reset_task_state, task_ids,
TASK_STATUS_FAILED)
for status in TASK_STATUSES_CAN_RESET_TO:
reset_item = gtk.ImageMenuItem('"%s"' % status)
reset_img = gtk.image_new_from_stock(
gtk.STOCK_CONVERT, gtk.ICON_SIZE_MENU)
reset_item.set_image(reset_img)
reset_menu.append(reset_item)
self.connect_right_click_sub_menu(is_graph_view, reset_item,
self.reset_task_state, task_ids,
status)

spawn_item = gtk.ImageMenuItem('Force spawn')
img = gtk.image_new_from_stock(gtk.STOCK_ADD, gtk.ICON_SIZE_MENU)
Expand Down
5 changes: 3 additions & 2 deletions lib/cylc/network/https/suite_command_server.py
Expand Up @@ -165,10 +165,11 @@ def reload_suite(self):

@cherrypy.expose
@cherrypy.tools.json_out()
def poll_tasks(self, items=None):
def poll_tasks(self, items=None, poll_all=False):
if items is not None and not isinstance(items, list):
items = [items]
return self._put("poll_tasks", (items,))
return self._put("poll_tasks", (items,),
{"poll_all": poll_all in ['True', True]})

@cherrypy.expose
@cherrypy.tools.json_out()
Expand Down
14 changes: 10 additions & 4 deletions lib/cylc/scheduler.py
Expand Up @@ -498,7 +498,8 @@ def process_queued_task_messages(self):
if itask.identity in task_id_messages:
for priority, message in task_id_messages[itask.identity]:
self.task_events_mgr.process_message(
itask, priority, message, is_incoming=True)
itask, priority, message,
self.task_job_mgr.poll_task_jobs, is_incoming=True)

def process_command_queue(self):
"""Process queued commands."""
Expand Down Expand Up @@ -693,12 +694,17 @@ def command_release_tasks(self, items):
"""Release tasks."""
return self.pool.release_tasks(items)

def command_poll_tasks(self, items=None):
"""Poll all tasks or a task/family if options are provided."""
def command_poll_tasks(self, items=None, poll_all=False):
"""Poll pollable tasks or a task/family if options are provided.
Don't poll succeeded tasks unless poll_all is True.
"""
if self.run_mode == 'simulation':
return
itasks, bad_items = self.pool.filter_task_proxies(items)
self.task_job_mgr.poll_task_jobs(self.suite, itasks, items is not None)
self.task_job_mgr.poll_task_jobs(self.suite, itasks, items is not None,
poll_all=poll_all)
return len(bad_items)

def command_kill_tasks(self, items=None):
Expand Down

0 comments on commit cc9514f

Please sign in to comment.