Skip to content

Commit

Permalink
Merge pull request #51 from ICRAR/master
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
awicenec committed Jul 20, 2021
2 parents 2f6e318 + 90f4dd9 commit e6cf2f6
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 16 deletions.
3 changes: 3 additions & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def trigger_drops(self, sessionId, drop_uids):
def shutdown_node_manager(self):
self._GET('/shutdown')

def get_log_file(self, sessionId):
return self._request(f'/sessions/{sessionId}/logs', 'GET')

class CompositeManagerClient(BaseDROPManagerClient):

def nodes(self):
Expand Down
13 changes: 8 additions & 5 deletions daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def _post_json(self, url, content, compress=False):
return json.load(ret) if ret else None

def _GET(self, url):
return self._request(url, 'GET')
stream, _ = self._request(url, 'GET')
return stream

def _POST(self, url, content=None, content_type=None, compress=False):
headers = {}
Expand All @@ -149,10 +150,12 @@ def _POST(self, url, content=None, content_type=None, compress=False):
if not hasattr(content, 'read'):
content = io.BytesIO(content)
content = common.ZlibCompressedStream(content)
return self._request(url, 'POST', content, headers)
stream, _ = self._request(url, 'POST', content, headers)
return stream

def _DELETE(self, url):
return self._request(url, 'DELETE')
stream, _ = self._request(url, 'DELETE')
return stream

def _request(self, url, method, content=None, headers={}):

Expand Down Expand Up @@ -197,5 +200,5 @@ def _request(self, url, method, content=None, headers={}):
raise ex

if not self._resp.length:
return None
return codecs.getreader('utf-8')(self._resp)
return None, None
return codecs.getreader('utf-8')(self._resp), self._resp
52 changes: 51 additions & 1 deletion daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,56 @@ def run(self):
o.len = len(content)
o.write(content) # send content to all outputs

##
# @brief GenericSplitApp\n
# @details An APP that splits about any object that can be converted to a numpy array
# into as many parts as the app has outputs, provided that the initially converted numpy
# array has enough elements. The return will be a numpy array of arrays, where the first
# axis is of length len(outputs). The modulo remainder of the length of the original array and
# the number of outputs will be distributed across the first len(outputs)-1 elements of the
# resulting array.
# @par EAGLE_START
# @param gitrepo $(GIT_REPO)
# @param version $(PROJECT_VERSION)
# @param category PythonApp
# @param[in] param/appclass/dlg.apps.simple.GenericSplitApp/String/readonly
# \~English Application class\n
# @param[out] port/content
# \~English The port carrying the content read from the URL.
# @par EAGLE_END


class GenericScatterApp(BarrierAppDROP):
"""
An APP that splits an object that has a len attribute into <numSplit> parts and
returns a numpy array of arrays, where the first axis is of length <numSplit>.
"""
compontent_meta = dlg_component('GenericScatterApp', 'Scatter an array like object into numSplit parts',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def initialize(self, **kwargs):
super(GenericScatterApp, self).initialize(**kwargs)

def run(self):
# split it as many times as we have outputs
numSplit = len(self.outputs)
inpArray = pickle.loads(droputils.allDropContents(self.inputs[0]))
try: # just checking whether the object is some object that can be used as an array
nObj = np.array(inpArray)
except:
raise
try:
result = np.array_split(nObj, numSplit)
except IndexError as err:
raise err
for i in range(numSplit):
o = self.outputs[i]
d = pickle.dumps(result[i])
o.len = len(d)
o.write(d) # average across inputs

class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

Expand All @@ -385,4 +435,4 @@ def run(self):
pass

def condition(self):
return self.result
return self.result
3 changes: 3 additions & 0 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ def getGraph(self, sessionId):
self._check_session_id(sessionId)
return self._sessions[sessionId].getGraph()

def getLogDir(self):
return self.logdir

def deploySession(self, sessionId, completedDrops=[]):
self._check_session_id(sessionId)
session = self._sessions[sessionId]
Expand Down
74 changes: 71 additions & 3 deletions daliuge-engine/dlg/manager/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,29 @@
Data Managers (DROPManager and DataIslandManager) to the outside world.
"""

import cgi
import functools
import io
import json
import logging
import os
import tarfile
import threading

import bottle
import pkg_resources

from bottle import static_file

from . import constants
from .client import NodeManagerClient
from .client import NodeManagerClient, DataIslandManagerClient
from .. import utils
from ..exceptions import InvalidGraphException, InvalidSessionState, \
DaliugeException, NoSessionException, SessionAlreadyExistsException, \
InvalidDropException, InvalidRelationshipException, SubManagerException
from ..restserver import RestServer
from ..restutils import RestClient, RestClientException

from .session import generateLogFileName

logger = logging.getLogger(__name__)

Expand All @@ -54,6 +60,10 @@ def daliuge_aware(func):
def fwrapper(*args, **kwargs):
try:
res = func(*args, **kwargs)

if isinstance(res, (bytes, bottle.HTTPResponse)):
return res

if res is not None:
bottle.response.content_type = 'application/json'
return json.dumps(res)
Expand Down Expand Up @@ -120,6 +130,7 @@ def __init__(self, dm, maxreqsize=10):
app.get( '/api/sessions', callback=self.getSessions)
app.get( '/api/sessions/<sessionId>', callback=self.getSessionInformation)
app.delete('/api/sessions/<sessionId>', callback=self.destroySession)
app.get( '/api/sessions/<sessionId>/logs', callback=self.getLogFile)
app.get( '/api/sessions/<sessionId>/status', callback=self.getSessionStatus)
app.post( '/api/sessions/<sessionId>/deploy', callback=self.deploySession)
app.post( '/api/sessions/<sessionId>/cancel', callback=self.cancelSession)
Expand Down Expand Up @@ -269,6 +280,14 @@ def getNMStatus(self):
# future
return {'sessions': self.sessions()}

@daliuge_aware
def getLogFile(self, sessionId):
logdir = self.dm.getLogDir()
logfile = generateLogFileName(logdir, sessionId)
if not os.path.isfile(logfile):
raise NoSessionException(sessionId, 'Log file not found.')
return static_file(os.path.basename(logfile), root=logdir, download=os.path.basename(logfile))

@daliuge_aware
def linkGraphParts(self, sessionId):
params = bottle.request.params
Expand Down Expand Up @@ -333,6 +352,9 @@ def getCMStatus(self):
def getCMNodes(self):
return self.dm.nodes

def getAllCMNodes(self):
return self.dm.nodes

@daliuge_aware
def addCMNode(self, node):
self.dm.add_node(node)
Expand All @@ -348,6 +370,45 @@ def getNodeSessions(self, node):
with NodeManagerClient(host=node) as dm:
return dm.sessions()

def _tarfile_write(self, tar, headers, stream):
file_header = headers.getheader('Content-Disposition')
length = headers.getheader('Content-Length')
_, params = cgi.parse_header(file_header)
filename = params['filename']
info = tarfile.TarInfo(filename)
info.size = int(length)

content = []
while True:
buffer = stream.read()
if not buffer:
break
content.append(buffer)

tar.addfile(info, io.BytesIO(initial_bytes=''.join(content).encode()))


@daliuge_aware
def getLogFile(self, sessionId):
fh = io.BytesIO()
with tarfile.open(fileobj=fh, mode='w:gz') as tar:
for node in self.getAllCMNodes():
with NodeManagerClient(host=node) as dm:
try:
stream, resp = dm.get_log_file(sessionId)
self._tarfile_write(tar, resp, stream)
except NoSessionException:
pass


data = fh.getvalue()
size = len(data)
bottle.response.set_header('Content-type', 'application/x-tar')
bottle.response['Content-Disposition'] = f'attachment; ' \
f'filename=dlg_{sessionId}.tar'
bottle.response['Content-Length'] = size
return data

@daliuge_aware
def getNodeSessionInformation(self, node, sessionId):
if node not in self.dm.nodes:
Expand Down Expand Up @@ -404,4 +465,11 @@ def initializeSpecifics(self, app):
def createDataIsland(self, host):
with RestClient(host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10) as c:
c._post_json('/managers/dataisland', bottle.request.body.read())
self.dm.addDmHost(host)
self.dm.addDmHost(host)

def getAllCMNodes(self):
nodes = []
for node in self.dm.dmHosts:
with DataIslandManagerClient(host=node) as dm:
nodes += dm.nodes()
return nodes
10 changes: 9 additions & 1 deletion daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import logging
import threading
import time
import socket

from . import constants
from .. import droputils
Expand Down Expand Up @@ -67,6 +68,13 @@ def handleEvent(self, evt):

track_current_session = utils.object_tracking('session')


def generateLogFileName(logdir, sessionId):
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
return f'{logdir}/dlg_{ip}_{sessionId}.log'


class Session(object):
"""
A DROP graph execution.
Expand Down Expand Up @@ -114,7 +122,7 @@ def filter(self, record):
logdir = utils.getDlgLogsDir()
if self._nm is not None:
logdir = self._nm.logdir
logfile = '%s/dlg_%s' % (logdir, self.sessionId,)
logfile = generateLogFileName(logdir, self.sessionId)
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
self.file_handler.addFilter(SessionFilter(self.sessionId))
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/manager/web/static/css/progressBar.css
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@

.node.error :first-child, rect.error {
fill: #e44f33;
}
}
20 changes: 18 additions & 2 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import os
import pickle
import unittest
from numpy import random, mean, array
from numpy import random, mean, array, concatenate


from dlg import droputils
from dlg.droputils import DROPWaiterCtx
from dlg.apps.simple import SleepApp, CopyApp, SleepAndCopyApp
from dlg.apps.simple import GenericScatterApp, SleepApp, CopyApp, SleepAndCopyApp
from dlg.apps.simple import RandomArrayApp, AverageArraysApp, HelloWorldApp
from dlg.ddap_protocol import DROPStates
from dlg.drop import NullDROP, InMemoryDROP, FileDROP, NgasDROP
Expand Down Expand Up @@ -149,6 +149,22 @@ def test_ngasio(self):
self._test_graph_runs((nd_in,b,i,nd_out),nd_in, nd_out, timeout=4)
self.assertEqual(b"Hello World", droputils.allDropContents(i))

def test_genericScatter(self):
data_in = random.randint(0, 100, size=100)
b = InMemoryDROP('b', 'b')
b.write(pickle.dumps(data_in))
s = GenericScatterApp('s', 's')
s.addInput(b)
o1 = InMemoryDROP('o1', 'o1')
o2 = InMemoryDROP('o2', 'o2')
for x in o1, o2:
s.addOutput(x)
self._test_graph_runs((b, s, o1, o2), b, (o1, o2), timeout=4)

data1 = pickle.loads(droputils.allDropContents(o1))
data2 = pickle.loads(droputils.allDropContents(o2))
data_out = concatenate([data1, data2])
self.assertEqual(data_in.all(), data_out.all())



Expand Down
9 changes: 9 additions & 0 deletions daliuge-engine/test/manager/test_mm.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from dlg.manager.composite_manager import MasterManager
from dlg.manager.session import SessionStates
from dlg.testutils import ManagerStarter
from dlg.exceptions import NoSessionException
from test.manager import testutils


Expand Down Expand Up @@ -272,3 +273,11 @@ def test_fullRound(self):
testutils.delete(self, '/sessions/%s' % (sessionId), restPort)
sessions = testutils.get(self, '/sessions', restPort)
self.assertEqual(0, len(sessions))

# Check log should not exist
resp, _ = testutils._get('/sessions/not_exist/logs', 8000)
self.assertEqual(resp.status, 404)

# Check logs exist and there is content
resp, _ = testutils._get('/sessions/{sessionId}/logs', restPort)
self.assertGreater(int(resp.getheader('Content-Length')), 0)
8 changes: 5 additions & 3 deletions daliuge-engine/test/manager/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
from dlg import utils
import codecs


def get(test, url, port):
def _get(url, port):
conn = http.client.HTTPConnection('localhost', port, timeout=3)
conn.request('GET', '/api' + url)
res = conn.getresponse()
return conn.getresponse(), conn

def get(test, url, port):
res, conn = _get(url, port)
test.assertEqual(http.HTTPStatus.OK, res.status)
jsonRes = json.load(codecs.getreader('utf-8')(res))
res.close()
Expand Down

0 comments on commit e6cf2f6

Please sign in to comment.