Skip to content

Commit

Permalink
Fixes for NGAS drops to set their size when completed. That required …
Browse files Browse the repository at this point in the history
…some of the tests to be adjusted as well. Also changed all of the build and run shell scripts to return 0 for success.
  • Loading branch information
awicenec committed Oct 18, 2021
1 parent a4d31b0 commit 9f30a49
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 40 deletions.
8 changes: 4 additions & 4 deletions daliuge-common/build_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ case "$1" in
echo "Building daliuge-common version using tag ${VCS_TAG}"
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1 ;;
exit 0 ;;
"dev")
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
# export VCS_TAG="casa"
Expand All @@ -18,7 +18,7 @@ case "$1" in
# and should not go much further.
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG} -f docker/Dockerfile.dev .
echo "Build finished!"
exit 1;;
exit 0;;
"casa")
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
# export VCS_TAG="casa"
Expand All @@ -27,8 +27,8 @@ case "$1" in
# and thus we do that only in the engine, not in the translator
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG}-casa -f docker/Dockerfile.dev .
echo "Build finished!"
exit 1;;
exit 0;;
*)
echo "Usage: build_common.sh <dep|dev|casa>"
exit 1;;
exit 0;;
esac
8 changes: 4 additions & 4 deletions daliuge-engine/build_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ case "$1" in
cp ../LICENSE dlg/manager/web/.
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1 ;;
exit 0 ;;
"dev")
C_TAG="master"
[[ ! -z $2 ]] && C_TAG=$2
Expand All @@ -23,14 +23,14 @@ case "$1" in
cp ../LICENSE dlg/manager/web/.
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile.dev .
echo "Build finished!"
exit 1;;
exit 0;;
"casa")
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-engine development version"
docker build --build-arg VCS_TAG=${VCS_TAG}-casa --no-cache -t icrar/daliuge-engine:${VCS_TAG}-casa -f docker/Dockerfile.casa .
echo "Build finished!"
exit 1;;
exit 0;;
*)
echo "Usage: build_engine.sh <dep|dev>"
exit 1;;
exit 0;;
esac
51 changes: 48 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ def open(self, **kwargs):
raise Exception("%r is in state %s (!=COMPLETED), cannot be opened for reading" % (self, self.status,))

io = self.getIO()
logger.debug("Opening drop %s" % (self.oid))
io.open(OpenMode.OPEN_READ, **kwargs)

# Save the IO object in the dictionary and return its descriptor instead
Expand Down Expand Up @@ -427,7 +428,7 @@ def _closeWriters(self):
try:
self._wio.close()
except:
pass # this will make sure that a pprevious issue does not cause the grpah to hang!
pass # this will make sure that a previous issue does not cause the graph to hang!
# raise Exception("Problem closing file!")
self._wio = None

Expand Down Expand Up @@ -1203,6 +1204,7 @@ class NgasDROP(AbstractDROP):

def initialize(self, **kwargs):
if self.len == -1:
# TODO: For writing the len field should be set to the size of the input drop
self.len = self._size
if self.ngasFileId:
self.fileId = self.ngasFileId
Expand All @@ -1212,13 +1214,56 @@ def initialize(self, **kwargs):
def getIO(self):
try:
ngasIO = NgasIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
self.ngasConnectTimeout, self.ngasTimeout, length=self.len, mimeType=self.ngasMime)
except ImportError:
logger.warning('NgasIO not available, using NgasLiteIO instead')
ngasIO = NgasLiteIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
self.ngasConnectTimeout, self.ngasTimeout, length=self.len, mimeType=self.ngasMime)
return ngasIO

@track_current_drop
def setCompleted(self):
'''
Override this method in order to get the size of the drop set once it is completed.
'''
# TODO: This implementation is almost a verbatim copy of the base class'
# so we should look into merging them
status = self.status
if status == DROPStates.CANCELLED:
return
elif status == DROPStates.SKIPPED:
self._fire('dropCompleted', status=status)
return
elif status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception("%r not in INITIALIZED or WRITING state (%s), cannot setComplete()" % (self, self.status))

self._closeWriters()


# here we set the size. It could happen that nothing is written into
# this file, in which case we create an empty file so applications
# downstream don't fail to read
logger.debug("Trying to set size of NGASDrop")
try:
stat = self.getIO().fileStatus()
logger.debug("Setting size of NGASDrop %s to %s" % (self.fileId, stat['FileSize']))
self._size = int(stat['FileSize'])
except:
# we''ll try this again in case there is some other issue
# try:
# with open(self.path, 'wb'):
# pass
# except:
# self.status = DROPStates.ERROR
# logger.error("Path not accessible: %s" % self.path)
raise
logger.debug("Setting size of NGASDrop to %s", 0)
self._size = 0
# Signal our subscribers that the show is over
logger.debug("Moving %r to COMPLETED", self)
self.status = DROPStates.COMPLETED
self._fire('dropCompleted', status=DROPStates.COMPLETED)

@property
def dataURL(self):
return "ngas://%s:%d/%s" % (self.ngasSrv, self.ngasPort, self.fileId)
Expand Down
51 changes: 48 additions & 3 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ def close(self, **kwargs):
self._close()
self._mode = None

def size(self, **kwargs):
"""
Returns the current total size of the underlying stored object. If the
storage class does not support this it is supposed to return -1.
"""
return _size(self)

def isOpened(self):
return self._mode is not None

Expand Down Expand Up @@ -130,6 +137,9 @@ def _write(self, data, **kwargs): pass
@abstractmethod
def _close(self, **kwargs): pass

@abstractmethod
def _size(self, **kwargs): pass

class NullIO(DataIO):
"""
A DataIO that stores no data
Expand All @@ -147,6 +157,12 @@ def _write(self, data, **kwargs):
def _close(self, **kwargs):
pass

def _size(self, **kwargs):
"""
Size is always 0 for this storage class
"""
return 0

def exists(self):
return True

Expand All @@ -170,6 +186,9 @@ def _write(self, data, **kwargs):
def _close(self, **kwargs):
raise NotImplementedError()

def _size(self, **kwargs):
raise NotImplementedError()

def exists(self):
raise NotImplementedError()

Expand Down Expand Up @@ -204,6 +223,12 @@ def _close(self, **kwargs):
# If we're writing we don't close the descriptor because it's our
# self._buf, which won't be readable afterwards

def _size(self, **kwargs):
"""
Return actual size of user data rather than the whole Python object
"""
return self._buf.getbuffer().nbytes

def exists(self):
return not self._buf.closed

Expand Down Expand Up @@ -235,6 +260,9 @@ def _write(self, data, **kwargs):
def _close(self, **kwargs):
self._desc.close()

def _size(self, **kwargs):
return os.path.getsize(self._fnm)

def getFileName(self):
return self._fnm

Expand Down Expand Up @@ -316,6 +344,16 @@ def exists(self):
status = self._getClient().sendCmd('STATUS', pars=[['fileId', self._fileId]])
return status.getStatus() == ngamsLib.ngamsCore.NGAMS_SUCCESS

def fileStatus(self):
import ngamsLib # @UnresolvedImport
#status = self._getClient().sendCmd('STATUS', pars=[['fileId', self._fileId]])
status = self._getClient.fileStatus('STATUS?file_id=%s' % self._fileId)
if status.getStatus() != ngamsLib.ngamsCore.NGAMS_SUCCESS:
raise FileNotFoundError
fs = dict(status.getDiskStatusList()[0].getFileObjList()[0].genXml().attributes.items())
return fs


def delete(self):
pass # We never delete stuff from NGAS

Expand Down Expand Up @@ -346,7 +384,7 @@ def _is_length_unknown(self):
def _getClient(self):
return ngaslite.open(
self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout,\
mode=self._mode, mimeType=self._mimeType)
mode=self._mode, mimeType=self._mimeType, length=self._length)

def _open(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
Expand All @@ -366,8 +404,11 @@ def _close(self, **kwargs):
# If length wasn't known up-front we first send Content-Length and then the buffer here.
conn.putheader('Content-Length', len(self._buf))
conn.endheaders()
logger.debug("Sending data for file %s to NGAS" % (self._fileId))
conn.send(self._buf)
self._buf = None
else:
logger.debug("Length is known, assuming data has been sent (%s, %s)" % (self.fileId, self._length))
ngaslite.finishArchive(conn, self._fileId)
conn.close()
else:
Expand All @@ -380,14 +421,18 @@ def _read(self, count=4096, **kwargs):
def _write(self, data, **kwargs):
if self._is_length_unknown():
self._buf += data
return len(self._buf)
else:
self._desc.send(data)
return len(data)
logger.debug("Wrote %s bytes" % len(data))
return len(data)

def exists(self):
raise NotImplementedError("This method is not supported by this class")

def fileStatus(self):
logger.debug("Getting status of file %s" % self._fileId)
return ngaslite.fileStatus(self._ngasSrv, self._ngasPort, self._fileId)

def delete(self):
pass # We never delete stuff from NGAS

Expand Down
33 changes: 30 additions & 3 deletions daliuge-engine/dlg/ngaslite.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,21 @@
import http.client
import logging
import urllib.request
from xml.dom.minidom import parseString

logger = logging.getLogger(__name__)


def open(host, fileId, port=7777, timeout=None, mode=1, mimeType='application/octet-stream'):
def open(host, fileId, port=7777, timeout=5, mode=1, mimeType='application/octet-stream', length=-1):
logger.debug("Opening NGAS drop %s with mode %d and length %s" % (fileId, mode, length))
if mode == 1:
return retrieve(host, fileId, port=port, timeout=timeout)
elif mode == 0:
return beginArchive(host, fileId, port=port, timeout=timeout, mimeType=mimeType, length=length)
else:
return beginArchive(host, fileId, port=port, timeout=timeout, mimeType=mimeType)
# just return the status for that file_id
stat = fileStatus(host, port, fileId, timeout=timeout)
return stat

def retrieve(host, fileId, port=7777, timeout=None):
"""
Expand All @@ -55,7 +61,7 @@ def retrieve(host, fileId, port=7777, timeout=None):
raise Exception("Error while RETRIEVE-ing %s from %s:%d: %d %s" % (fileId, host, port, conn.getcode(), conn.msg))
return conn

def beginArchive(host, fileId, port=7777, timeout=0, length=-1, mimeType='application/octet-stream'):
def beginArchive(host, fileId, port=7777, timeout=5, length=-1, mimeType='application/octet-stream'):
"""
Opens a connecting to the NGAS server located at `host`:`port` and sends out
the request for archiving the given `fileId`.
Expand All @@ -73,6 +79,8 @@ def beginArchive(host, fileId, port=7777, timeout=0, length=-1, mimeType='applic
conn.putheader('Content-Length', length)
# defer endheaders NGAS requires Content-Length
conn.endheaders()
else:
logger.warning("Data size for %s unkown. Caching it first" % (fileId))
return conn

def finishArchive(conn, fileId):
Expand All @@ -82,3 +90,22 @@ def finishArchive(conn, fileId):
response = conn.getresponse()
if response.status != http.HTTPStatus.OK:
raise Exception("Error while QARCHIVE-ing %s to %s:%d: %d %s" % (fileId, conn.host, conn.port, response.status, response.msg))

def fileStatus(host, port, fileId, timeout=10):
"""
Return the status as a dictionary for the file_id given
TODO: This needs to use file_version as well, else it might
return a value for a previous version.
"""
url = 'http://%s:%d/STATUS?file_id=%s' % (host, port, fileId)
logger.debug("Issuing STATUS request: %s" % (url))
try:
conn = urllib.request.urlopen(url, timeout=timeout)
except urllib.error.HTTPError:
raise FileNotFoundError
if conn.getcode() != http.HTTPStatus.OK:
raise Exception("Error while getting STATUS %s from %s:%d: %d %s" % (fileId, host, port, conn.getcode(), conn.msg))
dom = parseString(conn.read().decode())
stat = dict(dom.getElementsByTagName('NgamsStatus')[0].getElementsByTagName('DiskStatus')[0].getElementsByTagName('FileStatus')[0].attributes.items())
logger.debug("Returning status: %s" % (stat))
return stat
8 changes: 4 additions & 4 deletions daliuge-engine/run_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ case "$1" in
echo "Running Engine deployment version in background..."
echo "docker run -td "${DOCKER_OPTS}" icrar/daliuge-engine:${VCS_TAG}"
docker run -td ${DOCKER_OPTS} icrar/daliuge-engine:${VCS_TAG}
exit 1
exit 0
fi;;
"dev")
DLG_ROOT="/tmp/.dlg"
Expand All @@ -37,7 +37,7 @@ case "$1" in
# docker run -td ${DOCKER_OPTS} icrar/dlg-engine:casa
sleep 3
./start_local_managers.sh
exit 1;;
exit 0;;
"casa")
DLG_ROOT="/tmp/.dlg"
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
Expand All @@ -52,8 +52,8 @@ case "$1" in
docker run -td ${DOCKER_OPTS} ${CONTAINER_NM}
sleep 3
./start_local_managers.sh
exit 1;;
exit 0;;
*)
echo "Usage run_engine.sh <dep|dev>"
exit 1;;
exit 0;;
esac
5 changes: 3 additions & 2 deletions daliuge-engine/test/apps/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ def test_refer_to_io_by_uid(self):
UIDs (in addition to their position in the list of inputs or outputs)
in the command-line.
"""
self._ngas_and_fs_io("echo -n '%iDataURL[a]' > %o[c]")
self._ngas_and_fs_io("echo -n '%iDataURL[HelloWorld.txt]' > %o[c]")

def _ngas_and_fs_io(self, command):
a = NgasDROP('a', 'a') # not a filesystem-related DROP, we can reference its URL in the command-line
a = NgasDROP('HelloWorld.txt', 'HelloWorld.txt') # not a filesystem-related DROP, we can reference its URL in the command-line
a.ngasSrv ='ngas.ddns.net'
b = DockerApp('b', 'b', image="ubuntu:14.04", command=command)
c = FileDROP('c', 'c')
b.addInput(a)
Expand Down
Loading

0 comments on commit 9f30a49

Please sign in to comment.