From 3d640f78ed91a8b2bb9c9ea6728e9ed600208811 Mon Sep 17 00:00:00 2001 From: davepallot Date: Thu, 15 Jul 2021 16:21:09 +0800 Subject: [PATCH] LIU-66: Download logs from all running NM as a tar archive. --- daliuge-common/dlg/clients.py | 3 + daliuge-common/dlg/restutils.py | 13 ++-- daliuge-engine/dlg/manager/node_manager.py | 3 + daliuge-engine/dlg/manager/rest.py | 76 +++++++++++++++++++++- daliuge-engine/dlg/manager/session.py | 10 ++- 5 files changed, 96 insertions(+), 9 deletions(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index d1a447ba8..09bcaf9d9 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -159,6 +159,9 @@ def trigger_drops(self, sessionId, drop_uids): def shutdown_node_manager(self): self._GET('/shutdown') + def get_log_file(self, sessionId): + return self._request(f'/sessions/{sessionId}/logs', 'GET') + class CompositeManagerClient(BaseDROPManagerClient): def nodes(self): diff --git a/daliuge-common/dlg/restutils.py b/daliuge-common/dlg/restutils.py index 809af564b..55a013ee6 100644 --- a/daliuge-common/dlg/restutils.py +++ b/daliuge-common/dlg/restutils.py @@ -136,7 +136,8 @@ def _post_json(self, url, content, compress=False): return json.load(ret) if ret else None def _GET(self, url): - return self._request(url, 'GET') + stream, _ = self._request(url, 'GET') + return stream def _POST(self, url, content=None, content_type=None, compress=False): headers = {} @@ -149,10 +150,12 @@ def _POST(self, url, content=None, content_type=None, compress=False): if not hasattr(content, 'read'): content = io.BytesIO(content) content = common.ZlibCompressedStream(content) - return self._request(url, 'POST', content, headers) + stream, _ = self._request(url, 'POST', content, headers) + return stream def _DELETE(self, url): - return self._request(url, 'DELETE') + stream, _ = self._request(url, 'DELETE') + return stream def _request(self, url, method, content=None, headers={}): @@ -197,5 +200,5 @@ def _request(self, url, method, content=None, headers={}): raise ex if not self._resp.length: - return None - return codecs.getreader('utf-8')(self._resp) \ No newline at end of file + return None, None + return codecs.getreader('utf-8')(self._resp), self._resp \ No newline at end of file diff --git a/daliuge-engine/dlg/manager/node_manager.py b/daliuge-engine/dlg/manager/node_manager.py index 51818cbd1..247790e46 100644 --- a/daliuge-engine/dlg/manager/node_manager.py +++ b/daliuge-engine/dlg/manager/node_manager.py @@ -217,6 +217,9 @@ def getGraph(self, sessionId): self._check_session_id(sessionId) return self._sessions[sessionId].getGraph() + def getLogDir(self): + return self.logdir + def deploySession(self, sessionId, completedDrops=[]): self._check_session_id(sessionId) session = self._sessions[sessionId] diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index afa61e240..d0dba1d95 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -24,23 +24,29 @@ Data Managers (DROPManager and DataIslandManager) to the outside world. """ +import io +import os +import cgi import functools import json import logging import threading +import tarfile import bottle import pkg_resources +from bottle import static_file + from . import constants -from .client import NodeManagerClient +from .client import NodeManagerClient, DataIslandManagerClient from .. import utils from ..exceptions import InvalidGraphException, InvalidSessionState, \ DaliugeException, NoSessionException, SessionAlreadyExistsException, \ InvalidDropException, InvalidRelationshipException, SubManagerException from ..restserver import RestServer from ..restutils import RestClient, RestClientException - +from .session import generateLogFileName logger = logging.getLogger(__name__) @@ -54,6 +60,13 @@ def daliuge_aware(func): def fwrapper(*args, **kwargs): try: res = func(*args, **kwargs) + + if isinstance(res, bytes): + return res + + if isinstance(res, bottle.HTTPResponse): + return res + if res is not None: bottle.response.content_type = 'application/json' return json.dumps(res) @@ -120,6 +133,7 @@ def __init__(self, dm, maxreqsize=10): app.get( '/api/sessions', callback=self.getSessions) app.get( '/api/sessions/', callback=self.getSessionInformation) app.delete('/api/sessions/', callback=self.destroySession) + app.get( '/api/sessions//logs', callback=self.getLogFile) app.get( '/api/sessions//status', callback=self.getSessionStatus) app.post( '/api/sessions//deploy', callback=self.deploySession) app.post( '/api/sessions//cancel', callback=self.cancelSession) @@ -269,6 +283,14 @@ def getNMStatus(self): # future return {'sessions': self.sessions()} + @daliuge_aware + def getLogFile(self, sessionId): + logdir = self.dm.getLogDir() + logfile = generateLogFileName(logdir, sessionId) + if not os.path.isfile(logfile): + raise NoSessionException(sessionId, 'Log file not found.') + return static_file(os.path.basename(logfile), root=logdir, download=os.path.basename(logfile)) + @daliuge_aware def linkGraphParts(self, sessionId): params = bottle.request.params @@ -333,6 +355,9 @@ def getCMStatus(self): def getCMNodes(self): return self.dm.nodes + def getAllCMNodes(self): + return self.dm.nodes + @daliuge_aware def addCMNode(self, node): self.dm.add_node(node) @@ -348,6 +373,44 @@ def getNodeSessions(self, node): with NodeManagerClient(host=node) as dm: return dm.sessions() + def _tarfile_write(self, tar, headers, stream): + file_header = headers.getheader('Content-Disposition') + length = headers.getheader('Content-Length') + _, params = cgi.parse_header(file_header) + filename = params['filename'] + info = tarfile.TarInfo(filename) + info.size = int(length) + + content = [] + while True: + buffer = stream.read() + if not buffer: + break + content.append(buffer) + + tar.addfile(info, io.BytesIO(initial_bytes=''.join(content).encode())) + + @daliuge_aware + def getLogFile(self, sessionId): + fh = io.BytesIO() + with tarfile.open(fileobj=fh, mode='w:gz') as tar: + for node in self.getAllCMNodes(): + with NodeManagerClient(host=node) as dm: + try: + stream, resp = dm.get_log_file(sessionId) + self._tarfile_write(tar, resp, stream) + except NoSessionException: + pass + + + data = fh.getvalue() + size = len(data) + bottle.response.set_header('Content-type', 'application/x-tar') + bottle.response['Content-Disposition'] = f'attachment; ' \ + f'filename=dlg_{sessionId}.tar' + bottle.response['Content-Length'] = size + return data + @daliuge_aware def getNodeSessionInformation(self, node, sessionId): if node not in self.dm.nodes: @@ -404,4 +467,11 @@ def initializeSpecifics(self, app): def createDataIsland(self, host): with RestClient(host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10) as c: c._post_json('/managers/dataisland', bottle.request.body.read()) - self.dm.addDmHost(host) \ No newline at end of file + self.dm.addDmHost(host) + + def getAllCMNodes(self): + nodes = [] + for node in self.dm.dmHosts: + with DataIslandManagerClient(host=node) as dm: + nodes += dm.nodes() + return nodes \ No newline at end of file diff --git a/daliuge-engine/dlg/manager/session.py b/daliuge-engine/dlg/manager/session.py index a5730b0b0..3b088f561 100644 --- a/daliuge-engine/dlg/manager/session.py +++ b/daliuge-engine/dlg/manager/session.py @@ -28,6 +28,7 @@ import logging import threading import time +import socket from . import constants from .. import droputils @@ -67,6 +68,13 @@ def handleEvent(self, evt): track_current_session = utils.object_tracking('session') + +def generateLogFileName(logdir, sessionId): + hostname = socket.gethostname() + ip = socket.gethostbyname(hostname) + return f'{logdir}/dlg_{ip}_{sessionId}.log' + + class Session(object): """ A DROP graph execution. @@ -114,7 +122,7 @@ def filter(self, record): logdir = utils.getDlgLogsDir() if self._nm is not None: logdir = self._nm.logdir - logfile = '%s/dlg_%s' % (logdir, self.sessionId,) + logfile = generateLogFileName(logdir, self.sessionId) self.file_handler = logging.FileHandler(logfile) self.file_handler.setFormatter(fmt) self.file_handler.addFilter(SessionFilter(self.sessionId))