diff --git a/daliuge-engine/dlg/apps/simple.py b/daliuge-engine/dlg/apps/simple.py index 8324b57fb..3d2adac75 100644 --- a/daliuge-engine/dlg/apps/simple.py +++ b/daliuge-engine/dlg/apps/simple.py @@ -28,7 +28,7 @@ import numpy as np from .. import droputils, utils -from ..drop import BarrierAppDROP, ContainerDROP +from ..drop import BarrierAppDROP, BranchAppDrop, ContainerDROP from ..meta import dlg_float_param, dlg_string_param from ..meta import dlg_bool_param, dlg_int_param from ..meta import dlg_component, dlg_batch_input @@ -373,3 +373,16 @@ def run(self): for o in outs: o.len = len(content) o.write(content) # send content to all outputs + +class SimpleBranch(BranchAppDrop, NullBarrierApp): + """Simple branch app that is told the result of its condition""" + + def initialize(self, **kwargs): + self.result = self._getArg(kwargs, 'result', True) + BranchAppDrop.initialize(self, **kwargs) + + def run(self): + pass + + def condition(self): + return self.result \ No newline at end of file diff --git a/daliuge-engine/dlg/ddap_protocol.py b/daliuge-engine/dlg/ddap_protocol.py index 749629d06..dcab9b8c6 100644 --- a/daliuge-engine/dlg/ddap_protocol.py +++ b/daliuge-engine/dlg/ddap_protocol.py @@ -44,7 +44,7 @@ class DROPStates: COMPLETED. Later, they transition through EXPIRED, eventually arriving to DELETED. """ - INITIALIZED, WRITING, COMPLETED, ERROR, EXPIRED, DELETED, CANCELLED = range(7) + INITIALIZED, WRITING, COMPLETED, ERROR, EXPIRED, DELETED, CANCELLED, SKIPPED = range(8) class AppDROPStates: @@ -54,7 +54,7 @@ class AppDROPStates: are started. Depending on the execution result they eventually move to the FINISHED or ERROR state. """ - NOT_RUN, RUNNING, FINISHED, ERROR, CANCELLED = range(5) + NOT_RUN, RUNNING, FINISHED, ERROR, CANCELLED, SKIPPED = range(6) class DROPPhases: diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 54a567710..956782a38 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -923,7 +923,7 @@ def setError(self): Moves this DROP to the ERROR state. ''' - if self.status == DROPStates.CANCELLED: + if self.status in (DROPStates.CANCELLED, DROPStates.SKIPPED): return self._closeWriters() @@ -945,7 +945,10 @@ def setCompleted(self): status = self.status if status == DROPStates.CANCELLED: return - if status not in [DROPStates.INITIALIZED, DROPStates.WRITING]: + elif status == DROPStates.SKIPPED: + self._fire('dropCompleted', status=status) + return + elif status not in [DROPStates.INITIALIZED, DROPStates.WRITING]: raise Exception("%r not in INITIALIZED or WRITING state (%s), cannot setComplete()" % (self, self.status)) self._closeWriters() @@ -969,6 +972,11 @@ def cancel(self): self._closeWriters() self.status = DROPStates.CANCELLED + def skip(self): + '''Moves this drop to the SKIPPED state closing any writers we opened''' + self._closeWriters() + self.status = DROPStates.SKIPPED + @property def node(self): return self._node @@ -1560,6 +1568,14 @@ def cancel(self): super(AppDROP, self).cancel() self.execStatus = AppDROPStates.CANCELLED + def skip(self): + '''Moves this application drop to its SKIPPED state''' + super().skip() + self.execStatus = AppDROPStates.SKIPPED + for o in self._outputs.values(): + o.skip() + self._fire('producerFinished', status=self.status, execStatus=self.execStatus) + class InputFiredAppDROP(AppDROP): """ @@ -1594,6 +1610,7 @@ def initialize(self, **kwargs): super(InputFiredAppDROP, self).initialize(**kwargs) self._completedInputs = [] self._errorInputs = [] + self._skippedInputs = [] # Error threshold must be within 0 and 100 if self.input_error_threshold < 0 or self.input_error_threshold > 100: @@ -1634,13 +1651,18 @@ def dropCompleted(self, uid, drop_state): self._errorInputs.append(uid) elif drop_state == DROPStates.COMPLETED: self._completedInputs.append(uid) + elif drop_state == DROPStates.SKIPPED: + self._skippedInputs.append(uid) else: raise Exception('Invalid DROP state in dropCompleted: %s' % drop_state) error_len = len(self._errorInputs) ok_len = len(self._completedInputs) + skipped_len = len(self._skippedInputs) + + # We have enough inputs to proceed + if (skipped_len + error_len + ok_len) == n_eff_inputs: - if (error_len + ok_len) == n_eff_inputs: # calculate the number of errors that have already occurred percent_failed = math.floor((error_len/float(n_eff_inputs)) * 100) @@ -1654,6 +1676,8 @@ def dropCompleted(self, uid, drop_state): self.execStatus = AppDROPStates.ERROR self.status = DROPStates.ERROR self._notifyAppIsFinished() + elif skipped_len == n_eff_inputs: + self.skip() else: self.async_execute() @@ -1668,7 +1692,7 @@ def async_execute(self): t.start() @track_current_drop - def execute(self): + def execute(self, _send_notifications=True): """ Manually trigger the execution of this application. @@ -1703,7 +1727,8 @@ def execute(self): drop_state = DROPStates.ERROR self.status = drop_state - self._notifyAppIsFinished() + if _send_notifications: + self._notifyAppIsFinished() def run(self): """ @@ -1726,6 +1751,22 @@ def initialize(self, **kwargs): super(BarrierAppDROP, self).initialize(**kwargs) +class BranchAppDrop(BarrierAppDROP): + """ + A special kind of application with exactly two outputs. After normal + execution, the application decides whether a certain condition is met. + If the condition is met, the first output is considered as COMPLETED, + while the other is moved to SKIPPED state, and vice-versa. + """ + + @track_current_drop + def execute(self, _send_notifications=True): + if len(self._outputs) != 2: + raise InvalidDropException(self, f'BranchAppDrops should have exactly 2 outputs, not {len(self._outputs)}') + BarrierAppDROP.execute(self, _send_notifications=False) + self.outputs[1 if self.condition() else 0].skip() + self._notifyAppIsFinished() + class PlasmaDROP(AbstractDROP): ''' A DROP that points to data stored in a Plasma Store diff --git a/daliuge-engine/dlg/manager/web/session.html b/daliuge-engine/dlg/manager/web/session.html index 19788bcd6..0f2227531 100644 --- a/daliuge-engine/dlg/manager/web/session.html +++ b/daliuge-engine/dlg/manager/web/session.html @@ -96,11 +96,13 @@ var status_update_handler = function(statuses) { - + // This is the order in which blocks are drawn in the progress bar, + // so we want "done" states first and "nothing happened yet" states + // towards the end var states = ['completed', 'finished', 'running', 'writing', 'error', 'expired', 'deleted', - 'cancelled', + 'cancelled', 'skipped', 'not_run', 'initialized']; var states_idx = d3.scale.ordinal().domain(states).rangePoints([0, states.length - 1]); @@ -110,7 +112,7 @@ /* Get total and per-status counts, then normalize to 0-100% */ var total = statuses.length; - var status_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; + var status_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; statuses.reduce(function(status_counts, s) { var idx = states_idx(get_status_name(s)); status_counts[idx] = status_counts[idx] + 1; diff --git a/daliuge-engine/dlg/manager/web/static/css/session.css b/daliuge-engine/dlg/manager/web/static/css/session.css index d9f767d49..60c82debf 100644 --- a/daliuge-engine/dlg/manager/web/static/css/session.css +++ b/daliuge-engine/dlg/manager/web/static/css/session.css @@ -82,6 +82,10 @@ svg { color: #700000; } +.node.skipped :first-child, rect.skipped { + fill: #53c4f6; +} + /* AppDROP states */ .node.not_run :first-child, rect.not_run { fill: #ffe; diff --git a/daliuge-engine/dlg/manager/web/static/js/dm.js b/daliuge-engine/dlg/manager/web/static/js/dm.js index 0b7089354..f510b1542 100644 --- a/daliuge-engine/dlg/manager/web/static/js/dm.js +++ b/daliuge-engine/dlg/manager/web/static/js/dm.js @@ -21,8 +21,8 @@ // var SESSION_STATUS = ['Pristine', 'Building', 'Deploying', 'Running', 'Finished', 'Cancelled'] -var STATUS_CLASSES = ['initialized', 'writing', 'completed', 'error', 'expired', 'deleted', 'cancelled'] -var EXECSTATUS_CLASSES = ['not_run', 'running', 'finished', 'error', 'cancelled'] +var STATUS_CLASSES = ['initialized', 'writing', 'completed', 'error', 'expired', 'deleted', 'cancelled', 'skipped'] +var EXECSTATUS_CLASSES = ['not_run', 'running', 'finished', 'error', 'cancelled', 'skipped'] var TYPE_CLASSES = ['app', 'container', 'socket', 'plain'] var TYPE_SHAPES = {app:'rect', container:'parallelogram', socket:'parallelogram', plain:'parallelogram'} @@ -485,7 +485,7 @@ function startGraphStatusUpdates(serverUrl, sessionId, selectedNode, delay, var allCompleted = statuses.reduce(function(prevVal, curVal, idx, arr) { var cur_status = get_status_name(curVal); - return prevVal && (cur_status == 'completed' || cur_status == 'finished' || cur_status == 'error' || cur_status == 'cancelled'); + return prevVal && (cur_status == 'completed' || cur_status == 'finished' || cur_status == 'error' || cur_status == 'cancelled' || cur_status == 'skipped'); }, true); if (!allCompleted) { d3.timer(updateStates, delay); diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index d0401d1b8..321541022 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -26,6 +26,7 @@ import random import shutil import sqlite3 +import string import tempfile from dlg import droputils @@ -35,6 +36,7 @@ DirectoryContainer, ContainerDROP, InputFiredAppDROP, RDBMSDrop from dlg.droputils import DROPWaiterCtx from dlg.exceptions import InvalidDropException +from dlg.apps.simple import NullBarrierApp, SimpleBranch try: @@ -780,5 +782,125 @@ def test_rdbms_drop(self): finally: os.unlink(dbfile) -if __name__ == '__main__': - unittest.main() +class BranchAppDropTests(unittest.TestCase): + """Tests for the BranchAppDrop class""" + + def _simple_branch_with_outputs(self, result, uids): + a = SimpleBranch(uids[0], uids[0], result=result) + b, c = (InMemoryDROP(x, x) for x in uids[1:]) + a.addOutput(b) + a.addOutput(c) + return a, b, c + + def _assert_drop_in_status(self, drop, status, execStatus): + self.assertEqual(drop.status, status, f'{drop} has status {drop.status} != {status}') + if isinstance(drop, AppDROP): + self.assertEqual(drop.execStatus, execStatus, f'{drop} has execStatus {drop.execStatus} != {execStatus}') + + def _assert_drop_complete_or_skipped(self, drop, complete_expected): + if complete_expected: + self._assert_drop_in_status(drop, DROPStates.COMPLETED, AppDROPStates.FINISHED) + else: + self._assert_drop_in_status(drop, DROPStates.SKIPPED, AppDROPStates.SKIPPED) + + def _test_single_branch_graph(self, result, levels): + """ + Test this graph, where each level appends one drop to each branch + + ,-- true --> B --> ... + A ---- false --> C --> ... + """ + a, b, c = self._simple_branch_with_outputs(result, 'abc') + last_true = b + last_false = c + all_drops = [a, b, c] + + # all_uids is ['de', 'fg', 'hi', ....] + all_uids = [string.ascii_lowercase[i: i + 2] + for i in range(3, len(string.ascii_lowercase), 2)] + + for level, uids in zip(range(levels), all_uids): + if level % 2: + x, y = (InMemoryDROP(uid, uid) for uid in uids) + last_true.addOutput(x) + last_false.addOutput(y) + else: + x, y = (NullBarrierApp(uid, uid) for uid in uids) + last_true.addConsumer(x) + last_false.addConsumer(y) + all_drops += [x, y] + last_true = x + last_false = y + + with DROPWaiterCtx(self, [last_true, last_false], 2, [DROPStates.COMPLETED, DROPStates.SKIPPED]): + a.async_execute() + + # Depending on "result", the "true" branch will be run or skipped + self._assert_drop_complete_or_skipped(last_true, result) + self._assert_drop_complete_or_skipped(last_false, not result) + + def test_simple_branch(self): + """Check that simple branch event transmission wroks""" + self._test_single_branch_graph(True, 0) + self._test_single_branch_graph(False, 0) + + def test_simple_branch_one_level(self): + """Like test_simple_branch_app, but events propagate downstream one level""" + self._test_single_branch_graph(True, 1) + self._test_single_branch_graph(False, 1) + + def test_simple_branch_two_levels(self): + """Like test_skipped_propagates, but events propagate more levels""" + self._test_single_branch_graph(True, 2) + self._test_single_branch_graph(False, 2) + + def test_simple_branch_more_levels(self): + for levels in (3, 4, 5, 6, 7): + self._test_single_branch_graph(True, levels) + self._test_single_branch_graph(False, levels) + + def _test_multi_branch_graph(self, *results): + """ + Test this graph, each level appends a new branch to the first output + of the previous branch. + + ,- false --> C ,- false --> F + A ----- true --> B --> D ----- true --> E --> ... + """ + + a, b, c = self._simple_branch_with_outputs(results[0], 'abc') + all_drops = [a, b, c] + last_first_output = b + + # all_uids is ['de', 'fg', 'hi', ....] + all_uids = [string.ascii_lowercase[i: i + 3] + for i in range(4, len(string.ascii_lowercase), 3)] + + for uids, result in zip(all_uids, results[1:]): + x, y, z = self._simple_branch_with_outputs(result, uids) + all_drops += [x, y, z] + last_first_output.addConsumer(x) + last_first_output = y + + with DROPWaiterCtx(self, all_drops, 2, [DROPStates.COMPLETED, DROPStates.SKIPPED]): + a.async_execute() + + # TODO: Checking each individual drop depending on "results" would be a + # tricky business, so we are skipping that check for now. + + def test_multi_branch_one_level(self): + """Check that simple branch event transmission wroks""" + self._test_multi_branch_graph(True) + self._test_multi_branch_graph(False) + + def test_multi_branch_two_levels(self): + """Like test_simple_branch_app, but events propagate downstream one level""" + self._test_multi_branch_graph(True, True) + self._test_multi_branch_graph(True, False) + self._test_multi_branch_graph(False, False) + + def test_multi_branch_more_levels(self): + """Like test_skipped_propagates, but events propagate more levels""" + for levels in (3, 4, 5, 6, 7): + self._test_multi_branch_graph(True, levels) + self._test_multi_branch_graph(False, levels) \ No newline at end of file