Skip to content

Commit

Permalink
Merge pull request #128 from arcivanov/verbose_trace
Browse files Browse the repository at this point in the history
Perform a non-blocking read-side check before and after send
  • Loading branch information
arcivanov committed Jan 14, 2018
2 parents 425acf5 + 149701c commit 73af98f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 5 deletions.
17 changes: 17 additions & 0 deletions fluent/sender.py
Expand Up @@ -166,17 +166,34 @@ def _send_internal(self, bytes_):

return False

def _check_recv_side(self):
try:
self.socket.settimeout(0.0)
try:
recvd = self.socket.recv(4096, socket.MSG_DONTWAIT)
except socket.error as recv_e:
if recv_e.errno != errno.EWOULDBLOCK:
raise
return

if recvd == b'':
raise socket.error(errno.EPIPE, "Broken pipe")
finally:
self.socket.settimeout(self.timeout)

def _send_data(self, bytes_):
# reconnect if possible
self._reconnect()
# send message
bytes_to_send = len(bytes_)
bytes_sent = 0
self._check_recv_side()
while bytes_sent < bytes_to_send:
sent = self.socket.send(bytes_[bytes_sent:])
if sent == 0:
raise socket.error(errno.EPIPE, "Broken pipe")
bytes_sent += sent
self._check_recv_side()

def _reconnect(self):
if not self.socket:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -12,7 +12,7 @@

setup(
name='fluent-logger',
version='0.9.0',
version='0.9.9',
description=desc,
long_description=open(README).read(),
package_dir={'fluent': 'fluent'},
Expand Down
1 change: 0 additions & 1 deletion tests/test_asynchandler.py
Expand Up @@ -2,7 +2,6 @@

import logging
import sys
import time
import unittest

import fluent.asynchandler
Expand Down
62 changes: 59 additions & 3 deletions tests/test_sender.py
Expand Up @@ -218,20 +218,76 @@ def test_broken_conn(self):
self.assertTrue(sender.socket)

class FakeSocket:
def __init__(self):
self.to = 123
self.send_side_effects = [3, 0, 9]
self.send_idx = 0
self.recv_side_effects = [socket.error(errno.EWOULDBLOCK, "Blah"),
b"this data is going to be ignored",
b"",
socket.error(errno.EWOULDBLOCK, "Blah"),
socket.error(errno.EWOULDBLOCK, "Blah"),
socket.error(errno.EACCES, "This error will never happen"),
]
self.recv_idx = 0

def send(self, bytes_):
return 0
try:
v = self.send_side_effects[self.send_idx]
if isinstance(v, Exception):
raise v
if isinstance(v, type) and issubclass(v, Exception):
raise v()
return v
finally:
self.send_idx += 1

def shutdown(self, mode):
pass

def close(self):
pass

def settimeout(self, to):
self.to = to

def gettimeout(self):
return self.to

def recv(self, bufsize, flags):
try:
v = self.recv_side_effects[self.recv_idx]
if isinstance(v, Exception):
raise v
if isinstance(v, type) and issubclass(v, Exception):
raise v()
return v
finally:
self.recv_idx += 1

old_sock = self._sender.socket
self._sender.socket = FakeSocket()
sock = FakeSocket()

try:
self._sender.socket = sock
sender.last_error = None
self.assertTrue(sender._send_internal(b"456"))
self.assertFalse(sender.last_error)

self._sender.socket = sock
sender.last_error = None
self.assertFalse(sender._send_internal(b"456"))
self.assertEqual(sender.last_error.errno, errno.EPIPE)

self._sender.socket = sock
sender.last_error = None
self.assertFalse(sender._send_internal(b"456"))
self.assertEqual(sender.last_error.errno, errno.EPIPE)

self._sender.socket = sock
sender.last_error = None
self.assertFalse(sender._send_internal(b"456"))
self.assertTrue(sender.last_error.errno, errno.EPIPE)
self.assertEqual(sender.last_error.errno, errno.EACCES)
finally:
self._sender.socket = old_sock

Expand Down

0 comments on commit 73af98f

Please sign in to comment.