Skip to content

Commit

Permalink
Merge pull request #76 from ICRAR/liu-193
Browse files Browse the repository at this point in the history
Liu 193
  • Loading branch information
awicenec committed Oct 18, 2021
2 parents 5459857 + 7041d09 commit ae366b2
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 67 deletions.
8 changes: 4 additions & 4 deletions daliuge-common/build_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ case "$1" in
echo "Building daliuge-common version using tag ${VCS_TAG}"
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1 ;;
exit 0 ;;
"dev")
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
# export VCS_TAG="casa"
Expand All @@ -18,7 +18,7 @@ case "$1" in
# and should not go much further.
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG} -f docker/Dockerfile.dev .
echo "Build finished!"
exit 1;;
exit 0;;
"casa")
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
# export VCS_TAG="casa"
Expand All @@ -27,8 +27,8 @@ case "$1" in
# and thus we do that only in the engine, not in the translator
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG}-casa -f docker/Dockerfile.dev .
echo "Build finished!"
exit 1;;
exit 0;;
*)
echo "Usage: build_common.sh <dep|dev|casa>"
exit 1;;
exit 0;;
esac
8 changes: 4 additions & 4 deletions daliuge-engine/build_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ case "$1" in
cp ../LICENSE dlg/manager/web/.
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile .
echo "Build finished!"
exit 1 ;;
exit 0 ;;
"dev")
C_TAG="master"
[[ ! -z $2 ]] && C_TAG=$2
Expand All @@ -23,14 +23,14 @@ case "$1" in
cp ../LICENSE dlg/manager/web/.
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile.dev .
echo "Build finished!"
exit 1;;
exit 0;;
"casa")
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-engine development version"
docker build --build-arg VCS_TAG=${VCS_TAG}-casa --no-cache -t icrar/daliuge-engine:${VCS_TAG}-casa -f docker/Dockerfile.casa .
echo "Build finished!"
exit 1;;
exit 0;;
*)
echo "Usage: build_engine.sh <dep|dev>"
exit 1;;
exit 0;;
esac
51 changes: 48 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ def open(self, **kwargs):
raise Exception("%r is in state %s (!=COMPLETED), cannot be opened for reading" % (self, self.status,))

io = self.getIO()
logger.debug("Opening drop %s" % (self.oid))
io.open(OpenMode.OPEN_READ, **kwargs)

# Save the IO object in the dictionary and return its descriptor instead
Expand Down Expand Up @@ -427,7 +428,7 @@ def _closeWriters(self):
try:
self._wio.close()
except:
pass # this will make sure that a pprevious issue does not cause the grpah to hang!
pass # this will make sure that a previous issue does not cause the graph to hang!
# raise Exception("Problem closing file!")
self._wio = None

Expand Down Expand Up @@ -1203,6 +1204,7 @@ class NgasDROP(AbstractDROP):

def initialize(self, **kwargs):
if self.len == -1:
# TODO: For writing the len field should be set to the size of the input drop
self.len = self._size
if self.ngasFileId:
self.fileId = self.ngasFileId
Expand All @@ -1212,13 +1214,56 @@ def initialize(self, **kwargs):
def getIO(self):
try:
ngasIO = NgasIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
self.ngasConnectTimeout, self.ngasTimeout, length=self.len, mimeType=self.ngasMime)
except ImportError:
logger.warning('NgasIO not available, using NgasLiteIO instead')
ngasIO = NgasLiteIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
self.ngasConnectTimeout, self.ngasTimeout, length=self.len, mimeType=self.ngasMime)
return ngasIO

@track_current_drop
def setCompleted(self):
'''
Override this method in order to get the size of the drop set once it is completed.
'''
# TODO: This implementation is almost a verbatim copy of the base class'
# so we should look into merging them
status = self.status
if status == DROPStates.CANCELLED:
return
elif status == DROPStates.SKIPPED:
self._fire('dropCompleted', status=status)
return
elif status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception("%r not in INITIALIZED or WRITING state (%s), cannot setComplete()" % (self, self.status))

self._closeWriters()


# here we set the size. It could happen that nothing is written into
# this file, in which case we create an empty file so applications
# downstream don't fail to read
logger.debug("Trying to set size of NGASDrop")
try:
stat = self.getIO().fileStatus()
logger.debug("Setting size of NGASDrop %s to %s" % (self.fileId, stat['FileSize']))
self._size = int(stat['FileSize'])
except:
# we''ll try this again in case there is some other issue
# try:
# with open(self.path, 'wb'):
# pass
# except:
# self.status = DROPStates.ERROR
# logger.error("Path not accessible: %s" % self.path)
raise
logger.debug("Setting size of NGASDrop to %s", 0)
self._size = 0
# Signal our subscribers that the show is over
logger.debug("Moving %r to COMPLETED", self)
self.status = DROPStates.COMPLETED
self._fire('dropCompleted', status=DROPStates.COMPLETED)

@property
def dataURL(self):
return "ngas://%s:%d/%s" % (self.ngasSrv, self.ngasPort, self.fileId)
Expand Down
51 changes: 48 additions & 3 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ def close(self, **kwargs):
self._close()
self._mode = None

def size(self, **kwargs):
"""
Returns the current total size of the underlying stored object. If the
storage class does not support this it is supposed to return -1.
"""
return _size(self)

def isOpened(self):
return self._mode is not None

Expand Down Expand Up @@ -130,6 +137,9 @@ def _write(self, data, **kwargs): pass
@abstractmethod
def _close(self, **kwargs): pass

@abstractmethod
def _size(self, **kwargs): pass

class NullIO(DataIO):
"""
A DataIO that stores no data
Expand All @@ -147,6 +157,12 @@ def _write(self, data, **kwargs):
def _close(self, **kwargs):
pass

def _size(self, **kwargs):
"""
Size is always 0 for this storage class
"""
return 0

def exists(self):
return True

Expand All @@ -170,6 +186,9 @@ def _write(self, data, **kwargs):
def _close(self, **kwargs):
raise NotImplementedError()

def _size(self, **kwargs):
raise NotImplementedError()

def exists(self):
raise NotImplementedError()

Expand Down Expand Up @@ -204,6 +223,12 @@ def _close(self, **kwargs):
# If we're writing we don't close the descriptor because it's our
# self._buf, which won't be readable afterwards

def _size(self, **kwargs):
"""
Return actual size of user data rather than the whole Python object
"""
return self._buf.getbuffer().nbytes

def exists(self):
return not self._buf.closed

Expand Down Expand Up @@ -235,6 +260,9 @@ def _write(self, data, **kwargs):
def _close(self, **kwargs):
self._desc.close()

def _size(self, **kwargs):
return os.path.getsize(self._fnm)

def getFileName(self):
return self._fnm

Expand Down Expand Up @@ -316,6 +344,16 @@ def exists(self):
status = self._getClient().sendCmd('STATUS', pars=[['fileId', self._fileId]])
return status.getStatus() == ngamsLib.ngamsCore.NGAMS_SUCCESS

def fileStatus(self):
import ngamsLib # @UnresolvedImport
#status = self._getClient().sendCmd('STATUS', pars=[['fileId', self._fileId]])
status = self._getClient.fileStatus('STATUS?file_id=%s' % self._fileId)
if status.getStatus() != ngamsLib.ngamsCore.NGAMS_SUCCESS:
raise FileNotFoundError
fs = dict(status.getDiskStatusList()[0].getFileObjList()[0].genXml().attributes.items())
return fs


def delete(self):
pass # We never delete stuff from NGAS

Expand Down Expand Up @@ -346,7 +384,7 @@ def _is_length_unknown(self):
def _getClient(self):
return ngaslite.open(
self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout,\
mode=self._mode, mimeType=self._mimeType)
mode=self._mode, mimeType=self._mimeType, length=self._length)

def _open(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
Expand All @@ -366,8 +404,11 @@ def _close(self, **kwargs):
# If length wasn't known up-front we first send Content-Length and then the buffer here.
conn.putheader('Content-Length', len(self._buf))
conn.endheaders()
logger.debug("Sending data for file %s to NGAS" % (self._fileId))
conn.send(self._buf)
self._buf = None
else:
logger.debug("Length is known, assuming data has been sent (%s, %s)" % (self.fileId, self._length))
ngaslite.finishArchive(conn, self._fileId)
conn.close()
else:
Expand All @@ -380,14 +421,18 @@ def _read(self, count=4096, **kwargs):
def _write(self, data, **kwargs):
if self._is_length_unknown():
self._buf += data
return len(self._buf)
else:
self._desc.send(data)
return len(data)
logger.debug("Wrote %s bytes" % len(data))
return len(data)

def exists(self):
raise NotImplementedError("This method is not supported by this class")

def fileStatus(self):
logger.debug("Getting status of file %s" % self._fileId)
return ngaslite.fileStatus(self._ngasSrv, self._ngasPort, self._fileId)

def delete(self):
pass # We never delete stuff from NGAS

Expand Down
33 changes: 30 additions & 3 deletions daliuge-engine/dlg/ngaslite.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,21 @@
import http.client
import logging
import urllib.request
from xml.dom.minidom import parseString

logger = logging.getLogger(__name__)


def open(host, fileId, port=7777, timeout=None, mode=1, mimeType='application/octet-stream'):
def open(host, fileId, port=7777, timeout=5, mode=1, mimeType='application/octet-stream', length=-1):
logger.debug("Opening NGAS drop %s with mode %d and length %s" % (fileId, mode, length))
if mode == 1:
return retrieve(host, fileId, port=port, timeout=timeout)
elif mode == 0:
return beginArchive(host, fileId, port=port, timeout=timeout, mimeType=mimeType, length=length)
else:
return beginArchive(host, fileId, port=port, timeout=timeout, mimeType=mimeType)
# just return the status for that file_id
stat = fileStatus(host, port, fileId, timeout=timeout)
return stat

def retrieve(host, fileId, port=7777, timeout=None):
"""
Expand All @@ -55,7 +61,7 @@ def retrieve(host, fileId, port=7777, timeout=None):
raise Exception("Error while RETRIEVE-ing %s from %s:%d: %d %s" % (fileId, host, port, conn.getcode(), conn.msg))
return conn

def beginArchive(host, fileId, port=7777, timeout=0, length=-1, mimeType='application/octet-stream'):
def beginArchive(host, fileId, port=7777, timeout=5, length=-1, mimeType='application/octet-stream'):
"""
Opens a connecting to the NGAS server located at `host`:`port` and sends out
the request for archiving the given `fileId`.
Expand All @@ -73,6 +79,8 @@ def beginArchive(host, fileId, port=7777, timeout=0, length=-1, mimeType='applic
conn.putheader('Content-Length', length)
# defer endheaders NGAS requires Content-Length
conn.endheaders()
else:
logger.warning("Data size for %s unkown. Caching it first" % (fileId))
return conn

def finishArchive(conn, fileId):
Expand All @@ -82,3 +90,22 @@ def finishArchive(conn, fileId):
response = conn.getresponse()
if response.status != http.HTTPStatus.OK:
raise Exception("Error while QARCHIVE-ing %s to %s:%d: %d %s" % (fileId, conn.host, conn.port, response.status, response.msg))

def fileStatus(host, port, fileId, timeout=10):
"""
Return the status as a dictionary for the file_id given
TODO: This needs to use file_version as well, else it might
return a value for a previous version.
"""
url = 'http://%s:%d/STATUS?file_id=%s' % (host, port, fileId)
logger.debug("Issuing STATUS request: %s" % (url))
try:
conn = urllib.request.urlopen(url, timeout=timeout)
except urllib.error.HTTPError:
raise FileNotFoundError
if conn.getcode() != http.HTTPStatus.OK:
raise Exception("Error while getting STATUS %s from %s:%d: %d %s" % (fileId, host, port, conn.getcode(), conn.msg))
dom = parseString(conn.read().decode())
stat = dict(dom.getElementsByTagName('NgamsStatus')[0].getElementsByTagName('DiskStatus')[0].getElementsByTagName('FileStatus')[0].attributes.items())
logger.debug("Returning status: %s" % (stat))
return stat
10 changes: 5 additions & 5 deletions daliuge-engine/run_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ case "$1" in
DOCKER_OPTS=${DOCKER_OPTS}"-v ${DLG_ROOT}:${DLG_ROOT} --env DLG_ROOT=${DLG_ROOT} "
echo "Running Engine deployment version in background..."
echo "docker run -td "${DOCKER_OPTS}" icrar/daliuge-engine:${VCS_TAG}"
docker run --net=host -td ${DOCKER_OPTS} icrar/daliuge-engine:${VCS_TAG}
exit 1
docker run -td ${DOCKER_OPTS} icrar/daliuge-engine:${VCS_TAG}
exit 0
fi;;
"dev")
DLG_ROOT="/tmp/.dlg"
Expand All @@ -37,7 +37,7 @@ case "$1" in
# docker run -td ${DOCKER_OPTS} icrar/dlg-engine:casa
sleep 3
./start_local_managers.sh
exit 1;;
exit 0;;
"casa")
DLG_ROOT="/tmp/.dlg"
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
Expand All @@ -52,8 +52,8 @@ case "$1" in
docker run -td ${DOCKER_OPTS} ${CONTAINER_NM}
sleep 3
./start_local_managers.sh
exit 1;;
exit 0;;
*)
echo "Usage run_engine.sh <dep|dev>"
exit 1;;
exit 0;;
esac
5 changes: 3 additions & 2 deletions daliuge-engine/test/apps/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ def test_refer_to_io_by_uid(self):
UIDs (in addition to their position in the list of inputs or outputs)
in the command-line.
"""
self._ngas_and_fs_io("echo -n '%iDataURL[a]' > %o[c]")
self._ngas_and_fs_io("echo -n '%iDataURL[HelloWorld.txt]' > %o[c]")

def _ngas_and_fs_io(self, command):
a = NgasDROP('a', 'a') # not a filesystem-related DROP, we can reference its URL in the command-line
a = NgasDROP('HelloWorld.txt', 'HelloWorld.txt') # not a filesystem-related DROP, we can reference its URL in the command-line
a.ngasSrv ='ngas.ddns.net'
b = DockerApp('b', 'b', image="ubuntu:14.04", command=command)
c = FileDROP('c', 'c')
b.addInput(a)
Expand Down
Loading

0 comments on commit ae366b2

Please sign in to comment.