Skip to content

Commit

Permalink
Merge a06e313 into ea54ebb
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Apr 12, 2021
2 parents ea54ebb + a06e313 commit d40322e
Show file tree
Hide file tree
Showing 18 changed files with 134 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.ray
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ EXPOSE 8084
EXPOSE 9000


CMD ["dlg", "daemon", "-vv", "--no-nm"]
CMD ["dlg", "daemon", "-vv"]
25 changes: 17 additions & 8 deletions daliuge-engine/dlg/apps/archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ class NgasArchivingApp(ExternalStoreApp):
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

ngasSrv = dlg_string_param('NGAS Server', 'localhost')
ngasPort = dlg_int_param('NGAS Port', 7777)
ngasConnectTimeout = dlg_float_param('Connect Timeout', 2.)
ngasTimeout = dlg_float_param('Timeout', 2.)
ngasSrv = dlg_string_param('ngasSrv', 'localhost')
ngasPort = dlg_int_param('ngasPort', 7777)
ngasMime = dlg_string_param('ngasMime', 'application/octet-stream')
ngasTimeout = dlg_int_param('ngasTimeout', 2)
ngasConnectTimeout = dlg_int_param('ngasConnectTimeout', 2)

def initialize(self, **kwargs):
super(NgasArchivingApp, self).initialize(**kwargs)
Expand All @@ -92,12 +93,20 @@ def store(self, inDrop):
if isinstance(inDrop, ContainerDROP):
raise Exception("ContainerDROPs are not supported as inputs for this application")

size = -1 if inDrop.size is None else inDrop.size
logger.debug("Content-length %s", size)
if inDrop.size is None or inDrop.size < 0:
logger.error(
"NGAS requires content-length to be know, but the given input does not provide a size.")
size = None
else:
size = inDrop.size
logger.debug("Content-length %s", size)
try:
ngasIO = NgasIO(self.ngasSrv, inDrop.uid, self.ngasPort, self.ngasConnectTimeout, self.ngasTimeout, size)
ngasIO = NgasIO(self.ngasSrv, inDrop.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, size, mimeType=self.ngasMime)
except ImportError:
ngasIO = NgasLiteIO(self.ngasSrv, inDrop.uid, self.ngasPort, self.ngasConnectTimeout, self.ngasTimeout, size)
logger.warning("NgasIO library not available, falling back to NgasLiteIO.")
ngasIO = NgasLiteIO(self.ngasSrv, inDrop.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, size, mimeType=self.ngasMime)

ngasIO.open(OpenMode.OPEN_WRITE)

Expand Down
30 changes: 19 additions & 11 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,19 @@ def getmembers(object, predicate=None):
for attr_name, obj in getmembers(self, lambda a: not(inspect.isfunction(a) or isinstance(a, property))):
if isinstance(obj, dlg_float_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = float(value)
elif isinstance(obj, dlg_bool_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = bool(value)
elif isinstance(obj, dlg_int_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = int(value)
elif isinstance(obj, dlg_string_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = str(value)
elif isinstance(obj, dlg_list_param):
value = kwargs.get(attr_name, obj.default_value)
Expand Down Expand Up @@ -430,7 +430,7 @@ def read(self, descriptor, count=4096, **kwargs):

def _checkStateAndDescriptor(self, descriptor):
if self.status != DROPStates.COMPLETED:
raise Exception("%r is in state %s (!=COMPLETED), cannot be read" % (self.status,))
raise Exception("%r is in state %s (!=COMPLETED), cannot be read" % (self, self.status,))
if descriptor is None:
raise ValueError("Illegal empty descriptor given")
if descriptor not in self._rios:
Expand Down Expand Up @@ -1149,25 +1149,33 @@ class NgasDROP(AbstractDROP):
'''
ngasSrv = dlg_string_param('ngasSrv', 'localhost')
ngasPort = dlg_int_param('ngasPort', 7777)
ngasFileId = dlg_string_param('ngasFileId', None)
ngasTimeout = dlg_int_param('ngasTimeout', 2)
ngasConnectTimeout = dlg_int_param('ngasConnectTimeout', 2)
ngasMime = dlg_string_param('ngasMime', 'application/octet-stream')
len = dlg_int_param('len', -1)

def initialize(self, **kwargs):
pass
if self.len == -1:
self.len = self._size
if self.ngasFileId:
self.fileId = self.ngasFileId
else:
self.fileId = self.uid

def getIO(self):
try:
ngasIO = NgasIO(self.ngasSrv, self.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len)
ngasIO = NgasIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
except ImportError:
ngasIO = NgasLiteIO(self.ngasSrv, self.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len)
logger.warning('NgasIO not available, using NgasLiteIO instead')
ngasIO = NgasLiteIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
return ngasIO

@property
def dataURL(self):
return "ngas://%s:%d/%s" % (self.ngasSrv, self.ngasPort, self.uid)
return "ngas://%s:%d/%s" % (self.ngasSrv, self.ngasPort, self.fileId)


class InMemoryDROP(AbstractDROP):
Expand Down
6 changes: 5 additions & 1 deletion daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,16 @@ def copyDropContents(source, target, bufsize=4096):
'''
Manually copies data from one DROP into another, in bufsize steps
'''
logger.debug("Copying from %r to %r" % (source, target))
desc = source.open()
read = source.read
buf = read(desc, bufsize)
logger.debug("Read %d bytes from %r" % (len(buf), source))
while buf:
target.write(buf)
logger.debug("Wrote %d bytes to %r" % (len(buf), target))
buf = read(desc, bufsize)
logger.debug("Read %d bytes from %r" % (len(buf), source))
source.close(desc)

def getUpstreamObjects(drop):
Expand Down Expand Up @@ -388,4 +392,4 @@ def replace_dataurl_placeholders(cmd, inputs, outputs):

# Easing the transition from single- to multi-package
get_leaves = common.get_leaves
get_roots = common.get_roots
get_roots = common.get_roots
52 changes: 39 additions & 13 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from six import BytesIO
import six.moves.urllib.parse as urlparse # @UnresolvedImport
import six.moves.urllib.request as urlrequest

from . import ngaslite

Expand Down Expand Up @@ -243,7 +244,7 @@ class NgasIO(DataIO):
in a file on the local filesystem and then move it to the NGAS destination
'''

def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1):
def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1, mimeType='application/octet-stream'):

# Check that we actually have the NGAMS client libraries
try:
Expand All @@ -259,6 +260,7 @@ def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTime
self._ngasTimeout = ngasTimeout
self._fileId = fileId
self._length = length
self._mimeType = mimeType

def _getClient(self):
from ngamsPClient import ngamsPClient # @UnresolvedImport
Expand All @@ -270,7 +272,7 @@ def _open(self, **kwargs):
# request with data. Thus the only way we can currently archive data
# into NGAS is by accumulating it all on our side and finally
# sending it over.
self._buf = ''
self._buf = b''
self._writtenDataSize = 0
return self._getClient()

Expand All @@ -279,7 +281,7 @@ def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
reply, msg, _, _ = client._httpPost(
client.getHost(), client.getPort(), 'QARCHIVE',
'application/octet-stream', dataRef=self._buf,
self._mimeType, dataRef=self._buf,
pars=[['filename',self._fileId]], dataSource='BUFFER',
dataSize=self._writtenDataSize)
self._buf = None
Expand Down Expand Up @@ -319,39 +321,62 @@ class NgasLiteIO(DataIO):
that this class will throw an error if its `exists` method is invoked.
'''

def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1):
def __init__(self, hostname, fileId, port=7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1, \
mimeType='application/octet-stream'):
super(NgasLiteIO, self).__init__()
self._ngasSrv = hostname
self._ngasPort = port
self._ngasConnectTimeout = ngasConnectTimeout
self._ngasTimeout = ngasTimeout
self._fileId = fileId
self._length = length
self._mimeType = mimeType

def _is_length_unknown(self):
return self._length is None or self._length < 0

def _getClient(self):
from ngamsPClient import ngamsPClient # @UnresolvedImport
return ngamsPClient.ngamsPClient(self._ngasSrv, self._ngasPort, self._ngasTimeout)
return ngaslite.open(
self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout)

def _open(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
return ngaslite.beingArchive(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout, length=self._length)
return ngaslite.retrieve(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout)
if self._is_length_unknown():
# NGAS does not support HTTP chunked writes and thus it requires the Content-length
# of the whole fileObject to be known and sent up-front as part of the header. Thus
# is size is not provided all data will be buffered IN MEMORY and only sent to NGAS
# when finishArchive is called.
self._buf = b''
self._writtenDataSize = 0
return self._getClient()
else:
return self._getClient()

def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
conn = self._desc
if self._is_length_unknown():
# If length wasn't known up-front we first send Content-Length and then the buffer here.
conn.putheader('Content-Length', len(self._buf))
conn.endheaders()
conn.send(self._buf)
self._buf = None
ngaslite.finishArchive(conn, self._fileId)
conn.close()
else:
response = self._desc
response.close()

def _read(self, count, **kwargs):
self._desc.read(count)
def _read(self, count=4096, **kwargs):
return self._desc.read(count)

def _write(self, data, **kwargs):
self._desc.send(data)
return len(data)
if self._is_length_unknown():
self._buf += data
return len(self._buf)
else:
self._desc.send(data)
return len(data)

def exists(self):
raise NotImplementedError("This method is not supported by this class")
Expand All @@ -378,10 +403,11 @@ def IOForURL(url):
networkLocation = url.netloc
if ':' in networkLocation:
hostname, port = networkLocation.split(':')
port = int(port)
else:
hostname = networkLocation
port = 7777
fileId = url.path
fileId = url.path[1:] # remove the trailing slash
try:
io = NgasIO(hostname, fileId, port)
except:
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def addCommonOptions(parser, defaultPort):
parser.add_option( "--cwd", action="store_true",
dest="cwd", help="Short for '-w .'", default=False)
parser.add_option("-w", "--work-dir",
help="Working directory, defaults to '/' in daemon mode, '.' in interactive mode", default=None)
help="Working directory, defaults to '/' in daemon mode, '.' in interactive mode", default=utils.getDlgWorkDir())
parser.add_option("-s", "--stop", action="store_true",
dest="stop", help="Stop an instance running as daemon", default=False)
parser.add_option( "--status", action="store_true",
Expand Down Expand Up @@ -332,4 +332,4 @@ def dlgReplay(parser, args):
options.dmAcronym = 'RP'
options.restType = ReplayManagerServer

start(options, parser)
start(options, parser)
33 changes: 22 additions & 11 deletions daliuge-engine/dlg/ngaslite.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,17 @@
'''

import six.moves.http_client as httplib # @UnresolvedImport
import six.moves.urllib.request as urlrequest
import logging

logger = logging.getLogger(__name__)


def open(host, fileId, port=7777, timeout=None):
url = 'http://%s:%d/RETRIEVE?file_id=%s' % (host, port, fileId)
logger.debug("Issuing RETRIEVE request: %s" % (url))
conn = urlrequest.urlopen(url)
return conn

def retrieve(host, fileId, port=7777, timeout=None):
"""
Expand All @@ -38,14 +48,14 @@ def retrieve(host, fileId, port=7777, timeout=None):
This method returns a file-like object that supports the `read` operation,
and over which `close` must be invoked once no more data is read from it.
"""
conn = httplib.HTTPConnection(host, port, timeout=timeout)
conn.request('GET', '/RETRIEVE?file_id=' + fileId)
response = conn.getresponse()
if response.status != httplib.OK:
raise Exception("Error while RETRIEVE-ing %s from %s:%d: %d %s" % (fileId, host, port, response.status, response.msg))
return response
url = 'http://%s:%d/RETRIEVE?file_id=%s' % (host, port, fileId)
logger.debug("Issuing RETRIEVE request: %s" % (url))
conn = urlrequest.urlopen(url)
if conn.status != httplib.OK:
raise Exception("Error while RETRIEVE-ing %s from %s:%d: %d %s" % (fileId, host, port, conn.status, conn.msg))
return conn

def beingArchive(host, fileId, port=7777, timeout=0, length=-1):
def beginArchive(host, fileId, port=7777, timeout=0, length=-1, mimeType='application/octet-stream'):
"""
Opens a connecting to the NGAS server located at `host`:`port` and sends out
the request for archiving the given `fileId`.
Expand All @@ -57,10 +67,11 @@ def beingArchive(host, fileId, port=7777, timeout=0, length=-1):
"""
conn = httplib.HTTPConnection(host, port, timeout=timeout)
conn.putrequest('POST', '/QARCHIVE?filename=' + fileId)
conn.putheader('Content-Type', 'application/octet-stream')
if length != -1:
conn.putheader('Content-Type', mimeType)
if length is not None and length >= 0:
conn.putheader('Content-Length', length)
conn.endheaders()
# defer endheaders NGAS requires Content-Length
conn.endheaders()
return conn

def finishArchive(conn, fileId):
Expand All @@ -69,4 +80,4 @@ def finishArchive(conn, fileId):
"""
response = conn.getresponse()
if response.status != httplib.OK:
raise Exception("Error while QARCHIVE-ing %s to %s:%d: %d %s" % (fileId, conn.host, conn.port, response.status, response.msg))
raise Exception("Error while QARCHIVE-ing %s to %s:%d: %d %s" % (fileId, conn.host, conn.port, response.status, response.msg))
10 changes: 9 additions & 1 deletion daliuge-engine/dlg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ def getDlgLogsDir():
"""
return os.path.join(getDlgDir(), 'logs')

def getDlgWorkDir():
"""
Returns the location of the directory used by the DALiuGE framework to store
its logs. If `createIfMissing` is True, the directory will be created if it
currently doesn't exist
"""
return os.path.join(getDlgDir(), 'workspace')

def createDirIfMissing(path):
"""
Creates the given directory if it doesn't exist
Expand Down Expand Up @@ -433,4 +441,4 @@ def wait(self):
write_to = common.write_to

JSONStream = common.JSONStream
ZlibCompressedStream = common.ZlibCompressedStream
ZlibCompressedStream = common.ZlibCompressedStream
5 changes: 4 additions & 1 deletion daliuge-engine/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ FROM icrar/daliuge-common:ray
# copy sources and virtualenv
COPY --from=0 /daliuge/. /daliuge/.
COPY --from=0 /home/ray/dlg /home/ray/dlg
RUN sudo mkdir -p /var/dlg_home/workspace && sudo chown -R ray:users /var/dlg_home

EXPOSE 5555
EXPOSE 6666
Expand All @@ -25,5 +26,7 @@ EXPOSE 9000
# enable the virtualenv path from daliuge-common
ENV VIRTUAL_ENV=/home/ray/dlg
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
ENV DLG_ROOT="/var/dlg_home"
RUN sudo apt-get update && sudo apt-get install -y curl

CMD ["dlg", "daemon", "-vv", "--no-nm"]
CMD ["dlg", "daemon", "-vv"]
4 changes: 3 additions & 1 deletion daliuge-engine/run_engine.sh
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
docker run --rm -td --name daliuge-engine -v /var/run/docker.sock:/var/run/docker.sock -p 5555:5555 -p 6666:6666 -p 8000:8000 -p 8001:8001 -p 8002:8002 -p 9000:9000 icrar/daliuge-engine:ray
docker run --rm -td --name daliuge-engine -v /var/run/docker.sock:/var/run/docker.sock -p 5555:5555 -p 6666:6666 -p 8000:8000 -p 8001:8001 -p 8002:8002 -p 9000:9000 icrar/daliuge-engine:ray
sleep 2
./start_local_managers.sh
Loading

0 comments on commit d40322e

Please sign in to comment.