From 3d640f78ed91a8b2bb9c9ea6728e9ed600208811 Mon Sep 17 00:00:00 2001 From: davepallot Date: Thu, 15 Jul 2021 16:21:09 +0800 Subject: [PATCH 1/4] LIU-66: Download logs from all running NM as a tar archive. --- daliuge-common/dlg/clients.py | 3 + daliuge-common/dlg/restutils.py | 13 ++-- daliuge-engine/dlg/manager/node_manager.py | 3 + daliuge-engine/dlg/manager/rest.py | 76 +++++++++++++++++++++- daliuge-engine/dlg/manager/session.py | 10 ++- 5 files changed, 96 insertions(+), 9 deletions(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index d1a447ba8..09bcaf9d9 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -159,6 +159,9 @@ def trigger_drops(self, sessionId, drop_uids): def shutdown_node_manager(self): self._GET('/shutdown') + def get_log_file(self, sessionId): + return self._request(f'/sessions/{sessionId}/logs', 'GET') + class CompositeManagerClient(BaseDROPManagerClient): def nodes(self): diff --git a/daliuge-common/dlg/restutils.py b/daliuge-common/dlg/restutils.py index 809af564b..55a013ee6 100644 --- a/daliuge-common/dlg/restutils.py +++ b/daliuge-common/dlg/restutils.py @@ -136,7 +136,8 @@ def _post_json(self, url, content, compress=False): return json.load(ret) if ret else None def _GET(self, url): - return self._request(url, 'GET') + stream, _ = self._request(url, 'GET') + return stream def _POST(self, url, content=None, content_type=None, compress=False): headers = {} @@ -149,10 +150,12 @@ def _POST(self, url, content=None, content_type=None, compress=False): if not hasattr(content, 'read'): content = io.BytesIO(content) content = common.ZlibCompressedStream(content) - return self._request(url, 'POST', content, headers) + stream, _ = self._request(url, 'POST', content, headers) + return stream def _DELETE(self, url): - return self._request(url, 'DELETE') + stream, _ = self._request(url, 'DELETE') + return stream def _request(self, url, method, content=None, headers={}): @@ -197,5 +200,5 @@ def _request(self, url, method, content=None, headers={}): raise ex if not self._resp.length: - return None - return codecs.getreader('utf-8')(self._resp) \ No newline at end of file + return None, None + return codecs.getreader('utf-8')(self._resp), self._resp \ No newline at end of file diff --git a/daliuge-engine/dlg/manager/node_manager.py b/daliuge-engine/dlg/manager/node_manager.py index 51818cbd1..247790e46 100644 --- a/daliuge-engine/dlg/manager/node_manager.py +++ b/daliuge-engine/dlg/manager/node_manager.py @@ -217,6 +217,9 @@ def getGraph(self, sessionId): self._check_session_id(sessionId) return self._sessions[sessionId].getGraph() + def getLogDir(self): + return self.logdir + def deploySession(self, sessionId, completedDrops=[]): self._check_session_id(sessionId) session = self._sessions[sessionId] diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index afa61e240..d0dba1d95 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -24,23 +24,29 @@ Data Managers (DROPManager and DataIslandManager) to the outside world. """ +import io +import os +import cgi import functools import json import logging import threading +import tarfile import bottle import pkg_resources +from bottle import static_file + from . import constants -from .client import NodeManagerClient +from .client import NodeManagerClient, DataIslandManagerClient from .. import utils from ..exceptions import InvalidGraphException, InvalidSessionState, \ DaliugeException, NoSessionException, SessionAlreadyExistsException, \ InvalidDropException, InvalidRelationshipException, SubManagerException from ..restserver import RestServer from ..restutils import RestClient, RestClientException - +from .session import generateLogFileName logger = logging.getLogger(__name__) @@ -54,6 +60,13 @@ def daliuge_aware(func): def fwrapper(*args, **kwargs): try: res = func(*args, **kwargs) + + if isinstance(res, bytes): + return res + + if isinstance(res, bottle.HTTPResponse): + return res + if res is not None: bottle.response.content_type = 'application/json' return json.dumps(res) @@ -120,6 +133,7 @@ def __init__(self, dm, maxreqsize=10): app.get( '/api/sessions', callback=self.getSessions) app.get( '/api/sessions/', callback=self.getSessionInformation) app.delete('/api/sessions/', callback=self.destroySession) + app.get( '/api/sessions//logs', callback=self.getLogFile) app.get( '/api/sessions//status', callback=self.getSessionStatus) app.post( '/api/sessions//deploy', callback=self.deploySession) app.post( '/api/sessions//cancel', callback=self.cancelSession) @@ -269,6 +283,14 @@ def getNMStatus(self): # future return {'sessions': self.sessions()} + @daliuge_aware + def getLogFile(self, sessionId): + logdir = self.dm.getLogDir() + logfile = generateLogFileName(logdir, sessionId) + if not os.path.isfile(logfile): + raise NoSessionException(sessionId, 'Log file not found.') + return static_file(os.path.basename(logfile), root=logdir, download=os.path.basename(logfile)) + @daliuge_aware def linkGraphParts(self, sessionId): params = bottle.request.params @@ -333,6 +355,9 @@ def getCMStatus(self): def getCMNodes(self): return self.dm.nodes + def getAllCMNodes(self): + return self.dm.nodes + @daliuge_aware def addCMNode(self, node): self.dm.add_node(node) @@ -348,6 +373,44 @@ def getNodeSessions(self, node): with NodeManagerClient(host=node) as dm: return dm.sessions() + def _tarfile_write(self, tar, headers, stream): + file_header = headers.getheader('Content-Disposition') + length = headers.getheader('Content-Length') + _, params = cgi.parse_header(file_header) + filename = params['filename'] + info = tarfile.TarInfo(filename) + info.size = int(length) + + content = [] + while True: + buffer = stream.read() + if not buffer: + break + content.append(buffer) + + tar.addfile(info, io.BytesIO(initial_bytes=''.join(content).encode())) + + @daliuge_aware + def getLogFile(self, sessionId): + fh = io.BytesIO() + with tarfile.open(fileobj=fh, mode='w:gz') as tar: + for node in self.getAllCMNodes(): + with NodeManagerClient(host=node) as dm: + try: + stream, resp = dm.get_log_file(sessionId) + self._tarfile_write(tar, resp, stream) + except NoSessionException: + pass + + + data = fh.getvalue() + size = len(data) + bottle.response.set_header('Content-type', 'application/x-tar') + bottle.response['Content-Disposition'] = f'attachment; ' \ + f'filename=dlg_{sessionId}.tar' + bottle.response['Content-Length'] = size + return data + @daliuge_aware def getNodeSessionInformation(self, node, sessionId): if node not in self.dm.nodes: @@ -404,4 +467,11 @@ def initializeSpecifics(self, app): def createDataIsland(self, host): with RestClient(host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10) as c: c._post_json('/managers/dataisland', bottle.request.body.read()) - self.dm.addDmHost(host) \ No newline at end of file + self.dm.addDmHost(host) + + def getAllCMNodes(self): + nodes = [] + for node in self.dm.dmHosts: + with DataIslandManagerClient(host=node) as dm: + nodes += dm.nodes() + return nodes \ No newline at end of file diff --git a/daliuge-engine/dlg/manager/session.py b/daliuge-engine/dlg/manager/session.py index a5730b0b0..3b088f561 100644 --- a/daliuge-engine/dlg/manager/session.py +++ b/daliuge-engine/dlg/manager/session.py @@ -28,6 +28,7 @@ import logging import threading import time +import socket from . import constants from .. import droputils @@ -67,6 +68,13 @@ def handleEvent(self, evt): track_current_session = utils.object_tracking('session') + +def generateLogFileName(logdir, sessionId): + hostname = socket.gethostname() + ip = socket.gethostbyname(hostname) + return f'{logdir}/dlg_{ip}_{sessionId}.log' + + class Session(object): """ A DROP graph execution. @@ -114,7 +122,7 @@ def filter(self, record): logdir = utils.getDlgLogsDir() if self._nm is not None: logdir = self._nm.logdir - logfile = '%s/dlg_%s' % (logdir, self.sessionId,) + logfile = generateLogFileName(logdir, self.sessionId) self.file_handler = logging.FileHandler(logfile) self.file_handler.setFormatter(fmt) self.file_handler.addFilter(SessionFilter(self.sessionId)) From 7359e3fe8ef0f0da60ab0d31f88c53efb0933a39 Mon Sep 17 00:00:00 2001 From: davepallot Date: Mon, 19 Jul 2021 11:54:07 +0800 Subject: [PATCH 2/4] LIU-66: reorder. --- daliuge-engine/dlg/manager/rest.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index d0dba1d95..aba8e1c9f 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -24,14 +24,14 @@ Data Managers (DROPManager and DataIslandManager) to the outside world. """ -import io -import os import cgi import functools +import io import json import logging -import threading +import os import tarfile +import threading import bottle import pkg_resources @@ -61,10 +61,7 @@ def fwrapper(*args, **kwargs): try: res = func(*args, **kwargs) - if isinstance(res, bytes): - return res - - if isinstance(res, bottle.HTTPResponse): + if isinstance(res, (bytes, bottle.HTTPResponse)): return res if res is not None: @@ -390,6 +387,7 @@ def _tarfile_write(self, tar, headers, stream): tar.addfile(info, io.BytesIO(initial_bytes=''.join(content).encode())) + @daliuge_aware def getLogFile(self, sessionId): fh = io.BytesIO() From 59221e36185814d062c5417ed374ae77c13d7ff5 Mon Sep 17 00:00:00 2001 From: davepallot Date: Mon, 19 Jul 2021 12:41:51 +0800 Subject: [PATCH 3/4] LIU-66: Add test for logs. --- daliuge-engine/test/manager/test_mm.py | 9 +++++++++ daliuge-engine/test/manager/testutils.py | 8 +++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/daliuge-engine/test/manager/test_mm.py b/daliuge-engine/test/manager/test_mm.py index 0b2a263cc..acc5c3b0d 100644 --- a/daliuge-engine/test/manager/test_mm.py +++ b/daliuge-engine/test/manager/test_mm.py @@ -34,6 +34,7 @@ from dlg.manager.composite_manager import MasterManager from dlg.manager.session import SessionStates from dlg.testutils import ManagerStarter +from dlg.exceptions import NoSessionException from test.manager import testutils @@ -272,3 +273,11 @@ def test_fullRound(self): testutils.delete(self, '/sessions/%s' % (sessionId), restPort) sessions = testutils.get(self, '/sessions', restPort) self.assertEqual(0, len(sessions)) + + # Check log should not exist + resp, _ = testutils._get('/sessions/not_exist/logs', 8000) + self.assertEqual(resp.status, 404) + + # Check logs exist and there is content + resp, _ = testutils._get('/sessions/{sessionId}/logs', restPort) + self.assertGreater(int(resp.getheader('Content-Length')), 0) diff --git a/daliuge-engine/test/manager/testutils.py b/daliuge-engine/test/manager/testutils.py index 64d5f0469..8da70bc6b 100644 --- a/daliuge-engine/test/manager/testutils.py +++ b/daliuge-engine/test/manager/testutils.py @@ -25,11 +25,13 @@ from dlg import utils import codecs - -def get(test, url, port): +def _get(url, port): conn = http.client.HTTPConnection('localhost', port, timeout=3) conn.request('GET', '/api' + url) - res = conn.getresponse() + return conn.getresponse(), conn + +def get(test, url, port): + res, conn = _get(url, port) test.assertEqual(http.HTTPStatus.OK, res.status) jsonRes = json.load(codecs.getreader('utf-8')(res)) res.close() From 503e058091e0fe0e3fc10ca2564f296257dd7041 Mon Sep 17 00:00:00 2001 From: awicenec Date: Mon, 19 Jul 2021 20:56:28 +0800 Subject: [PATCH 4/4] changes and tests required for GenericScatterApp --- daliuge-engine/dlg/apps/simple.py | 52 ++++++++++++++++++- .../manager/web/static/css/progressBar.css | 3 +- daliuge-engine/test/apps/test_simple.py | 20 ++++++- 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/daliuge-engine/dlg/apps/simple.py b/daliuge-engine/dlg/apps/simple.py index 3d2adac75..1af47ef1f 100644 --- a/daliuge-engine/dlg/apps/simple.py +++ b/daliuge-engine/dlg/apps/simple.py @@ -374,6 +374,56 @@ def run(self): o.len = len(content) o.write(content) # send content to all outputs +## +# @brief GenericSplitApp\n +# @details An APP that splits about any object that can be converted to a numpy array +# into as many parts as the app has outputs, provided that the initially converted numpy +# array has enough elements. The return will be a numpy array of arrays, where the first +# axis is of length len(outputs). The modulo remainder of the length of the original array and +# the number of outputs will be distributed across the first len(outputs)-1 elements of the +# resulting array. +# @par EAGLE_START +# @param gitrepo $(GIT_REPO) +# @param version $(PROJECT_VERSION) +# @param category PythonApp +# @param[in] param/appclass/dlg.apps.simple.GenericSplitApp/String/readonly +# \~English Application class\n +# @param[out] port/content +# \~English The port carrying the content read from the URL. +# @par EAGLE_END + + +class GenericScatterApp(BarrierAppDROP): + """ + An APP that splits an object that has a len attribute into parts and + returns a numpy array of arrays, where the first axis is of length . + """ + compontent_meta = dlg_component('GenericScatterApp', 'Scatter an array like object into numSplit parts', + [dlg_batch_input('binary/*', [])], + [dlg_batch_output('binary/*', [])], + [dlg_streaming_input('binary/*')]) + + def initialize(self, **kwargs): + super(GenericScatterApp, self).initialize(**kwargs) + + def run(self): + # split it as many times as we have outputs + numSplit = len(self.outputs) + inpArray = pickle.loads(droputils.allDropContents(self.inputs[0])) + try: # just checking whether the object is some object that can be used as an array + nObj = np.array(inpArray) + except: + raise + try: + result = np.array_split(nObj, numSplit) + except IndexError as err: + raise err + for i in range(numSplit): + o = self.outputs[i] + d = pickle.dumps(result[i]) + o.len = len(d) + o.write(d) # average across inputs + class SimpleBranch(BranchAppDrop, NullBarrierApp): """Simple branch app that is told the result of its condition""" @@ -385,4 +435,4 @@ def run(self): pass def condition(self): - return self.result \ No newline at end of file + return self.result diff --git a/daliuge-engine/dlg/manager/web/static/css/progressBar.css b/daliuge-engine/dlg/manager/web/static/css/progressBar.css index 09b18cb78..ce62aefc8 100644 --- a/daliuge-engine/dlg/manager/web/static/css/progressBar.css +++ b/daliuge-engine/dlg/manager/web/static/css/progressBar.css @@ -2,7 +2,6 @@ .node rect, .node polygon { stroke-width: 2.0px; stroke: #bbb; - height:100%; } /* DROP states */ @@ -45,4 +44,4 @@ .node.error :first-child, rect.error { fill: #e44f33; - } \ No newline at end of file + } diff --git a/daliuge-engine/test/apps/test_simple.py b/daliuge-engine/test/apps/test_simple.py index c577083d2..4b9907d0a 100644 --- a/daliuge-engine/test/apps/test_simple.py +++ b/daliuge-engine/test/apps/test_simple.py @@ -22,12 +22,12 @@ import os import pickle import unittest -from numpy import random, mean, array +from numpy import random, mean, array, concatenate from dlg import droputils from dlg.droputils import DROPWaiterCtx -from dlg.apps.simple import SleepApp, CopyApp, SleepAndCopyApp +from dlg.apps.simple import GenericScatterApp, SleepApp, CopyApp, SleepAndCopyApp from dlg.apps.simple import RandomArrayApp, AverageArraysApp, HelloWorldApp from dlg.ddap_protocol import DROPStates from dlg.drop import NullDROP, InMemoryDROP, FileDROP, NgasDROP @@ -149,6 +149,22 @@ def test_ngasio(self): self._test_graph_runs((nd_in,b,i,nd_out),nd_in, nd_out, timeout=4) self.assertEqual(b"Hello World", droputils.allDropContents(i)) + def test_genericScatter(self): + data_in = random.randint(0, 100, size=100) + b = InMemoryDROP('b', 'b') + b.write(pickle.dumps(data_in)) + s = GenericScatterApp('s', 's') + s.addInput(b) + o1 = InMemoryDROP('o1', 'o1') + o2 = InMemoryDROP('o2', 'o2') + for x in o1, o2: + s.addOutput(x) + self._test_graph_runs((b, s, o1, o2), b, (o1, o2), timeout=4) + + data1 = pickle.loads(droputils.allDropContents(o1)) + data2 = pickle.loads(droputils.allDropContents(o2)) + data_out = concatenate([data1, data2]) + self.assertEqual(data_in.all(), data_out.all())