Skip to content

Commit

Permalink
Make xtriggers unique to sequence.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Aug 12, 2019
1 parent 1e261be commit 48a1d02
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 156 deletions.
65 changes: 13 additions & 52 deletions lib/cylc/config.py
Expand Up @@ -58,7 +58,6 @@
from cylc.graphnode import GraphNodeParser, GraphNodeError
from cylc.print_tree import print_tree
from cylc.subprocctx import SubFuncContext
from cylc.subprocpool import get_func
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.taskdef import TaskDef, TaskDefError
from cylc.task_id import TaskID
Expand Down Expand Up @@ -155,7 +154,6 @@ def __init__(
self.xtrigger_mgr = XtriggerManager(self.suite)
else:
self.xtrigger_mgr = xtrigger_mgr
self.xtriggers = {}
self.suite_polling_tasks = {}
self._last_graph_raw_id = None
self._last_graph_raw_edges = []
Expand Down Expand Up @@ -1741,11 +1739,19 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
dependency = Dependency(expr_list, set(triggers.values()), suicide)
self.taskdefs[right].add_dependency(dependency, seq)

# Record xtrigger labels for each task name.
if right not in self.xtriggers:
self.xtriggers[right] = xtrig_labels
else:
self.xtriggers[right] = self.xtriggers[right].union(xtrig_labels)
for label in xtrig_labels:
try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
except KeyError:
if label == 'wall_clock':
# Allow predefined zero-offset wall clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
else:
raise SuiteConfigError(
"ERROR, undefined xtrigger label: %s" % label)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
"""Get actual first cycle point for the suite
Expand Down Expand Up @@ -2086,51 +2092,6 @@ def load_graph(self):
self._proc_triggers(
parser.triggers, parser.original, seq, task_triggers)

xtcfg = self.cfg['scheduling']['xtriggers']
# Taskdefs just know xtrigger labels.
for task_name, xt_labels in self.xtriggers.items():
for label in xt_labels:
try:
xtrig = xtcfg[label]
except KeyError:
if label == 'wall_clock':
# Allow predefined zero-offset wall clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
else:
raise SuiteConfigError(
"ERROR, undefined xtrigger label: %s" % label)
if xtrig.func_name.startswith('wall_clock'):
self.xtrigger_mgr.add_clock(label, xtrig)
# Replace existing xclock if the new offset is larger.
try:
offset = get_interval(xtrig.func_kwargs['offset'])
except KeyError:
offset = 0
old_label = self.taskdefs[task_name].xclock_label
if old_label is None:
self.taskdefs[task_name].xclock_label = label
else:
old_xtrig = self.xtrigger_mgr.clockx_map[old_label]
old_offset = get_interval(
old_xtrig.func_kwargs['offset'])
if offset > old_offset:
self.taskdefs[task_name].xclock_label = label
else:
try:
if not callable(get_func(xtrig.func_name, self.fdir)):
raise SuiteConfigError(
"ERROR, "
"xtrigger function not callable: %s" %
xtrig.func_name)
except (ImportError, AttributeError):
raise SuiteConfigError(
"ERROR, "
"xtrigger function not found: %s" %
xtrig.func_name)
self.xtrigger_mgr.add_trig(label, xtrig)
self.taskdefs[task_name].xtrig_labels.add(label)

# Detect use of xtrigger names with '@' prefix (creates a task).
overlap = set(self.taskdefs.keys()).intersection(
self.cfg['scheduling']['xtriggers'].keys())
Expand Down
15 changes: 5 additions & 10 deletions lib/cylc/task_pool.py
Expand Up @@ -1263,16 +1263,13 @@ def get_task_requisites(self, items, list_prereqs=False):
else:
extras['External trigger "%s"' % trig] = 'NOT satisfied'
for label, satisfied in itask.state.xtriggers.items():
extra = 'xtrigger "%s = %s"' % (
label, self.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature())
if satisfied:
extras['xtrigger "%s"' % label] = 'satisfied'
extras[extra] = 'satisfied'
else:
extras['xtrigger "%s"' % label] = 'NOT satisfied'
if itask.state.xclock is not None:
label, satisfied = itask.state.xclock
if satisfied:
extras['xclock "%s"' % label] = 'satisfied'
else:
extras['xclock "%s"' % label] = 'NOT satisfied'
extras[extra] = 'NOT satisfied'

outputs = []
for _, msg, is_completed in itask.state.outputs.get_all():
Expand All @@ -1289,8 +1286,6 @@ def check_xtriggers(self):
itasks = self.get_tasks()
self.xtrigger_mgr.collate(itasks)
for itask in itasks:
if itask.state.xclock is not None:
self.xtrigger_mgr.satisfy_xclock(itask)
if itask.state.xtriggers:
self.xtrigger_mgr.satisfy_xtriggers(itask, self.proc_pool)

Expand Down
29 changes: 15 additions & 14 deletions lib/cylc/task_state.py
Expand Up @@ -218,10 +218,6 @@ class TaskState(object):
List of prerequisites that will cause the task to suicide.
.time_updated (str):
Time string of latest update time.
.xclock (tuple):
A tuple (clock_label (str), is_done (boolean)) to indicate if a
clock trigger is satisfied or not. Set to `None` if the task has no
clock trigger.
.xtriggers (dict):
xtriggers as {trigger (str): satisfied (boolean), ...}.
._is_satisfied (boolean):
Expand All @@ -242,7 +238,6 @@ class TaskState(object):
"status",
"suicide_prerequisites",
"time_updated",
"xclock",
"xtriggers",
"_is_satisfied",
"_suicide_is_satisfied",
Expand Down Expand Up @@ -274,12 +269,7 @@ def __init__(self, tdef, point, status, hold_swap):

# xtriggers (represented by labels) satisfied or not
self.xtriggers = {}
for label in tdef.xtrig_labels:
self.xtriggers[label] = False
if tdef.xclock_label:
self.xclock = (tdef.xclock_label, False)
else:
self.xclock = None
self._add_xtriggers(point, tdef)

# Message outputs.
self.outputs = TaskOutputs(tdef)
Expand All @@ -301,9 +291,7 @@ def satisfy_me(self, all_task_outputs):
self._suicide_is_satisfied = None

def xtriggers_all_satisfied(self):
"""Return True if xclock and all xtriggers are satisfied."""
if self.xclock is not None and not self.xclock[1]:
return False
"""Return True if all xtriggers are satisfied."""
return all(self.xtriggers.values())

def prerequisites_are_all_satisfied(self):
Expand Down Expand Up @@ -528,3 +516,16 @@ def _add_prerequisites(self, point, tdef):
p_prev < tdef.start_point)
cpre.set_condition(tdef.name)
self.prerequisites.append(cpre)

def _add_xtriggers(self, point, tdef):
"""Add task xtriggers valid for the current sequence.
Initialize each one to unsatisfied.
"""
# Triggers for sequence_i only used if my cycle point is a
# valid member of sequence_i's sequence of cycle points.
for sequence, xtrig_labels in tdef.xtrig_labels.items():
if not sequence.is_valid(point):
continue
for xtrig_label in xtrig_labels:
self.xtriggers[xtrig_label] = False
31 changes: 20 additions & 11 deletions lib/cylc/taskdef.py
Expand Up @@ -47,8 +47,7 @@ class TaskDef(object):
"intercycle_offsets", "sequential", "is_coldstart",
"suite_polling_cfg", "clocktrigger_offset", "expiration_offset",
"namespace_hierarchy", "dependencies", "outputs", "param_var",
"external_triggers", "xtrig_labels", "xclock_label",
"name", "elapsed_times"]
"external_triggers", "xtrig_labels", "name", "elapsed_times"]

# Store the elapsed times for a maximum of 10 cycles
MAX_LEN_ELAPSED_TIMES = 10
Expand Down Expand Up @@ -78,12 +77,7 @@ def __init__(self, name, rtcfg, run_mode, start_point, spawn_ahead):
self.outputs = set()
self.param_var = {}
self.external_triggers = []
self.xtrig_labels = set()
self.xclock_label = None
# Note a task can only have one clock xtrigger - if it depends on
# several we just keep the label of the one with the largest offset
# (this is determined and set during suite config parsing, to avoid
# storing the offset here in the taskdef).
self.xtrig_labels = {} # {sequence: [labels]}

self.name = name
self.elapsed_times = deque(maxlen=self.MAX_LEN_ELAPSED_TIMES)
Expand All @@ -97,9 +91,24 @@ def add_dependency(self, dependency, sequence):
dependency applies.
"""
if sequence not in self.dependencies:
self.dependencies[sequence] = []
self.dependencies[sequence].append(dependency)
try:
self.dependencies[sequence].append(dependency)
except KeyError:
self.dependencies[sequence] = [dependency]

def add_xtrig_label(self, xtrig_label, sequence):
"""Add an xtrigger to a named sequence.
Args:
xtrig_label: The xtrigger label to add.
sequence (cylc.cycling.SequenceBase): The sequence for which this
xtrigger applies.
"""
try:
self.xtrig_labels[sequence].append(xtrig_label)
except KeyError:
self.xtrig_labels[sequence] = [xtrig_label]

def add_sequence(self, sequence):
"""Add a sequence."""
Expand Down

0 comments on commit 48a1d02

Please sign in to comment.