Skip to content

Commit

Permalink
Merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Jul 16, 2021
2 parents 5e17344 + 9d8ce20 commit 16cba1c
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 913 deletions.
3 changes: 2 additions & 1 deletion daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class Categories:
GATHER = 'Gather'
GROUP_BY = 'GroupBy'
LOOP = 'Loop'
BRANCH = 'Branch'
VARIABLES = 'Variables'

BRANCH = 'Branch'
DATA = 'Data'
COMPONENT = 'Component'
PYTHON_APP = 'PythonApp'
Expand All @@ -67,6 +67,7 @@ class Categories:
APP_DROP_TYPES = [
Categories.COMPONENT,
Categories.PYTHON_APP,
Categories.BRANCH,
Categories.BASH_SHELL_APP,
Categories.MPI,
Categories.DYNLIB_APP,
Expand Down
6 changes: 4 additions & 2 deletions daliuge-engine/build_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ case "$1" in
echo "Build finished!"
exit 1 ;;
"dev")
C_TAG="master"
[[ ! -z $2 ]] && C_TAG=$2
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-engine development version using daliuge-common:master"
docker build --build-arg VCS_TAG=master --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Building daliuge-engine development version using daliuge-common:${C_TAG}"
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1;;
"casa")
Expand Down
15 changes: 14 additions & 1 deletion daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import numpy as np

from .. import droputils, utils
from ..drop import BarrierAppDROP, ContainerDROP
from ..drop import BarrierAppDROP, BranchAppDrop, ContainerDROP
from ..meta import dlg_float_param, dlg_string_param
from ..meta import dlg_bool_param, dlg_int_param
from ..meta import dlg_component, dlg_batch_input
Expand Down Expand Up @@ -373,3 +373,16 @@ def run(self):
for o in outs:
o.len = len(content)
o.write(content) # send content to all outputs

class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

def initialize(self, **kwargs):
self.result = self._getArg(kwargs, 'result', True)
BranchAppDrop.initialize(self, **kwargs)

def run(self):
pass

def condition(self):
return self.result
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/ddap_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DROPStates:
COMPLETED. Later, they transition through EXPIRED, eventually arriving to
DELETED.
"""
INITIALIZED, WRITING, COMPLETED, ERROR, EXPIRED, DELETED, CANCELLED = range(7)
INITIALIZED, WRITING, COMPLETED, ERROR, EXPIRED, DELETED, CANCELLED, SKIPPED = range(8)


class AppDROPStates:
Expand All @@ -54,7 +54,7 @@ class AppDROPStates:
are started. Depending on the execution result they eventually move to the
FINISHED or ERROR state.
"""
NOT_RUN, RUNNING, FINISHED, ERROR, CANCELLED = range(5)
NOT_RUN, RUNNING, FINISHED, ERROR, CANCELLED, SKIPPED = range(6)


class DROPPhases:
Expand Down
51 changes: 46 additions & 5 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ def setError(self):
Moves this DROP to the ERROR state.
'''

if self.status == DROPStates.CANCELLED:
if self.status in (DROPStates.CANCELLED, DROPStates.SKIPPED):
return

self._closeWriters()
Expand All @@ -945,7 +945,10 @@ def setCompleted(self):
status = self.status
if status == DROPStates.CANCELLED:
return
if status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
elif status == DROPStates.SKIPPED:
self._fire('dropCompleted', status=status)
return
elif status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception("%r not in INITIALIZED or WRITING state (%s), cannot setComplete()" % (self, self.status))

self._closeWriters()
Expand All @@ -969,6 +972,11 @@ def cancel(self):
self._closeWriters()
self.status = DROPStates.CANCELLED

def skip(self):
'''Moves this drop to the SKIPPED state closing any writers we opened'''
self._closeWriters()
self.status = DROPStates.SKIPPED

@property
def node(self):
return self._node
Expand Down Expand Up @@ -1560,6 +1568,14 @@ def cancel(self):
super(AppDROP, self).cancel()
self.execStatus = AppDROPStates.CANCELLED

def skip(self):
'''Moves this application drop to its SKIPPED state'''
super().skip()
self.execStatus = AppDROPStates.SKIPPED
for o in self._outputs.values():
o.skip()
self._fire('producerFinished', status=self.status, execStatus=self.execStatus)


class InputFiredAppDROP(AppDROP):
"""
Expand Down Expand Up @@ -1594,6 +1610,7 @@ def initialize(self, **kwargs):
super(InputFiredAppDROP, self).initialize(**kwargs)
self._completedInputs = []
self._errorInputs = []
self._skippedInputs = []

# Error threshold must be within 0 and 100
if self.input_error_threshold < 0 or self.input_error_threshold > 100:
Expand Down Expand Up @@ -1634,13 +1651,18 @@ def dropCompleted(self, uid, drop_state):
self._errorInputs.append(uid)
elif drop_state == DROPStates.COMPLETED:
self._completedInputs.append(uid)
elif drop_state == DROPStates.SKIPPED:
self._skippedInputs.append(uid)
else:
raise Exception('Invalid DROP state in dropCompleted: %s' % drop_state)

error_len = len(self._errorInputs)
ok_len = len(self._completedInputs)
skipped_len = len(self._skippedInputs)

# We have enough inputs to proceed
if (skipped_len + error_len + ok_len) == n_eff_inputs:

if (error_len + ok_len) == n_eff_inputs:
# calculate the number of errors that have already occurred
percent_failed = math.floor((error_len/float(n_eff_inputs)) * 100)

Expand All @@ -1654,6 +1676,8 @@ def dropCompleted(self, uid, drop_state):
self.execStatus = AppDROPStates.ERROR
self.status = DROPStates.ERROR
self._notifyAppIsFinished()
elif skipped_len == n_eff_inputs:
self.skip()
else:
self.async_execute()

Expand All @@ -1668,7 +1692,7 @@ def async_execute(self):
t.start()

@track_current_drop
def execute(self):
def execute(self, _send_notifications=True):
"""
Manually trigger the execution of this application.
Expand Down Expand Up @@ -1703,7 +1727,8 @@ def execute(self):
drop_state = DROPStates.ERROR

self.status = drop_state
self._notifyAppIsFinished()
if _send_notifications:
self._notifyAppIsFinished()

def run(self):
"""
Expand All @@ -1726,6 +1751,22 @@ def initialize(self, **kwargs):
super(BarrierAppDROP, self).initialize(**kwargs)


class BranchAppDrop(BarrierAppDROP):
"""
A special kind of application with exactly two outputs. After normal
execution, the application decides whether a certain condition is met.
If the condition is met, the first output is considered as COMPLETED,
while the other is moved to SKIPPED state, and vice-versa.
"""

@track_current_drop
def execute(self, _send_notifications=True):
if len(self._outputs) != 2:
raise InvalidDropException(self, f'BranchAppDrops should have exactly 2 outputs, not {len(self._outputs)}')
BarrierAppDROP.execute(self, _send_notifications=False)
self.outputs[1 if self.condition() else 0].skip()
self._notifyAppIsFinished()

class PlasmaDROP(AbstractDROP):
'''
A DROP that points to data stored in a Plasma Store
Expand Down
8 changes: 5 additions & 3 deletions daliuge-engine/dlg/manager/web/session.html
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@

var status_update_handler = function(statuses)
{

// This is the order in which blocks are drawn in the progress bar,
// so we want "done" states first and "nothing happened yet" states
// towards the end
var states = ['completed', 'finished',
'running', 'writing',
'error', 'expired', 'deleted',
'cancelled',
'cancelled', 'skipped',
'not_run', 'initialized'];
var states_idx = d3.scale.ordinal().domain(states).rangePoints([0, states.length - 1]);

Expand All @@ -111,7 +113,7 @@

/* Get total and per-status counts, then normalize to 0-100% */
var total = statuses.length;
var status_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
var status_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
statuses.reduce(function(status_counts, s) {
var idx = states_idx(get_status_name(s));
status_counts[idx] = status_counts[idx] + 1;
Expand Down
126 changes: 124 additions & 2 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import random
import shutil
import sqlite3
import string
import tempfile

from dlg import droputils
Expand All @@ -35,6 +36,7 @@
DirectoryContainer, ContainerDROP, InputFiredAppDROP, RDBMSDrop
from dlg.droputils import DROPWaiterCtx
from dlg.exceptions import InvalidDropException
from dlg.apps.simple import NullBarrierApp, SimpleBranch


try:
Expand Down Expand Up @@ -780,5 +782,125 @@ def test_rdbms_drop(self):
finally:
os.unlink(dbfile)

if __name__ == '__main__':
unittest.main()
class BranchAppDropTests(unittest.TestCase):
"""Tests for the BranchAppDrop class"""

def _simple_branch_with_outputs(self, result, uids):
a = SimpleBranch(uids[0], uids[0], result=result)
b, c = (InMemoryDROP(x, x) for x in uids[1:])
a.addOutput(b)
a.addOutput(c)
return a, b, c

def _assert_drop_in_status(self, drop, status, execStatus):
self.assertEqual(drop.status, status, f'{drop} has status {drop.status} != {status}')
if isinstance(drop, AppDROP):
self.assertEqual(drop.execStatus, execStatus, f'{drop} has execStatus {drop.execStatus} != {execStatus}')

def _assert_drop_complete_or_skipped(self, drop, complete_expected):
if complete_expected:
self._assert_drop_in_status(drop, DROPStates.COMPLETED, AppDROPStates.FINISHED)
else:
self._assert_drop_in_status(drop, DROPStates.SKIPPED, AppDROPStates.SKIPPED)

def _test_single_branch_graph(self, result, levels):
"""
Test this graph, where each level appends one drop to each branch
,-- true --> B --> ...
A ---- false --> C --> ...
"""
a, b, c = self._simple_branch_with_outputs(result, 'abc')
last_true = b
last_false = c
all_drops = [a, b, c]

# all_uids is ['de', 'fg', 'hi', ....]
all_uids = [string.ascii_lowercase[i: i + 2]
for i in range(3, len(string.ascii_lowercase), 2)]

for level, uids in zip(range(levels), all_uids):
if level % 2:
x, y = (InMemoryDROP(uid, uid) for uid in uids)
last_true.addOutput(x)
last_false.addOutput(y)
else:
x, y = (NullBarrierApp(uid, uid) for uid in uids)
last_true.addConsumer(x)
last_false.addConsumer(y)
all_drops += [x, y]
last_true = x
last_false = y

with DROPWaiterCtx(self, [last_true, last_false], 2, [DROPStates.COMPLETED, DROPStates.SKIPPED]):
a.async_execute()

# Depending on "result", the "true" branch will be run or skipped
self._assert_drop_complete_or_skipped(last_true, result)
self._assert_drop_complete_or_skipped(last_false, not result)

def test_simple_branch(self):
"""Check that simple branch event transmission wroks"""
self._test_single_branch_graph(True, 0)
self._test_single_branch_graph(False, 0)

def test_simple_branch_one_level(self):
"""Like test_simple_branch_app, but events propagate downstream one level"""
self._test_single_branch_graph(True, 1)
self._test_single_branch_graph(False, 1)

def test_simple_branch_two_levels(self):
"""Like test_skipped_propagates, but events propagate more levels"""
self._test_single_branch_graph(True, 2)
self._test_single_branch_graph(False, 2)

def test_simple_branch_more_levels(self):
for levels in (3, 4, 5, 6, 7):
self._test_single_branch_graph(True, levels)
self._test_single_branch_graph(False, levels)

def _test_multi_branch_graph(self, *results):
"""
Test this graph, each level appends a new branch to the first output
of the previous branch.
,- false --> C ,- false --> F
A ----- true --> B --> D ----- true --> E --> ...
"""

a, b, c = self._simple_branch_with_outputs(results[0], 'abc')
all_drops = [a, b, c]
last_first_output = b

# all_uids is ['de', 'fg', 'hi', ....]
all_uids = [string.ascii_lowercase[i: i + 3]
for i in range(4, len(string.ascii_lowercase), 3)]

for uids, result in zip(all_uids, results[1:]):
x, y, z = self._simple_branch_with_outputs(result, uids)
all_drops += [x, y, z]
last_first_output.addConsumer(x)
last_first_output = y

with DROPWaiterCtx(self, all_drops, 2, [DROPStates.COMPLETED, DROPStates.SKIPPED]):
a.async_execute()

# TODO: Checking each individual drop depending on "results" would be a
# tricky business, so we are skipping that check for now.

def test_multi_branch_one_level(self):
"""Check that simple branch event transmission wroks"""
self._test_multi_branch_graph(True)
self._test_multi_branch_graph(False)

def test_multi_branch_two_levels(self):
"""Like test_simple_branch_app, but events propagate downstream one level"""
self._test_multi_branch_graph(True, True)
self._test_multi_branch_graph(True, False)
self._test_multi_branch_graph(False, False)

def test_multi_branch_more_levels(self):
"""Like test_skipped_propagates, but events propagate more levels"""
for levels in (3, 4, 5, 6, 7):
self._test_multi_branch_graph(True, levels)
self._test_multi_branch_graph(False, levels)
6 changes: 4 additions & 2 deletions daliuge-translator/build_translator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ case "$1" in
echo "Build finished!"
exit 1 ;;
"dev")
C_TAG="master"
[[ ! -z "$2" ]] && C_TAG=$2
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-translator development version using daliuge-common:master"
echo "Building daliuge-translator development version using daliuge-common:${C_TAG}"
# The complete casa and arrow installation is only required for the Plasma streaming
# and should not go much further.
docker build --build-arg VCS_TAG=master --no-cache -t icrar/daliuge-translator:${VCS_TAG} -f docker/Dockerfile .
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-translator:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1;;
"casa")
Expand Down
Binary file added daliuge-translator/dlg/dropmake/.DS_Store
Binary file not shown.
Loading

0 comments on commit 16cba1c

Please sign in to comment.