Skip to content
Permalink
Browse files

feat(service): logging elapsed time and body type change

  • Loading branch information...
hanxiao committed Sep 5, 2019
1 parent e631d39 commit a0fec684f2506b7c663b596d22648ecfa52a4026
Showing with 34 additions and 15 deletions.
  1. +8 −3 gnes/helper.py
  2. +26 −12 gnes/service/base.py
@@ -250,18 +250,23 @@ def warning(self, msg, **kwargs):


class TimeContext:
def __init__(self, msg):
def __init__(self, msg: str, logger=None):
self._msg = msg
self._logger = logger
self.duration = 0

def __enter__(self):
self.start = time.perf_counter()
print(self._msg, end=' ...\t', flush=True)
if not self._logger:
print(self._msg, end=' ...\t', flush=True)
return self

def __exit__(self, typ, value, traceback):
self.duration = time.perf_counter() - self.start
print(colored(' [%3.3f secs]' % self.duration, 'green'), flush=True)
if self._logger:
self._logger.info('%s takes %3.3f secs' % (self._msg, self.duration))
else:
print(colored(' [%3.3f secs]' % self.duration, 'green'), flush=True)


class Tokenizer:
@@ -29,7 +29,7 @@

from ..base import TrainableBase, T
from ..cli.parser import resolve_yaml_path
from ..helper import set_logger, PathImporter
from ..helper import set_logger, PathImporter, TimeContext
from ..proto import gnes_pb2, add_route, send_message, recv_message, router2str


@@ -135,6 +135,7 @@ def build_socket(ctx: 'zmq.Context', host: str, port: int, socket_type: 'SocketT
class MessageHandler:
def __init__(self, mh: 'MessageHandler' = None):
self.routes = {k: v for k, v in mh.routes.items()} if mh else {}
self.hook_fns = []
self.logger = set_logger(self.__class__.__name__)

def register(self, msg_type: Union[List, Tuple, type]):
@@ -148,7 +149,7 @@ def decorator(f):

return decorator

def serve(self, msg: 'gnes_pb2.Message'):
def get_serve_fn(self, msg: 'gnes_pb2.Message'):
def get_default_fn(m_type):
self.logger.warning('cant find handler for message type: %s, fall back to the default handler' % m_type)
f = self.routes.get(m_type, self.routes[NotImplementedError])
@@ -224,6 +225,7 @@ def __init__(self, args):
self.identity = args.identity if 'identity' in args else None
self.use_event_loop = True
self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl)
self.handler.hook_fns.extend([self._hook_warn_body_type_change, self._hook_sort_response])

def run(self):
try:
@@ -257,7 +259,17 @@ def dump(self):
else:
self.logger.info('no dumping as "read_only" set to true.')

def post_handler(self, msg: 'gnes_pb2.Message'):
def post_handler(self, msg: 'gnes_pb2.Message', *args, **kwargs):
for fn in self.handler.hook_fns:
fn(msg, *args, **kwargs)
self.logger.info('hook handler %s is done' % fn.__name__)

def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', old_body_type: str, *args, **kwargs):
new_type = msg.WhichOneof('body')
if new_type != old_body_type:
self.logger.warning('message body is changed from %s to %s' % (new_type, old_body_type))

def _hook_sort_response(self, msg: 'gnes_pb2.Message', *args, **kwargs):
if 'sorted_response' in self.args and self.args.sorted_response and msg.response.search.topk_results:
msg.response.search.topk_results.sort(key=lambda x: x.score.value,
reverse=msg.response.search.is_big_score_similar)
@@ -269,10 +281,11 @@ def post_handler(self, msg: 'gnes_pb2.Message'):

def message_handler(self, msg: 'gnes_pb2.Message', out_sck, ctrl_sck):
try:
fn = self.handler.serve(msg)
fn = self.handler.get_serve_fn(msg)
if fn:
add_route(msg.envelope, self._model.__class__.__name__)
self.logger.info('handling a message with route: %s' % router2str(msg))
self.logger.info('handling a message with route: %s using handler %s' % (router2str(msg), fn.__name__))
old_type = msg.WhichOneof('body')
if msg.request and msg.request.WhichOneof('body') and \
type(getattr(msg.request, msg.request.WhichOneof('body'))) == gnes_pb2.Request.ControlRequest:
out_sock = ctrl_sck
@@ -281,18 +294,18 @@ def message_handler(self, msg: 'gnes_pb2.Message', out_sck, ctrl_sck):
try:
# NOTE that msg is mutable object, it may be modified in fn()
ret = fn(self, msg)
self.logger.info('handler %s is done' % fn.__name__)
if ret is None:
# assume 'msg' is modified inside fn()
self.post_handler(msg)
self.post_handler(msg, old_body_type=old_type)
send_message(out_sock, msg, timeout=self.args.timeout)
elif isinstance(ret, types.GeneratorType):
for r_msg in ret:
self.post_handler(r_msg)
self.post_handler(msg, old_body_type=old_type)
send_message(out_sock, r_msg, timeout=self.args.timeout)
else:
raise ServiceError('unknown return type from the handler: %s' % fn)

self.logger.info('handler %s is done' % fn.__name__)
except BlockMessage:
pass
except EventLoopEnd:
@@ -337,10 +350,11 @@ def _run(self, ctx):
else:
self.logger.error('received message from unknown socket: %s' % socks)
if self.use_event_loop or pull_sock == ctrl_sock:
self.is_handler_done.clear()
msg = recv_message(pull_sock)
self.message_handler(msg, out_sock, ctrl_sock)
self.is_handler_done.set()
with TimeContext('handling message', self.logger):
self.is_handler_done.clear()
msg = recv_message(pull_sock)
self.message_handler(msg, out_sock, ctrl_sock)
self.is_handler_done.set()
else:
self.logger.warning(
'received a new message but since "use_event_loop=False" I will not handle it. '

0 comments on commit a0fec68

Please sign in to comment.
You can’t perform that action at this time.