Skip to content
Browse files

HH-29729 gelf bulk handler added

  • Loading branch information...
1 parent debbd80 commit bce2a92af17b8a2e3f407522a882eafcd71a37d3 @katraev katraev committed with curlup
Showing with 69 additions and 44 deletions.
  1. +67 −12 frontik/frontik_logging.py
  2. +1 −31 frontik/handler.py
  3. +1 −1 frontik/handler_whc_limit.py
View
79 frontik/frontik_logging.py
@@ -1,11 +1,18 @@
# -*- coding: utf-8 -*-
from collections import namedtuple
+import copy
+
import logging
from logging.handlers import SysLogHandler
+import traceback
import weakref
import time
import tornado.options
from lxml.builder import E
+try:
+ from graypy.handler import GELFHandler, LAN_CHUNK
+except ImportError:
+ tornado.options.options.graylog = False
log = logging.getLogger('frontik.handler')
@@ -33,6 +40,33 @@ def __get_logfile_name(self):
logfile_parts.insert(1, 'monik')
return '.'.join(logfile_parts)
+class BulkGELFHandler(GELFHandler):
@curlup
curlup added a note

падает если импорт не прошел

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ def handler_bulk(self, records_list, stages= None, status_code=None, exception=None, **kw):
+
+ if records_list != []:
+ first_record = records_list[0]
+ else:
+ return
+ record_for_gelf = copy.deepcopy(first_record)
+ record_for_gelf.message ="{0} {1} {2} \n".format(record_for_gelf.asctime,record_for_gelf.levelname, record_for_gelf.message)
+
+ record_for_gelf.exc_info = exception
+ for record in records_list[1:]:
+ if record.levelno > record_for_gelf.levelno:
+ record_for_gelf.levelno = record.levelno
+ record_for_gelf.lineno=record.lineno
+ record_for_gelf.filename=record.filename
+ if record.exc_info is not None:
+ record_for_gelf.exc_info=traceback.format_exc(record.exc_info)
+ record_for_gelf.message +=" {0} {1} {2} \n".format(record.asctime, record.levelname,record.message)
+ if stages is not None:
+ for stage_name,stage_time in stages:
+ setattr(record_for_gelf,stage_name,str(int(stage_time)))
+
+ record_for_gelf.code = status_code
+ GELFHandler.handle(self, record_for_gelf)
+
class MaxLenSysLogHandler(SysLogHandler):
"""
@@ -64,14 +98,39 @@ class PageLogger(logging.LoggerAdapter):
def __init__(self, handler, logger_name, page):
- class Logger4Adapter(logging.Logger):
+ class PerRequestLogBufferHandler(logging.Logger):
+ """
+ Handler for storing all LogRecords for current request in a buffer until finish
+ """
+ def __init__(self, name, level=logging.NOTSET):
+ logging.Logger.__init__(self, name, level)
+ self.records_list = []
+ self.bulk_handlers = []
+
def handle(self, record):
logging.Logger.handle(self, record)
log.handle(record)
+ if len(self.bulk_handlers) > 0:
+ self.records_list.append(self.process_record(record))
+
+ def process_record(self,record):
+ return record
+
+ def add_bulk_handler(self, bulk_handler):
+ self.bulk_handlers.append(bulk_handler)
+
+ def get_records_list(self):
+ return self.records_list
+
+
+ # def flush(self, *a, **kw):
+ def flush(self, **kw):
+ for handler in self.bulk_handlers:
+ handler.handler_bulk(self.records_list, **kw)
self.handler_ref = weakref.ref(handler)
self.handler_started = self.handler_ref().handler_started
- logging.LoggerAdapter.__init__(self, Logger4Adapter('frontik.handler'),
+ logging.LoggerAdapter.__init__(self, PerRequestLogBufferHandler('frontik.handler'),
dict(request_id=logger_name, page=page, handler=self.handler_ref().__module__))
self._time = self.handler_started
@@ -81,6 +140,9 @@ def handle(self, record):
self.warn = self.warning
self.addHandler = self.logger.addHandler
+ if tornado.options.options.graylog:
+ self.logger.add_bulk_handler(BulkGELFHandler(tornado.options.options.graylog_host,
+ tornado.options.options.graylog_port, LAN_CHUNK, False))
def stage_tag(self, stage_name):
zero_time = self.handler_started
self._stage_tag(PageLogger.Stage(stage_name, self._time - zero_time, time.time() - self._time))
@@ -111,6 +173,9 @@ def process(self, msg, kwargs):
kwargs["extra"] = self.extra
return msg, kwargs
+ def request_finish_hook(self, exception = None):
+ self.logger.flush(status_code=self.handler_ref()._status_code, stages=self.stages, exception=exception)
+
def bootstrap_all_logging():
server_log = logging.getLogger("frontik.server")
@@ -123,16 +188,6 @@ def bootstrap_all_logging():
syslog_handler.setFormatter(logging.Formatter(tornado.options.options.logformat))
logging.getLogger().addHandler(syslog_handler)
- if tornado.options.options.graylog:
- try:
- from graypy import GELFHandler, WAN_CHUNK
- graylog_handler = GELFHandler(tornado.options.options.graylog_host,
- tornado.options.options.graylog_port, WAN_CHUNK, False)
-
- logging.getLogger().addHandler(graylog_handler)
- except ImportError:
- server_log.error('Graylog option is on, but can not import graypy and start graylog logging!')
-
if tornado.options.options.logfile is not None:
logging.getLogger().addHandler(MonikInfoLoggingHandler())
View
32 frontik/handler.py
@@ -271,6 +271,7 @@ def flush(self, include_footers=False):
tornado.web.RequestHandler.flush(self, include_footers=False)
+ self.log.request_finish_hook()
def get_page(self):
""" Эта функция должна быть переопределена в наследнике и
@@ -338,37 +339,6 @@ def get_url(self, url, data = None, headers = None, connect_timeout = 0.5, reque
return placeholder
- def get_url_retry(self, url, data = None, headers = None, retry_count = 3, retry_delay = 0.1, connect_timeout = 0.5, request_timeout = 2, callback = None, request_types = None):
- placeholder = future.Placeholder()
-
- request = frontik.util.make_get_request(url,
- {} if data is None else data,
- {} if headers is None else headers,
- connect_timeout,
- request_timeout)
-
- def step1(retry_count, response):
- if response.error and retry_count > 0:
- self.log.warn('failed to get %s; retries left = %s; retrying', response.effective_url, retry_count)
- # TODO use handler-specific ioloop
- if retry_delay > 0:
- tornado.ioloop.IOLoop.instance().add_timeout(time.time() + retry_delay,
- self.finish_group.add(self.async_callback(partial(step2, retry_count))))
- else:
- step2(retry_count)
- else:
- if response.error and retry_count == 0:
- self.log.warn('failed to get %s; no more retries left; give up retrying', response.effective_url)
-
- self._fetch_request_response(placeholder, callback, request, response, request_types = request_types)
-
- def step2(retry_count):
- self.http_client.fetch(request, self.finish_group.add(self.async_callback(partial(step1, retry_count - 1))))
-
- self.http_client.fetch(request, self.finish_group.add(self.async_callback(partial(step1, retry_count - 1))))
-
- return placeholder
-
def post_url(self, url, data = '',
headers = None,
files = None,
View
2 frontik/handler_whc_limit.py
@@ -17,7 +17,7 @@ def __init__(self, handler):
self.handler.request.uri,
working_handlers_count)
else:
- self.handler.log.warn('dropping %s %s; too many workers (%s)',
+ self.handler.log.warn('dropping %s %s; too many handlers (%s)',
self.handler.request.method,
self.handler.request.uri,
working_handlers_count)

0 comments on commit bce2a92

Please sign in to comment.
Something went wrong with that request. Please try again.