Skip to content

Commit

Permalink
additional changes to make ngaslite and NgasDROP read an existing fil…
Browse files Browse the repository at this point in the history
…e from NGAS
  • Loading branch information
awicenec committed Apr 12, 2021
1 parent ae5276d commit a06e313
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.ray
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ EXPOSE 8084
EXPOSE 9000


CMD ["dlg", "daemon", "-vv", "--no-nm"]
CMD ["dlg", "daemon", "-vv"]
4 changes: 3 additions & 1 deletion daliuge-engine/dlg/apps/archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ def store(self, inDrop):
raise Exception("ContainerDROPs are not supported as inputs for this application")

if inDrop.size is None or inDrop.size < 0:
logger.error("NGAS requires content-length to be know, but the given input does not provide a size.")
logger.error(
"NGAS requires content-length to be know, but the given input does not provide a size.")
size = None
else:
size = inDrop.size
logger.debug("Content-length %s", size)
Expand Down
11 changes: 8 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,7 @@ class NgasDROP(AbstractDROP):
'''
ngasSrv = dlg_string_param('ngasSrv', 'localhost')
ngasPort = dlg_int_param('ngasPort', 7777)
ngasFileId = dlg_string_param('ngasFileId', None)
ngasTimeout = dlg_int_param('ngasTimeout', 2)
ngasConnectTimeout = dlg_int_param('ngasConnectTimeout', 2)
ngasMime = dlg_string_param('ngasMime', 'application/octet-stream')
Expand All @@ -1157,20 +1158,24 @@ class NgasDROP(AbstractDROP):
def initialize(self, **kwargs):
if self.len == -1:
self.len = self._size
if self.ngasFileId:
self.fileId = self.ngasFileId
else:
self.fileId = self.uid

def getIO(self):
try:
ngasIO = NgasIO(self.ngasSrv, self.uid, self.ngasPort,
ngasIO = NgasIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
except ImportError:
logger.warning('NgasIO not available, using NgasLiteIO instead')
ngasIO = NgasLiteIO(self.ngasSrv, self.uid, self.ngasPort,
ngasIO = NgasLiteIO(self.ngasSrv, self.fileId, self.ngasPort,
self.ngasConnectTimeout, self.ngasTimeout, self.len, mimeType=self.ngasMime)
return ngasIO

@property
def dataURL(self):
return "ngas://%s:%d/%s" % (self.ngasSrv, self.ngasPort, self.uid)
return "ngas://%s:%d/%s" % (self.ngasSrv, self.ngasPort, self.fileId)


class InMemoryDROP(AbstractDROP):
Expand Down
6 changes: 5 additions & 1 deletion daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,16 @@ def copyDropContents(source, target, bufsize=4096):
'''
Manually copies data from one DROP into another, in bufsize steps
'''
logger.debug("Copying from %r to %r" % (source, target))
desc = source.open()
read = source.read
buf = read(desc, bufsize)
logger.debug("Read %d bytes from %r" % (len(buf), source))
while buf:
target.write(buf)
logger.debug("Wrote %d bytes to %r" % (len(buf), target))
buf = read(desc, bufsize)
logger.debug("Read %d bytes from %r" % (len(buf), source))
source.close(desc)

def getUpstreamObjects(drop):
Expand Down Expand Up @@ -388,4 +392,4 @@ def replace_dataurl_placeholders(cmd, inputs, outputs):

# Easing the transition from single- to multi-package
get_leaves = common.get_leaves
get_roots = common.get_roots
get_roots = common.get_roots
17 changes: 9 additions & 8 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from six import BytesIO
import six.moves.urllib.parse as urlparse # @UnresolvedImport
import six.moves.urllib.request as urlrequest

from . import ngaslite

Expand Down Expand Up @@ -335,8 +336,8 @@ def _is_length_unknown(self):
return self._length is None or self._length < 0

def _getClient(self):
from ngamsPClient import ngamsPClient # @UnresolvedImport
return ngamsPClient.ngamsPClient(self._ngasSrv, self._ngasPort, self._ngasTimeout)
return ngaslite.open(
self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout)

def _open(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
Expand All @@ -347,10 +348,9 @@ def _open(self, **kwargs):
# when finishArchive is called.
self._buf = b''
self._writtenDataSize = 0
return ngaslite.beginArchive(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout, \
length=self._length, mimeType=self._mimeType)
return self._getClient()
else:
return ngaslite.retrieve(self._ngasSrv, self._fileId, port=self._ngasPort, timeout=self._ngasTimeout)
return self._getClient()

def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
Expand All @@ -367,8 +367,8 @@ def _close(self, **kwargs):
response = self._desc
response.close()

def _read(self, count, **kwargs):
self._desc.read(count)
def _read(self, count=4096, **kwargs):
return self._desc.read(count)

def _write(self, data, **kwargs):
if self._is_length_unknown():
Expand Down Expand Up @@ -403,10 +403,11 @@ def IOForURL(url):
networkLocation = url.netloc
if ':' in networkLocation:
hostname, port = networkLocation.split(':')
port = int(port)
else:
hostname = networkLocation
port = 7777
fileId = url.path
fileId = url.path[1:] # remove the trailing slash
try:
io = NgasIO(hostname, fileId, port)
except:
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def addCommonOptions(parser, defaultPort):
parser.add_option( "--cwd", action="store_true",
dest="cwd", help="Short for '-w .'", default=False)
parser.add_option("-w", "--work-dir",
help="Working directory, defaults to '/' in daemon mode, '.' in interactive mode", default=None)
help="Working directory, defaults to '/' in daemon mode, '.' in interactive mode", default=utils.getDlgWorkDir())
parser.add_option("-s", "--stop", action="store_true",
dest="stop", help="Stop an instance running as daemon", default=False)
parser.add_option( "--status", action="store_true",
Expand Down Expand Up @@ -332,4 +332,4 @@ def dlgReplay(parser, args):
options.dmAcronym = 'RP'
options.restType = ReplayManagerServer

start(options, parser)
start(options, parser)
24 changes: 17 additions & 7 deletions daliuge-engine/dlg/ngaslite.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,17 @@
'''

import six.moves.http_client as httplib # @UnresolvedImport
import six.moves.urllib.request as urlrequest
import logging

logger = logging.getLogger(__name__)


def open(host, fileId, port=7777, timeout=None):
url = 'http://%s:%d/RETRIEVE?file_id=%s' % (host, port, fileId)
logger.debug("Issuing RETRIEVE request: %s" % (url))
conn = urlrequest.urlopen(url)
return conn

def retrieve(host, fileId, port=7777, timeout=None):
"""
Expand All @@ -38,12 +48,12 @@ def retrieve(host, fileId, port=7777, timeout=None):
This method returns a file-like object that supports the `read` operation,
and over which `close` must be invoked once no more data is read from it.
"""
conn = httplib.HTTPConnection(host, port, timeout=timeout)
conn.request('GET', '/RETRIEVE?file_id=' + fileId)
response = conn.getresponse()
if response.status != httplib.OK:
raise Exception("Error while RETRIEVE-ing %s from %s:%d: %d %s" % (fileId, host, port, response.status, response.msg))
return response
url = 'http://%s:%d/RETRIEVE?file_id=%s' % (host, port, fileId)
logger.debug("Issuing RETRIEVE request: %s" % (url))
conn = urlrequest.urlopen(url)
if conn.status != httplib.OK:
raise Exception("Error while RETRIEVE-ing %s from %s:%d: %d %s" % (fileId, host, port, conn.status, conn.msg))
return conn

def beginArchive(host, fileId, port=7777, timeout=0, length=-1, mimeType='application/octet-stream'):
"""
Expand All @@ -70,4 +80,4 @@ def finishArchive(conn, fileId):
"""
response = conn.getresponse()
if response.status != httplib.OK:
raise Exception("Error while QARCHIVE-ing %s to %s:%d: %d %s" % (fileId, conn.host, conn.port, response.status, response.msg))
raise Exception("Error while QARCHIVE-ing %s to %s:%d: %d %s" % (fileId, conn.host, conn.port, response.status, response.msg))
10 changes: 9 additions & 1 deletion daliuge-engine/dlg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ def getDlgLogsDir():
"""
return os.path.join(getDlgDir(), 'logs')

def getDlgWorkDir():
"""
Returns the location of the directory used by the DALiuGE framework to store
its logs. If `createIfMissing` is True, the directory will be created if it
currently doesn't exist
"""
return os.path.join(getDlgDir(), 'workspace')

def createDirIfMissing(path):
"""
Creates the given directory if it doesn't exist
Expand Down Expand Up @@ -433,4 +441,4 @@ def wait(self):
write_to = common.write_to

JSONStream = common.JSONStream
ZlibCompressedStream = common.ZlibCompressedStream
ZlibCompressedStream = common.ZlibCompressedStream
4 changes: 3 additions & 1 deletion daliuge-engine/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ EXPOSE 9000
# enable the virtualenv path from daliuge-common
ENV VIRTUAL_ENV=/home/ray/dlg
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
ENV DLG_ROOT="/var/dlg_home"
RUN sudo apt-get update && sudo apt-get install -y curl

CMD ["dlg", "daemon", "-vv", "--no-nm"]
CMD ["dlg", "daemon", "-vv"]
1 change: 1 addition & 0 deletions daliuge-engine/run_engine.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
docker run --rm -td --name daliuge-engine -v /var/run/docker.sock:/var/run/docker.sock -p 5555:5555 -p 6666:6666 -p 8000:8000 -p 8001:8001 -p 8002:8002 -p 9000:9000 icrar/daliuge-engine:ray
sleep 2
./start_local_managers.sh
5 changes: 2 additions & 3 deletions daliuge-engine/start_local_managers.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Script starts a node manager and a data island manager on the local node. Useful mainly for testing.
# Script starts a data island manager on the local node. Useful mainly for testing.

docker exec daliuge-engine bash -c 'dlg nm -vvd --no-dlm -H 0.0.0.0 -w /var/dlg_home/workspace -l /var/dlg_home/logs'
docker exec -ti daliuge-engine bash -c 'dlg dim -N localhost -vvd -H 0.0.0.0 -w /var/dlg_home/workspace -l /var/dlg_home/logs'
docker exec daliuge-engine bash -c 'source /home/ray/dlg/bin/activate && curl -sd '\''{"nodes": ["localhost"]}'\'' -H "Content-Type: application/json" -X POST http://localhost:9000/managers/dataisland'
2 changes: 1 addition & 1 deletion scripts/start_dim.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# This script starts the island-manager on an already running daliuge-engine docker container.
echo "Starting the DALiuGE Data Island manager"
docker exec -ti daliuge-engine bash -c "dlg dim -d -H 0.0.0.0 -N localhost"
docker exec -ti daliuge-engine bash -c "dlg dim -d -H 0.0.0.0 -N localhost -w /var/dlg_home/workspace -l /var/dlg_home/logs"
2 changes: 1 addition & 1 deletion scripts/start_nm.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# This script starts the node-manager on an already running daliuge-engine docker container.
echo "Starting the DALiuGE Node manager in interactive mode:"
docker exec -ti daliuge-engine bash -c "dlg nm -vv -H 0.0.0.0 --dlg-path=/var/dlg_home/code"
docker exec -ti daliuge-engine bash -c "dlg nm -vv -H 0.0.0.0 --dlg-path=/var/dlg_home/code -w /var/dlg_home/workspace -l /var/dlg_home/logs"
2 changes: 1 addition & 1 deletion scripts/stop_dim.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# This script stops the island-manager on an already running daliuge-engine docker container.
echo "Starting the DALiuGE Data Island manager"
echo "Stopping the DALiuGE Data Island manager"
docker exec -ti daliuge-engine bash -c "dlg dim -s"
4 changes: 2 additions & 2 deletions scripts/stop_nm_daemon.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This script stops the node-manager on an already running daliuge-engine docker container.
# NOTE: This only applies to a node manager started in daemon mode.
echo "Starting the DALiuGE Node manager in interactive mode:"
docker exec -ti daliuge-engine bash -c "dlg nm -s"
echo "Stopping the DALiuGE Node manager in interactive mode:"
docker exec daliuge-engine bash -c "dlg nm -s"

0 comments on commit a06e313

Please sign in to comment.