Skip to content

Commit

Permalink
Merge 3d640f7 into 9d8ce20
Browse files Browse the repository at this point in the history
  • Loading branch information
davepallot committed Jul 15, 2021
2 parents 9d8ce20 + 3d640f7 commit daf5488
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 9 deletions.
3 changes: 3 additions & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def trigger_drops(self, sessionId, drop_uids):
def shutdown_node_manager(self):
self._GET('/shutdown')

def get_log_file(self, sessionId):
return self._request(f'/sessions/{sessionId}/logs', 'GET')

class CompositeManagerClient(BaseDROPManagerClient):

def nodes(self):
Expand Down
13 changes: 8 additions & 5 deletions daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def _post_json(self, url, content, compress=False):
return json.load(ret) if ret else None

def _GET(self, url):
return self._request(url, 'GET')
stream, _ = self._request(url, 'GET')
return stream

def _POST(self, url, content=None, content_type=None, compress=False):
headers = {}
Expand All @@ -149,10 +150,12 @@ def _POST(self, url, content=None, content_type=None, compress=False):
if not hasattr(content, 'read'):
content = io.BytesIO(content)
content = common.ZlibCompressedStream(content)
return self._request(url, 'POST', content, headers)
stream, _ = self._request(url, 'POST', content, headers)
return stream

def _DELETE(self, url):
return self._request(url, 'DELETE')
stream, _ = self._request(url, 'DELETE')
return stream

def _request(self, url, method, content=None, headers={}):

Expand Down Expand Up @@ -197,5 +200,5 @@ def _request(self, url, method, content=None, headers={}):
raise ex

if not self._resp.length:
return None
return codecs.getreader('utf-8')(self._resp)
return None, None
return codecs.getreader('utf-8')(self._resp), self._resp
3 changes: 3 additions & 0 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ def getGraph(self, sessionId):
self._check_session_id(sessionId)
return self._sessions[sessionId].getGraph()

def getLogDir(self):
return self.logdir

def deploySession(self, sessionId, completedDrops=[]):
self._check_session_id(sessionId)
session = self._sessions[sessionId]
Expand Down
76 changes: 73 additions & 3 deletions daliuge-engine/dlg/manager/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,29 @@
Data Managers (DROPManager and DataIslandManager) to the outside world.
"""

import io
import os
import cgi
import functools
import json
import logging
import threading
import tarfile

import bottle
import pkg_resources

from bottle import static_file

from . import constants
from .client import NodeManagerClient
from .client import NodeManagerClient, DataIslandManagerClient
from .. import utils
from ..exceptions import InvalidGraphException, InvalidSessionState, \
DaliugeException, NoSessionException, SessionAlreadyExistsException, \
InvalidDropException, InvalidRelationshipException, SubManagerException
from ..restserver import RestServer
from ..restutils import RestClient, RestClientException

from .session import generateLogFileName

logger = logging.getLogger(__name__)

Expand All @@ -54,6 +60,13 @@ def daliuge_aware(func):
def fwrapper(*args, **kwargs):
try:
res = func(*args, **kwargs)

if isinstance(res, bytes):
return res

if isinstance(res, bottle.HTTPResponse):
return res

if res is not None:
bottle.response.content_type = 'application/json'
return json.dumps(res)
Expand Down Expand Up @@ -120,6 +133,7 @@ def __init__(self, dm, maxreqsize=10):
app.get( '/api/sessions', callback=self.getSessions)
app.get( '/api/sessions/<sessionId>', callback=self.getSessionInformation)
app.delete('/api/sessions/<sessionId>', callback=self.destroySession)
app.get( '/api/sessions/<sessionId>/logs', callback=self.getLogFile)
app.get( '/api/sessions/<sessionId>/status', callback=self.getSessionStatus)
app.post( '/api/sessions/<sessionId>/deploy', callback=self.deploySession)
app.post( '/api/sessions/<sessionId>/cancel', callback=self.cancelSession)
Expand Down Expand Up @@ -269,6 +283,14 @@ def getNMStatus(self):
# future
return {'sessions': self.sessions()}

@daliuge_aware
def getLogFile(self, sessionId):
logdir = self.dm.getLogDir()
logfile = generateLogFileName(logdir, sessionId)
if not os.path.isfile(logfile):
raise NoSessionException(sessionId, 'Log file not found.')
return static_file(os.path.basename(logfile), root=logdir, download=os.path.basename(logfile))

@daliuge_aware
def linkGraphParts(self, sessionId):
params = bottle.request.params
Expand Down Expand Up @@ -333,6 +355,9 @@ def getCMStatus(self):
def getCMNodes(self):
return self.dm.nodes

def getAllCMNodes(self):
return self.dm.nodes

@daliuge_aware
def addCMNode(self, node):
self.dm.add_node(node)
Expand All @@ -348,6 +373,44 @@ def getNodeSessions(self, node):
with NodeManagerClient(host=node) as dm:
return dm.sessions()

def _tarfile_write(self, tar, headers, stream):
file_header = headers.getheader('Content-Disposition')
length = headers.getheader('Content-Length')
_, params = cgi.parse_header(file_header)
filename = params['filename']
info = tarfile.TarInfo(filename)
info.size = int(length)

content = []
while True:
buffer = stream.read()
if not buffer:
break
content.append(buffer)

tar.addfile(info, io.BytesIO(initial_bytes=''.join(content).encode()))

@daliuge_aware
def getLogFile(self, sessionId):
fh = io.BytesIO()
with tarfile.open(fileobj=fh, mode='w:gz') as tar:
for node in self.getAllCMNodes():
with NodeManagerClient(host=node) as dm:
try:
stream, resp = dm.get_log_file(sessionId)
self._tarfile_write(tar, resp, stream)
except NoSessionException:
pass


data = fh.getvalue()
size = len(data)
bottle.response.set_header('Content-type', 'application/x-tar')
bottle.response['Content-Disposition'] = f'attachment; ' \
f'filename=dlg_{sessionId}.tar'
bottle.response['Content-Length'] = size
return data

@daliuge_aware
def getNodeSessionInformation(self, node, sessionId):
if node not in self.dm.nodes:
Expand Down Expand Up @@ -404,4 +467,11 @@ def initializeSpecifics(self, app):
def createDataIsland(self, host):
with RestClient(host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10) as c:
c._post_json('/managers/dataisland', bottle.request.body.read())
self.dm.addDmHost(host)
self.dm.addDmHost(host)

def getAllCMNodes(self):
nodes = []
for node in self.dm.dmHosts:
with DataIslandManagerClient(host=node) as dm:
nodes += dm.nodes()
return nodes
10 changes: 9 additions & 1 deletion daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import logging
import threading
import time
import socket

from . import constants
from .. import droputils
Expand Down Expand Up @@ -67,6 +68,13 @@ def handleEvent(self, evt):

track_current_session = utils.object_tracking('session')


def generateLogFileName(logdir, sessionId):
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
return f'{logdir}/dlg_{ip}_{sessionId}.log'


class Session(object):
"""
A DROP graph execution.
Expand Down Expand Up @@ -114,7 +122,7 @@ def filter(self, record):
logdir = utils.getDlgLogsDir()
if self._nm is not None:
logdir = self._nm.logdir
logfile = '%s/dlg_%s' % (logdir, self.sessionId,)
logfile = generateLogFileName(logdir, self.sessionId)
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
self.file_handler.addFilter(SessionFilter(self.sessionId))
Expand Down

0 comments on commit daf5488

Please sign in to comment.