Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIU-165 Docker Service Drops #61

Closed
wants to merge 17 commits into from
Closed
44 changes: 32 additions & 12 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ class Categories:
DOCKER = 'Docker'
DYNLIB_PROC_APP = 'DynlibProcApp'

SERVICE = 'Service'
DOCKER_SERVICE = 'DockerService'

COMMENT = 'Comment'
DESCRIPTION = 'Description'

STORAGE_TYPES = {
Categories.MEMORY,
Categories.FILE,
Categories.NGAS,
Categories.NULL,
Categories.JSON,
Categories.PLASMA,
Categories.PLASMAFLIGHT
}
Categories.MEMORY,
Categories.FILE,
Categories.NGAS,
Categories.NULL,
Categories.JSON,
Categories.PLASMA,
Categories.PLASMAFLIGHT
}
APP_DROP_TYPES = [
Categories.COMPONENT,
Categories.PYTHON_APP,
Expand All @@ -75,8 +78,19 @@ class Categories:
Categories.DYNLIB_APP,
Categories.DOCKER,
Categories.DYNLIB_PROC_APP,
Categories.SERVICE,
Categories.DOCKER_SERVICE
]


class DropType:
PLAIN = 'plain'
SOCKET = 'socket'
APP = 'app' # Application drop that terminates onces executed
SERVICE_APP = 'serviceapp' # App drop that runs continously
CONTAINER = 'container' # Drop that contains other drops


def b2s(b, enc='utf8'):
"Converts bytes into a string"
return b.decode(enc)
Expand Down Expand Up @@ -139,12 +153,12 @@ def get_roots(pg_spec):
oid = dropspec['oid']
all_oids.add(oid)

if dropspec["type"] in ('app', 'socket'):
if dropspec["type"] in (DropType.APP, DropType.SOCKET):
if dropspec.get('inputs', None) or dropspec.get('streamingInputs', None):
nonroots.add(oid)
if dropspec.get('outputs', None):
nonroots |= set(dropspec['outputs'])
elif dropspec["type"] == 'plain':
elif dropspec["type"] == DropType.PLAIN:
if dropspec.get('producers', None):
nonroots.add(oid)
if dropspec.get('consumers', None):
Expand All @@ -170,14 +184,20 @@ def get_leaves(pg_spec):
oid = dropspec['oid']
all_oids.add(oid)

if dropspec["type"] == 'app':
if dropspec["type"] == DropType.APP:
if dropspec.get('outputs', None):
nonleaves.add(oid)
if dropspec.get('streamingInputs', None):
nonleaves |= set(dropspec['streamingInputs'])
if dropspec.get('inputs', None):
nonleaves |= set(dropspec['inputs'])
elif dropspec["type"] == 'plain':
if dropspec["type"] == DropType.SERVICE_APP:
nonleaves.add(oid) # services are never leaves
if dropspec.get('streamingInputs', None):
nonleaves |= set(dropspec['streamingInputs'])
if dropspec.get('inputs', None):
nonleaves |= set(dropspec['inputs'])
elif dropspec["type"] == DropType.PLAIN:
if dropspec.get('producers', None):
nonleaves |= set(dropspec['producers'])
if dropspec.get('consumers', None) or dropspec.get('streamingConsumers', None):
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/build_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ case "$1" in
echo "Build finished!"
exit 1 ;;
"dev")
C_TAG="master"
[[ ! -z $2 ]] && C_TAG=$2
#C_TAG="master"
#[[ ! -z $2 ]] && C_TAG=$2
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-engine development version using daliuge-common:${C_TAG}"
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Building daliuge-engine development version using daliuge-common:${VCS_TAG}"
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1;;
"casa")
Expand Down
59 changes: 40 additions & 19 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import os
import threading
import time
from overrides import overrides

from configobj import ConfigObj
import docker
Expand Down Expand Up @@ -369,28 +370,25 @@ def run(self):

c = DockerApp._get_client()

# Remove the container unless it's specified that we should keep it
# (used below)
def rm(container):
if self._removeContainer:
container.remove()

# Create container
container = c.containers.create(
self.container = c.containers.create(
self._image,
cmd,
volumes=binds,
ports=portMappings,
user=user,
environment=env,
working_dir=self.workdir
working_dir=self.workdir,
init=True,
auto_remove=self._removeContainer
)
self._containerId = cId = container.id
self._containerId = cId = self.container.id
logger.info("Created container %s for %r", cId, self)
logger.debug(f"autoremove container {self._removeContainer}")

# Start it
start = time.time()
container.start()
self.container.start()
logger.info("Started container %s", cId)

# Figure out the container's IP and save it
Expand All @@ -400,10 +398,14 @@ def rm(container):
logger.debug("Docker inspection: %r", inspection)
self.containerIp = inspection['NetworkSettings']['IPAddress']

# Capture output
stdout = self.container.logs(stream=False, stdout=True, stderr=False)
stderr = self.container.logs(stream=False, stdout=False, stderr=True)

# Wait until it finishes
# In docker-py < 3 the .wait() method returns the exit code directly
# In docker-py >= 3 the .wait() method returns a dictionary with the API response
x = container.wait()
x = self.container.wait()
if isinstance(x, dict) and 'StatusCode' in x:
self._exitCode = x['StatusCode']
else:
Expand All @@ -413,20 +415,39 @@ def rm(container):
logger.info("Container %s finished in %.2f [s] with exit code %d", cId, (end-start), self._exitCode)

if self._exitCode == 0 and logger.isEnabledFor(logging.DEBUG):
stdout = container.logs(stream=False, stdout=True, stderr=False)
stderr = container.logs(stream=False, stdout=False, stderr=True)
logger.debug("Container %s finished successfully, output follows.\n==STDOUT==\n%s==STDERR==\n%s", cId, stdout, stderr)
elif self._exitCode != 0:
stdout = container.logs(stream=False, stdout=True, stderr=False)
stderr = container.logs(stream=False, stdout=False, stderr=True)
msg = "Container %s didn't finish successfully (exit code %d)" % (cId, self._exitCode)
logger.error(msg + ", output follows.\n==STDOUT==\n%s==STDERR==\n%s", stdout, stderr)
rm(container)
raise Exception(msg)

rm(container)
if self._exitCode == 137 or self._exitCode == 139 or self._exitCode == 143:
# termination via SIGKILL, SIGSEGV, and SIGTERM is expected for some services
logger.warning(msg + ", output follows.\n==STDOUT==\n%s==STDERR==\n%s", stdout, stderr)
else:
logger.error(msg + ", output follows.\n==STDOUT==\n%s==STDERR==\n%s", stdout, stderr)
raise Exception(msg)

c.api.close()

@overrides
def setCompleted(self):
super(BarrierAppDROP, self).setCompleted()
self.container.stop()

@overrides
def setError(self):
super(BarrierAppDROP, self).setError()
self.container.stop()

@overrides
def cancel(self):
super(BarrierAppDROP, self).cancel()
self.container.stop()

@overrides
def skip(self):
super(BarrierAppDROP, self).skip()
self.container.stop()

@classmethod
def _get_client(cls):
return docker.from_env(version='auto', **cls._kwargs_from_env())
Expand Down
8 changes: 8 additions & 0 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ def __init__(self, oid, uid, **kwargs):
self._oid = str(oid)
self._uid = str(uid)

# The physical graph drop type. This is determined
# by the drop category when generating the drop spec
self._type = self._getArg(kwargs, 'type', None)

# The Session owning this drop, if any
# In most real-world situations this attribute will be set, but in
# general it cannot be assumed it will (e.g., unit tests create drops
Expand Down Expand Up @@ -603,6 +607,10 @@ def uid(self):
"""
return self._uid

@property
def type(self):
return self._type

@property
def executionMode(self):
"""
Expand Down
18 changes: 10 additions & 8 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import traceback

from .ddap_protocol import DROPStates
from .drop import AppDROP
from .drop import AppDROP, AbstractDROP
from .apps.dockerapp import DockerApp
from .io import IOForURL, OpenMode
from . import common
from .common import DropType


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -150,7 +152,7 @@ def getUpstreamObjects(drop):
def getDownstreamObjects(drop):
"""
Returns a list of all direct "downstream" DROPs for the given
DROP. An DROP A is "downstream" with respect to DROP B if
DROP. A DROP A is "downstream" with respect to DROP B if
any of the following conditions are true:
* A is an output of B (therefore B is an AppDROP)
* A is a normal or streaming consumer of B (and A is therefore an AppDROP)
Expand All @@ -166,15 +168,15 @@ def getDownstreamObjects(drop):
downObjs += drop.streamingConsumers
return downObjs

def getLeafNodes(nodes):
def getLeafNodes(drops):
"""
Returns a list of all the "leaf nodes" of the graph pointed by `nodes`.
`nodes` is either a single DROP, or a list of DROPs.
Returns a list of all the "leaf nodes" of the graph pointed by `drops`.
`drops` is either a single DROP, or a list of DROPs.
"""
nodes = listify(nodes)
return [drop for drop,_ in breadFirstTraverse(nodes) if not getDownstreamObjects(drop)]
drops = listify(drops)
return [drop for drop,_ in breadFirstTraverse(drops) if not getDownstreamObjects(drop) and drop.type != DropType.SERVICE_APP]

def depthFirstTraverse(node, visited = []):
def depthFirstTraverse(node: AbstractDROP, visited = []):
"""
Depth-first iterator for a DROP graph.

Expand Down
38 changes: 31 additions & 7 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
LINKTYPE_1TON_APPEND_METHOD, NullDROP, PlasmaDROP, PlasmaFlightDROP
from .exceptions import InvalidGraphException
from .json_drop import JsonDROP
from .common import Categories
from .common import Categories, DropType


STORAGE_TYPES = {
Expand Down Expand Up @@ -281,8 +281,8 @@ def _createContainer(dropSpec, dryRun=False, session=None):
kwargs = _getKwargs(dropSpec)

# if no 'container' is specified, we default to ContainerDROP
if 'container' in dropSpec:
containerTypeName = dropSpec['container']
if DropType.CONTAINER in dropSpec:
containerTypeName = dropSpec[DropType.CONTAINER]
parts = containerTypeName.split('.')

# Support old "dfms..." package names (pre-Oct2017)
Expand Down Expand Up @@ -312,6 +312,28 @@ def _createApp(dropSpec, dryRun=False, session=None):
kwargs = _getKwargs(dropSpec)
del kwargs['app']

appName = dropSpec[DropType.APP]
parts = appName.split('.')

# Support old "dfms..." package names (pre-Oct2017)
if parts[0] == 'dfms':
parts[0] = 'dlg'

try:
module = importlib.import_module('.'.join(parts[:-1]))
appType = getattr(module, parts[-1])
except (ImportError, AttributeError):
raise InvalidGraphException("drop %s specifies non-existent application: %s" % (oid, appName,))

if dryRun:
return
return appType(oid, uid, dlg_session=session, **kwargs)

def _createServiceApp(dropSpec, dryRun=False, session=None):
oid, uid = _getIds(dropSpec)
kwargs = _getKwargs(dropSpec)
del kwargs['app']

appName = dropSpec['app']
parts = appName.split('.')

Expand Down Expand Up @@ -344,9 +366,11 @@ def _getKwargs(dropSpec):
del kwargs['uid']
return kwargs


__CREATION_FUNCTIONS = {
'plain': _createPlain,
'container': _createContainer,
'app': _createApp,
'socket': _createSocket
DropType.PLAIN: _createPlain,
DropType.CONTAINER: _createContainer,
DropType.APP: _createApp,
DropType.SERVICE_APP: _createServiceApp,
DropType.SOCKET: _createSocket
}
4 changes: 4 additions & 0 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ def add_node_subscriptions(self, relationships):
def finish(self):
self.status = SessionStates.FINISHED
logger.info("Session %s finished", self._sessionId)
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
downStreamDrops[:] = [dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)]
if drop.status not in (DROPStates.ERROR, DROPStates.COMPLETED, DROPStates.CANCELLED):
drop.setCompleted()

def getGraphStatus(self):
if self.status not in (SessionStates.RUNNING, SessionStates.FINISHED, SessionStates.CANCELLED):
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def run(self):
"lockfile",
# 0.10.6 builds correctly with old (<=3.10) Linux kernels
"netifaces>=0.10.6",
"overrides",
"paramiko",
"psutil",
"python-daemon",
Expand Down
8 changes: 4 additions & 4 deletions daliuge-translator/build_translator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ case "$1" in
echo "Build finished!"
exit 1 ;;
"dev")
C_TAG="master"
[[ ! -z "$2" ]] && C_TAG=$2
#C_TAG="master"
#[[ ! -z "$2" ]] && C_TAG=$2
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-translator development version using daliuge-common:${C_TAG}"
echo "Building daliuge-translator development version using daliuge-common:${VCS_TAG}"
# The complete casa and arrow installation is only required for the Plasma streaming
# and should not go much further.
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-translator:${VCS_TAG} -f docker/Dockerfile .
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-translator:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1;;
"casa")
Expand Down
7 changes: 6 additions & 1 deletion daliuge-translator/dlg/dropmake/dm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,13 @@ def convert_construct(lgo):
for afd in node[app_fd_name]:
app_node[afd["name"]] = afd["value"]
break
if Categories.GATHER == node["category"]:

if node["category"] == Categories.GATHER:
app_node["group_start"] = 1

if node['category'] == Categories.SERVICE:
app_node['isService'] = True

new_nodes.append(app_node)

# step 2
Expand Down
Loading