Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions fluent/event.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from fluent import sender
# -*- coding: utf-8 -*-

import time

from fluent import sender


class Event(object):
def __init__(self, label, data, **kwargs):
if not isinstance(data, dict) :
raise Exception("data must be dict")
s = kwargs['sender'] if ('sender' in kwargs) else sender.get_global_sender()
timestamp = kwargs['time'] if ('time' in kwargs) else int(time.time())
s.emit_with_time(label, timestamp, data)
assert isinstance(data, dict), 'data must be a dict'
sender_ = kwargs.get('sender', sender.get_global_sender())
timestamp = kwargs.get('time', int(time.time()))
sender_.emit_with_time(label, timestamp, data)
67 changes: 35 additions & 32 deletions fluent/handler.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
# -*- coding: utf-8 -*-

import logging
import os
import sys
import msgpack
import socket
import threading

try:
import json
except ImportError:
import simplejson as json
except ImportError:
import json

from fluent import sender


class FluentRecordFormatter(object):
def __init__(self):
self.hostname = socket.gethostname()

def format(self, record):
data = {
'sys_host' : self.hostname,
'sys_name' : record.name,
'sys_module' : record.module,
# 'sys_lineno' : record.lineno,
# 'sys_levelno' : record.levelno,
# 'sys_levelname' : record.levelname,
# 'sys_filename' : record.filename,
# 'sys_funcname' : record.funcName,
# 'sys_exc_info' : record.exc_info,
}
data = {'sys_host': self.hostname,
'sys_name': record.name,
'sys_module': record.module,
# 'sys_lineno': record.lineno,
# 'sys_levelno': record.levelno,
# 'sys_levelname': record.levelname,
# 'sys_filename': record.filename,
# 'sys_funcname': record.funcName,
# 'sys_exc_info': record.exc_info,
}
# if 'sys_exc_info' in data and data['sys_exc_info']:
# data['sys_exc_info'] = self.formatException(data['sys_exc_info'])

Expand All @@ -40,36 +38,41 @@ def _structuring(self, data, msg):
elif isinstance(msg, str):
try:
self._add_dic(data, json.loads(str(msg)))
except:
except (ValueError, json.JSONDecodeError):
pass

def _add_dic(self, data, dic):
for k, v in dic.items():
if isinstance(k, str) or isinstance(k, unicode):
data[str(k)] = v
@staticmethod
def _add_dic(data, dic):
for key, value in dic.items():
if isinstance(key, basestring):
data[str(key)] = value


class FluentHandler(logging.Handler):
'''
Logging Handler for fluent.
'''
def __init__(self,
tag,
host='localhost',
port=24224,
timeout=3.0,
verbose=False):
tag,
host='localhost',
port=24224,
timeout=3.0,
verbose=False):

self.tag = tag
self.sender = sender.FluentSender(tag,
host=host, port=port,
timeout=timeout, verbose=verbose)
self.fmt = FluentRecordFormatter()
logging.Handler.__init__(self)

def emit(self, record):
if record.levelno < self.level: return
data = self.fmt.format(record)
data = self.format(record)
self.sender.emit(None, data)

def _close(self):
self.sender._close()
def close(self):
self.acquire()
try:
self.sender._close()
logging.Handler.close(self)
finally:
self.release()
31 changes: 19 additions & 12 deletions fluent/sender.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
import msgpack
import socket
import threading
import time

import msgpack


_global_sender = None


def setup(tag, **kwargs):
host = kwargs.get('host', 'localhost')
port = kwargs.get('port', 24224)

global _global_sender
_global_sender = FluentSender(tag, host=host, port=port)


def get_global_sender():
return _global_sender


class FluentSender(object):
def __init__(self,
tag,
Expand All @@ -36,10 +43,10 @@ def __init__(self,
self.pendings = None
self.packer = msgpack.Packer()
self.lock = threading.Lock()

try:
self._reconnect()
except:
except Exception:
# will be retried in emit()
self._close()

Expand All @@ -48,8 +55,8 @@ def emit(self, label, data):
self.emit_with_time(label, cur_time, data)

def emit_with_time(self, label, timestamp, data):
bytes = self._make_packet(label, timestamp, data)
self._send(bytes)
bytes_ = self._make_packet(label, timestamp, data)
self._send(bytes_)

def _make_packet(self, label, timestamp, data):
if label:
Expand All @@ -61,25 +68,25 @@ def _make_packet(self, label, timestamp, data):
print(packet)
return self.packer.pack(packet)

def _send(self, bytes):
def _send(self, bytes_):
self.lock.acquire()
try:
self._send_internal(bytes)
self._send_internal(bytes_)
finally:
self.lock.release()

def _send_internal(self, bytes):
def _send_internal(self, bytes_):
# buffering
if self.pendings:
self.pendings += bytes
bytes = self.pendings
self.pendings += bytes_
bytes_ = self.pendings

try:
# reconnect if possible
self._reconnect()

# send message
self.socket.sendall(bytes)
self.socket.sendall(bytes_)

# send finished
self.pendings = None
Expand All @@ -91,7 +98,7 @@ def _send_internal(self, bytes):
# TODO: add callback handler here
self.pendings = None
else:
self.pendings = bytes
self.pendings = bytes_

def _reconnect(self):
if not self.socket:
Expand Down
2 changes: 2 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-

from tests.test_event import *
from tests.test_sender import *
from tests.test_handler import *
29 changes: 17 additions & 12 deletions tests/mockserver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import socket
import threading
import time
from msgpack import Unpacker
# -*- coding: utf-8 -*-

try:
from cStringIO import StringIO as BytesIO
except ImportError:
from io import BytesIO

import socket
import threading
import time

from msgpack import Unpacker


class MockRecvServer(threading.Thread):
"""
Single threaded server accepts one connection and recv until EOF.
Expand All @@ -21,16 +25,16 @@ def __init__(self, port):
self.start()

def run(self):
s = self._sock
s.listen(1)
con, _ = s.accept()
sock = self._sock
sock.listen(1)
con, _ = sock.accept()
while True:
d = con.recv(4096)
if not d:
data = con.recv(4096)
if not data:
break
self._buf.write(d)
self._buf.write(data)
con.close()
s.close()
sock.close()
self._sock = None

def wait(self):
Expand All @@ -40,5 +44,6 @@ def wait(self):
def get_recieved(self):
self.wait()
self._buf.seek(0)
# TODO: have to process string encoding properly. currently we assume that all encoding is utf-8.
# TODO: have to process string encoding properly. currently we assume
# that all encoding is utf-8.
return list(Unpacker(self._buf, encoding='utf-8'))
15 changes: 8 additions & 7 deletions tests/test_event.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
# -*- coding: utf-8 -*-

import unittest
import time

import fluent
from fluent import event, sender


sender.setup(server='localhost', tag='app')


class TestHandler(unittest.TestCase):
def testLogging(self):
# send event with tag app.follow
event.Event('follow', {
'from': 'userA',
'to': 'userB'
'from': 'userA',
'to': 'userB'
})

# send event with tag app.follow, with timestamp
event.Event('follow', {
'from': 'userA',
'to': 'userB'
'from': 'userA',
'to': 'userB'
}, time=int(0))

23 changes: 14 additions & 9 deletions tests/test_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import unittest
from tests import mockserver
# -*- coding: utf-8 -*-

import logging
import unittest

import fluent.handler
import msgpack

from tests import mockserver


class TestLogger(unittest.TestCase):
def setUp(self):
Expand All @@ -12,23 +16,24 @@ def setUp(self):
self._server = mockserver.MockRecvServer(port)
self._port = port
break
except IOError as e:
except IOError:
pass

def get_data(self):
return self._server.get_recieved()

def test_simple(self):
h = fluent.handler.FluentHandler('app.follow', port=self._port)
handler = fluent.handler.FluentHandler('app.follow', port=self._port)

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
l.addHandler(h)
l.info({
log = logging.getLogger('fluent.test')
handler.setFormatter(fluent.handler.FluentRecordFormatter())
log.addHandler(handler)
log.info({
'from': 'userA',
'to': 'userB'
})
h._close()
handler.close()

data = self.get_data()
eq = self.assertEqual
Expand Down
Loading