Skip to content

Commit

Permalink
Merge ae5276d into ea54ebb
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Apr 9, 2021
2 parents ea54ebb + ae5276d commit 23cb629
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 31 deletions.
23 changes: 15 additions & 8 deletions daliuge-engine/dlg/apps/archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ class NgasArchivingApp(ExternalStoreApp):
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

ngasSrv = dlg_string_param('NGAS Server', 'localhost')
ngasPort = dlg_int_param('NGAS Port', 7777)
ngasConnectTimeout = dlg_float_param('Connect Timeout', 2.)
ngasTimeout = dlg_float_param('Timeout', 2.)
ngasSrv = dlg_string_param('ngasSrv', 'localhost')
ngasPort = dlg_int_param('ngasPort', 7777)
ngasMime = dlg_string_param('ngasMime', 'application/octet-stream')
ngasTimeout = dlg_int_param('ngasTimeout', 2)
ngasConnectTimeout = dlg_int_param('ngasConnectTimeout', 2)

def initialize(self, **kwargs):
super(NgasArchivingApp, self).initialize(**kwargs)
Expand All @@ -92,12 +93,18 @@ def store(self, inDrop):
if isinstance(inDrop, ContainerDROP):
raise Exception("ContainerDROPs are not supported as inputs for this application")

size = -1 if inDrop.size is None else inDrop.size
logger.debug("Content-length %s", size)
if inDrop.size is None or inDrop.size < 0:
logger.error("NGAS requires content-length to be know, but the given input does not provide a size.")
else:
size = inDrop.size
logger.debug("Content-length %s", size)
try:
ngasIO = NgasIO(self.ngasSrv, inDrop.uid, self.ngasPort, self.ngasConnectTimeout, self.ngasTimeout, size)
ngasIO = NgasIO(self.ngasSrv, inDrop.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, size, mimeType=self.ngasMime)
except ImportError:
ngasIO = NgasLiteIO(self.ngasSrv, inDrop.uid, self.ngasPort, self.ngasConnectTimeout, self.ngasTimeout, size)
logger.warning("NgasIO library not available, falling back to NgasLiteIO.")
ngasIO = NgasLiteIO(self.ngasSrv, inDrop.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, size, mimeType=self.ngasMime)

ngasIO.open(OpenMode.OPEN_WRITE)

Expand Down
19 changes: 11 additions & 8 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,19 @@ def getmembers(object, predicate=None):
for attr_name, obj in getmembers(self, lambda a: not(inspect.isfunction(a) or isinstance(a, property))):
if isinstance(obj, dlg_float_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = float(value)
elif isinstance(obj, dlg_bool_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = bool(value)
elif isinstance(obj, dlg_int_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = int(value)
elif isinstance(obj, dlg_string_param):
value = kwargs.get(attr_name, obj.default_value)
if value is not None:
if value is not None and value is not '':
value = str(value)
elif isinstance(obj, dlg_list_param):
value = kwargs.get(attr_name, obj.default_value)
Expand Down Expand Up @@ -430,7 +430,7 @@ def read(self, descriptor, count=4096, **kwargs):

def _checkStateAndDescriptor(self, descriptor):
if self.status != DROPStates.COMPLETED:
raise Exception("%r is in state %s (!=COMPLETED), cannot be read" % (self.status,))
raise Exception("%r is in state %s (!=COMPLETED), cannot be read" % (self, self.status,))
if descriptor is None:
raise ValueError("Illegal empty descriptor given")
if descriptor not in self._rios:
Expand Down Expand Up @@ -1151,18 +1151,21 @@ class NgasDROP(AbstractDROP):
ngasPort = dlg_int_param('ngasPort', 7777)
ngasTimeout = dlg_int_param('ngasTimeout', 2)
ngasConnectTimeout = dlg_int_param('ngasConnectTimeout', 2)
ngasMime = dlg_string_param('ngasMime', 'application/octet-stream')
len = dlg_int_param('len', -1)

def initialize(self, **kwargs):
pass
if self.len == -1:
self.len = self._size

def getIO(self):
try:
ngasIO = NgasIO(self.ngasSrv, self.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len)
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
except ImportError:
logger.warning('NgasIO not available, using NgasLiteIO instead')
ngasIO = NgasLiteIO(self.ngasSrv, self.uid, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len)
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
return ngasIO

@property
Expand Down
41 changes: 33 additions & 8 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class NgasIO(DataIO):
in a file on the local filesystem and then move it to the NGAS destination
'''

def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1):
def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1, mimeType='application/octet-stream'):

# Check that we actually have the NGAMS client libraries
try:
Expand All @@ -259,6 +259,7 @@ def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTime
self._ngasTimeout = ngasTimeout
self._fileId = fileId
self._length = length
self._mimeType = mimeType

def _getClient(self):
from ngamsPClient import ngamsPClient # @UnresolvedImport
Expand All @@ -270,7 +271,7 @@ def _open(self, **kwargs):
# request with data. Thus the only way we can currently archive data
# into NGAS is by accumulating it all on our side and finally
# sending it over.
self._buf = ''
self._buf = b''
self._writtenDataSize = 0
return self._getClient()

Expand All @@ -279,7 +280,7 @@ def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
reply, msg, _, _ = client._httpPost(
client.getHost(), client.getPort(), 'QARCHIVE',
'application/octet-stream', dataRef=self._buf,
self._mimeType, dataRef=self._buf,
pars=[['filename',self._fileId]], dataSource='BUFFER',
dataSize=self._writtenDataSize)
self._buf = None
Expand Down Expand Up @@ -319,27 +320,47 @@ class NgasLiteIO(DataIO):
that this class will throw an error if its `exists` method is invoked.
'''

def __init__(self, hostname, fileId, port = 7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1):
def __init__(self, hostname, fileId, port=7777, ngasConnectTimeout=2, ngasTimeout=2, length=-1, \
mimeType='application/octet-stream'):
super(NgasLiteIO, self).__init__()
self._ngasSrv = hostname
self._ngasPort = port
self._ngasConnectTimeout = ngasConnectTimeout
self._ngasTimeout = ngasTimeout
self._fileId = fileId
self._length = length
self._mimeType = mimeType

def _is_length_unknown(self):
return self._length is None or self._length < 0

def _getClient(self):
from ngamsPClient import ngamsPClient # @UnresolvedImport
return ngamsPClient.ngamsPClient(self._ngasSrv, self._ngasPort, self._ngasTimeout)

def _open(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
return ngaslite.beingArchive(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout, length=self._length)
return ngaslite.retrieve(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout)
if self._is_length_unknown():
# NGAS does not support HTTP chunked writes and thus it requires the Content-length
# of the whole fileObject to be known and sent up-front as part of the header. Thus
# is size is not provided all data will be buffered IN MEMORY and only sent to NGAS
# when finishArchive is called.
self._buf = b''
self._writtenDataSize = 0
return ngaslite.beginArchive(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout, \
length=self._length, mimeType=self._mimeType)
else:
return ngaslite.retrieve(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout)

def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
conn = self._desc
if self._is_length_unknown():
# If length wasn't known up-front we first send Content-Length and then the buffer here.
conn.putheader('Content-Length', len(self._buf))
conn.endheaders()
conn.send(self._buf)
self._buf = None
ngaslite.finishArchive(conn, self._fileId)
conn.close()
else:
Expand All @@ -350,8 +371,12 @@ def _read(self, count, **kwargs):
self._desc.read(count)

def _write(self, data, **kwargs):
self._desc.send(data)
return len(data)
if self._is_length_unknown():
self._buf += data
return len(self._buf)
else:
self._desc.send(data)
return len(data)

def exists(self):
raise NotImplementedError("This method is not supported by this class")
Expand Down
9 changes: 5 additions & 4 deletions daliuge-engine/dlg/ngaslite.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def retrieve(host, fileId, port=7777, timeout=None):
raise Exception("Error while RETRIEVE-ing %s from %s:%d: %d %s" % (fileId, host, port, response.status, response.msg))
return response

def beingArchive(host, fileId, port=7777, timeout=0, length=-1):
def beginArchive(host, fileId, port=7777, timeout=0, length=-1, mimeType='application/octet-stream'):
"""
Opens a connecting to the NGAS server located at `host`:`port` and sends out
the request for archiving the given `fileId`.
Expand All @@ -57,10 +57,11 @@ def beingArchive(host, fileId, port=7777, timeout=0, length=-1):
"""
conn = httplib.HTTPConnection(host, port, timeout=timeout)
conn.putrequest('POST', '/QARCHIVE?filename=' + fileId)
conn.putheader('Content-Type', 'application/octet-stream')
if length != -1:
conn.putheader('Content-Type', mimeType)
if length is not None and length >= 0:
conn.putheader('Content-Length', length)
conn.endheaders()
# defer endheaders NGAS requires Content-Length
conn.endheaders()
return conn

def finishArchive(conn, fileId):
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ FROM icrar/daliuge-common:ray
# copy sources and virtualenv
COPY --from=0 /daliuge/. /daliuge/.
COPY --from=0 /home/ray/dlg /home/ray/dlg
RUN sudo mkdir -p /var/dlg_home/workspace && sudo chown -R ray:users /var/dlg_home

EXPOSE 5555
EXPOSE 6666
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/run_engine.sh
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
docker run --rm -td --name daliuge-engine -v /var/run/docker.sock:/var/run/docker.sock -p 5555:5555 -p 6666:6666 -p 8000:8000 -p 8001:8001 -p 8002:8002 -p 9000:9000 icrar/daliuge-engine:ray
docker run --rm -td --name daliuge-engine -v /var/run/docker.sock:/var/run/docker.sock -p 5555:5555 -p 6666:6666 -p 8000:8000 -p 8001:8001 -p 8002:8002 -p 9000:9000 icrar/daliuge-engine:ray
./start_local_managers.sh
2 changes: 2 additions & 0 deletions daliuge-engine/show_DIMlogs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# This utility script just shows the Node Manager log file of a locally running docker-engine
docker exec -ti daliuge-engine /bin/bash -c "tail -f /var/dlg_home/logs/dlgDIM.log"
2 changes: 2 additions & 0 deletions daliuge-engine/show_NMlogs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# This utility script just shows the Data Island Manager log file of a locally running docker-engine
docker exec -ti daliuge-engine /bin/bash -c "tail -f /var/dlg_home/logs/dlgNM.log"
4 changes: 2 additions & 2 deletions daliuge-engine/start_local_managers.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Script starts a node manager and a data island manager on the local node. Useful mainly for testing.

docker exec daliuge-engine bash -c 'dlg nm -vvd --no-dlm -H 0.0.0.0 -w ${HOME}'
docker exec -ti daliuge-engine bash -c 'dlg dim -N localhost -vvd -H 0.0.0.0 -w ${HOME}'
docker exec daliuge-engine bash -c 'dlg nm -vvd --no-dlm -H 0.0.0.0 -w /var/dlg_home/workspace -l /var/dlg_home/logs'
docker exec -ti daliuge-engine bash -c 'dlg dim -N localhost -vvd -H 0.0.0.0 -w /var/dlg_home/workspace -l /var/dlg_home/logs'
2 changes: 2 additions & 0 deletions daliuge-engine/stop_engine.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# This utlity script stops a locally running docker-engine container
docker stop daliuge-engine

0 comments on commit 23cb629

Please sign in to comment.