Skip to content

Commit

Permalink
Merge df89922 into 3cf8254
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Aug 26, 2021
2 parents 3cf8254 + df89922 commit 859356b
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 110 deletions.
42 changes: 30 additions & 12 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,20 @@ class Categories:
DOCKER = 'Docker'
DYNLIB_PROC_APP = 'DynlibProcApp'

SERVICE = 'Service'

COMMENT = 'Comment'
DESCRIPTION = 'Description'

STORAGE_TYPES = {
Categories.MEMORY,
Categories.FILE,
Categories.NGAS,
Categories.NULL,
Categories.JSON,
Categories.PLASMA,
Categories.PLASMAFLIGHT
}
Categories.MEMORY,
Categories.FILE,
Categories.NGAS,
Categories.NULL,
Categories.JSON,
Categories.PLASMA,
Categories.PLASMAFLIGHT
}
APP_DROP_TYPES = [
Categories.COMPONENT,
Categories.PYTHON_APP,
Expand All @@ -75,8 +77,18 @@ class Categories:
Categories.DYNLIB_APP,
Categories.DOCKER,
Categories.DYNLIB_PROC_APP,
Categories.SERVICE,
]


class DropType:
PLAIN = 'plain'
SOCKET = 'socket'
APP = 'app' # Application drop that terminates onces executed
SERVICE_APP = 'serviceapp' # App drop that runs continously
CONTAINER = 'container' # Drop that contains other drops


def b2s(b, enc='utf8'):
"Converts bytes into a string"
return b.decode(enc)
Expand Down Expand Up @@ -139,12 +151,12 @@ def get_roots(pg_spec):
oid = dropspec['oid']
all_oids.add(oid)

if dropspec["type"] in ('app', 'socket'):
if dropspec["type"] in (DropType.APP, DropType.SOCKET):
if dropspec.get('inputs', None) or dropspec.get('streamingInputs', None):
nonroots.add(oid)
if dropspec.get('outputs', None):
nonroots |= set(dropspec['outputs'])
elif dropspec["type"] == 'plain':
elif dropspec["type"] == DropType.PLAIN:
if dropspec.get('producers', None):
nonroots.add(oid)
if dropspec.get('consumers', None):
Expand All @@ -170,14 +182,20 @@ def get_leaves(pg_spec):
oid = dropspec['oid']
all_oids.add(oid)

if dropspec["type"] == 'app':
if dropspec["type"] == DropType.APP:
if dropspec.get('outputs', None):
nonleaves.add(oid)
if dropspec.get('streamingInputs', None):
nonleaves |= set(dropspec['streamingInputs'])
if dropspec.get('inputs', None):
nonleaves |= set(dropspec['inputs'])
elif dropspec["type"] == 'plain':
if dropspec["type"] == DropType.SERVICE_APP:
nonleaves.add(oid) # services are never leaves
if dropspec.get('streamingInputs', None):
nonleaves |= set(dropspec['streamingInputs'])
if dropspec.get('inputs', None):
nonleaves |= set(dropspec['inputs'])
elif dropspec["type"] == DropType.PLAIN:
if dropspec.get('producers', None):
nonleaves |= set(dropspec['producers'])
if dropspec.get('consumers', None) or dropspec.get('streamingConsumers', None):
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/build_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ case "$1" in
echo "Build finished!"
exit 1 ;;
"dev")
C_TAG="master"
[[ ! -z $2 ]] && C_TAG=$2
#C_TAG="master"
#[[ ! -z $2 ]] && C_TAG=$2
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-engine development version using daliuge-common:${C_TAG}"
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Building daliuge-engine development version using daliuge-common:${VCS_TAG}"
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1;;
"casa")
Expand Down
59 changes: 40 additions & 19 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import os
import threading
import time
from overrides import overrides

from configobj import ConfigObj
import docker
Expand Down Expand Up @@ -369,28 +370,25 @@ def run(self):

c = DockerApp._get_client()

# Remove the container unless it's specified that we should keep it
# (used below)
def rm(container):
if self._removeContainer:
container.remove()

# Create container
container = c.containers.create(
self.container = c.containers.create(
self._image,
cmd,
volumes=binds,
ports=portMappings,
user=user,
environment=env,
working_dir=self.workdir
working_dir=self.workdir,
init=True,
auto_remove=self._removeContainer
)
self._containerId = cId = container.id
self._containerId = cId = self.container.id
logger.info("Created container %s for %r", cId, self)
logger.debug(f"autoremove container {self._removeContainer}")

# Start it
start = time.time()
container.start()
self.container.start()
logger.info("Started container %s", cId)

# Figure out the container's IP and save it
Expand All @@ -400,10 +398,14 @@ def rm(container):
logger.debug("Docker inspection: %r", inspection)
self.containerIp = inspection['NetworkSettings']['IPAddress']

# Capture output
stdout = self.container.logs(stream=False, stdout=True, stderr=False)
stderr = self.container.logs(stream=False, stdout=False, stderr=True)

# Wait until it finishes
# In docker-py < 3 the .wait() method returns the exit code directly
# In docker-py >= 3 the .wait() method returns a dictionary with the API response
x = container.wait()
x = self.container.wait()
if isinstance(x, dict) and 'StatusCode' in x:
self._exitCode = x['StatusCode']
else:
Expand All @@ -413,20 +415,39 @@ def rm(container):
logger.info("Container %s finished in %.2f [s] with exit code %d", cId, (end-start), self._exitCode)

if self._exitCode == 0 and logger.isEnabledFor(logging.DEBUG):
stdout = container.logs(stream=False, stdout=True, stderr=False)
stderr = container.logs(stream=False, stdout=False, stderr=True)
logger.debug("Container %s finished successfully, output follows.\n==STDOUT==\n%s==STDERR==\n%s", cId, stdout, stderr)
elif self._exitCode != 0:
stdout = container.logs(stream=False, stdout=True, stderr=False)
stderr = container.logs(stream=False, stdout=False, stderr=True)
msg = "Container %s didn't finish successfully (exit code %d)" % (cId, self._exitCode)
logger.error(msg + ", output follows.\n==STDOUT==\n%s==STDERR==\n%s", stdout, stderr)
rm(container)
raise Exception(msg)

rm(container)
if self._exitCode == 137 or self._exitCode == 139 or self._exitCode == 143:
# termination via SIGKILL, SIGSEGV, and SIGTERM is expected for some services
logger.warning(msg + ", output follows.\n==STDOUT==\n%s==STDERR==\n%s", stdout, stderr)
else:
logger.error(msg + ", output follows.\n==STDOUT==\n%s==STDERR==\n%s", stdout, stderr)
raise Exception(msg)

c.api.close()

@overrides
def setCompleted(self):
super(BarrierAppDROP, self).setCompleted()
self.container.stop()

@overrides
def setError(self):
super(BarrierAppDROP, self).setError()
self.container.stop()

@overrides
def cancel(self):
super(BarrierAppDROP, self).cancel()
self.container.stop()

@overrides
def skip(self):
super(BarrierAppDROP, self).skip()
self.container.stop()

@classmethod
def _get_client(cls):
return docker.from_env(version='auto', **cls._kwargs_from_env())
Expand Down
8 changes: 8 additions & 0 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ def __init__(self, oid, uid, **kwargs):
self._oid = str(oid)
self._uid = str(uid)

# The physical graph drop type. This is determined
# by the drop category when generating the drop spec
self._type = self._getArg(kwargs, 'type', None)

# The Session owning this drop, if any
# In most real-world situations this attribute will be set, but in
# general it cannot be assumed it will (e.g., unit tests create drops
Expand Down Expand Up @@ -603,6 +607,10 @@ def uid(self):
"""
return self._uid

@property
def type(self):
return self._type

@property
def executionMode(self):
"""
Expand Down
18 changes: 10 additions & 8 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import traceback

from .ddap_protocol import DROPStates
from .drop import AppDROP
from .drop import AppDROP, AbstractDROP
from .apps.dockerapp import DockerApp
from .io import IOForURL, OpenMode
from . import common
from .common import DropType


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -150,7 +152,7 @@ def getUpstreamObjects(drop):
def getDownstreamObjects(drop):
"""
Returns a list of all direct "downstream" DROPs for the given
DROP. An DROP A is "downstream" with respect to DROP B if
DROP. A DROP A is "downstream" with respect to DROP B if
any of the following conditions are true:
* A is an output of B (therefore B is an AppDROP)
* A is a normal or streaming consumer of B (and A is therefore an AppDROP)
Expand All @@ -166,15 +168,15 @@ def getDownstreamObjects(drop):
downObjs += drop.streamingConsumers
return downObjs

def getLeafNodes(nodes):
def getLeafNodes(drops):
"""
Returns a list of all the "leaf nodes" of the graph pointed by `nodes`.
`nodes` is either a single DROP, or a list of DROPs.
Returns a list of all the "leaf nodes" of the graph pointed by `drops`.
`drops` is either a single DROP, or a list of DROPs.
"""
nodes = listify(nodes)
return [drop for drop,_ in breadFirstTraverse(nodes) if not getDownstreamObjects(drop)]
drops = listify(drops)
return [drop for drop,_ in breadFirstTraverse(drops) if not getDownstreamObjects(drop) and drop.type != DropType.SERVICE_APP]

def depthFirstTraverse(node, visited = []):
def depthFirstTraverse(node: AbstractDROP, visited = []):
"""
Depth-first iterator for a DROP graph.
Expand Down
18 changes: 10 additions & 8 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
LINKTYPE_1TON_APPEND_METHOD, NullDROP, PlasmaDROP, PlasmaFlightDROP
from .exceptions import InvalidGraphException
from .json_drop import JsonDROP
from .common import Categories
from .common import Categories, DropType


STORAGE_TYPES = {
Expand Down Expand Up @@ -281,8 +281,8 @@ def _createContainer(dropSpec, dryRun=False, session=None):
kwargs = _getKwargs(dropSpec)

# if no 'container' is specified, we default to ContainerDROP
if 'container' in dropSpec:
containerTypeName = dropSpec['container']
if DropType.CONTAINER in dropSpec:
containerTypeName = dropSpec[DropType.CONTAINER]
parts = containerTypeName.split('.')

# Support old "dfms..." package names (pre-Oct2017)
Expand Down Expand Up @@ -312,7 +312,7 @@ def _createApp(dropSpec, dryRun=False, session=None):
kwargs = _getKwargs(dropSpec)
del kwargs['app']

appName = dropSpec['app']
appName = dropSpec[DropType.APP]
parts = appName.split('.')

# Support old "dfms..." package names (pre-Oct2017)
Expand Down Expand Up @@ -344,9 +344,11 @@ def _getKwargs(dropSpec):
del kwargs['uid']
return kwargs


__CREATION_FUNCTIONS = {
'plain': _createPlain,
'container': _createContainer,
'app': _createApp,
'socket': _createSocket
DropType.PLAIN: _createPlain,
DropType.CONTAINER: _createContainer,
DropType.APP: _createApp,
DropType.SERVICE_APP: _createApp,
DropType.SOCKET: _createSocket
}
4 changes: 4 additions & 0 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ def add_node_subscriptions(self, relationships):
def finish(self):
self.status = SessionStates.FINISHED
logger.info("Session %s finished", self._sessionId)
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
downStreamDrops[:] = [dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)]
if drop.status not in (DROPStates.ERROR, DROPStates.COMPLETED, DROPStates.CANCELLED):
drop.setCompleted()

def getGraphStatus(self):
if self.status not in (SessionStates.RUNNING, SessionStates.FINISHED, SessionStates.CANCELLED):
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def run(self):
"lockfile",
# 0.10.6 builds correctly with old (<=3.10) Linux kernels
"netifaces>=0.10.6",
"overrides",
"paramiko",
"psutil",
"python-daemon",
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def checkDropStates(aStatus, dStatus, eStatus, lastByte):
if lastByte is not None:
self.assertEqual(lastByte, b._lastByte)

checkDropStates(DROPStates.INITIALIZED , DROPStates.INITIALIZED, DROPStates.INITIALIZED, None)
checkDropStates(DROPStates.INITIALIZED, DROPStates.INITIALIZED, DROPStates.INITIALIZED, None)
a.write(b'abcde')
checkDropStates(DROPStates.WRITING, DROPStates.WRITING, DROPStates.INITIALIZED, b'e')
a.write(b'fghij')
Expand Down
8 changes: 4 additions & 4 deletions daliuge-translator/build_translator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ case "$1" in
echo "Build finished!"
exit 1 ;;
"dev")
C_TAG="master"
[[ ! -z "$2" ]] && C_TAG=$2
#C_TAG="master"
#[[ ! -z "$2" ]] && C_TAG=$2
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-translator development version using daliuge-common:${C_TAG}"
echo "Building daliuge-translator development version using daliuge-common:${VCS_TAG}"
# The complete casa and arrow installation is only required for the Plasma streaming
# and should not go much further.
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-translator:${VCS_TAG} -f docker/Dockerfile .
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-translator:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1;;
"casa")
Expand Down
Loading

0 comments on commit 859356b

Please sign in to comment.