diff --git a/fluent/event.py b/fluent/event.py index 62976ee..76f27ca 100644 --- a/fluent/event.py +++ b/fluent/event.py @@ -1,10 +1,13 @@ -from fluent import sender +# -*- coding: utf-8 -*- + import time +from fluent import sender + + class Event(object): def __init__(self, label, data, **kwargs): - if not isinstance(data, dict) : - raise Exception("data must be dict") - s = kwargs['sender'] if ('sender' in kwargs) else sender.get_global_sender() - timestamp = kwargs['time'] if ('time' in kwargs) else int(time.time()) - s.emit_with_time(label, timestamp, data) + assert isinstance(data, dict), 'data must be a dict' + sender_ = kwargs.get('sender', sender.get_global_sender()) + timestamp = kwargs.get('time', int(time.time())) + sender_.emit_with_time(label, timestamp, data) diff --git a/fluent/handler.py b/fluent/handler.py index 2148cb3..edcf840 100644 --- a/fluent/handler.py +++ b/fluent/handler.py @@ -1,33 +1,31 @@ +# -*- coding: utf-8 -*- + import logging -import os -import sys -import msgpack import socket -import threading try: - import json -except ImportError: import simplejson as json +except ImportError: + import json from fluent import sender + class FluentRecordFormatter(object): def __init__(self): self.hostname = socket.gethostname() def format(self, record): - data = { - 'sys_host' : self.hostname, - 'sys_name' : record.name, - 'sys_module' : record.module, - # 'sys_lineno' : record.lineno, - # 'sys_levelno' : record.levelno, - # 'sys_levelname' : record.levelname, - # 'sys_filename' : record.filename, - # 'sys_funcname' : record.funcName, - # 'sys_exc_info' : record.exc_info, - } + data = {'sys_host': self.hostname, + 'sys_name': record.name, + 'sys_module': record.module, + # 'sys_lineno': record.lineno, + # 'sys_levelno': record.levelno, + # 'sys_levelname': record.levelname, + # 'sys_filename': record.filename, + # 'sys_funcname': record.funcName, + # 'sys_exc_info': record.exc_info, + } # if 'sys_exc_info' in data and data['sys_exc_info']: # data['sys_exc_info'] = self.formatException(data['sys_exc_info']) @@ -40,36 +38,41 @@ def _structuring(self, data, msg): elif isinstance(msg, str): try: self._add_dic(data, json.loads(str(msg))) - except: + except (ValueError, json.JSONDecodeError): pass - def _add_dic(self, data, dic): - for k, v in dic.items(): - if isinstance(k, str) or isinstance(k, unicode): - data[str(k)] = v + @staticmethod + def _add_dic(data, dic): + for key, value in dic.items(): + if isinstance(key, basestring): + data[str(key)] = value + class FluentHandler(logging.Handler): ''' Logging Handler for fluent. ''' def __init__(self, - tag, - host='localhost', - port=24224, - timeout=3.0, - verbose=False): + tag, + host='localhost', + port=24224, + timeout=3.0, + verbose=False): self.tag = tag self.sender = sender.FluentSender(tag, host=host, port=port, timeout=timeout, verbose=verbose) - self.fmt = FluentRecordFormatter() logging.Handler.__init__(self) def emit(self, record): - if record.levelno < self.level: return - data = self.fmt.format(record) + data = self.format(record) self.sender.emit(None, data) - def _close(self): - self.sender._close() + def close(self): + self.acquire() + try: + self.sender._close() + logging.Handler.close(self) + finally: + self.release() \ No newline at end of file diff --git a/fluent/sender.py b/fluent/sender.py index 21b02d8..b063605 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -1,11 +1,16 @@ +# -*- coding: utf-8 -*- + from __future__ import print_function -import msgpack import socket import threading import time +import msgpack + + _global_sender = None + def setup(tag, **kwargs): host = kwargs.get('host', 'localhost') port = kwargs.get('port', 24224) @@ -13,9 +18,11 @@ def setup(tag, **kwargs): global _global_sender _global_sender = FluentSender(tag, host=host, port=port) + def get_global_sender(): return _global_sender + class FluentSender(object): def __init__(self, tag, @@ -36,10 +43,10 @@ def __init__(self, self.pendings = None self.packer = msgpack.Packer() self.lock = threading.Lock() - + try: self._reconnect() - except: + except Exception: # will be retried in emit() self._close() @@ -48,8 +55,8 @@ def emit(self, label, data): self.emit_with_time(label, cur_time, data) def emit_with_time(self, label, timestamp, data): - bytes = self._make_packet(label, timestamp, data) - self._send(bytes) + bytes_ = self._make_packet(label, timestamp, data) + self._send(bytes_) def _make_packet(self, label, timestamp, data): if label: @@ -61,25 +68,25 @@ def _make_packet(self, label, timestamp, data): print(packet) return self.packer.pack(packet) - def _send(self, bytes): + def _send(self, bytes_): self.lock.acquire() try: - self._send_internal(bytes) + self._send_internal(bytes_) finally: self.lock.release() - def _send_internal(self, bytes): + def _send_internal(self, bytes_): # buffering if self.pendings: - self.pendings += bytes - bytes = self.pendings + self.pendings += bytes_ + bytes_ = self.pendings try: # reconnect if possible self._reconnect() # send message - self.socket.sendall(bytes) + self.socket.sendall(bytes_) # send finished self.pendings = None @@ -91,7 +98,7 @@ def _send_internal(self, bytes): # TODO: add callback handler here self.pendings = None else: - self.pendings = bytes + self.pendings = bytes_ def _reconnect(self): if not self.socket: diff --git a/tests/__init__.py b/tests/__init__.py index 805a0c1..6942bc3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + from tests.test_event import * from tests.test_sender import * from tests.test_handler import * diff --git a/tests/mockserver.py b/tests/mockserver.py index f7364cf..54bfcbe 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -1,13 +1,17 @@ -import socket -import threading -import time -from msgpack import Unpacker +# -*- coding: utf-8 -*- try: from cStringIO import StringIO as BytesIO except ImportError: from io import BytesIO +import socket +import threading +import time + +from msgpack import Unpacker + + class MockRecvServer(threading.Thread): """ Single threaded server accepts one connection and recv until EOF. @@ -21,16 +25,16 @@ def __init__(self, port): self.start() def run(self): - s = self._sock - s.listen(1) - con, _ = s.accept() + sock = self._sock + sock.listen(1) + con, _ = sock.accept() while True: - d = con.recv(4096) - if not d: + data = con.recv(4096) + if not data: break - self._buf.write(d) + self._buf.write(data) con.close() - s.close() + sock.close() self._sock = None def wait(self): @@ -40,5 +44,6 @@ def wait(self): def get_recieved(self): self.wait() self._buf.seek(0) - # TODO: have to process string encoding properly. currently we assume that all encoding is utf-8. + # TODO: have to process string encoding properly. currently we assume + # that all encoding is utf-8. return list(Unpacker(self._buf, encoding='utf-8')) diff --git a/tests/test_event.py b/tests/test_event.py index cf95612..cae9443 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -1,22 +1,23 @@ +# -*- coding: utf-8 -*- + import unittest -import time -import fluent from fluent import event, sender + sender.setup(server='localhost', tag='app') + class TestHandler(unittest.TestCase): def testLogging(self): # send event with tag app.follow event.Event('follow', { - 'from': 'userA', - 'to': 'userB' + 'from': 'userA', + 'to': 'userB' }) # send event with tag app.follow, with timestamp event.Event('follow', { - 'from': 'userA', - 'to': 'userB' + 'from': 'userA', + 'to': 'userB' }, time=int(0)) - diff --git a/tests/test_handler.py b/tests/test_handler.py index e1971e1..4d3b6e2 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -1,8 +1,12 @@ -import unittest -from tests import mockserver +# -*- coding: utf-8 -*- + import logging +import unittest + import fluent.handler -import msgpack + +from tests import mockserver + class TestLogger(unittest.TestCase): def setUp(self): @@ -12,23 +16,24 @@ def setUp(self): self._server = mockserver.MockRecvServer(port) self._port = port break - except IOError as e: + except IOError: pass def get_data(self): return self._server.get_recieved() def test_simple(self): - h = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler('app.follow', port=self._port) logging.basicConfig(level=logging.INFO) - l = logging.getLogger('fluent.test') - l.addHandler(h) - l.info({ + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info({ 'from': 'userA', 'to': 'userB' }) - h._close() + handler.close() data = self.get_data() eq = self.assertEqual diff --git a/tests/test_sender.py b/tests/test_sender.py index 8fb1916..420d9b3 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -1,8 +1,12 @@ +# -*- coding: utf-8 -*- + from __future__ import print_function import unittest -from tests import mockserver + import fluent.sender -import msgpack + +from tests import mockserver + class TestSender(unittest.TestCase): def setUp(self): @@ -11,13 +15,11 @@ def setUp(self): try: self._server = mockserver.MockRecvServer(port) break - except IOError as e: - print(e) - pass - self._sender = fluent.sender.FluentSender( - tag='test', - port=port, - ) + except IOError as exc: + print(exc) + self._sender = fluent.sender.FluentSender(tag='test', + port=port, + ) def get_data(self): return self._server.get_recieved() @@ -31,6 +33,6 @@ def test_simple(self): eq(1, len(data)) eq(3, len(data[0])) eq('test.foo', data[0][0]) - eq({'bar':'baz'}, data[0][2]) + eq({'bar': 'baz'}, data[0][2]) self.assert_(data[0][1]) self.assert_(isinstance(data[0][1], int))