Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions fluent/asynchandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ def getSenderClass(self):
return asyncsender.FluentSender

def close(self):
self.acquire()
try:
self.sender.close()
try:
self.sender.close()
finally:
super(FluentHandler, self).close()
finally:
super(FluentHandler, self).close()
self.release()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
215 changes: 63 additions & 152 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import print_function

import threading
import time

try:
from queue import Queue, Full, Empty
Expand All @@ -15,12 +14,13 @@

__all__ = ["EventTime", "FluentSender"]

_global_sender = None

DEFAULT_QUEUE_TIMEOUT = 0.05
DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False

_TOMBSTONE = object()

_global_sender = None


def _set_global_sender(sender): # pragma: no cover
""" [For testing] Function to set global sender directly
Expand All @@ -42,8 +42,9 @@ def close(): # pragma: no cover
get_global_sender().close()


class CommunicatorThread(threading.Thread):
def __init__(self, tag,
class FluentSender(sender.FluentSender):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
Expand All @@ -52,76 +53,42 @@ def __init__(self, tag,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
super(CommunicatorThread, self).__init__(**kwargs)
self._queue = Queue(maxsize=queue_maxsize)
self._do_run = True
self._queue_timeout = queue_timeout
queue_circular=DEFAULT_QUEUE_CIRCULAR,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
"""
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision,
msgpack_kwargs=msgpack_kwargs,
**kwargs)
self._queue_maxsize = queue_maxsize
self._queue_circular = queue_circular
self._conn_close_lock = threading.Lock()
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)

def send(self, bytes_):
if self._queue_circular and self._queue.full():
# discard oldest
try:
self._queue.get(block=False)
except Empty: # pragma: no cover
pass
try:
self._queue.put(bytes_, block=(not self._queue_circular))
except Full:
return False
return True
self._thread_guard = threading.Event() # This ensures visibility across all variables
self._closed = False

def run(self):
while self._do_run:
try:
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
except Empty:
continue
with self._conn_close_lock:
self._sender._send(bytes_)

def close(self, flush=True, discard=True):
if discard:
while not self._queue.empty():
try:
self._queue.get(block=False)
except Empty:
break
while flush and (not self._queue.empty()):
time.sleep(0.1)
self._do_run = False
self._sender.close()

def _close(self):
with self._conn_close_lock:
self._sender._close()

@property
def last_error(self):
return self._sender.last_error

@last_error.setter
def last_error(self, err):
self._sender.last_error = err

def clear_last_error(self, _thread_id=None):
self._sender.clear_last_error(_thread_id=_thread_id)

@property
def queue_timeout(self):
return self._queue_timeout

@queue_timeout.setter
def queue_timeout(self, value):
self._queue_timeout = value
self._queue = Queue(maxsize=queue_maxsize)
self._send_thread = threading.Thread(target=self._send_loop,
name="AsyncFluentSender %d" % id(self))
self._send_thread.daemon = True
self._send_thread.start()

def close(self, flush=True):
with self.lock:
if self._closed:
return
self._closed = True
if not flush:
while True:
try:
self._queue.get(block=False)
except Empty:
break
self._queue.put(_TOMBSTONE)
self._send_thread.join()

@property
def queue_maxsize(self):
Expand All @@ -135,91 +102,35 @@ def queue_blocking(self):
def queue_circular(self):
return self._queue_circular


class FluentSender(sender.FluentSender):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
**kwargs)
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
queue_circular=queue_circular)
self._communicator.start()

def _send(self, bytes_):
return self._communicator.send(bytes_=bytes_)

def _close(self):
# super(FluentSender, self)._close()
self._communicator._close()

def _send_internal(self, bytes_):
assert False # pragma: no cover

def _send_data(self, bytes_):
assert False # pragma: no cover

# override reconnect, so we don't open a socket here (since it
# will be opened by the CommunicatorThread)
def _reconnect(self):
return

def close(self):
self._communicator.close(flush=True)
self._communicator.join()
return super(FluentSender, self).close()

@property
def last_error(self):
return self._communicator.last_error

@last_error.setter
def last_error(self, err):
self._communicator.last_error = err

def clear_last_error(self, _thread_id=None):
self._communicator.clear_last_error(_thread_id=_thread_id)
with self.lock:
if self._closed:
return False
if self._queue_circular and self._queue.full():
# discard oldest
try:
self._queue.get(block=False)
except Empty: # pragma: no cover
pass
try:
self._queue.put(bytes_, block=(not self._queue_circular))
except Full: # pragma: no cover
return False # this actually can't happen

@property
def queue_timeout(self):
return self._communicator.queue_timeout
return True

@queue_timeout.setter
def queue_timeout(self, value):
self._communicator.queue_timeout = value
def _send_loop(self):
send_internal = super(FluentSender, self)._send_internal

@property
def queue_maxsize(self):
return self._communicator.queue_maxsize

@property
def queue_blocking(self):
return self._communicator.queue_blocking

@property
def queue_circular(self):
return self._communicator.queue_circular
try:
while True:
bytes_ = self._queue.get(block=True)
if bytes_ is _TOMBSTONE:
break

def __enter__(self):
return self
send_internal(bytes_)
finally:
self._close()

def __exit__(self, typ, value, traceback):
# give time to the comm. thread to send its queued messages
time.sleep(0.2)
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
21 changes: 16 additions & 5 deletions fluent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False

def format(self, record):
# Only needed for python2.6
if sys.version_info[0:2] <= (2, 6) and self.usesTime():
if sys.version_info[0:2] <= (2, 6) and self.usesTime(): # pragma: no cover
record.asctime = self.formatTime(record, self.datefmt)

# Compute attributes handled by parent class.
Expand Down Expand Up @@ -116,8 +116,11 @@ def usesTime(self):
if self._exc_attrs is not None:
return super(FluentRecordFormatter, self).usesTime()
else:
return any([value.find('%(asctime)') >= 0
for value in self._fmt_dict.values()])
if self.__style:
search = self.__style.asctime_search
else:
search = "%(asctime)"
return any([value.find(search) >= 0 for value in self._fmt_dict.values()])

def _structuring(self, data, record):
""" Melds `msg` into `data`.
Expand Down Expand Up @@ -209,7 +212,15 @@ def emit(self, record):
def close(self):
self.acquire()
try:
self.sender._close()
logging.Handler.close(self)
try:
self.sender.close()
finally:
super(FluentHandler, self).close()
finally:
self.release()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
Loading