diff --git a/README.rst b/README.rst index c243652..3e1a621 100644 --- a/README.rst +++ b/README.rst @@ -348,6 +348,18 @@ or for the python logging interface: sure the communication thread terminates and it's joined correctly. Otherwise the program won't exit, waiting for the thread, unless forcibly killed. +#### Circular queue mode + +In some applications it can be especially important to guarantee that the logging process won't block under *any* +circumstance, even when it's logging faster than the sending thread could handle (_backpressure_). In this case it's +possible to enable the `circular queue` mode, by passing `True` in the `queue_circular` parameter of +``asyncsender.FluentSender`` or ``asynchandler.FluentHandler``. By doing so the thread doing the logging won't block +even when the queue is full, the new event will be added to the queue by discarding the oldest one. + +**WARNING**: setting `queue_circular` to `True` will cause loss of events if the queue fills up completely! Make sure +that this doesn't happen, or it's acceptable for your application. + + Testing ------- diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 178a0ba..788be25 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -14,6 +14,8 @@ _global_sender = None DEFAULT_QUEUE_TIMEOUT = 0.05 +DEFAULT_QUEUE_MAXSIZE = 100 +DEFAULT_QUEUE_CIRCULAR = False def _set_global_sender(sender): @@ -46,19 +48,29 @@ def __init__(self, tag, buffer_overflow_handler=None, nanosecond_precision=False, msgpack_kwargs=None, - queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs): + queue_timeout=DEFAULT_QUEUE_TIMEOUT, + queue_maxsize=DEFAULT_QUEUE_MAXSIZE, + queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs): super(CommunicatorThread, self).__init__(**kwargs) - self._queue = Queue() + self._queue = Queue(maxsize=queue_maxsize) self._do_run = True self._queue_timeout = queue_timeout + self._queue_maxsize = queue_maxsize + self._queue_circular = queue_circular self._conn_close_lock = threading.Lock() self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs) def send(self, bytes_): + if self._queue_circular and self._queue.full(): + # discard oldest + try: + self._queue.get(block=False) + except Empty: # pragma: no cover + pass try: - self._queue.put(bytes_) + self._queue.put(bytes_, block=(not self._queue_circular)) except Full: return False return True @@ -114,11 +126,17 @@ def queue_timeout(self): def queue_timeout(self, value): self._queue_timeout = value - def __enter__(self): - return self + @property + def queue_maxsize(self): + return self._queue_maxsize - def __exit__(self, typ, value, traceback): - self.close() + @property + def queue_blocking(self): + return not self._queue_circular + + @property + def queue_circular(self): + return self._queue_circular class FluentSender(sender.FluentSender): @@ -133,6 +151,8 @@ def __init__(self, nanosecond_precision=False, msgpack_kwargs=None, queue_timeout=DEFAULT_QUEUE_TIMEOUT, + queue_maxsize=DEFAULT_QUEUE_MAXSIZE, + queue_circular=DEFAULT_QUEUE_CIRCULAR, **kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version. super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, @@ -141,7 +161,8 @@ def __init__(self, self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs, - queue_timeout=queue_timeout) + queue_timeout=queue_timeout, queue_maxsize=queue_maxsize, + queue_circular=queue_circular) self._communicator.start() def _send(self, bytes_): @@ -152,10 +173,10 @@ def _close(self): self._communicator._close() def _send_internal(self, bytes_): - return + assert False # pragma: no cover def _send_data(self, bytes_): - return + assert False # pragma: no cover # override reconnect, so we don't open a socket here (since it # will be opened by the CommunicatorThread) @@ -186,6 +207,18 @@ def queue_timeout(self): def queue_timeout(self, value): self._communicator.queue_timeout = value + @property + def queue_maxsize(self): + return self._communicator.queue_maxsize + + @property + def queue_blocking(self): + return self._communicator.queue_blocking + + @property + def queue_circular(self): + return self._communicator.queue_circular + def __enter__(self): return self diff --git a/fluent/handler.py b/fluent/handler.py index efabff6..d2e79be 100644 --- a/fluent/handler.py +++ b/fluent/handler.py @@ -152,7 +152,8 @@ def __init__(self, verbose=False, buffer_overflow_handler=None, msgpack_kwargs=None, - nanosecond_precision=False): + nanosecond_precision=False, + **kwargs): self.tag = tag self.sender = self.getSenderInstance(tag, @@ -160,7 +161,8 @@ def __init__(self, timeout=timeout, verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, msgpack_kwargs=msgpack_kwargs, - nanosecond_precision=nanosecond_precision) + nanosecond_precision=nanosecond_precision, + **kwargs) logging.Handler.__init__(self) def getSenderClass(self): @@ -168,14 +170,14 @@ def getSenderClass(self): def getSenderInstance(self, tag, host, port, timeout, verbose, buffer_overflow_handler, msgpack_kwargs, - nanosecond_precision): + nanosecond_precision, **kwargs): sender_class = self.getSenderClass() return sender_class(tag, host=host, port=port, timeout=timeout, verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, msgpack_kwargs=msgpack_kwargs, - nanosecond_precision=nanosecond_precision) + nanosecond_precision=nanosecond_precision, **kwargs) def emit(self, record): data = self.format(record) diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index 2725c6a..bd03eef 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -277,3 +277,60 @@ def test_non_string_dict_message(self): data = self.get_data() # For some reason, non-string keys are ignored self.assertFalse(42 in data[0][2]) + + +class TestHandlerWithCircularQueue(unittest.TestCase): + Q_TIMEOUT = 0.04 + Q_SIZE = 3 + + def setUp(self): + super(TestHandlerWithCircularQueue, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._port = self._server.port + self.handler = None + + def get_handler_class(self): + # return fluent.handler.FluentHandler + return fluent.asynchandler.FluentHandler + + def get_data(self): + return self._server.get_recieved() + + def test_simple(self): + handler = self.get_handler_class()('app.follow', port=self._port, + queue_timeout=self.Q_TIMEOUT, + queue_maxsize=self.Q_SIZE, + queue_circular=True) + self.handler = handler + + self.assertEqual(self.handler.sender.queue_circular, True) + self.assertEqual(self.handler.sender.queue_maxsize, self.Q_SIZE) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'}) + + # wait, giving time to the communicator thread to send the messages + time.sleep(0.5) + # close the handler, to join the thread and let the test suite to terminate + handler.close() + + data = self.get_data() + eq = self.assertEqual + # with the logging interface, we can't be sure to have filled up the queue, so we can + # test only for a cautelative condition here + self.assertTrue(len(data) >= self.Q_SIZE) + + el = data[0] + eq(3, len(el)) + eq('app.follow', el[0]) + eq('userA', el[2]['from']) + eq('userB', el[2]['to']) + self.assertTrue(el[1]) + self.assertTrue(isinstance(el[1], int)) diff --git a/tests/test_asyncsender.py b/tests/test_asyncsender.py index 477a768..53add21 100644 --- a/tests/test_asyncsender.py +++ b/tests/test_asyncsender.py @@ -145,6 +145,27 @@ def test_connect_exception_during_sender_init(self, mock_socket): self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG) +class TestSenderDefaultProperties(unittest.TestCase): + def setUp(self): + super(TestSenderDefaultProperties, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._sender = fluent.asyncsender.FluentSender(tag='test', + port=self._server.port) + + def tearDown(self): + self._sender.close() + + def test_default_properties(self): + sender = self._sender + self.assertTrue(sender.queue_blocking) + self.assertFalse(sender.queue_circular) + self.assertTrue(isinstance(sender.queue_maxsize, int)) + self.assertTrue(sender.queue_maxsize > 0) + self.assertTrue(isinstance(sender.queue_timeout, (int, float))) + self.assertTrue(sender.queue_timeout > 0) + sender._close() + + class TestSenderWithTimeout(unittest.TestCase): def setUp(self): super(TestSenderWithTimeout, self).setUp() @@ -195,3 +216,151 @@ def test_event_time(self): time = fluent.asyncsender.EventTime(1490061367.8616468906402588) self.assertEqual(time.code, 0) self.assertEqual(time.data, b'X\xd0\x8873[\xb0*') + + +class TestSenderWithTimeoutAndCircular(unittest.TestCase): + Q_SIZE = 3 + + def setUp(self): + super(TestSenderWithTimeoutAndCircular, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._sender = fluent.asyncsender.FluentSender(tag='test', + port=self._server.port, + queue_timeout=0.04, + queue_maxsize=self.Q_SIZE, + queue_circular=True) + + def tearDown(self): + self._sender.close() + + def get_data(self): + return self._server.get_recieved() + + def test_simple(self): + sender = self._sender + + self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE) + self.assertEqual(self._sender.queue_circular, True) + self.assertEqual(self._sender.queue_blocking, False) + + ok = sender.emit('foo1', {'bar': 'baz1'}) + self.assertTrue(ok) + ok = sender.emit('foo2', {'bar': 'baz2'}) + self.assertTrue(ok) + ok = sender.emit('foo3', {'bar': 'baz3'}) + self.assertTrue(ok) + ok = sender.emit('foo4', {'bar': 'baz4'}) + self.assertTrue(ok) + ok = sender.emit('foo5', {'bar': 'baz5'}) + self.assertTrue(ok) + time.sleep(0.5) + sender._close() + data = self.get_data() + eq = self.assertEqual + eq(self.Q_SIZE, len(data)) + eq(3, len(data[0])) + eq('test.foo3', data[0][0]) + eq({'bar': 'baz3'}, data[0][2]) + self.assertTrue(data[0][1]) + self.assertTrue(isinstance(data[0][1], int)) + + eq(3, len(data[2])) + eq('test.foo5', data[2][0]) + eq({'bar': 'baz5'}, data[2][2]) + + +class TestSenderWithTimeoutMaxSizeNonCircular(unittest.TestCase): + Q_SIZE = 3 + + def setUp(self): + super(TestSenderWithTimeoutMaxSizeNonCircular, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._sender = fluent.asyncsender.FluentSender(tag='test', + port=self._server.port, + queue_timeout=0.04, + queue_maxsize=self.Q_SIZE) + + def tearDown(self): + self._sender.close() + + def get_data(self): + return self._server.get_recieved() + + def test_simple(self): + sender = self._sender + + self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE) + self.assertEqual(self._sender.queue_blocking, True) + self.assertEqual(self._sender.queue_circular, False) + + ok = sender.emit('foo1', {'bar': 'baz1'}) + self.assertTrue(ok) + ok = sender.emit('foo2', {'bar': 'baz2'}) + self.assertTrue(ok) + ok = sender.emit('foo3', {'bar': 'baz3'}) + self.assertTrue(ok) + ok = sender.emit('foo4', {'bar': 'baz4'}) + self.assertTrue(ok) + ok = sender.emit('foo5', {'bar': 'baz5'}) + self.assertTrue(ok) + time.sleep(0.5) + sender._close() + data = self.get_data() + eq = self.assertEqual + print(data) + eq(5, len(data)) + eq(3, len(data[0])) + eq('test.foo1', data[0][0]) + eq({'bar': 'baz1'}, data[0][2]) + self.assertTrue(data[0][1]) + self.assertTrue(isinstance(data[0][1], int)) + + eq(3, len(data[2])) + eq('test.foo3', data[2][0]) + eq({'bar': 'baz3'}, data[2][2]) + + +class TestSenderUnlimitedSize(unittest.TestCase): + Q_SIZE = 3 + + def setUp(self): + super(TestSenderUnlimitedSize, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._sender = fluent.asyncsender.FluentSender(tag='test', + port=self._server.port, + queue_timeout=0.04, + queue_maxsize=0) + + def tearDown(self): + self._sender.close() + + def get_data(self): + return self._server.get_recieved() + + def test_simple(self): + sender = self._sender + + self.assertEqual(self._sender.queue_maxsize, 0) + self.assertEqual(self._sender.queue_blocking, True) + self.assertEqual(self._sender.queue_circular, False) + + NUM = 1000 + for i in range(1, NUM+1): + ok = sender.emit("foo{}".format(i), {'bar': "baz{}".format(i)}) + self.assertTrue(ok) + time.sleep(0.5) + sender._close() + data = self.get_data() + eq = self.assertEqual + eq(NUM, len(data)) + el = data[0] + eq(3, len(el)) + eq('test.foo1', el[0]) + eq({'bar': 'baz1'}, el[2]) + self.assertTrue(el[1]) + self.assertTrue(isinstance(el[1], int)) + + el = data[NUM-1] + eq(3, len(el)) + eq("test.foo{}".format(NUM), el[0]) + eq({'bar': "baz{}".format(NUM)}, el[2])