Skip to content

Commit

Permalink
Merge pull request #107 from arcivanov/issue_105
Browse files Browse the repository at this point in the history
Almost complete rewrite of async sender and async handler
  • Loading branch information
repeatedly committed Dec 9, 2017
2 parents f357a2d + fad318b commit b7254b9
Show file tree
Hide file tree
Showing 10 changed files with 748 additions and 643 deletions.
14 changes: 12 additions & 2 deletions fluent/asynchandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ def getSenderClass(self):
return asyncsender.FluentSender

def close(self):
self.acquire()
try:
self.sender.close()
try:
self.sender.close()
finally:
super(FluentHandler, self).close()
finally:
super(FluentHandler, self).close()
self.release()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
215 changes: 63 additions & 152 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import print_function

import threading
import time

try:
from queue import Queue, Full, Empty
Expand All @@ -15,12 +14,13 @@

__all__ = ["EventTime", "FluentSender"]

_global_sender = None

DEFAULT_QUEUE_TIMEOUT = 0.05
DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False

_TOMBSTONE = object()

_global_sender = None


def _set_global_sender(sender): # pragma: no cover
""" [For testing] Function to set global sender directly
Expand All @@ -42,8 +42,9 @@ def close(): # pragma: no cover
get_global_sender().close()


class CommunicatorThread(threading.Thread):
def __init__(self, tag,
class FluentSender(sender.FluentSender):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
Expand All @@ -52,76 +53,42 @@ def __init__(self, tag,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
super(CommunicatorThread, self).__init__(**kwargs)
self._queue = Queue(maxsize=queue_maxsize)
self._do_run = True
self._queue_timeout = queue_timeout
queue_circular=DEFAULT_QUEUE_CIRCULAR,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
"""
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision,
msgpack_kwargs=msgpack_kwargs,
**kwargs)
self._queue_maxsize = queue_maxsize
self._queue_circular = queue_circular
self._conn_close_lock = threading.Lock()
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)

def send(self, bytes_):
if self._queue_circular and self._queue.full():
# discard oldest
try:
self._queue.get(block=False)
except Empty: # pragma: no cover
pass
try:
self._queue.put(bytes_, block=(not self._queue_circular))
except Full:
return False
return True
self._thread_guard = threading.Event() # This ensures visibility across all variables
self._closed = False

def run(self):
while self._do_run:
try:
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
except Empty:
continue
with self._conn_close_lock:
self._sender._send(bytes_)

def close(self, flush=True, discard=True):
if discard:
while not self._queue.empty():
try:
self._queue.get(block=False)
except Empty:
break
while flush and (not self._queue.empty()):
time.sleep(0.1)
self._do_run = False
self._sender.close()

def _close(self):
with self._conn_close_lock:
self._sender._close()

@property
def last_error(self):
return self._sender.last_error

@last_error.setter
def last_error(self, err):
self._sender.last_error = err

def clear_last_error(self, _thread_id=None):
self._sender.clear_last_error(_thread_id=_thread_id)

@property
def queue_timeout(self):
return self._queue_timeout

@queue_timeout.setter
def queue_timeout(self, value):
self._queue_timeout = value
self._queue = Queue(maxsize=queue_maxsize)
self._send_thread = threading.Thread(target=self._send_loop,
name="AsyncFluentSender %d" % id(self))
self._send_thread.daemon = True
self._send_thread.start()

def close(self, flush=True):
with self.lock:
if self._closed:
return
self._closed = True
if not flush:
while True:
try:
self._queue.get(block=False)
except Empty:
break
self._queue.put(_TOMBSTONE)
self._send_thread.join()

@property
def queue_maxsize(self):
Expand All @@ -135,91 +102,35 @@ def queue_blocking(self):
def queue_circular(self):
return self._queue_circular


class FluentSender(sender.FluentSender):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
**kwargs)
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
queue_circular=queue_circular)
self._communicator.start()

def _send(self, bytes_):
return self._communicator.send(bytes_=bytes_)

def _close(self):
# super(FluentSender, self)._close()
self._communicator._close()

def _send_internal(self, bytes_):
assert False # pragma: no cover

def _send_data(self, bytes_):
assert False # pragma: no cover

# override reconnect, so we don't open a socket here (since it
# will be opened by the CommunicatorThread)
def _reconnect(self):
return

def close(self):
self._communicator.close(flush=True)
self._communicator.join()
return super(FluentSender, self).close()

@property
def last_error(self):
return self._communicator.last_error

@last_error.setter
def last_error(self, err):
self._communicator.last_error = err

def clear_last_error(self, _thread_id=None):
self._communicator.clear_last_error(_thread_id=_thread_id)
with self.lock:
if self._closed:
return False
if self._queue_circular and self._queue.full():
# discard oldest
try:
self._queue.get(block=False)
except Empty: # pragma: no cover
pass
try:
self._queue.put(bytes_, block=(not self._queue_circular))
except Full: # pragma: no cover
return False # this actually can't happen

@property
def queue_timeout(self):
return self._communicator.queue_timeout
return True

@queue_timeout.setter
def queue_timeout(self, value):
self._communicator.queue_timeout = value
def _send_loop(self):
send_internal = super(FluentSender, self)._send_internal

@property
def queue_maxsize(self):
return self._communicator.queue_maxsize

@property
def queue_blocking(self):
return self._communicator.queue_blocking

@property
def queue_circular(self):
return self._communicator.queue_circular
try:
while True:
bytes_ = self._queue.get(block=True)
if bytes_ is _TOMBSTONE:
break

def __enter__(self):
return self
send_internal(bytes_)
finally:
self._close()

def __exit__(self, typ, value, traceback):
# give time to the comm. thread to send its queued messages
time.sleep(0.2)
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
21 changes: 16 additions & 5 deletions fluent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False

def format(self, record):
# Only needed for python2.6
if sys.version_info[0:2] <= (2, 6) and self.usesTime():
if sys.version_info[0:2] <= (2, 6) and self.usesTime(): # pragma: no cover
record.asctime = self.formatTime(record, self.datefmt)

# Compute attributes handled by parent class.
Expand Down Expand Up @@ -116,8 +116,11 @@ def usesTime(self):
if self._exc_attrs is not None:
return super(FluentRecordFormatter, self).usesTime()
else:
return any([value.find('%(asctime)') >= 0
for value in self._fmt_dict.values()])
if self.__style:
search = self.__style.asctime_search
else:
search = "%(asctime)"
return any([value.find(search) >= 0 for value in self._fmt_dict.values()])

def _structuring(self, data, record):
""" Melds `msg` into `data`.
Expand Down Expand Up @@ -209,7 +212,15 @@ def emit(self, record):
def close(self):
self.acquire()
try:
self.sender._close()
logging.Handler.close(self)
try:
self.sender.close()
finally:
super(FluentHandler, self).close()
finally:
self.release()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

0 comments on commit b7254b9

Please sign in to comment.