Skip to content

Commit

Permalink
Merge pull request #156 from pguz/queue_overflow_handler
Browse files Browse the repository at this point in the history
Add queue overflow handler in asyncsender.
  • Loading branch information
arcivanov committed Jun 24, 2020
2 parents eb68481 + 478bd02 commit d1b81ba
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
12 changes: 11 additions & 1 deletion fluent/asyncsender.py
Expand Up @@ -55,6 +55,7 @@ def __init__(self,
msgpack_kwargs=None,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
queue_overflow_handler=None,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
Expand All @@ -66,6 +67,10 @@ def __init__(self,
**kwargs)
self._queue_maxsize = queue_maxsize
self._queue_circular = queue_circular
if queue_circular and queue_overflow_handler:
self._queue_overflow_handler = queue_overflow_handler
else:
self._queue_overflow_handler = self._queue_overflow_handler_default

self._thread_guard = threading.Event() # This ensures visibility across all variables
self._closed = False
Expand Down Expand Up @@ -109,9 +114,11 @@ def _send(self, bytes_):
if self._queue_circular and self._queue.full():
# discard oldest
try:
self._queue.get(block=False)
discarded_bytes = self._queue.get(block=False)
except Empty: # pragma: no cover
pass
else:
self._queue_overflow_handler(discarded_bytes)
try:
self._queue.put(bytes_, block=(not self._queue_circular))
except Full: # pragma: no cover
Expand All @@ -132,5 +139,8 @@ def _send_loop(self):
finally:
self._close()

def _queue_overflow_handler_default(self, discarded_bytes):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
76 changes: 76 additions & 0 deletions tests/test_asynchandler.py
Expand Up @@ -4,6 +4,17 @@
import sys
import unittest

try:
from unittest import mock
except ImportError:
import mock
try:
from unittest.mock import patch
except ImportError:
from mock import patch



import fluent.asynchandler
import fluent.handler
from tests import mockserver
Expand Down Expand Up @@ -309,3 +320,68 @@ def test_simple(self):
eq('userB', el[2]['to'])
self.assertTrue(el[1])
self.assertTrue(isinstance(el[1], int))


class QueueOverflowException(BaseException):
pass


def queue_overflow_handler(discarded_bytes):
raise QueueOverflowException(discarded_bytes)


class TestHandlerWithCircularQueueHandler(unittest.TestCase):
Q_SIZE = 1

def setUp(self):
super(TestHandlerWithCircularQueueHandler, self).setUp()
self._server = mockserver.MockRecvServer('localhost')
self._port = self._server.port

def tearDown(self):
self._server.close()

def get_handler_class(self):
# return fluent.handler.FluentHandler
return fluent.asynchandler.FluentHandler

def test_simple(self):
handler = self.get_handler_class()('app.follow', port=self._port,
queue_maxsize=self.Q_SIZE,
queue_circular=True,
queue_overflow_handler=queue_overflow_handler)
with handler:
def custom_full_queue():
handler.sender._queue.put(b'Mock', block=True)
return True

with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)):
self.assertEqual(handler.sender.queue_circular, True)
self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE)

logging.basicConfig(level=logging.INFO)
log = logging.getLogger('fluent.test')
handler.setFormatter(fluent.handler.FluentRecordFormatter())
log.addHandler(handler)

exc_counter = 0

try:
log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'})
except QueueOverflowException:
exc_counter += 1

try:
log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'})
except QueueOverflowException:
exc_counter += 1

try:
log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'})
except QueueOverflowException:
exc_counter += 1

# we can't be sure to have exception in every case due to multithreading,
# so we can test only for a cautelative condition here
print('Exception raised: {} (expected 3)'.format(exc_counter))
assert exc_counter >= 0

0 comments on commit d1b81ba

Please sign in to comment.