Skip to content

Commit

Permalink
Fixes after tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Aug 16, 2017
1 parent 4d71f6b commit 5c45b73
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 36 deletions.
5 changes: 2 additions & 3 deletions lib/cylc/gui/app_gcylc.py
Expand Up @@ -68,9 +68,8 @@
from cylc.task_state import (
TASK_STATUSES_ALL, TASK_STATUSES_RESTRICTED, TASK_STATUSES_CAN_RESET_TO,
TASK_STATUSES_WITH_JOB_SCRIPT, TASK_STATUSES_WITH_JOB_LOGS,
TASK_STATUSES_TRIGGERABLE, TASK_STATUSES_ACTIVE,
TASK_STATUS_WAITING, TASK_STATUS_HELD, TASK_STATUS_READY,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED)
TASK_STATUSES_TRIGGERABLE, TASK_STATUSES_ACTIVE, TASK_STATUS_RUNNING,
TASK_STATUS_HELD, TASK_STATUS_FAILED)
from cylc.task_state_prop import get_status_prop


Expand Down
3 changes: 2 additions & 1 deletion lib/cylc/scheduler.py
Expand Up @@ -498,7 +498,8 @@ def process_queued_task_messages(self):
if itask.identity in task_id_messages:
for priority, message in task_id_messages[itask.identity]:
self.task_events_mgr.process_message(
itask, priority, message, is_incoming=True)
itask, priority, message,
self.task_job_mgr.poll_task_jobs, is_incoming=True)

def process_command_queue(self):
"""Process queued commands."""
Expand Down
29 changes: 13 additions & 16 deletions lib/cylc/task_events_mgr.py
Expand Up @@ -47,10 +47,9 @@
from cylc.task_action_timer import TaskActionTimer
from cylc.task_message import TaskMessage
from cylc.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUS_READY, TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING, TASK_STATUS_RETRYING, TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED)
TASK_STATUS_READY, TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_RUNNING, TASK_STATUS_RETRYING,
TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED)
from cylc.task_outputs import (
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED)
Expand Down Expand Up @@ -117,8 +116,6 @@ def __init__(self, suite, proc_pool, suite_db_mgr):
self.mail_footer = None
self.next_mail_time = None
self.event_timers = {}
# method supplied by task_job_mgr
self.job_poll = None

@staticmethod
def get_host_conf(itask, key, default=None, skey="remote"):
Expand Down Expand Up @@ -244,8 +241,8 @@ def process_events(self, schd_ctx):
elif ctx.ctx_type == self.HANDLER_JOB_LOGS_RETRIEVE:
self._process_job_logs_retrieval(schd_ctx, ctx, id_keys)

def process_message(self, itask, priority, message, poll_event_time=None,
is_incoming=False):
def process_message(self, itask, priority, message, poll_func,
poll_event_time=None, is_incoming=False):
"""Parse an incoming task message and update task state.
Incoming, e.g. "succeeded at <TIME>", may be from task job or polling.
Expand Down Expand Up @@ -292,38 +289,38 @@ def process_message(self, itask, priority, message, poll_event_time=None,
if (itask.state.is_greater_than(TASK_STATUS_RUNNING) and not
itask.state.confirming_with_poll):
itask.state.confirming_with_poll = True
self.job_poll(self.suite, [itask])
poll_func(self.suite, [itask])
itask.state.confirming_with_poll = False
self._process_message_started(itask, event_time)
elif message == TASK_OUTPUT_SUCCEEDED:
if (itask.state.is_greater_than(TASK_STATUS_SUCCEEDED) and not
itask.state.confirming_with_poll):
itask.state.confirming_with_poll = True
self.job_poll(self.suite, [itask])
poll_func(self.suite, [itask])
return
itask.state.confirming_with_poll = False
self._process_message_succeeded(itask, event_time)
elif message == TASK_OUTPUT_FAILED:
if (itask.state.is_greater_than(TASK_STATUS_FAILED) and not
itask.state.confirming_with_poll):
itask.state.confirming_with_poll = True
self.job_poll(self.suite, [itask])
poll_func(self.suite, [itask])
return
itask.state.confirming_with_poll = False
self._process_message_failed(itask, event_time, self.JOB_FAILED)
elif message == self.EVENT_SUBMIT_FAILED:
if (itask.state.is_greater_than(TASK_STATUS_SUBMIT_FAILED) and not
itask.state.confirming_with_poll):
itask.state.confirming_with_poll = True
self.job_poll(self.suite, [itask])
poll_func(self.suite, [itask])
return
itask.state.confirming_with_poll = False
self._process_message_submit_failed(itask, event_time)
elif message == TASK_OUTPUT_SUBMITTED:
if (itask.state.is_greater_than(TASK_STATUS_SUBMITTED) and not
itask.state.confirming_with_poll):
itask.state.confirming_with_poll = True
self.job_poll(self.suite, [itask])
poll_func(self.suite, [itask])
return
itask.state.confirming_with_poll = False
self._process_message_submitted(itask, event_time)
Expand All @@ -336,7 +333,7 @@ def process_message(self, itask, priority, message, poll_event_time=None,
if (itask.state.is_greater_than(TASK_STATUS_FAILED) and not
itask.state.confirming_with_poll):
itask.state.confirming_with_poll = True
self.job_poll(self.suite, [itask])
poll_func(self.suite, [itask])
return
itask.state.confirming_with_poll = False
self._process_message_failed(itask, event_time, self.JOB_FAILED)
Expand All @@ -345,11 +342,11 @@ def process_message(self, itask, priority, message, poll_event_time=None,
aborted_with = message[len(TaskMessage.ABORT_MESSAGE_PREFIX):]
self._db_events_insert(itask, "aborted", message)
self.suite_db_mgr.put_update_task_jobs(
itask, {"aborted_with": aborted_with})
itask, {"run_signal": aborted_with})
if (itask.state.is_greater_than(TASK_STATUS_FAILED) and not
itask.state.confirming_with_poll):
itask.state.confirming_with_poll = True
self.job_poll(self.suite, [itask])
poll_func(self.suite, [itask])
return
itask.state.confirming_with_poll = False
self._process_message_failed(itask, event_time, aborted_with)
Expand Down
41 changes: 25 additions & 16 deletions lib/cylc/task_job_mgr.py
Expand Up @@ -551,11 +551,12 @@ def _kill_task_job_callback(self, suite, itask, cmd_ctx, line):
elif itask.state.status == TASK_STATUS_SUBMITTED:
self.task_events_mgr.process_message(
itask, CRITICAL, "%s at %s" % (
self.task_events_mgr.EVENT_SUBMIT_FAILED, ctx.timestamp))
self.task_events_mgr.EVENT_SUBMIT_FAILED, ctx.timestamp),
self.poll_task_jobs)
cylc.flags.iflag = True
elif itask.state.status == TASK_STATUS_RUNNING:
self.task_events_mgr.process_message(
itask, CRITICAL, TASK_OUTPUT_FAILED)
itask, CRITICAL, TASK_OUTPUT_FAILED, self.poll_task_jobs)
cylc.flags.iflag = True
else:
log_lvl = WARNING
Expand Down Expand Up @@ -647,41 +648,46 @@ def _poll_task_job_callback(self, suite, itask, cmd_ctx, line):
if run_status == "1" and run_signal in ["ERR", "EXIT"]:
# Failed normally
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_FAILED, time_run_exit)
itask, INFO, TASK_OUTPUT_FAILED, self.poll_task_jobs,
time_run_exit)
elif run_status == "1" and batch_sys_exit_polled == "1":
# Failed by a signal, and no longer in batch system
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_FAILED, time_run_exit)
itask, INFO, TASK_OUTPUT_FAILED, self.poll_task_jobs,
time_run_exit)
self.task_events_mgr.process_message(
itask, INFO, TaskMessage.FAIL_MESSAGE_PREFIX + run_signal,
time_run_exit)
self.poll_task_jobs, time_run_exit)
elif run_status == "1":
# The job has terminated, but is still managed by batch system.
# Some batch system may restart a job in this state, so don't
# mark as failed yet.
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_STARTED, time_run)
itask, INFO, TASK_OUTPUT_STARTED, self.poll_task_jobs,
time_run)
elif run_status == "0":
# The job succeeded
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUCCEEDED, time_run_exit)
itask, INFO, TASK_OUTPUT_SUCCEEDED, self.poll_task_jobs,
time_run_exit)
elif time_run and batch_sys_exit_polled == "1":
# The job has terminated without executing the error trap
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_FAILED, "")
itask, INFO, TASK_OUTPUT_FAILED, self.poll_task_jobs, "")
elif time_run:
# The job has started, and is still managed by batch system
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_STARTED, time_run)
itask, INFO, TASK_OUTPUT_STARTED, self.poll_task_jobs, time_run)
elif batch_sys_exit_polled == "1":
# The job never ran, and no longer in batch system
self.task_events_mgr.process_message(
itask, INFO, self.task_events_mgr.EVENT_SUBMIT_FAILED,
time_submit_exit)
self.poll_task_jobs, time_submit_exit)
else:
# The job never ran, and is in batch system
self.task_events_mgr.process_message(
itask, INFO, TASK_STATUS_SUBMITTED, time_submit_exit)
itask, INFO, TASK_STATUS_SUBMITTED, self.poll_task_jobs,
time_submit_exit)

def _poll_task_job_message_callback(self, suite, itask, cmd_ctx, line):
"""Helper for _poll_task_jobs_callback, on message of one task job."""
Expand All @@ -695,7 +701,7 @@ def _poll_task_job_message_callback(self, suite, itask, cmd_ctx, line):
else:
ctx.ret_code = 0
self.task_events_mgr.process_message(
itask, priority, message, event_time)
itask, priority, message, self.poll_task_jobs, event_time)
self.task_events_mgr.log_task_job_activity(
ctx, suite, itask.point, itask.tdef.name)

Expand Down Expand Up @@ -762,7 +768,7 @@ def _simulation_submit_task_jobs(self, itasks):
itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = (
itask.tdef.rtconfig['job']['simulated run length'])
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED)
itask, INFO, TASK_OUTPUT_SUBMITTED, self.poll_task_jobs)

def _submit_task_jobs_callback(self, ctx, suite, itasks):
"""Callback when submit task jobs command exits."""
Expand Down Expand Up @@ -802,11 +808,13 @@ def _submit_task_job_callback(self, suite, itask, cmd_ctx, line):
if itask.summary['submit_method_id'] and ctx.ret_code == 0:
self.task_events_mgr.process_message(
itask, INFO, '%s at %s' % (
TASK_OUTPUT_SUBMITTED, ctx.timestamp))
TASK_OUTPUT_SUBMITTED, ctx.timestamp),
self.poll_task_jobs)
else:
self.task_events_mgr.process_message(
itask, CRITICAL, '%s at %s' % (
self.task_events_mgr.EVENT_SUBMIT_FAILED, ctx.timestamp))
self.task_events_mgr.EVENT_SUBMIT_FAILED, ctx.timestamp),
self.poll_task_jobs)

def _prep_submit_task_job(self, suite, itask, dry_run):
"""Prepare a task job submission.
Expand Down Expand Up @@ -834,7 +842,8 @@ def _prep_submit_task_job(self, suite, itask, dry_run):
suite, itask.point, itask.tdef.name)
if not dry_run:
self.task_events_mgr.process_message(
itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED)
itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED,
self.poll_task_jobs)
return
itask.local_job_file_path = local_job_file_path

Expand Down

0 comments on commit 5c45b73

Please sign in to comment.