Skip to content

Commit

Permalink
scheduler, task_pool, task_proxy refactor
Browse files Browse the repository at this point in the history
Task event handlers no longer hang onto their originating task proxy
objects.
  • Loading branch information
matthewrmshin committed Mar 6, 2017
1 parent 587dcb0 commit ff92dd1
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 164 deletions.
9 changes: 4 additions & 5 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,24 @@ def main():
config.cfg['scheduling']['final cycle point']),
})

task_name, point_string = TaskID.split(task_id)
task_name, point_str = TaskID.split(task_id)
itask = TaskProxy(
config.get_taskdef(task_name),
get_point(point_string).standardise(),
get_point(point_str).standardise(),
is_startup=True)
if options.dry_run:
if not task_job_mgr.prep_submit_task_jobs(
suite, [itask], dry_run=True):
sys.exit(1)
print "JOB SCRIPT=%s" % itask.get_job_log_path(
itask.HEAD_MODE_LOCAL, tail=itask.JOB_FILE_BASE)
print('JOB SCRIPT=%s' % itask.local_job_file_path)
else:
task_job_mgr.submit_task_jobs(suite, [itask])
while task_job_mgr.proc_pool.results:
task_job_mgr.proc_pool.handle_results_async()
task_job_mgr.proc_pool.close()
task_job_mgr.proc_pool.join()
if itask.summary['submit_method_id'] is not None:
print 'Job ID:', itask.summary['submit_method_id']
print('Job ID: %s' % itask.summary['submit_method_id'])

sys.exit(itask.state.status == TASK_STATUS_SUBMIT_FAILED)

Expand Down
5 changes: 1 addition & 4 deletions bin/cylc-trigger
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ def main():

# Get the job filename from the suite daemon - the task cycle point may
# need standardising to the suite cycle point format.
jobfile_path, compat = info_client.get_info(
jobfile_path = info_client.get_info(
'get_task_jobfile_path', task_id=task_id)
if not jobfile_path:
sys.exit('ERROR: task not found')

if isinstance(jobfile_path, bool):
jobfile_path = compat

# Note: localhost time and file system time may be out of sync,
# so the safe way to detect whether a new file is modified
# or is to detect whether time stamp has changed or not.
Expand Down
24 changes: 10 additions & 14 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,12 @@ def configure(self):
'ERROR: this suite requires the %s run mode' % reqmode)

self.suite_event_handler = SuiteEventHandler(self.proc_pool)
self.task_job_mgr = TaskJobManager(self.proc_pool)
self.task_events_mgr = TaskEventsManager(
self.proc_pool,
self._get_cylc_conf("task event mail interval"),
self._get_events_conf("mail footer"))
self.task_events_mgr = TaskEventsManager(self.proc_pool)
self.task_events_mgr.mail_interval = self._get_cylc_conf(
"task event mail interval")
self.task_events_mgr.mail_footer = self._get_events_conf("mail footer")
self.task_job_mgr = TaskJobManager(
self.proc_pool, self.task_events_mgr)
if self.options.genref or self.options.reftest:
self.configure_reftest()

Expand Down Expand Up @@ -767,14 +768,9 @@ def info_ping_task(self, task_id, exists_only=False):

def info_get_task_jobfile_path(self, task_id):
"""Return task job file path."""
itask = self.pool.get_task_by_id(task_id)
if itask is None:
return False, "task not found"
path = itask.get_job_log_path(
head_mode=itask.HEAD_MODE_LOCAL, submit_num=itask.NN,
tail=itask.JOB_FILE_BASE)
# Note: 2nd value for back compat
return path, os.path.dirname(os.path.dirname(path))
name, point = TaskID.split(task_id)
return self.task_events_mgr.get_task_job_log(
self.suite, point, name, tail=self.task_job_mgr.JOB_FILE_BASE)

def info_get_suite_info(self):
"""Return a dict containing the suite title and description."""
Expand Down Expand Up @@ -1388,7 +1384,7 @@ def run(self):

self.pool.process_queued_task_messages(self.message_queue)
self.process_command_queue()
self.task_events_mgr.add_event_timers(self.pool.get_tasks())
self.task_events_mgr.event_timers_from_tasks(self.pool.get_tasks())
self.task_events_mgr.process_events(self)
has_changes = cylc.flags.iflag or self.do_update_state_summary
if has_changes:
Expand Down
113 changes: 78 additions & 35 deletions lib/cylc/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@
from cylc.mp_pool import SuiteProcContext
from cylc.suite_logging import ERR, LOG
from cylc.task_proxy import TaskProxy
from cylc.task_state import TASK_STATUS_SUCCEEDED


class TaskEventsManager(object):
"""Task events manager."""

def __init__(self, proc_pool, mail_interval, mail_footer):
NN = "NN"

def __init__(self, proc_pool):
self.proc_pool = proc_pool
self.mail_interval = mail_interval
self.mail_footer = mail_footer
self.mail_interval = 0.0
self.mail_footer = None
self.next_mail_time = None
self.event_timers = {}

def add_event_timers(self, itasks):
def event_timers_from_tasks(self, itasks):
"""Add event timers by removing them from itasks."""
for itask in itasks:
while itask.event_handler_try_timers:
Expand All @@ -49,13 +50,56 @@ def add_event_timers(self, itasks):
key1, submit_num = key
id_key = (
key1, str(itask.point), itask.tdef.name, submit_num)
self.event_timers[id_key] = (itask, timer)
self.event_timers[id_key] = timer

def get_task_job_activity_log(
self, suite, point, name, submit_num=None):
"""Shorthand for get_task_job_log(..., tail="job-activity.log")."""
return self.get_task_job_log(
suite, point, name, submit_num, "job-activity.log")

def get_task_job_log(
self, suite, point, name, submit_num=None, tail=None):
"""Return the job log path."""
args = [
GLOBAL_CFG.get_derived_host_item(suite, "suite job log directory"),
self.get_task_job_id(point, name, submit_num)]
if tail:
args.append(tail)
return os.path.join(*args)

def get_task_job_id(self, point, name, submit_num=None):
"""Return the job log path."""
try:
submit_num = "%02d" % submit_num
except TypeError:
submit_num = self.NN
return os.path.join(str(point), name, submit_num)

def log_task_job_activity(self, ctx, suite, point, name, submit_num=NN):
"""Log an activity for a task job."""
ctx_str = str(ctx)
if not ctx_str:
return
if isinstance(ctx.cmd_key, tuple): # An event handler
submit_num = ctx.cmd_key[-1]
job_activity_log = self.get_task_job_activity_log(
suite, point, name, submit_num)
try:
with open(job_activity_log, "ab") as handle:
handle.write(ctx_str + '\n')
except IOError as exc:
LOG.warning("%s: write failed\n%s" % (job_activity_log, exc))
if ctx.cmd and ctx.ret_code:
LOG.error(ctx_str)
elif ctx.cmd:
LOG.debug(ctx_str)

def process_events(self, schd_ctx):
"""Process task events."""
ctx_groups = {}
now = time()
for id_key, (_, timer) in self.event_timers.copy().items():
for id_key, timer in self.event_timers.copy().items():
key1, point, name, submit_num = id_key
if timer.is_waiting:
continue
Expand Down Expand Up @@ -97,12 +141,11 @@ def process_events(self, schd_ctx):
(key1, submit_num),
timer.ctx.cmd, env=os.environ, shell=True,
),
self._custom_handler_callback, [id_key])
self._custom_handler_callback, [schd_ctx, id_key])
else:
# Group together built-in event handlers, where possible
if timer.ctx not in ctx_groups:
ctx_groups[timer.ctx] = []
# "itask.submit_num" may have moved on at this point
ctx_groups[timer.ctx].append(id_key)

next_mail_time = now + self.mail_interval
Expand All @@ -114,14 +157,15 @@ def process_events(self, schd_ctx):
elif ctx.ctx_type == TaskProxy.JOB_LOGS_RETRIEVE:
self._process_job_logs_retrieval(schd_ctx, ctx, id_keys)

def _custom_handler_callback(self, ctx, id_key):
def _custom_handler_callback(self, ctx, schd_ctx, id_key):
"""Callback when a custom event handler is done."""
itask, timer = self.event_timers[id_key]
itask.command_log(ctx)
_, point, name, submit_num = id_key
self.log_task_job_activity(
ctx, schd_ctx.suite, point, name, submit_num)
if ctx.ret_code == 0:
del self.event_timers[id_key]
else:
timer.unset_waiting()
self.event_timers[id_key].unset_waiting()

def _process_event_email(self, schd_ctx, ctx, id_keys):
"""Process event notification, by email."""
Expand Down Expand Up @@ -159,9 +203,8 @@ def _process_event_email(self, schd_ctx, ctx, id_keys):
("owner", schd_ctx.owner)]:
if value:
stdin_str += "%s: %s\n" % (label, value)
mail_footer_tmpl = self.mail_footer
if mail_footer_tmpl:
stdin_str += (mail_footer_tmpl + "\n") % {
if self.mail_footer:
stdin_str += (self.mail_footer + "\n") % {
"host": schd_ctx.host,
"port": schd_ctx.port,
"owner": schd_ctx.owner,
Expand All @@ -175,22 +218,21 @@ def _process_event_email(self, schd_ctx, ctx, id_keys):
SuiteProcContext(
ctx, cmd, env=env, stdin_str=stdin_str, id_keys=id_keys,
),
self._event_email_callback)
self._event_email_callback, [schd_ctx])

def _event_email_callback(self, proc_ctx):
def _event_email_callback(self, proc_ctx, schd_ctx):
"""Call back when email notification command exits."""
for id_key in proc_ctx.cmd_kwargs["id_keys"]:
key1 = id_key[0]
submit_num = id_key[-1]
key1, point, name, submit_num = id_key
try:
itask, timer = self.event_timers[id_key]
if proc_ctx.ret_code == 0:
del self.event_timers[id_key]
log_ctx = SuiteProcContext((key1, submit_num), None)
log_ctx.ret_code = 0
itask.command_log(log_ctx)
self.log_task_job_activity(
log_ctx, schd_ctx.suite, point, name, submit_num)
else:
timer.unset_waiting()
self.event_timers[id_key].unset_waiting()
except KeyError:
if cylc.flags.debug:
ERR.debug(traceback.format_exc())
Expand Down Expand Up @@ -229,24 +271,24 @@ def _process_job_logs_retrieval(self, schd_ctx, ctx, id_keys):
schd_ctx.suite, "suite job log directory") + "/")
self.proc_pool.put_command(
SuiteProcContext(ctx, cmd, env=dict(os.environ), id_keys=id_keys),
self._job_logs_retrieval_callback)
self._job_logs_retrieval_callback, [schd_ctx])

def _job_logs_retrieval_callback(self, proc_ctx):
def _job_logs_retrieval_callback(self, proc_ctx, schd_ctx):
"""Call back when log job retrieval completes."""
for id_key in proc_ctx.cmd_kwargs["id_keys"]:
key1 = id_key[0]
submit_num = id_key[-1]
key1, point, name, submit_num = id_key
try:
itask, timer = self.event_timers[id_key]
# All completed jobs are expected to have a "job.out".
fnames = ["job.out"]
# Failed jobs are expected to have a "job.err".
if itask.state.status != TASK_STATUS_SUCCEEDED:
fnames.append("job.err")
try:
if key1[1] not in 'succeeded':
fnames.append("job.err")
except TypeError:
pass
fname_oks = {}
for fname in fnames:
fname_oks[fname] = os.path.exists(itask.get_job_log_path(
TaskProxy.HEAD_MODE_LOCAL, submit_num, fname))
fname_oks[fname] = os.path.exists(self.get_task_job_log(
schd_ctx.suite, point, name, submit_num, fname))
# All expected paths must exist to record a good attempt
log_ctx = SuiteProcContext((key1, submit_num), None)
if all(fname_oks.values()):
Expand All @@ -258,8 +300,9 @@ def _job_logs_retrieval_callback(self, proc_ctx):
for fname, exist_ok in sorted(fname_oks.items()):
if not exist_ok:
log_ctx.err += " %s" % fname
timer.unset_waiting()
itask.command_log(log_ctx)
self.event_timers[id_key].unset_waiting()
self.log_task_job_activity(
log_ctx, schd_ctx.suite, point, name, submit_num)
except KeyError:
if cylc.flags.debug:
ERR.debug(traceback.format_exc())

0 comments on commit ff92dd1

Please sign in to comment.