diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 7f8dc02..e140774 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -55,6 +55,7 @@ def __init__(self, msgpack_kwargs=None, queue_maxsize=DEFAULT_QUEUE_MAXSIZE, queue_circular=DEFAULT_QUEUE_CIRCULAR, + queue_overflow_handler=None, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. @@ -66,6 +67,10 @@ def __init__(self, **kwargs) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular + if queue_circular and queue_overflow_handler: + self._queue_overflow_handler = queue_overflow_handler + else: + self._queue_overflow_handler = self._queue_overflow_handler_default self._thread_guard = threading.Event() # This ensures visibility across all variables self._closed = False @@ -109,9 +114,11 @@ def _send(self, bytes_): if self._queue_circular and self._queue.full(): # discard oldest try: - self._queue.get(block=False) + discarded_bytes = self._queue.get(block=False) except Empty: # pragma: no cover pass + else: + self._queue_overflow_handler(discarded_bytes) try: self._queue.put(bytes_, block=(not self._queue_circular)) except Full: # pragma: no cover @@ -132,5 +139,8 @@ def _send_loop(self): finally: self._close() + def _queue_overflow_handler_default(self, discarded_bytes): + pass + def __exit__(self, exc_type, exc_val, exc_tb): self.close() diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index 52d9182..e88a041 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -4,6 +4,17 @@ import sys import unittest +try: + from unittest import mock +except ImportError: + import mock +try: + from unittest.mock import patch +except ImportError: + from mock import patch + + + import fluent.asynchandler import fluent.handler from tests import mockserver @@ -309,3 +320,68 @@ def test_simple(self): eq('userB', el[2]['to']) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) + + +class QueueOverflowException(BaseException): + pass + + +def queue_overflow_handler(discarded_bytes): + raise QueueOverflowException(discarded_bytes) + + +class TestHandlerWithCircularQueueHandler(unittest.TestCase): + Q_SIZE = 1 + + def setUp(self): + super(TestHandlerWithCircularQueueHandler, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._port = self._server.port + + def tearDown(self): + self._server.close() + + def get_handler_class(self): + # return fluent.handler.FluentHandler + return fluent.asynchandler.FluentHandler + + def test_simple(self): + handler = self.get_handler_class()('app.follow', port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + queue_overflow_handler=queue_overflow_handler) + with handler: + def custom_full_queue(): + handler.sender._queue.put(b'Mock', block=True) + return True + + with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)): + self.assertEqual(handler.sender.queue_circular, True) + self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + exc_counter = 0 + + try: + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + try: + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + try: + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + # we can't be sure to have exception in every case due to multithreading, + # so we can test only for a cautelative condition here + print('Exception raised: {} (expected 3)'.format(exc_counter)) + assert exc_counter >= 0