Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions fluent/asynchandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from fluent import asyncsender
from fluent import handler
from fluent.handler import FluentRecordFormatter


class FluentHandler(handler.FluentHandler):
Expand All @@ -14,5 +13,7 @@ def getSenderClass(self):
return asyncsender.FluentSender

def close(self):
self.sender.close()
super(FluentHandler, self).close()
try:
self.sender.close()
finally:
super(FluentHandler, self).close()
29 changes: 13 additions & 16 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-

from __future__ import print_function

import threading
import time

try:
from queue import Queue, Full, Empty
except ImportError:
Expand All @@ -11,30 +13,32 @@
from fluent import sender
from fluent.sender import EventTime

__all__ = ["EventTime", "FluentSender"]

_global_sender = None

DEFAULT_QUEUE_TIMEOUT = 0.05
DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False


def _set_global_sender(sender):
def _set_global_sender(sender): # pragma: no cover
""" [For testing] Function to set global sender directly
"""
global _global_sender
_global_sender = sender


def setup(tag, **kwargs):
def setup(tag, **kwargs): # pragma: no cover
global _global_sender
_global_sender = FluentSender(tag, **kwargs)


def get_global_sender():
def get_global_sender(): # pragma: no cover
return _global_sender


def close():
def close(): # pragma: no cover
get_global_sender().close()


Expand Down Expand Up @@ -81,9 +85,8 @@ def run(self):
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
except Empty:
continue
self._conn_close_lock.acquire()
self._sender._send(bytes_)
self._conn_close_lock.release()
with self._conn_close_lock:
self._sender._send(bytes_)

def close(self, flush=True, discard=True):
if discard:
Expand All @@ -98,14 +101,8 @@ def close(self, flush=True, discard=True):
self._sender.close()

def _close(self):
self._conn_close_lock.acquire()
# self._sender.lock.acquire()
try:
with self._conn_close_lock:
self._sender._close()
finally:
# self._sender.lock.release()
self._conn_close_lock.release()
pass

@property
def last_error(self):
Expand All @@ -115,7 +112,7 @@ def last_error(self):
def last_error(self, err):
self._sender.last_error = err

def clear_last_error(self, _thread_id = None):
def clear_last_error(self, _thread_id=None):
self._sender.clear_last_error(_thread_id=_thread_id)

@property
Expand Down Expand Up @@ -196,7 +193,7 @@ def last_error(self):
def last_error(self, err):
self._communicator.last_error = err

def clear_last_error(self, _thread_id = None):
def clear_last_error(self, _thread_id=None):
self._communicator.clear_last_error(_thread_id=_thread_id)

@property
Expand Down
53 changes: 30 additions & 23 deletions fluent/sender.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
import struct

import socket
import struct
import threading
import time
import traceback

import msgpack


_global_sender = None


def _set_global_sender(sender):
def _set_global_sender(sender): # pragma: no cover
""" [For testing] Function to set global sender directly
"""
global _global_sender
_global_sender = sender


def setup(tag, **kwargs):
def setup(tag, **kwargs): # pragma: no cover
global _global_sender
_global_sender = FluentSender(tag, **kwargs)


def get_global_sender():
def get_global_sender(): # pragma: no cover
return _global_sender

def close():

def close(): # pragma: no cover
get_global_sender().close()


Expand All @@ -54,7 +55,7 @@ def __init__(self,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.

self.tag = tag
self.host = host
Expand Down Expand Up @@ -98,8 +99,7 @@ def emit_with_time(self, label, timestamp, data):
return self._send(bytes_)

def close(self):
self.lock.acquire()
try:
with self.lock:
if self.pendings:
try:
self._send_data(self.pendings)
Expand All @@ -108,8 +108,6 @@ def close(self):

self._close()
self.pendings = None
finally:
self.lock.release()

def _make_packet(self, label, timestamp, data):
if label:
Expand All @@ -122,11 +120,8 @@ def _make_packet(self, label, timestamp, data):
return msgpack.packb(packet, **self.msgpack_kwargs)

def _send(self, bytes_):
self.lock.acquire()
try:
with self.lock:
return self._send_internal(bytes_)
finally:
self.lock.release()

def _send_internal(self, bytes_):
# buffering
Expand All @@ -142,7 +137,6 @@ def _send_internal(self, bytes_):

return True
except socket.error as e:
#except Exception as e:
self.last_error = e

# close socket
Expand All @@ -161,7 +155,13 @@ def _send_data(self, bytes_):
# reconnect if possible
self._reconnect()
# send message
self.socket.sendall(bytes_)
bytes_to_send = len(bytes_)
bytes_sent = 0
while bytes_sent < bytes_to_send:
sent = self.socket.send(bytes_[bytes_sent:])
if sent == 0:
raise BrokenPipeError(32, 'broken pipe')
bytes_sent += sent

def _reconnect(self):
if not self.socket:
Expand All @@ -172,6 +172,8 @@ def _reconnect(self):
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
# This might be controversial and may need to be removed
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.connect((self.host, self.port))
self.socket = sock

Expand All @@ -189,17 +191,22 @@ def last_error(self):

@last_error.setter
def last_error(self, err):
self._last_error_threadlocal.exception = err
self._last_error_threadlocal.exception = err

def clear_last_error(self, _thread_id = None):
def clear_last_error(self, _thread_id=None):
if hasattr(self._last_error_threadlocal, 'exception'):
delattr(self._last_error_threadlocal, 'exception')

def _close(self):
if self.socket:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
self.socket = None
try:
sock = self.socket
if sock:
try:
sock.shutdown(socket.SHUT_RDWR)
finally:
sock.close()
finally:
self.socket = None

def __enter__(self):
return self
Expand Down
9 changes: 6 additions & 3 deletions tests/test_sender.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
import unittest

import socket
import unittest

import msgpack

import fluent.sender
Expand Down Expand Up @@ -127,8 +129,9 @@ def test_clear_last_error(self):

self.assertEqual(self._sender.last_error, None)

@unittest.skip("This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
#@patch('fluent.sender.socket')
@unittest.skip(
"This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
# @patch('fluent.sender.socket')
def test_connect_exception_during_sender_init(self, mock_socket):
# Make the socket.socket().connect() call raise a custom exception
mock_connect = mock_socket.socket.return_value.connect
Expand Down