Skip to content

Commit

Permalink
Merge e579c75 into 6a3275a
Browse files Browse the repository at this point in the history
  • Loading branch information
davepallot committed Jul 7, 2021
2 parents 6a3275a + e579c75 commit 1d47b2d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 5 deletions.
6 changes: 4 additions & 2 deletions daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ def dlgNM(parser, args):
'host': options.host,
'error_listener': options.errorListener,
'event_listeners': list(filter(None, options.event_listeners.split(":"))),
'max_threads': options.max_threads}
'max_threads': options.max_threads,
'logdir': options.logdir}
options.dmAcronym = 'NM'
options.restType = NMRestServer

Expand All @@ -290,7 +291,8 @@ def dlgCompositeManager(parser, args, dmType, acronym, dmPort, dmRestServer):
# Add DIM-specific options
options.dmType = dmType
options.dmArgs = ([s for s in options.nodes.split(',') if s],)
options.dmKwargs = {'pkeyPath': options.pkeyPath, 'dmCheckTimeout': options.dmCheckTimeout}
options.dmKwargs = {'pkeyPath': options.pkeyPath,
'dmCheckTimeout': options.dmCheckTimeout}
options.dmAcronym = acronym
options.restType = dmRestServer

Expand Down
7 changes: 5 additions & 2 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ def __init__(self,
dlgPath=None,
error_listener=None,
event_listeners=[],
max_threads = 0):
max_threads=0,
logdir=utils.getDlgLogsDir()):

self._dlm = DataLifecycleManager() if useDLM else None
self._sessions = {}
self.logdir = logdir

# dlgPath contains code added by the user with possible
# DROP applications
Expand Down Expand Up @@ -472,6 +474,7 @@ class RpcMixIn(rpc.RPCClient, rpc.RPCServer): pass
class NodeManager(EventMixIn, RpcMixIn, NodeManagerBase):

def __init__(self, useDLM=True, dlgPath=None, error_listener=None, event_listeners=[], max_threads=0,
logdir=utils.getDlgLogsDir(),
host=None, rpc_port=constants.NODE_DEFAULT_RPC_PORT,
events_port=constants.NODE_DEFAULT_EVENTS_PORT):
# We "just know" that our RpcMixIn will have a create_context static
Expand All @@ -480,7 +483,7 @@ def __init__(self, useDLM=True, dlgPath=None, error_listener=None, event_listene
host = host or '127.0.0.1'
EventMixIn.__init__(self, host, events_port)
RpcMixIn.__init__(self, host, rpc_port)
NodeManagerBase.__init__(self, useDLM, dlgPath, error_listener, event_listeners, max_threads)
NodeManagerBase.__init__(self, useDLM, dlgPath, error_listener, event_listeners, max_threads, logdir)

def shutdown(self):
super(NodeManager, self).shutdown()
Expand Down
28 changes: 27 additions & 1 deletion daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import inspect
import logging
import threading
import time

from . import constants
from .. import droputils
Expand Down Expand Up @@ -97,6 +98,30 @@ def __init__(self, sessionId, nm=None):
self._nm = nm
self._dropsubs = {}

class SessionFilter(logging.Filter):
def __init__(self, sessionId):
self.sessionId = sessionId

def filter(self, record):
if hasattr(record, 'session_id'):
return record.session_id == self.sessionId
return False

fmt = '%(asctime)-15s [%(levelname)5.5s] [%(threadName)15.15s] '
fmt += '[%(session_id)10.10s] [%(drop_uid)10.10s] '
fmt += '%(name)s#%(funcName)s:%(lineno)s %(message)s'
fmt = logging.Formatter(fmt)
fmt.converter = time.gmtime

logdir = utils.getDlgLogsDir()
if self._nm is not None:
logdir = self._nm.logdir
logfile = '%s/dlg_%s' % (logdir, self.sessionId,)
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
self.file_handler.addFilter(SessionFilter(self.sessionId))
logging.root.addHandler(self.file_handler)

@property
def sessionId(self):
return self._sessionId
Expand Down Expand Up @@ -389,7 +414,8 @@ def getGraph(self):
return dict(self._graph)

def destroy(self):
pass
self.file_handler.close()
logging.root.removeHandler(self.file_handler)

__del__ = destroy

Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/test/manager/test_dim.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import pkg_resources

from dlg import runtime
from dlg import droputils
from dlg import utils
from dlg.common import tool, Categories
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/test/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
import unittest

from dlg import runtime
from dlg.ddap_protocol import DROPLinkType, DROPStates, AppDROPStates
from dlg.manager.session import Session, SessionStates
from dlg.exceptions import InvalidGraphException
Expand Down

0 comments on commit 1d47b2d

Please sign in to comment.