diff --git a/fluent/sender.py b/fluent/sender.py index 424e641..4b901a3 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -166,17 +166,34 @@ def _send_internal(self, bytes_): return False + def _check_recv_side(self): + try: + self.socket.settimeout(0.0) + try: + recvd = self.socket.recv(4096, socket.MSG_DONTWAIT) + except socket.error as recv_e: + if recv_e.errno != errno.EWOULDBLOCK: + raise + return + + if recvd == b'': + raise socket.error(errno.EPIPE, "Broken pipe") + finally: + self.socket.settimeout(self.timeout) + def _send_data(self, bytes_): # reconnect if possible self._reconnect() # send message bytes_to_send = len(bytes_) bytes_sent = 0 + self._check_recv_side() while bytes_sent < bytes_to_send: sent = self.socket.send(bytes_[bytes_sent:]) if sent == 0: raise socket.error(errno.EPIPE, "Broken pipe") bytes_sent += sent + self._check_recv_side() def _reconnect(self): if not self.socket: diff --git a/setup.py b/setup.py index a8f52fb..d00b340 100755 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name='fluent-logger', - version='0.9.0', + version='0.9.9', description=desc, long_description=open(README).read(), package_dir={'fluent': 'fluent'}, diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index 3994e7b..8c4024d 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -2,7 +2,6 @@ import logging import sys -import time import unittest import fluent.asynchandler diff --git a/tests/test_sender.py b/tests/test_sender.py index 2af3907..f1f3f98 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -218,8 +218,29 @@ def test_broken_conn(self): self.assertTrue(sender.socket) class FakeSocket: + def __init__(self): + self.to = 123 + self.send_side_effects = [3, 0, 9] + self.send_idx = 0 + self.recv_side_effects = [socket.error(errno.EWOULDBLOCK, "Blah"), + b"this data is going to be ignored", + b"", + socket.error(errno.EWOULDBLOCK, "Blah"), + socket.error(errno.EWOULDBLOCK, "Blah"), + socket.error(errno.EACCES, "This error will never happen"), + ] + self.recv_idx = 0 + def send(self, bytes_): - return 0 + try: + v = self.send_side_effects[self.send_idx] + if isinstance(v, Exception): + raise v + if isinstance(v, type) and issubclass(v, Exception): + raise v() + return v + finally: + self.send_idx += 1 def shutdown(self, mode): pass @@ -227,11 +248,46 @@ def shutdown(self, mode): def close(self): pass + def settimeout(self, to): + self.to = to + + def gettimeout(self): + return self.to + + def recv(self, bufsize, flags): + try: + v = self.recv_side_effects[self.recv_idx] + if isinstance(v, Exception): + raise v + if isinstance(v, type) and issubclass(v, Exception): + raise v() + return v + finally: + self.recv_idx += 1 + old_sock = self._sender.socket - self._sender.socket = FakeSocket() + sock = FakeSocket() + try: + self._sender.socket = sock + sender.last_error = None + self.assertTrue(sender._send_internal(b"456")) + self.assertFalse(sender.last_error) + + self._sender.socket = sock + sender.last_error = None + self.assertFalse(sender._send_internal(b"456")) + self.assertEqual(sender.last_error.errno, errno.EPIPE) + + self._sender.socket = sock + sender.last_error = None + self.assertFalse(sender._send_internal(b"456")) + self.assertEqual(sender.last_error.errno, errno.EPIPE) + + self._sender.socket = sock + sender.last_error = None self.assertFalse(sender._send_internal(b"456")) - self.assertTrue(sender.last_error.errno, errno.EPIPE) + self.assertEqual(sender.last_error.errno, errno.EACCES) finally: self._sender.socket = old_sock