Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions fluent/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ def __init__(self,
self.pendings = None
self.lock = threading.Lock()

self._last_error_threadlocal = threading.local()

try:
self._reconnect()
except Exception:
except Exception as e:
# remember latest error
self.last_error = e

# will be retried in emit()
self._close()

Expand Down Expand Up @@ -81,15 +86,15 @@ def _send_internal(self, bytes_):
bytes_ = self.pendings

try:
# reconnect if possible
# connect/reconnect if necessary
self._reconnect()

# send message
self.socket.sendall(bytes_)
except Exception as e:
# remember latest error
self.last_error = e

# send finished
self.pendings = None
except Exception:
# close socket
self._close()
# clear buffer if it exceeds max bufer size
Expand All @@ -98,6 +103,9 @@ def _send_internal(self, bytes_):
self.pendings = None
else:
self.pendings = bytes_
else:
# send finished
self.pendings = None

def _reconnect(self):
if not self.socket:
Expand All @@ -111,6 +119,18 @@ def _reconnect(self):
sock.connect((self.host, self.port))
self.socket = sock

@property
def last_error(self):
return getattr(self._last_error_threadlocal, 'exception', None)

@last_error.setter
def last_error(self, err):
self._last_error_threadlocal.exception = err

def clear_last_error(self, _thread_id=None):
if hasattr(self._last_error_threadlocal, 'exception'):
delattr(self._last_error_threadlocal, 'exception')

def _close(self):
if self.socket:
self.socket.close()
Expand Down
38 changes: 37 additions & 1 deletion tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,58 @@

import unittest

from fluent import event, sender
from mock import patch

from fluent import event, sender

sender.setup(server='localhost', tag='app')


class TestEvent(unittest.TestCase):
def test_logging(self):
# XXX: This tests succeeds even if the fluentd connection failed

# send event with tag app.follow
event.Event('follow', {
'from': 'userA',
'to': 'userB'
})

def test_logging_with_timestamp(self):
# XXX: This tests succeeds even if the fluentd connection failed

# send event with tag app.follow, with timestamp
event.Event('follow', {
'from': 'userA',
'to': 'userB'
}, time=int(0))

def test_no_last_error_on_successful_event(self):
global_sender = sender.get_global_sender()
event.Event('unfollow', {
'from': 'userC',
'to': 'userD'
})

# This test will fail unless you have a working connection to fluentd
self.assertEqual(global_sender.last_error, None)

@patch('fluent.sender.socket')
def test_connect_exception_during_event_send(self, mock_socket):
# Make the socket.socket().connect() call raise a custom exception
mock_connect = mock_socket.socket.return_value.connect
EXCEPTION_MSG = "a event send socket connect() exception"
mock_connect.side_effect = Exception(EXCEPTION_MSG)

# Force the socket to reconnect while trying to emit the event
global_sender = sender.get_global_sender()
global_sender._close()

event.Event('unfollow', {
'from': 'userE',
'to': 'userF'
})

ex = global_sender.last_error
self.assertEqual(ex.message, EXCEPTION_MSG)
global_sender.clear_last_error()
56 changes: 52 additions & 4 deletions tests/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from __future__ import print_function
import unittest

from mock import patch

import fluent.sender

from tests import mockserver
Expand All @@ -15,10 +17,20 @@ def setUp(self):
try:
self._server = mockserver.MockRecvServer('localhost', port)
break
except IOError as exc:
print(exc)
# except IOError as exc:
# print(exc)
except IOError:
pass
self.server_port = port
self._sender = fluent.sender.FluentSender(tag='test', port=port)

def tearDown(self):
# Make sure that the mock server thread terminates after each test
sender = self._sender
sender.emit('foo', {'bar': 'baz'})
sender._close()
self.get_data()

def get_data(self):
return self._server.get_recieved()

Expand All @@ -32,5 +44,41 @@ def test_simple(self):
eq(3, len(data[0]))
eq('test.foo', data[0][0])
eq({'bar': 'baz'}, data[0][2])
self.assertTrue(data[0][1])
self.assertTrue(isinstance(data[0][1], int))
self.assert_(data[0][1])
self.assert_(isinstance(data[0][1], int))

def test_no_last_error_on_successful_emit(self):
sender = self._sender
sender.emit('foo', {'bar': 'baz'})
sender._close()

self.assertEqual(sender.last_error, None)

def test_last_error_property(self):
sender = self._sender
EXCEPTION_MSG = "custom exception for testing last_error property"

sender.last_error = Exception(EXCEPTION_MSG)

self.assertEqual(sender.last_error.message, EXCEPTION_MSG)

def test_clear_last_error(self):
sender = self._sender
EXCEPTION_MSG = "custom exception for testing clear_last_error"
sender.last_error = Exception(EXCEPTION_MSG)

sender.clear_last_error()

self.assertEqual(sender.last_error, None)

@patch('fluent.sender.socket')
def test_connect_exception_during_sender_init(self, mock_socket):
# Make the socket.socket().connect() call raise a custom exception
mock_connect = mock_socket.socket.return_value.connect
EXCEPTION_MSG = "a sender init socket connect() exception"
mock_connect.side_effect = Exception(EXCEPTION_MSG)

sender = fluent.sender.FluentSender(tag='test', port=self.server_port)

ex = sender.last_error
self.assertEqual(ex.message, EXCEPTION_MSG)