Skip to content

Commit

Permalink
Add initial support for Branch drops
Browse files Browse the repository at this point in the history
Branch drops are a special type of application drops that must implement
an additional condition function, returning true or false, and thus
triggering one branch of execution down the rest of the graph, or the
other. The current implementation requires these applications
to have exactly two outputs, mainly because the information of which
branch should continue executing is transmitted through the output drop
onto its corresponding consumers instead of trying to contact the
consumers directly.

To support this new functionality we require a new SKIPPED state, which
automatically propagates through the graph. Unit tests check that this
propagation works through multiple levels, both for data and application
drops, such that their respective states change correctly (which is
important for the UI to display them correctly too).

The Web UI has also been updated to display skipped branches of
execution with a different color.

Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Jul 8, 2021
1 parent e12f29d commit fd85ba0
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 16 deletions.
15 changes: 14 additions & 1 deletion daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import numpy as np

from .. import droputils, utils
from ..drop import BarrierAppDROP, ContainerDROP
from ..drop import BarrierAppDROP, BranchAppDrop, ContainerDROP
from ..meta import dlg_float_param, dlg_string_param
from ..meta import dlg_bool_param, dlg_int_param
from ..meta import dlg_component, dlg_batch_input
Expand Down Expand Up @@ -373,3 +373,16 @@ def run(self):
for o in outs:
o.len = len(content)
o.write(content) # send content to all outputs

class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

def initialize(self, **kwargs):
self.result = self._getArg(kwargs, 'result', True)
BranchAppDrop.initialize(self, **kwargs)

def run(self):
pass

def condition(self):
return self.result
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/ddap_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DROPStates:
COMPLETED. Later, they transition through EXPIRED, eventually arriving to
DELETED.
"""
INITIALIZED, WRITING, COMPLETED, ERROR, EXPIRED, DELETED, CANCELLED = range(7)
INITIALIZED, WRITING, COMPLETED, ERROR, EXPIRED, DELETED, CANCELLED, SKIPPED = range(8)


class AppDROPStates:
Expand All @@ -54,7 +54,7 @@ class AppDROPStates:
are started. Depending on the execution result they eventually move to the
FINISHED or ERROR state.
"""
NOT_RUN, RUNNING, FINISHED, ERROR, CANCELLED = range(5)
NOT_RUN, RUNNING, FINISHED, ERROR, CANCELLED, SKIPPED = range(6)


class DROPPhases:
Expand Down
51 changes: 46 additions & 5 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ def setError(self):
Moves this DROP to the ERROR state.
'''

if self.status == DROPStates.CANCELLED:
if self.status in (DROPStates.CANCELLED, DROPStates.SKIPPED):
return

self._closeWriters()
Expand All @@ -945,7 +945,10 @@ def setCompleted(self):
status = self.status
if status == DROPStates.CANCELLED:
return
if status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
elif status == DROPStates.SKIPPED:
self._fire('dropCompleted', status=status)
return
elif status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception("%r not in INITIALIZED or WRITING state (%s), cannot setComplete()" % (self, self.status))

self._closeWriters()
Expand All @@ -969,6 +972,11 @@ def cancel(self):
self._closeWriters()
self.status = DROPStates.CANCELLED

def skip(self):
'''Moves this drop to the SKIPPED state closing any writers we opened'''
self._closeWriters()
self.status = DROPStates.SKIPPED

@property
def node(self):
return self._node
Expand Down Expand Up @@ -1560,6 +1568,14 @@ def cancel(self):
super(AppDROP, self).cancel()
self.execStatus = AppDROPStates.CANCELLED

def skip(self):
'''Moves this application drop to its SKIPPED state'''
super().skip()
self.execStatus = AppDROPStates.SKIPPED
for o in self._outputs.values():
o.skip()
self._fire('producerFinished', status=self.status, execStatus=self.execStatus)


class InputFiredAppDROP(AppDROP):
"""
Expand Down Expand Up @@ -1594,6 +1610,7 @@ def initialize(self, **kwargs):
super(InputFiredAppDROP, self).initialize(**kwargs)
self._completedInputs = []
self._errorInputs = []
self._skippedInputs = []

# Error threshold must be within 0 and 100
if self.input_error_threshold < 0 or self.input_error_threshold > 100:
Expand Down Expand Up @@ -1634,13 +1651,18 @@ def dropCompleted(self, uid, drop_state):
self._errorInputs.append(uid)
elif drop_state == DROPStates.COMPLETED:
self._completedInputs.append(uid)
elif drop_state == DROPStates.SKIPPED:
self._skippedInputs.append(uid)
else:
raise Exception('Invalid DROP state in dropCompleted: %s' % drop_state)

error_len = len(self._errorInputs)
ok_len = len(self._completedInputs)
skipped_len = len(self._skippedInputs)

# We have enough inputs to proceed
if (skipped_len + error_len + ok_len) == n_eff_inputs:

if (error_len + ok_len) == n_eff_inputs:
# calculate the number of errors that have already occurred
percent_failed = math.floor((error_len/float(n_eff_inputs)) * 100)

Expand All @@ -1654,6 +1676,8 @@ def dropCompleted(self, uid, drop_state):
self.execStatus = AppDROPStates.ERROR
self.status = DROPStates.ERROR
self._notifyAppIsFinished()
elif skipped_len == n_eff_inputs:
self.skip()
else:
self.async_execute()

Expand All @@ -1668,7 +1692,7 @@ def async_execute(self):
t.start()

@track_current_drop
def execute(self):
def execute(self, _send_notifications=True):
"""
Manually trigger the execution of this application.
Expand Down Expand Up @@ -1703,7 +1727,8 @@ def execute(self):
drop_state = DROPStates.ERROR

self.status = drop_state
self._notifyAppIsFinished()
if _send_notifications:
self._notifyAppIsFinished()

def run(self):
"""
Expand All @@ -1726,6 +1751,22 @@ def initialize(self, **kwargs):
super(BarrierAppDROP, self).initialize(**kwargs)


class BranchAppDrop(BarrierAppDROP):
"""
A special kind of application with exactly two outputs. After normal
execution, the application decides whether a certain condition is met.
If the condition is met, the first output is considered as COMPLETED,
while the other is moved to SKIPPED state, and vice-versa.
"""

@track_current_drop
def execute(self, _send_notifications=True):
if len(self._outputs) != 2:
raise InvalidDropException(self, f'BranchAppDrops should have exactly 2 outputs, not {len(self._outputs)}')
BarrierAppDROP.execute(self, _send_notifications=False)
self.outputs[1 if self.condition() else 0].skip()
self._notifyAppIsFinished()

class PlasmaDROP(AbstractDROP):
'''
A DROP that points to data stored in a Plasma Store
Expand Down
8 changes: 5 additions & 3 deletions daliuge-engine/dlg/manager/web/session.html
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@

var status_update_handler = function(statuses)
{

// This is the order in which blocks are drawn in the progress bar,
// so we want "done" states first and "nothing happened yet" states
// towards the end
var states = ['completed', 'finished',
'running', 'writing',
'error', 'expired', 'deleted',
'cancelled',
'cancelled', 'skipped',
'not_run', 'initialized'];
var states_idx = d3.scale.ordinal().domain(states).rangePoints([0, states.length - 1]);

Expand All @@ -110,7 +112,7 @@

/* Get total and per-status counts, then normalize to 0-100% */
var total = statuses.length;
var status_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
var status_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
statuses.reduce(function(status_counts, s) {
var idx = states_idx(get_status_name(s));
status_counts[idx] = status_counts[idx] + 1;
Expand Down
4 changes: 4 additions & 0 deletions daliuge-engine/dlg/manager/web/static/css/session.css
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ svg {
color: #700000;
}

.node.skipped :first-child, rect.skipped {
fill: #53c4f6;
}

/* AppDROP states */
.node.not_run :first-child, rect.not_run {
fill: #ffe;
Expand Down
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/manager/web/static/js/dm.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
//

var SESSION_STATUS = ['Pristine', 'Building', 'Deploying', 'Running', 'Finished', 'Cancelled']
var STATUS_CLASSES = ['initialized', 'writing', 'completed', 'error', 'expired', 'deleted', 'cancelled']
var EXECSTATUS_CLASSES = ['not_run', 'running', 'finished', 'error', 'cancelled']
var STATUS_CLASSES = ['initialized', 'writing', 'completed', 'error', 'expired', 'deleted', 'cancelled', 'skipped']
var EXECSTATUS_CLASSES = ['not_run', 'running', 'finished', 'error', 'cancelled', 'skipped']
var TYPE_CLASSES = ['app', 'container', 'socket', 'plain']
var TYPE_SHAPES = {app:'rect', container:'parallelogram', socket:'parallelogram', plain:'parallelogram'}

Expand Down Expand Up @@ -485,7 +485,7 @@ function startGraphStatusUpdates(serverUrl, sessionId, selectedNode, delay,

var allCompleted = statuses.reduce(function(prevVal, curVal, idx, arr) {
var cur_status = get_status_name(curVal);
return prevVal && (cur_status == 'completed' || cur_status == 'finished' || cur_status == 'error' || cur_status == 'cancelled');
return prevVal && (cur_status == 'completed' || cur_status == 'finished' || cur_status == 'error' || cur_status == 'cancelled' || cur_status == 'skipped');
}, true);
if (!allCompleted) {
d3.timer(updateStates, delay);
Expand Down
126 changes: 124 additions & 2 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import random
import shutil
import sqlite3
import string
import tempfile

from dlg import droputils
Expand All @@ -35,6 +36,7 @@
DirectoryContainer, ContainerDROP, InputFiredAppDROP, RDBMSDrop
from dlg.droputils import DROPWaiterCtx
from dlg.exceptions import InvalidDropException
from dlg.apps.simple import NullBarrierApp, SimpleBranch


try:
Expand Down Expand Up @@ -780,5 +782,125 @@ def test_rdbms_drop(self):
finally:
os.unlink(dbfile)

if __name__ == '__main__':
unittest.main()
class BranchAppDropTests(unittest.TestCase):
"""Tests for the BranchAppDrop class"""

def _simple_branch_with_outputs(self, result, uids):
a = SimpleBranch(uids[0], uids[0], result=result)
b, c = (InMemoryDROP(x, x) for x in uids[1:])
a.addOutput(b)
a.addOutput(c)
return a, b, c

def _assert_drop_in_status(self, drop, status, execStatus):
self.assertEqual(drop.status, status, f'{drop} has status {drop.status} != {status}')
if isinstance(drop, AppDROP):
self.assertEqual(drop.execStatus, execStatus, f'{drop} has execStatus {drop.execStatus} != {execStatus}')

def _assert_drop_complete_or_skipped(self, drop, complete_expected):
if complete_expected:
self._assert_drop_in_status(drop, DROPStates.COMPLETED, AppDROPStates.FINISHED)
else:
self._assert_drop_in_status(drop, DROPStates.SKIPPED, AppDROPStates.SKIPPED)

def _test_single_branch_graph(self, result, levels):
"""
Test this graph, where each level appends one drop to each branch
,-- true --> B --> ...
A ---- false --> C --> ...
"""
a, b, c = self._simple_branch_with_outputs(result, 'abc')
last_true = b
last_false = c
all_drops = [a, b, c]

# all_uids is ['de', 'fg', 'hi', ....]
all_uids = [string.ascii_lowercase[i: i + 2]
for i in range(3, len(string.ascii_lowercase), 2)]

for level, uids in zip(range(levels), all_uids):
if level % 2:
x, y = (InMemoryDROP(uid, uid) for uid in uids)
last_true.addOutput(x)
last_false.addOutput(y)
else:
x, y = (NullBarrierApp(uid, uid) for uid in uids)
last_true.addConsumer(x)
last_false.addConsumer(y)
all_drops += [x, y]
last_true = x
last_false = y

with DROPWaiterCtx(self, [last_true, last_false], 2, [DROPStates.COMPLETED, DROPStates.SKIPPED]):
a.async_execute()

# Depending on "result", the "true" branch will be run or skipped
self._assert_drop_complete_or_skipped(last_true, result)
self._assert_drop_complete_or_skipped(last_false, not result)

def test_simple_branch(self):
"""Check that simple branch event transmission wroks"""
self._test_single_branch_graph(True, 0)
self._test_single_branch_graph(False, 0)

def test_simple_branch_one_level(self):
"""Like test_simple_branch_app, but events propagate downstream one level"""
self._test_single_branch_graph(True, 1)
self._test_single_branch_graph(False, 1)

def test_simple_branch_two_levels(self):
"""Like test_skipped_propagates, but events propagate more levels"""
self._test_single_branch_graph(True, 2)
self._test_single_branch_graph(False, 2)

def test_simple_branch_more_levels(self):
for levels in (3, 4, 5, 6, 7):
self._test_single_branch_graph(True, levels)
self._test_single_branch_graph(False, levels)

def _test_multi_branch_graph(self, *results):
"""
Test this graph, each level appends a new branch to the first output
of the previous branch.
,- false --> C ,- false --> F
A ----- true --> B --> D ----- true --> E --> ...
"""

a, b, c = self._simple_branch_with_outputs(results[0], 'abc')
all_drops = [a, b, c]
last_first_output = b

# all_uids is ['de', 'fg', 'hi', ....]
all_uids = [string.ascii_lowercase[i: i + 3]
for i in range(4, len(string.ascii_lowercase), 3)]

for uids, result in zip(all_uids, results[1:]):
x, y, z = self._simple_branch_with_outputs(result, uids)
all_drops += [x, y, z]
last_first_output.addConsumer(x)
last_first_output = y

with DROPWaiterCtx(self, all_drops, 2, [DROPStates.COMPLETED, DROPStates.SKIPPED]):
a.async_execute()

# TODO: Checking each individual drop depending on "results" would be a
# tricky business, so we are skipping that check for now.

def test_multi_branch_one_level(self):
"""Check that simple branch event transmission wroks"""
self._test_multi_branch_graph(True)
self._test_multi_branch_graph(False)

def test_multi_branch_two_levels(self):
"""Like test_simple_branch_app, but events propagate downstream one level"""
self._test_multi_branch_graph(True, True)
self._test_multi_branch_graph(True, False)
self._test_multi_branch_graph(False, False)

def test_multi_branch_more_levels(self):
"""Like test_skipped_propagates, but events propagate more levels"""
for levels in (3, 4, 5, 6, 7):
self._test_multi_branch_graph(True, levels)
self._test_multi_branch_graph(False, levels)

0 comments on commit fd85ba0

Please sign in to comment.