Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIU-66: Download logs from all running NM as a tar archive. #46

Merged
merged 3 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def trigger_drops(self, sessionId, drop_uids):
def shutdown_node_manager(self):
self._GET('/shutdown')

def get_log_file(self, sessionId):
return self._request(f'/sessions/{sessionId}/logs', 'GET')

class CompositeManagerClient(BaseDROPManagerClient):

def nodes(self):
Expand Down
13 changes: 8 additions & 5 deletions daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def _post_json(self, url, content, compress=False):
return json.load(ret) if ret else None

def _GET(self, url):
return self._request(url, 'GET')
stream, _ = self._request(url, 'GET')
return stream

def _POST(self, url, content=None, content_type=None, compress=False):
headers = {}
Expand All @@ -149,10 +150,12 @@ def _POST(self, url, content=None, content_type=None, compress=False):
if not hasattr(content, 'read'):
content = io.BytesIO(content)
content = common.ZlibCompressedStream(content)
return self._request(url, 'POST', content, headers)
stream, _ = self._request(url, 'POST', content, headers)
return stream

def _DELETE(self, url):
return self._request(url, 'DELETE')
stream, _ = self._request(url, 'DELETE')
return stream

def _request(self, url, method, content=None, headers={}):

Expand Down Expand Up @@ -197,5 +200,5 @@ def _request(self, url, method, content=None, headers={}):
raise ex

if not self._resp.length:
return None
return codecs.getreader('utf-8')(self._resp)
return None, None
return codecs.getreader('utf-8')(self._resp), self._resp
3 changes: 3 additions & 0 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ def getGraph(self, sessionId):
self._check_session_id(sessionId)
return self._sessions[sessionId].getGraph()

def getLogDir(self):
return self.logdir

def deploySession(self, sessionId, completedDrops=[]):
self._check_session_id(sessionId)
session = self._sessions[sessionId]
Expand Down
76 changes: 73 additions & 3 deletions daliuge-engine/dlg/manager/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,29 @@
Data Managers (DROPManager and DataIslandManager) to the outside world.
"""

import io
import os
import cgi
import functools
import json
import logging
import threading
import tarfile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a personal preference, but could we have imports alphasorted in general? It would be nicer to have this checked automatically by a tool instead of me telling it, we could do that at some point.


import bottle
import pkg_resources

from bottle import static_file

from . import constants
from .client import NodeManagerClient
from .client import NodeManagerClient, DataIslandManagerClient
from .. import utils
from ..exceptions import InvalidGraphException, InvalidSessionState, \
DaliugeException, NoSessionException, SessionAlreadyExistsException, \
InvalidDropException, InvalidRelationshipException, SubManagerException
from ..restserver import RestServer
from ..restutils import RestClient, RestClientException

from .session import generateLogFileName

logger = logging.getLogger(__name__)

Expand All @@ -54,6 +60,13 @@ def daliuge_aware(func):
def fwrapper(*args, **kwargs):
try:
res = func(*args, **kwargs)

if isinstance(res, bytes):
return res

if isinstance(res, bottle.HTTPResponse):
return res
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if isinstance(res, (bytes, bottle.HTTPResponse))


if res is not None:
bottle.response.content_type = 'application/json'
return json.dumps(res)
Expand Down Expand Up @@ -120,6 +133,7 @@ def __init__(self, dm, maxreqsize=10):
app.get( '/api/sessions', callback=self.getSessions)
app.get( '/api/sessions/<sessionId>', callback=self.getSessionInformation)
app.delete('/api/sessions/<sessionId>', callback=self.destroySession)
app.get( '/api/sessions/<sessionId>/logs', callback=self.getLogFile)
app.get( '/api/sessions/<sessionId>/status', callback=self.getSessionStatus)
app.post( '/api/sessions/<sessionId>/deploy', callback=self.deploySession)
app.post( '/api/sessions/<sessionId>/cancel', callback=self.cancelSession)
Expand Down Expand Up @@ -269,6 +283,14 @@ def getNMStatus(self):
# future
return {'sessions': self.sessions()}

@daliuge_aware
def getLogFile(self, sessionId):
logdir = self.dm.getLogDir()
logfile = generateLogFileName(logdir, sessionId)
if not os.path.isfile(logfile):
raise NoSessionException(sessionId, 'Log file not found.')
return static_file(os.path.basename(logfile), root=logdir, download=os.path.basename(logfile))

@daliuge_aware
def linkGraphParts(self, sessionId):
params = bottle.request.params
Expand Down Expand Up @@ -333,6 +355,9 @@ def getCMStatus(self):
def getCMNodes(self):
return self.dm.nodes

def getAllCMNodes(self):
return self.dm.nodes

@daliuge_aware
def addCMNode(self, node):
self.dm.add_node(node)
Expand All @@ -348,6 +373,44 @@ def getNodeSessions(self, node):
with NodeManagerClient(host=node) as dm:
return dm.sessions()

def _tarfile_write(self, tar, headers, stream):
file_header = headers.getheader('Content-Disposition')
length = headers.getheader('Content-Length')
_, params = cgi.parse_header(file_header)
filename = params['filename']
info = tarfile.TarInfo(filename)
info.size = int(length)

content = []
while True:
buffer = stream.read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this stream.read() read all of the contents in one go? In which case there's no need for a while around it. I guess you wanted to do a buffered read here instead and accumulate into content?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because its a tcp stream there is no guarantee that a single read will get all the data unless there is code underneath this library that is accumulating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ill leave it for now and we can review again.

if not buffer:
break
content.append(buffer)

tar.addfile(info, io.BytesIO(initial_bytes=''.join(content).encode()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmm.... now that I read this: wouldn't it be possible to simply pass stream as the input of addfile and forget about collecting in content? If that's possible then you'll save some unnecessary memory copying.


@daliuge_aware
def getLogFile(self, sessionId):
fh = io.BytesIO()
with tarfile.open(fileobj=fh, mode='w:gz') as tar:
for node in self.getAllCMNodes():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a TODO here to point out that in the future we might want to fetch in this loop concurrently instead of sequentially.

with NodeManagerClient(host=node) as dm:
try:
stream, resp = dm.get_log_file(sessionId)
self._tarfile_write(tar, resp, stream)
except NoSessionException:
pass


data = fh.getvalue()
size = len(data)
bottle.response.set_header('Content-type', 'application/x-tar')
bottle.response['Content-Disposition'] = f'attachment; ' \
f'filename=dlg_{sessionId}.tar'
bottle.response['Content-Length'] = size
return data

@daliuge_aware
def getNodeSessionInformation(self, node, sessionId):
if node not in self.dm.nodes:
Expand Down Expand Up @@ -404,4 +467,11 @@ def initializeSpecifics(self, app):
def createDataIsland(self, host):
with RestClient(host=host, port=constants.DAEMON_DEFAULT_REST_PORT, timeout=10) as c:
c._post_json('/managers/dataisland', bottle.request.body.read())
self.dm.addDmHost(host)
self.dm.addDmHost(host)

def getAllCMNodes(self):
nodes = []
for node in self.dm.dmHosts:
with DataIslandManagerClient(host=node) as dm:
nodes += dm.nodes()
return nodes
10 changes: 9 additions & 1 deletion daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import logging
import threading
import time
import socket

from . import constants
from .. import droputils
Expand Down Expand Up @@ -67,6 +68,13 @@ def handleEvent(self, evt):

track_current_session = utils.object_tracking('session')


def generateLogFileName(logdir, sessionId):
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
return f'{logdir}/dlg_{ip}_{sessionId}.log'


class Session(object):
"""
A DROP graph execution.
Expand Down Expand Up @@ -114,7 +122,7 @@ def filter(self, record):
logdir = utils.getDlgLogsDir()
if self._nm is not None:
logdir = self._nm.logdir
logfile = '%s/dlg_%s' % (logdir, self.sessionId,)
logfile = generateLogFileName(logdir, self.sessionId)
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
self.file_handler.addFilter(SessionFilter(self.sessionId))
Expand Down