Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,18 @@ or for the python logging interface:
sure the communication thread terminates and it's joined correctly. Otherwise the program won't exit, waiting for
the thread, unless forcibly killed.

#### Circular queue mode

In some applications it can be especially important to guarantee that the logging process won't block under *any*
circumstance, even when it's logging faster than the sending thread could handle (_backpressure_). In this case it's
possible to enable the `circular queue` mode, by passing `True` in the `queue_circular` parameter of
``asyncsender.FluentSender`` or ``asynchandler.FluentHandler``. By doing so the thread doing the logging won't block
even when the queue is full, the new event will be added to the queue by discarding the oldest one.

**WARNING**: setting `queue_circular` to `True` will cause loss of events if the queue fills up completely! Make sure
that this doesn't happen, or it's acceptable for your application.


Testing
-------

Expand Down
53 changes: 43 additions & 10 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
_global_sender = None

DEFAULT_QUEUE_TIMEOUT = 0.05
DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False


def _set_global_sender(sender):
Expand Down Expand Up @@ -46,19 +48,29 @@ def __init__(self, tag,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs):
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
super(CommunicatorThread, self).__init__(**kwargs)
self._queue = Queue()
self._queue = Queue(maxsize=queue_maxsize)
self._do_run = True
self._queue_timeout = queue_timeout
self._queue_maxsize = queue_maxsize
self._queue_circular = queue_circular
self._conn_close_lock = threading.Lock()
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)

def send(self, bytes_):
if self._queue_circular and self._queue.full():
# discard oldest
try:
self._queue.get(block=False)
except Empty: # pragma: no cover
pass
try:
self._queue.put(bytes_)
self._queue.put(bytes_, block=(not self._queue_circular))
except Full:
return False
return True
Expand Down Expand Up @@ -114,11 +126,17 @@ def queue_timeout(self):
def queue_timeout(self, value):
self._queue_timeout = value

def __enter__(self):
return self
@property
def queue_maxsize(self):
return self._queue_maxsize

def __exit__(self, typ, value, traceback):
self.close()
@property
def queue_blocking(self):
return not self._queue_circular

@property
def queue_circular(self):
return self._queue_circular


class FluentSender(sender.FluentSender):
Expand All @@ -133,6 +151,8 @@ def __init__(self,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
Expand All @@ -141,7 +161,8 @@ def __init__(self,
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
queue_timeout=queue_timeout)
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
queue_circular=queue_circular)
self._communicator.start()

def _send(self, bytes_):
Expand All @@ -152,10 +173,10 @@ def _close(self):
self._communicator._close()

def _send_internal(self, bytes_):
return
assert False # pragma: no cover

def _send_data(self, bytes_):
return
assert False # pragma: no cover

# override reconnect, so we don't open a socket here (since it
# will be opened by the CommunicatorThread)
Expand Down Expand Up @@ -186,6 +207,18 @@ def queue_timeout(self):
def queue_timeout(self, value):
self._communicator.queue_timeout = value

@property
def queue_maxsize(self):
return self._communicator.queue_maxsize

@property
def queue_blocking(self):
return self._communicator.queue_blocking

@property
def queue_circular(self):
return self._communicator.queue_circular

def __enter__(self):
return self

Expand Down
10 changes: 6 additions & 4 deletions fluent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,30 +152,32 @@ def __init__(self,
verbose=False,
buffer_overflow_handler=None,
msgpack_kwargs=None,
nanosecond_precision=False):
nanosecond_precision=False,
**kwargs):

self.tag = tag
self.sender = self.getSenderInstance(tag,
host=host, port=port,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
msgpack_kwargs=msgpack_kwargs,
nanosecond_precision=nanosecond_precision)
nanosecond_precision=nanosecond_precision,
**kwargs)
logging.Handler.__init__(self)

def getSenderClass(self):
return sender.FluentSender

def getSenderInstance(self, tag, host, port, timeout, verbose,
buffer_overflow_handler, msgpack_kwargs,
nanosecond_precision):
nanosecond_precision, **kwargs):
sender_class = self.getSenderClass()
return sender_class(tag,
host=host, port=port,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
msgpack_kwargs=msgpack_kwargs,
nanosecond_precision=nanosecond_precision)
nanosecond_precision=nanosecond_precision, **kwargs)

def emit(self, record):
data = self.format(record)
Expand Down
57 changes: 57 additions & 0 deletions tests/test_asynchandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,60 @@ def test_non_string_dict_message(self):
data = self.get_data()
# For some reason, non-string keys are ignored
self.assertFalse(42 in data[0][2])


class TestHandlerWithCircularQueue(unittest.TestCase):
Q_TIMEOUT = 0.04
Q_SIZE = 3

def setUp(self):
super(TestHandlerWithCircularQueue, self).setUp()
self._server = mockserver.MockRecvServer('localhost')
self._port = self._server.port
self.handler = None

def get_handler_class(self):
# return fluent.handler.FluentHandler
return fluent.asynchandler.FluentHandler

def get_data(self):
return self._server.get_recieved()

def test_simple(self):
handler = self.get_handler_class()('app.follow', port=self._port,
queue_timeout=self.Q_TIMEOUT,
queue_maxsize=self.Q_SIZE,
queue_circular=True)
self.handler = handler

self.assertEqual(self.handler.sender.queue_circular, True)
self.assertEqual(self.handler.sender.queue_maxsize, self.Q_SIZE)

logging.basicConfig(level=logging.INFO)
log = logging.getLogger('fluent.test')
handler.setFormatter(fluent.handler.FluentRecordFormatter())
log.addHandler(handler)
log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'})
log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'})
log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'})
log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'})
log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'})

# wait, giving time to the communicator thread to send the messages
time.sleep(0.5)
# close the handler, to join the thread and let the test suite to terminate
handler.close()

data = self.get_data()
eq = self.assertEqual
# with the logging interface, we can't be sure to have filled up the queue, so we can
# test only for a cautelative condition here
self.assertTrue(len(data) >= self.Q_SIZE)

el = data[0]
eq(3, len(el))
eq('app.follow', el[0])
eq('userA', el[2]['from'])
eq('userB', el[2]['to'])
self.assertTrue(el[1])
self.assertTrue(isinstance(el[1], int))
Loading