diff --git a/fluent/sender.py b/fluent/sender.py index 43ea213..9d99e3a 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -43,9 +43,14 @@ def __init__(self, self.pendings = None self.lock = threading.Lock() + self._last_error_threadlocal = threading.local() + try: self._reconnect() - except Exception: + except Exception as e: + # remember latest error + self.last_error = e + # will be retried in emit() self._close() @@ -81,15 +86,15 @@ def _send_internal(self, bytes_): bytes_ = self.pendings try: - # reconnect if possible + # connect/reconnect if necessary self._reconnect() # send message self.socket.sendall(bytes_) + except Exception as e: + # remember latest error + self.last_error = e - # send finished - self.pendings = None - except Exception: # close socket self._close() # clear buffer if it exceeds max bufer size @@ -98,6 +103,9 @@ def _send_internal(self, bytes_): self.pendings = None else: self.pendings = bytes_ + else: + # send finished + self.pendings = None def _reconnect(self): if not self.socket: @@ -111,6 +119,18 @@ def _reconnect(self): sock.connect((self.host, self.port)) self.socket = sock + @property + def last_error(self): + return getattr(self._last_error_threadlocal, 'exception', None) + + @last_error.setter + def last_error(self, err): + self._last_error_threadlocal.exception = err + + def clear_last_error(self, _thread_id=None): + if hasattr(self._last_error_threadlocal, 'exception'): + delattr(self._last_error_threadlocal, 'exception') + def _close(self): if self.socket: self.socket.close() diff --git a/tests/test_event.py b/tests/test_event.py index 8a0669b..07896de 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -2,22 +2,58 @@ import unittest -from fluent import event, sender +from mock import patch +from fluent import event, sender sender.setup(server='localhost', tag='app') class TestEvent(unittest.TestCase): def test_logging(self): + # XXX: This tests succeeds even if the fluentd connection failed + # send event with tag app.follow event.Event('follow', { 'from': 'userA', 'to': 'userB' }) + def test_logging_with_timestamp(self): + # XXX: This tests succeeds even if the fluentd connection failed + # send event with tag app.follow, with timestamp event.Event('follow', { 'from': 'userA', 'to': 'userB' }, time=int(0)) + + def test_no_last_error_on_successful_event(self): + global_sender = sender.get_global_sender() + event.Event('unfollow', { + 'from': 'userC', + 'to': 'userD' + }) + + # This test will fail unless you have a working connection to fluentd + self.assertEqual(global_sender.last_error, None) + + @patch('fluent.sender.socket') + def test_connect_exception_during_event_send(self, mock_socket): + # Make the socket.socket().connect() call raise a custom exception + mock_connect = mock_socket.socket.return_value.connect + EXCEPTION_MSG = "a event send socket connect() exception" + mock_connect.side_effect = Exception(EXCEPTION_MSG) + + # Force the socket to reconnect while trying to emit the event + global_sender = sender.get_global_sender() + global_sender._close() + + event.Event('unfollow', { + 'from': 'userE', + 'to': 'userF' + }) + + ex = global_sender.last_error + self.assertEqual(ex.message, EXCEPTION_MSG) + global_sender.clear_last_error() diff --git a/tests/test_sender.py b/tests/test_sender.py index e2e4335..50601ad 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -3,6 +3,8 @@ from __future__ import print_function import unittest +from mock import patch + import fluent.sender from tests import mockserver @@ -15,10 +17,20 @@ def setUp(self): try: self._server = mockserver.MockRecvServer('localhost', port) break - except IOError as exc: - print(exc) + # except IOError as exc: + # print(exc) + except IOError: + pass + self.server_port = port self._sender = fluent.sender.FluentSender(tag='test', port=port) + def tearDown(self): + # Make sure that the mock server thread terminates after each test + sender = self._sender + sender.emit('foo', {'bar': 'baz'}) + sender._close() + self.get_data() + def get_data(self): return self._server.get_recieved() @@ -32,5 +44,41 @@ def test_simple(self): eq(3, len(data[0])) eq('test.foo', data[0][0]) eq({'bar': 'baz'}, data[0][2]) - self.assertTrue(data[0][1]) - self.assertTrue(isinstance(data[0][1], int)) + self.assert_(data[0][1]) + self.assert_(isinstance(data[0][1], int)) + + def test_no_last_error_on_successful_emit(self): + sender = self._sender + sender.emit('foo', {'bar': 'baz'}) + sender._close() + + self.assertEqual(sender.last_error, None) + + def test_last_error_property(self): + sender = self._sender + EXCEPTION_MSG = "custom exception for testing last_error property" + + sender.last_error = Exception(EXCEPTION_MSG) + + self.assertEqual(sender.last_error.message, EXCEPTION_MSG) + + def test_clear_last_error(self): + sender = self._sender + EXCEPTION_MSG = "custom exception for testing clear_last_error" + sender.last_error = Exception(EXCEPTION_MSG) + + sender.clear_last_error() + + self.assertEqual(sender.last_error, None) + + @patch('fluent.sender.socket') + def test_connect_exception_during_sender_init(self, mock_socket): + # Make the socket.socket().connect() call raise a custom exception + mock_connect = mock_socket.socket.return_value.connect + EXCEPTION_MSG = "a sender init socket connect() exception" + mock_connect.side_effect = Exception(EXCEPTION_MSG) + + sender = fluent.sender.FluentSender(tag='test', port=self.server_port) + + ex = sender.last_error + self.assertEqual(ex.message, EXCEPTION_MSG)