Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/HH-68476' into EXP-21025
Browse files Browse the repository at this point in the history
  • Loading branch information
HH ReleaseBot committed Apr 21, 2017
2 parents 39e26d2 + c0d0b16 commit c098f9f
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 84 deletions.
19 changes: 9 additions & 10 deletions frontik/async.py
@@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
# coding=utf-8

import time
import logging

from tornado.ioloop import IOLoop
from tornado.concurrent import Future

default_logger = logging.getLogger('frontik.async')
async_logger = logging.getLogger('frontik.async')


class AsyncGroup(object):
Expand All @@ -18,12 +18,11 @@ class AsyncGroup(object):
would not be automatically called.
"""

def __init__(self, finish_cb, log=default_logger.debug, name=None, logger=None):
def __init__(self, finish_cb, name=None, logger=None):
self._counter = 0
self._finish_cb = finish_cb
self._finish_cb_called = False
self._aborted = False
self._logger = logger if logger is not None else default_logger
self._name = name

self._start_time = time.time()
Expand All @@ -37,12 +36,12 @@ def _message(self, message):
return self._log_name + ': ' + message

def abort(self):
self._logger.info(self._message('aborting async group'))
async_logger.info(self._message('aborting async group'))
self._aborted = True

def finish(self):
if not self._finish_cb_called:
self._logger.debug(self._message('done in %.2fms'), (time.time() - self._start_time) * 1000.)
async_logger.debug(self._message('done in %.2fms'), (time.time() - self._start_time) * 1000.)
self._finish_cb_called = True

try:
Expand All @@ -66,7 +65,7 @@ def _inc(self):

def _dec(self):
self._counter -= 1
self._logger.debug(self._message('%s requests pending'), self._counter)
async_logger.debug(self._message('%s requests pending'), self._counter)

def add(self, intermediate_cb):
self._inc()
Expand All @@ -77,14 +76,14 @@ def new_cb(*args, **kwargs):
self._dec()
intermediate_cb(*args, **kwargs)
except Exception:
self._logger.error(self._message('aborting async group due to unhandled exception in callback'))
self._logger.debug(self._message('done in %.2fms'), (time.time() - self._start_time) * 1000.)
async_logger.error(self._message('aborting async group due to unhandled exception in callback'))
async_logger.debug(self._message('done in %.2fms'), (time.time() - self._start_time) * 1000.)
self._aborted = True
raise

self.try_finish()
else:
self._logger.info(self._message('ignoring response because of already finished group'))
async_logger.info(self._message('ignoring response because of already finished group'))

return new_cb

Expand Down
4 changes: 2 additions & 2 deletions frontik/handler.py
Expand Up @@ -84,7 +84,7 @@ def initialize(self, logger=None, **kwargs):
def prepare(self):
self.active_limit = frontik.handler_active_limit.PageHandlerActiveLimit(self)
self.debug = PageHandlerDebug(self)
self.finish_group = AsyncGroup(self.check_finished(self._finish_page_cb), name='finish', logger=self.log)
self.finish_group = AsyncGroup(self.check_finished(self._finish_page_cb), name='finish')

self.json_producer = self.application.json.get_producer(self)
self.json = self.json_producer.json
Expand Down Expand Up @@ -241,7 +241,7 @@ def check_finished(self, callback, *args, **kwargs):

def wrapper(*args, **kwargs):
if self._finished:
self.log.warn('page was already finished, {0} ignored'.format(original_callback))
self.log.warning('page was already finished, {0} ignored'.format(original_callback))
else:
callback(*args, **kwargs)

Expand Down
34 changes: 17 additions & 17 deletions frontik/handler_debug.py
Expand Up @@ -2,27 +2,28 @@

import base64
import copy
from datetime import datetime
import inspect
from io import BytesIO
import logging
import os
import pprint
import time
import traceback
import weakref
from datetime import datetime
from io import BytesIO

import lxml.etree as etree
from lxml.builder import E
import simplejson as json
from lxml.builder import E
from tornado.escape import to_unicode, utf8
from tornado.httpclient import HTTPResponse
from tornado.httputil import HTTPHeaders

from frontik.compat import basestring_type, iteritems, SimpleCookie, unicode_type, urlparse
from frontik.loggers import BufferedHandler
import frontik.util
import frontik.xml_util
from frontik.compat import basestring_type, iteritems, SimpleCookie, unicode_type, urlparse
from frontik.loggers import BufferedHandler
from frontik.request_context import RequestContext

debug_log = logging.getLogger('frontik.debug')

Expand Down Expand Up @@ -191,15 +192,15 @@ def _format_header(key):
).strip()


def _params_to_xml(url, logger=debug_log):
def _params_to_xml(url):
params = etree.Element('params')
query = frontik.util.get_query_parameters(url)
for name, values in iteritems(query):
for value in values:
try:
params.append(E.param(to_unicode(value), name=to_unicode(name)))
except UnicodeDecodeError:
logger.exception('cannot decode parameter name or value')
debug_log.exception('cannot decode parameter name or value')
params.append(E.param(repr(value), name=repr(name)))
return params

Expand Down Expand Up @@ -274,9 +275,6 @@ class DebugBufferedHandler(BufferedHandler):
FIELDS = ['created', 'filename', 'funcName', 'levelname', 'levelno', 'lineno', 'module', 'msecs',
'name', 'pathname', 'process', 'processName', 'relativeCreated', 'threadName']

def __init__(self):
super(DebugBufferedHandler, self).__init__('frontik.debug_buffered_handler')

def produce_all(self):
log_data = etree.Element('log')

Expand Down Expand Up @@ -369,14 +367,16 @@ def __init__(self, handler):
if self.debug_mode.enabled:
self.handler.require_debug_access()
self.debug_log_handler = DebugBufferedHandler()
self.handler.log.addHandler(self.debug_log_handler)
self.handler.log.debug('debug mode is ON')

RequestContext.set('log_handler', self.debug_log_handler)

debug_log.debug('debug mode is ON')

if self.debug_mode.inherited:
self.handler.log.debug('debug mode is inherited due to %s request header', self.DEBUG_HEADER_NAME)
debug_log.debug('debug mode is inherited due to %s request header', self.DEBUG_HEADER_NAME)

if self.debug_mode.pass_debug:
self.handler.log.debug('%s header will be passed to all requests', self.DEBUG_HEADER_NAME)
debug_log.debug('%s header will be passed to all requests', self.DEBUG_HEADER_NAME)

def get_debug_page(self, status_code, response_headers, original_response, stages_total):
import frontik.app
Expand All @@ -403,7 +403,7 @@ def get_debug_page(self, status_code, response_headers, original_response, stage

debug_log_data.append(E.request(
E.method(self.handler.request.method),
_params_to_xml(self.handler.request.uri, self.handler.log),
_params_to_xml(self.handler.request.uri),
_headers_to_xml(self.handler.request.headers),
_cookies_to_xml(self.handler.request.headers)
))
Expand All @@ -428,9 +428,9 @@ def get_debug_page(self, status_code, response_headers, original_response, stage
log_document = utf8(str(transform(debug_log_data)))
self.handler.set_header('Content-Type', 'text/html; charset=UTF-8')
except Exception:
self.handler.log.exception('XSLT debug file error')
debug_log.exception('XSLT debug file error')
try:
self.handler.log.error('XSL error log entries:\n%s' % "\n".join(map(
debug_log.error('XSL error log entries:\n%s' % "\n".join(map(
'File "{0.filename}", line {0.line}, column {0.column}\n\t{0.message}'
.format, transform.error_log)))
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion frontik/http_client.py
Expand Up @@ -30,7 +30,7 @@ def group(self, futures, callback=None, name=None):
results_holder = {}
group_callback = self.handler.finish_group.add(self.handler.check_finished(callback, results_holder))

async_group = AsyncGroup(group_callback, logger=self.handler.log, name=name)
async_group = AsyncGroup(group_callback, name=name)

def future_callback(name, future):
results_holder[name] = future.result()
Expand Down
25 changes: 22 additions & 3 deletions frontik/loggers/__init__.py
Expand Up @@ -8,6 +8,7 @@
from tornado.options import options

from frontik.loggers import sentry
from frontik.request_context import RequestContext

"""Contains a list of all available third-party loggers, that can be used in the request handler.
Expand All @@ -25,9 +26,17 @@
ROOT_LOGGER = logging.root


class BufferedHandler(logging.Logger):
def __init__(self, name, level=logging.NOTSET):
super(BufferedHandler, self).__init__(name, level)
class ContextFilter(logging.Filter):
def filter(self, record):
handler_name = RequestContext.get('handler_name')
request_id = RequestContext.get('request_id')
record.name = '.'.join(filter(None, [record.name, handler_name, request_id]))
return True


class BufferedHandler(logging.Handler):
def __init__(self, level=logging.NOTSET):
super(BufferedHandler, self).__init__(level)
self.records = []

def handle(self, record):
Expand All @@ -37,6 +46,12 @@ def produce_all(self):
raise NotImplementedError() # pragma: no cover


class GlobalLogHandler(logging.Handler):
def handle(self, record):
if RequestContext.get('log_handler'):
RequestContext.get('log_handler').handle(record)


def bootstrap_app_loggers(app):
return [logger.bootstrap_logger(app) for logger in LOGGERS if logger is not None]

Expand All @@ -46,6 +61,7 @@ def bootstrap_core_logging():

handlers = []
level = getattr(logging, options.loglevel.upper())
context_filter = ContextFilter()
ROOT_LOGGER.setLevel(logging.NOTSET)

if options.logfile:
Expand Down Expand Up @@ -83,7 +99,10 @@ def bootstrap_core_logging():

for handler in handlers:
handler.setLevel(level)
handler.addFilter(context_filter)
ROOT_LOGGER.addHandler(handler)

ROOT_LOGGER.addHandler(GlobalLogHandler())

if not ROOT_LOGGER.handlers:
ROOT_LOGGER.addHandler(logging.NullHandler())
26 changes: 2 additions & 24 deletions frontik/loggers/request.py
Expand Up @@ -9,38 +9,16 @@
logger = logging.getLogger('frontik.handler')


class ContextFilter(logging.Filter):
def filter(self, record):
handler_name = RequestContext.get('handler_name')
request_id = RequestContext.get('request_id')
record.name = '.'.join(filter(None, [record.name, handler_name, request_id]))
return True


logger.addFilter(ContextFilter())


class ProxyLogger(logging.Logger):
"""
Proxies everything to "frontik.handler" logger, but allows to add additional per-request handlers
"""

def handle(self, record):
logger.handle(record)
if self.handlers:
super(ProxyLogger, self).handle(record)


class RequestLogger(logging.LoggerAdapter):

Stage = namedtuple('Stage', ('name', 'delta', 'start_delta'))

def __init__(self, request, request_id=None):
def __init__(self, request):
self._page_handler_name = None
self._last_stage_time = self._start_time = request._start_time
self.stages = []

super(RequestLogger, self).__init__(ProxyLogger('frontik.handler'), {})
super(RequestLogger, self).__init__(logger, {})

# backcompatibility with logger
self.warn = self.warning
Expand Down
37 changes: 10 additions & 27 deletions tests/test_asyncgroup.py
@@ -1,25 +1,16 @@
# coding=utf-8

import logging
import unittest
from functools import partial

from tornado.concurrent import Future
from tornado.testing import ExpectLog

from frontik.async import AsyncGroup
from frontik.async import async_logger, AsyncGroup


class LoggerMock(object):
def __init__(self):
self.log = []

def debug(self, msg, *args):
self.log.append(msg % args)

def error(self, msg, *args):
self.log.append(msg % args)

def info(self, msg, *args):
self.log.append(msg % args)
logging.root.setLevel(logging.NOTSET)


class TestAsyncGroup(unittest.TestCase):
Expand Down Expand Up @@ -85,8 +76,6 @@ def test_finish(self):
self.assertEqual(f.result(), True)

def test_exception_in_first(self):
logger = LoggerMock()

def callback1():
raise Exception('callback1 error')

Expand All @@ -96,41 +85,35 @@ def callback2():
def finish_callback():
self.fail('finish_callback should not be called')

ag = AsyncGroup(finish_callback, logger=logger, name='test_group')
ag = AsyncGroup(finish_callback, name='test_group')
cb1 = ag.add(callback1)
cb2 = ag.add(callback2)

self.assertRaises(Exception, cb1)
self.assertEqual(ag._finish_cb_called, False)
self.assertEqual(ag._aborted, True)

cb2()
with ExpectLog(async_logger, '.*test_group group: ignoring response because of already finished group'):
cb2()

self.assertEqual(logger.log[-1], 'test_group group: ignoring response because of already finished group')
self.assertEqual(ag._finish_cb_called, False)
self.assertEqual(ag._aborted, True)

def test_exception_in_last(self):
logger = LoggerMock()

def callback2():
raise Exception('callback1 error')

def finish_callback():
self.fail('finish_callback should not be called')

ag = AsyncGroup(finish_callback, logger=logger, name='test_group')
ag = AsyncGroup(finish_callback, name='test_group')
cb1 = ag.add(lambda: None)
cb2 = ag.add(callback2)

cb1()

self.assertRaises(Exception, cb2)

self.assertEqual(
logger.log[-2],
'test_group group: aborting async group due to unhandled exception in callback'
)
with ExpectLog(async_logger, '.*test_group group: aborting async group due to unhandled exception in callback'):
self.assertRaises(Exception, cb2)

self.assertEqual(ag._finish_cb_called, False)
self.assertEqual(ag._aborted, True)
Expand Down

0 comments on commit c098f9f

Please sign in to comment.