From fad318babf93618ea1bce7b2ee81e68a3bfd4a14 Mon Sep 17 00:00:00 2001 From: Arcadiy Ivanov Date: Tue, 5 Dec 2017 21:45:40 -0500 Subject: [PATCH] Almost complete rewrite of async sender and async handler Queue timeout removed as it served no purpose other than hide multiple threading issues asctime format was non-functional Many tests would silently fail and appear successful Tests now are near-instantaneous due to removal of sleep fixes #105, fixes #106 --- fluent/asynchandler.py | 14 +- fluent/asyncsender.py | 215 ++++++++------------------ fluent/handler.py | 21 ++- fluent/sender.py | 84 +++++----- tests/mockserver.py | 89 ++++++++--- tests/test_asynchandler.py | 306 ++++++++++++++++--------------------- tests/test_asyncsender.py | 220 +++++++++++++------------- tests/test_event.py | 14 +- tests/test_handler.py | 302 ++++++++++++++++++++---------------- tests/test_sender.py | 126 +++++++++++++-- 10 files changed, 748 insertions(+), 643 deletions(-) diff --git a/fluent/asynchandler.py b/fluent/asynchandler.py index 73ba371..bbba4c4 100644 --- a/fluent/asynchandler.py +++ b/fluent/asynchandler.py @@ -13,7 +13,17 @@ def getSenderClass(self): return asyncsender.FluentSender def close(self): + self.acquire() try: - self.sender.close() + try: + self.sender.close() + finally: + super(FluentHandler, self).close() finally: - super(FluentHandler, self).close() + self.release() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 3314e4d..7f8dc02 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -3,7 +3,6 @@ from __future__ import print_function import threading -import time try: from queue import Queue, Full, Empty @@ -15,12 +14,13 @@ __all__ = ["EventTime", "FluentSender"] -_global_sender = None - -DEFAULT_QUEUE_TIMEOUT = 0.05 DEFAULT_QUEUE_MAXSIZE = 100 DEFAULT_QUEUE_CIRCULAR = False +_TOMBSTONE = object() + +_global_sender = None + def _set_global_sender(sender): # pragma: no cover """ [For testing] Function to set global sender directly @@ -42,8 +42,9 @@ def close(): # pragma: no cover get_global_sender().close() -class CommunicatorThread(threading.Thread): - def __init__(self, tag, +class FluentSender(sender.FluentSender): + def __init__(self, + tag, host='localhost', port=24224, bufmax=1 * 1024 * 1024, @@ -52,76 +53,42 @@ def __init__(self, tag, buffer_overflow_handler=None, nanosecond_precision=False, msgpack_kwargs=None, - queue_timeout=DEFAULT_QUEUE_TIMEOUT, queue_maxsize=DEFAULT_QUEUE_MAXSIZE, - queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs): - super(CommunicatorThread, self).__init__(**kwargs) - self._queue = Queue(maxsize=queue_maxsize) - self._do_run = True - self._queue_timeout = queue_timeout + queue_circular=DEFAULT_QUEUE_CIRCULAR, + **kwargs): + """ + :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. + """ + super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, + verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, + nanosecond_precision=nanosecond_precision, + msgpack_kwargs=msgpack_kwargs, + **kwargs) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular - self._conn_close_lock = threading.Lock() - self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, - verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, - nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs) - def send(self, bytes_): - if self._queue_circular and self._queue.full(): - # discard oldest - try: - self._queue.get(block=False) - except Empty: # pragma: no cover - pass - try: - self._queue.put(bytes_, block=(not self._queue_circular)) - except Full: - return False - return True + self._thread_guard = threading.Event() # This ensures visibility across all variables + self._closed = False - def run(self): - while self._do_run: - try: - bytes_ = self._queue.get(block=True, timeout=self._queue_timeout) - except Empty: - continue - with self._conn_close_lock: - self._sender._send(bytes_) - - def close(self, flush=True, discard=True): - if discard: - while not self._queue.empty(): - try: - self._queue.get(block=False) - except Empty: - break - while flush and (not self._queue.empty()): - time.sleep(0.1) - self._do_run = False - self._sender.close() - - def _close(self): - with self._conn_close_lock: - self._sender._close() - - @property - def last_error(self): - return self._sender.last_error - - @last_error.setter - def last_error(self, err): - self._sender.last_error = err - - def clear_last_error(self, _thread_id=None): - self._sender.clear_last_error(_thread_id=_thread_id) - - @property - def queue_timeout(self): - return self._queue_timeout - - @queue_timeout.setter - def queue_timeout(self, value): - self._queue_timeout = value + self._queue = Queue(maxsize=queue_maxsize) + self._send_thread = threading.Thread(target=self._send_loop, + name="AsyncFluentSender %d" % id(self)) + self._send_thread.daemon = True + self._send_thread.start() + + def close(self, flush=True): + with self.lock: + if self._closed: + return + self._closed = True + if not flush: + while True: + try: + self._queue.get(block=False) + except Empty: + break + self._queue.put(_TOMBSTONE) + self._send_thread.join() @property def queue_maxsize(self): @@ -135,91 +102,35 @@ def queue_blocking(self): def queue_circular(self): return self._queue_circular - -class FluentSender(sender.FluentSender): - def __init__(self, - tag, - host='localhost', - port=24224, - bufmax=1 * 1024 * 1024, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - nanosecond_precision=False, - msgpack_kwargs=None, - queue_timeout=DEFAULT_QUEUE_TIMEOUT, - queue_maxsize=DEFAULT_QUEUE_MAXSIZE, - queue_circular=DEFAULT_QUEUE_CIRCULAR, - **kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version. - super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, - verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, - nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs, - **kwargs) - self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, - verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, - nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs, - queue_timeout=queue_timeout, queue_maxsize=queue_maxsize, - queue_circular=queue_circular) - self._communicator.start() - def _send(self, bytes_): - return self._communicator.send(bytes_=bytes_) - - def _close(self): - # super(FluentSender, self)._close() - self._communicator._close() - - def _send_internal(self, bytes_): - assert False # pragma: no cover - - def _send_data(self, bytes_): - assert False # pragma: no cover - - # override reconnect, so we don't open a socket here (since it - # will be opened by the CommunicatorThread) - def _reconnect(self): - return - - def close(self): - self._communicator.close(flush=True) - self._communicator.join() - return super(FluentSender, self).close() - - @property - def last_error(self): - return self._communicator.last_error - - @last_error.setter - def last_error(self, err): - self._communicator.last_error = err - - def clear_last_error(self, _thread_id=None): - self._communicator.clear_last_error(_thread_id=_thread_id) + with self.lock: + if self._closed: + return False + if self._queue_circular and self._queue.full(): + # discard oldest + try: + self._queue.get(block=False) + except Empty: # pragma: no cover + pass + try: + self._queue.put(bytes_, block=(not self._queue_circular)) + except Full: # pragma: no cover + return False # this actually can't happen - @property - def queue_timeout(self): - return self._communicator.queue_timeout + return True - @queue_timeout.setter - def queue_timeout(self, value): - self._communicator.queue_timeout = value + def _send_loop(self): + send_internal = super(FluentSender, self)._send_internal - @property - def queue_maxsize(self): - return self._communicator.queue_maxsize - - @property - def queue_blocking(self): - return self._communicator.queue_blocking - - @property - def queue_circular(self): - return self._communicator.queue_circular + try: + while True: + bytes_ = self._queue.get(block=True) + if bytes_ is _TOMBSTONE: + break - def __enter__(self): - return self + send_internal(bytes_) + finally: + self._close() - def __exit__(self, typ, value, traceback): - # give time to the comm. thread to send its queued messages - time.sleep(0.2) + def __exit__(self, exc_type, exc_val, exc_tb): self.close() diff --git a/fluent/handler.py b/fluent/handler.py index 2f9a28d..b37b685 100644 --- a/fluent/handler.py +++ b/fluent/handler.py @@ -81,7 +81,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False def format(self, record): # Only needed for python2.6 - if sys.version_info[0:2] <= (2, 6) and self.usesTime(): + if sys.version_info[0:2] <= (2, 6) and self.usesTime(): # pragma: no cover record.asctime = self.formatTime(record, self.datefmt) # Compute attributes handled by parent class. @@ -116,8 +116,11 @@ def usesTime(self): if self._exc_attrs is not None: return super(FluentRecordFormatter, self).usesTime() else: - return any([value.find('%(asctime)') >= 0 - for value in self._fmt_dict.values()]) + if self.__style: + search = self.__style.asctime_search + else: + search = "%(asctime)" + return any([value.find(search) >= 0 for value in self._fmt_dict.values()]) def _structuring(self, data, record): """ Melds `msg` into `data`. @@ -209,7 +212,15 @@ def emit(self, record): def close(self): self.acquire() try: - self.sender._close() - logging.Handler.close(self) + try: + self.sender.close() + finally: + super(FluentHandler, self).close() finally: self.release() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() diff --git a/fluent/sender.py b/fluent/sender.py index 908283f..bcef2cc 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -2,6 +2,7 @@ from __future__ import print_function +import errno import socket import struct import threading @@ -55,8 +56,10 @@ def __init__(self, buffer_overflow_handler=None, nanosecond_precision=False, msgpack_kwargs=None, - **kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version. - + **kwargs): + """ + :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. + """ self.tag = tag self.host = host self.port = port @@ -72,12 +75,6 @@ def __init__(self, self.lock = threading.Lock() self._last_error_threadlocal = threading.local() - try: - self._reconnect() - except socket.error: - # will be retried in emit() - self._close() - def emit(self, label, data): if self.nanosecond_precision: cur_time = EventTime(time.time()) @@ -98,6 +95,18 @@ def emit_with_time(self, label, timestamp, data): "traceback": traceback.format_exc()}) return self._send(bytes_) + @property + def last_error(self): + return getattr(self._last_error_threadlocal, 'exception', None) + + @last_error.setter + def last_error(self, err): + self._last_error_threadlocal.exception = err + + def clear_last_error(self, _thread_id=None): + if hasattr(self._last_error_threadlocal, 'exception'): + delattr(self._last_error_threadlocal, 'exception') + def close(self): with self.lock: if self.pendings: @@ -142,7 +151,7 @@ def _send_internal(self, bytes_): # close socket self._close() - # clear buffer if it exceeds max bufer size + # clear buffer if it exceeds max buffer size if self.pendings and (len(self.pendings) > self.bufmax): self._call_buffer_overflow_handler(self.pendings) self.pendings = None @@ -160,22 +169,30 @@ def _send_data(self, bytes_): while bytes_sent < bytes_to_send: sent = self.socket.send(bytes_[bytes_sent:]) if sent == 0: - raise BrokenPipeError(32, 'broken pipe') + raise socket.error(errno.EPIPE, "Broken pipe") bytes_sent += sent def _reconnect(self): if not self.socket: - if self.host.startswith('unix://'): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.settimeout(self.timeout) - sock.connect(self.host[len('unix://'):]) + try: + if self.host.startswith('unix://'): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(self.timeout) + sock.connect(self.host[len('unix://'):]) + else: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(self.timeout) + # This might be controversial and may need to be removed + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.connect((self.host, self.port)) + except Exception as e: + try: + sock.close() + except Exception: # pragma: no cover + pass + raise e else: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(self.timeout) - # This might be controversial and may need to be removed - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sock.connect((self.host, self.port)) - self.socket = sock + self.socket = sock def _call_buffer_overflow_handler(self, pending_events): try: @@ -185,26 +202,20 @@ def _call_buffer_overflow_handler(self, pending_events): # User should care any exception in handler pass - @property - def last_error(self): - return getattr(self._last_error_threadlocal, 'exception', None) - - @last_error.setter - def last_error(self, err): - self._last_error_threadlocal.exception = err - - def clear_last_error(self, _thread_id=None): - if hasattr(self._last_error_threadlocal, 'exception'): - delattr(self._last_error_threadlocal, 'exception') - def _close(self): try: sock = self.socket if sock: try: - sock.shutdown(socket.SHUT_RDWR) + try: + sock.shutdown(socket.SHUT_RDWR) + except socket.error: # pragma: no cover + pass finally: - sock.close() + try: + sock.close() + except socket.error: # pragma: no cover + pass finally: self.socket = None @@ -212,4 +223,7 @@ def __enter__(self): return self def __exit__(self, typ, value, traceback): - self.close() + try: + self.close() + except Exception as e: # pragma: no cover + self.last_error = e diff --git a/tests/mockserver.py b/tests/mockserver.py index b385d0e..426d139 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -7,7 +7,6 @@ import socket import threading -import time from msgpack import Unpacker @@ -16,39 +15,85 @@ class MockRecvServer(threading.Thread): """ Single threaded server accepts one connection and recv until EOF. """ + def __init__(self, host='localhost', port=0): + super(MockRecvServer, self).__init__() + if host.startswith('unix://'): - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self._sock.bind(host[len('unix://'):]) + self.socket_proto = socket.AF_UNIX + self.socket_type = socket.SOCK_STREAM + self.socket_addr = host[len('unix://'):] else: - self._sock = socket.socket() - self._sock.bind((host, port)) + self.socket_proto = socket.AF_INET + self.socket_type = socket.SOCK_STREAM + self.socket_addr = (host, port) + + self._sock = socket.socket(self.socket_proto, self.socket_type) + self._sock.bind(self.socket_addr) + if self.socket_proto == socket.AF_INET: self.port = self._sock.getsockname()[1] + self._sock.listen(1) self._buf = BytesIO() + self._con = None - threading.Thread.__init__(self) self.start() def run(self): sock = self._sock - con, _ = sock.accept() - while True: - data = con.recv(4096) - if not data: - break - self._buf.write(data) - con.close() - sock.close() - self._sock = None - - def wait(self): - while self._sock: - time.sleep(0.1) - - def get_recieved(self): - self.wait() + + try: + try: + con, _ = sock.accept() + except Exception: + return + self._con = con + try: + while True: + try: + data = con.recv(16384) + if not data: + break + self._buf.write(data) + except socket.error as e: + print("MockServer error: %s" % e) + break + finally: + con.close() + finally: + sock.close() + + def get_received(self): + self.join() self._buf.seek(0) # TODO: have to process string encoding properly. currently we assume # that all encoding is utf-8. return list(Unpacker(self._buf, encoding='utf-8')) + + def close(self): + + try: + self._sock.close() + except Exception: + pass + + try: + conn = socket.socket(socket.AF_INET, + socket.SOCK_STREAM) + try: + conn.connect((self.socket_addr[0], self.port)) + finally: + conn.close() + except Exception: + pass + + if self._con: + try: + self._con.close() + except Exception: + pass + + self.join() + + def __del__(self): + self.close() diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index bd03eef..3994e7b 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -1,13 +1,12 @@ -# -*- coding: utf-8 -*- +#  -*- coding: utf-8 -*- import logging import sys -import unittest import time +import unittest -import fluent.handler import fluent.asynchandler - +import fluent.handler from tests import mockserver @@ -16,32 +15,29 @@ def setUp(self): super(TestHandler, self).setUp() self._server = mockserver.MockRecvServer('localhost') self._port = self._server.port - self.handler = None + + def tearDown(self): + self._server.close() def get_handler_class(self): # return fluent.handler.FluentHandler return fluent.asynchandler.FluentHandler def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): handler = self.get_handler_class()('app.follow', port=self._port) - self.handler = handler - - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info({ - 'from': 'userA', - 'to': 'userB' - }) - - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info({ + 'from': 'userA', + 'to': 'userB' + }) data = self.get_data() eq = self.assertEqual @@ -56,21 +52,18 @@ def test_simple(self): def test_custom_fmt(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'lineno': '%(lineno)d', - 'emitted_at': '%(asctime)s', - }) - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'lineno': '%(lineno)d', + 'emitted_at': '%(asctime)s', + }) + ) + log.addHandler(handler) + log.info({'sample': 'value'}) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -82,21 +75,18 @@ def test_custom_fmt(self): def test_custom_fmt_with_format_style(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '{name}', - 'lineno': '{lineno}', - 'emitted_at': '{asctime}', - }, style='{') - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '{name}', + 'lineno': '{lineno}', + 'emitted_at': '{asctime}', + }, style='{') + ) + log.addHandler(handler) + log.info({'sample': 'value'}) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -108,21 +98,18 @@ def test_custom_fmt_with_format_style(self): def test_custom_fmt_with_template_style(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '${name}', - 'lineno': '${lineno}', - 'emitted_at': '${asctime}', - }, style='$') - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '${name}', + 'lineno': '${lineno}', + 'emitted_at': '${asctime}', + }, style='$') + ) + log.addHandler(handler) + log.info({'sample': 'value'}) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -133,43 +120,36 @@ def test_custom_fmt_with_template_style(self): def test_custom_field_raise_exception(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }) - ) - log.addHandler(handler) - with self.assertRaises(KeyError): - log.info({'sample': 'value'}) - log.removeHandler(handler) - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'custom_field': '%(custom_field)s' + }) + ) + log.addHandler(handler) + with self.assertRaises(KeyError): + log.info({'sample': 'value'}) + log.removeHandler(handler) def test_custom_field_fill_missing_fmt_key_is_true(self): handler = self.get_handler_class()('app.follow', port=self._port) - - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'custom_field': '%(custom_field)s' }, - fill_missing_fmt_key=True + fill_missing_fmt_key=True + ) ) - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - log.removeHandler(handler) - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + log.addHandler(handler) + log.info({'sample': 'value'}) + log.removeHandler(handler) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -181,15 +161,12 @@ def test_custom_field_fill_missing_fmt_key_is_true(self): def test_json_encoded_message(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info('{"key": "hello world!", "param": "value"}') - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info('{"key": "hello world!", "param": "value"}') data = self.get_data() self.assertTrue('key' in data[0][2]) @@ -198,15 +175,12 @@ def test_json_encoded_message(self): def test_unstructured_message(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info('hello %s', 'world') - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info('hello %s', 'world') data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -215,15 +189,12 @@ def test_unstructured_message(self): def test_unstructured_formatted_message(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info('hello world, %s', 'you!') - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info('hello world, %s', 'you!') data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -232,15 +203,12 @@ def test_unstructured_formatted_message(self): def test_number_string_simple_message(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info("1") - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info("1") data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -248,15 +216,12 @@ def test_number_string_simple_message(self): def test_non_string_simple_message(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info(42) - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info(42) data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -264,15 +229,12 @@ def test_non_string_simple_message(self): def test_non_string_dict_message(self): handler = self.get_handler_class()('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info({42: 'root'}) - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info({42: 'root'}) data = self.get_data() # For some reason, non-string keys are ignored @@ -280,46 +242,40 @@ def test_non_string_dict_message(self): class TestHandlerWithCircularQueue(unittest.TestCase): - Q_TIMEOUT = 0.04 Q_SIZE = 3 def setUp(self): super(TestHandlerWithCircularQueue, self).setUp() self._server = mockserver.MockRecvServer('localhost') self._port = self._server.port - self.handler = None + + def tearDown(self): + self._server.close() def get_handler_class(self): # return fluent.handler.FluentHandler return fluent.asynchandler.FluentHandler def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): handler = self.get_handler_class()('app.follow', port=self._port, - queue_timeout=self.Q_TIMEOUT, queue_maxsize=self.Q_SIZE, queue_circular=True) - self.handler = handler - - self.assertEqual(self.handler.sender.queue_circular, True) - self.assertEqual(self.handler.sender.queue_maxsize, self.Q_SIZE) - - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'}) - - # wait, giving time to the communicator thread to send the messages - time.sleep(0.5) - # close the handler, to join the thread and let the test suite to terminate - handler.close() + with handler: + self.assertEqual(handler.sender.queue_circular, True) + self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'}) data = self.get_data() eq = self.assertEqual diff --git a/tests/test_asyncsender.py b/tests/test_asyncsender.py index 53add21..eb36f96 100644 --- a/tests/test_asyncsender.py +++ b/tests/test_asyncsender.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- from __future__ import print_function -import unittest + import socket +import unittest + import msgpack -import time import fluent.asyncsender from tests import mockserver @@ -48,19 +49,21 @@ def setUp(self): super(TestSender, self).setUp() self._server = mockserver.MockRecvServer('localhost') self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port) + port=self._server.port) def tearDown(self): - self._sender.close() + try: + self._sender.close() + finally: + self._server.close() def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): - sender = self._sender - sender.emit('foo', {'bar': 'baz'}) - time.sleep(0.5) - sender._close() + with self._sender as sender: + sender.emit('foo', {'bar': 'baz'}) + data = self.get_data() eq = self.assertEqual eq(1, len(data)) @@ -73,6 +76,7 @@ def test_simple(self): def test_decorator_simple(self): with self._sender as sender: sender.emit('foo', {'bar': 'baz'}) + data = self.get_data() eq = self.assertEqual eq(1, len(data)) @@ -83,11 +87,10 @@ def test_decorator_simple(self): self.assertTrue(isinstance(data[0][1], int)) def test_nanosecond(self): - sender = self._sender - sender.nanosecond_precision = True - sender.emit('foo', {'bar': 'baz'}) - time.sleep(0.5) - sender._close() + with self._sender as sender: + sender.nanosecond_precision = True + sender.emit('foo', {'bar': 'baz'}) + data = self.get_data() eq = self.assertEqual eq(1, len(data)) @@ -99,11 +102,10 @@ def test_nanosecond(self): def test_nanosecond_coerce_float(self): time_ = 1490061367.8616468906402588 - sender = self._sender - sender.nanosecond_precision = True - sender.emit_with_time('foo', time_, {'bar': 'baz'}) - time.sleep(0.5) - sender._close() + with self._sender as sender: + sender.nanosecond_precision = True + sender.emit_with_time('foo', time_, {'bar': 'baz'}) + data = self.get_data() eq = self.assertEqual eq(1, len(data)) @@ -115,9 +117,8 @@ def test_nanosecond_coerce_float(self): eq(data[0][1].data, b'X\xd0\x8873[\xb0*') def test_no_last_error_on_successful_emit(self): - sender = self._sender - sender.emit('foo', {'bar': 'baz'}) - sender._close() + with self._sender as sender: + sender.emit('foo', {'bar': 'baz'}) self.assertEqual(sender.last_error, None) @@ -134,8 +135,8 @@ def test_clear_last_error(self): self.assertEqual(self._sender.last_error, None) - @unittest.skip("This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped") - #@patch('fluent.asyncsender.socket') + @unittest.skip("This test failed with 'TypeError: catching classes that do not " + "inherit from BaseException is not allowed' so skipped") def test_connect_exception_during_sender_init(self, mock_socket): # Make the socket.socket().connect() call raise a custom exception mock_connect = mock_socket.socket.return_value.connect @@ -144,6 +145,15 @@ def test_connect_exception_during_sender_init(self, mock_socket): self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG) + def test_sender_without_flush(self): + with self._sender as sender: + sender._queue.put(fluent.asyncsender._TOMBSTONE) # This closes without closing + sender._send_thread.join() + for x in range(1, 10): + sender._queue.put(x) + sender.close(False) + self.assertIs(sender._queue.get(False), fluent.asyncsender._TOMBSTONE) + class TestSenderDefaultProperties(unittest.TestCase): def setUp(self): @@ -153,17 +163,17 @@ def setUp(self): port=self._server.port) def tearDown(self): - self._sender.close() + try: + self._sender.close() + finally: + self._server.close() def test_default_properties(self): - sender = self._sender - self.assertTrue(sender.queue_blocking) - self.assertFalse(sender.queue_circular) - self.assertTrue(isinstance(sender.queue_maxsize, int)) - self.assertTrue(sender.queue_maxsize > 0) - self.assertTrue(isinstance(sender.queue_timeout, (int, float))) - self.assertTrue(sender.queue_timeout > 0) - sender._close() + with self._sender as sender: + self.assertTrue(sender.queue_blocking) + self.assertFalse(sender.queue_circular) + self.assertTrue(isinstance(sender.queue_maxsize, int)) + self.assertTrue(sender.queue_maxsize > 0) class TestSenderWithTimeout(unittest.TestCase): @@ -175,16 +185,18 @@ def setUp(self): queue_timeout=0.04) def tearDown(self): - self._sender.close() + try: + self._sender.close() + finally: + self._server.close() def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): - sender = self._sender - sender.emit('foo', {'bar': 'baz'}) - time.sleep(0.5) - sender._close() + with self._sender as sender: + sender.emit('foo', {'bar': 'baz'}) + data = self.get_data() eq = self.assertEqual eq(1, len(data)) @@ -195,12 +207,9 @@ def test_simple(self): self.assertTrue(isinstance(data[0][1], int)) def test_simple_with_timeout_props(self): - sender = self._sender - sender.queue_timeout = 0.06 - assert sender.queue_timeout == 0.06 - sender.emit('foo', {'bar': 'baz'}) - time.sleep(0.5) - sender._close() + with self._sender as sender: + sender.emit('foo', {'bar': 'baz'}) + data = self.get_data() eq = self.assertEqual eq(1, len(data)) @@ -226,47 +235,47 @@ def setUp(self): self._server = mockserver.MockRecvServer('localhost') self._sender = fluent.asyncsender.FluentSender(tag='test', port=self._server.port, - queue_timeout=0.04, queue_maxsize=self.Q_SIZE, queue_circular=True) def tearDown(self): - self._sender.close() + try: + self._sender.close() + finally: + self._server.close() def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): - sender = self._sender - - self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE) - self.assertEqual(self._sender.queue_circular, True) - self.assertEqual(self._sender.queue_blocking, False) - - ok = sender.emit('foo1', {'bar': 'baz1'}) - self.assertTrue(ok) - ok = sender.emit('foo2', {'bar': 'baz2'}) - self.assertTrue(ok) - ok = sender.emit('foo3', {'bar': 'baz3'}) - self.assertTrue(ok) - ok = sender.emit('foo4', {'bar': 'baz4'}) - self.assertTrue(ok) - ok = sender.emit('foo5', {'bar': 'baz5'}) - self.assertTrue(ok) - time.sleep(0.5) - sender._close() + with self._sender as sender: + self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE) + self.assertEqual(self._sender.queue_circular, True) + self.assertEqual(self._sender.queue_blocking, False) + + ok = sender.emit('foo1', {'bar': 'baz1'}) + self.assertTrue(ok) + ok = sender.emit('foo2', {'bar': 'baz2'}) + self.assertTrue(ok) + ok = sender.emit('foo3', {'bar': 'baz3'}) + self.assertTrue(ok) + ok = sender.emit('foo4', {'bar': 'baz4'}) + self.assertTrue(ok) + ok = sender.emit('foo5', {'bar': 'baz5'}) + self.assertTrue(ok) + data = self.get_data() eq = self.assertEqual - eq(self.Q_SIZE, len(data)) + # with the logging interface, we can't be sure to have filled up the queue, so we can + # test only for a cautelative condition here + self.assertTrue(len(data) >= self.Q_SIZE) eq(3, len(data[0])) - eq('test.foo3', data[0][0]) - eq({'bar': 'baz3'}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) eq(3, len(data[2])) - eq('test.foo5', data[2][0]) - eq({'bar': 'baz5'}, data[2][2]) + self.assertTrue(data[2][1]) + self.assertTrue(isinstance(data[2][1], int)) class TestSenderWithTimeoutMaxSizeNonCircular(unittest.TestCase): @@ -277,34 +286,34 @@ def setUp(self): self._server = mockserver.MockRecvServer('localhost') self._sender = fluent.asyncsender.FluentSender(tag='test', port=self._server.port, - queue_timeout=0.04, queue_maxsize=self.Q_SIZE) def tearDown(self): - self._sender.close() + try: + self._sender.close() + finally: + self._server.close() def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): - sender = self._sender - - self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE) - self.assertEqual(self._sender.queue_blocking, True) - self.assertEqual(self._sender.queue_circular, False) - - ok = sender.emit('foo1', {'bar': 'baz1'}) - self.assertTrue(ok) - ok = sender.emit('foo2', {'bar': 'baz2'}) - self.assertTrue(ok) - ok = sender.emit('foo3', {'bar': 'baz3'}) - self.assertTrue(ok) - ok = sender.emit('foo4', {'bar': 'baz4'}) - self.assertTrue(ok) - ok = sender.emit('foo5', {'bar': 'baz5'}) - self.assertTrue(ok) - time.sleep(0.5) - sender._close() + with self._sender as sender: + self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE) + self.assertEqual(self._sender.queue_blocking, True) + self.assertEqual(self._sender.queue_circular, False) + + ok = sender.emit('foo1', {'bar': 'baz1'}) + self.assertTrue(ok) + ok = sender.emit('foo2', {'bar': 'baz2'}) + self.assertTrue(ok) + ok = sender.emit('foo3', {'bar': 'baz3'}) + self.assertTrue(ok) + ok = sender.emit('foo4', {'bar': 'baz4'}) + self.assertTrue(ok) + ok = sender.emit('foo5', {'bar': 'baz5'}) + self.assertTrue(ok) + data = self.get_data() eq = self.assertEqual print(data) @@ -332,24 +341,25 @@ def setUp(self): queue_maxsize=0) def tearDown(self): - self._sender.close() + try: + self._sender.close() + finally: + self._server.close() def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): - sender = self._sender + with self._sender as sender: + self.assertEqual(self._sender.queue_maxsize, 0) + self.assertEqual(self._sender.queue_blocking, True) + self.assertEqual(self._sender.queue_circular, False) - self.assertEqual(self._sender.queue_maxsize, 0) - self.assertEqual(self._sender.queue_blocking, True) - self.assertEqual(self._sender.queue_circular, False) + NUM = 1000 + for i in range(1, NUM + 1): + ok = sender.emit("foo{}".format(i), {'bar': "baz{}".format(i)}) + self.assertTrue(ok) - NUM = 1000 - for i in range(1, NUM+1): - ok = sender.emit("foo{}".format(i), {'bar': "baz{}".format(i)}) - self.assertTrue(ok) - time.sleep(0.5) - sender._close() data = self.get_data() eq = self.assertEqual eq(NUM, len(data)) @@ -360,7 +370,7 @@ def test_simple(self): self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) - el = data[NUM-1] + el = data[NUM - 1] eq(3, len(el)) eq("test.foo{}".format(NUM), el[0]) eq({'bar': "baz{}".format(NUM)}, el[2]) diff --git a/tests/test_event.py b/tests/test_event.py index 494b0f2..d341616 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -5,8 +5,10 @@ from fluent import event, sender from tests import mockserver + class TestException(BaseException): pass + class TestEvent(unittest.TestCase): def setUp(self): self._server = mockserver.MockRecvServer('localhost') @@ -22,7 +24,7 @@ def test_logging(self): # send event with tag app.follow event.Event('follow', { 'from': 'userA', - 'to': 'userB' + 'to': 'userB' }) def test_logging_with_timestamp(self): @@ -31,21 +33,21 @@ def test_logging_with_timestamp(self): # send event with tag app.follow, with timestamp event.Event('follow', { 'from': 'userA', - 'to': 'userB' + 'to': 'userB' }, time=int(0)) def test_no_last_error_on_successful_event(self): global_sender = sender.get_global_sender() event.Event('unfollow', { 'from': 'userC', - 'to': 'userD' + 'to': 'userD' }) self.assertEqual(global_sender.last_error, None) sender.close() - @unittest.skip("This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped") - #@patch('fluent.sender.socket') + @unittest.skip("This test failed with 'TypeError: catching classes that do not " + "inherit from BaseException is not allowed' so skipped") def test_connect_exception_during_event_send(self, mock_socket): # Make the socket.socket().connect() call raise a custom exception mock_connect = mock_socket.socket.return_value.connect @@ -58,7 +60,7 @@ def test_connect_exception_during_event_send(self, mock_socket): event.Event('unfollow', { 'from': 'userE', - 'to': 'userF' + 'to': 'userF' }) ex = global_sender.last_error diff --git a/tests/test_handler.py b/tests/test_handler.py index 4ceb84e..1678e5c 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -14,21 +14,27 @@ def setUp(self): self._server = mockserver.MockRecvServer('localhost') self._port = self._server.port + def tearDown(self): + self._server.close() + def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info({ - 'from': 'userA', - 'to': 'userB' - }) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + log.info({ + 'from': 'userA', + 'to': 'userB' + }) + + log.removeHandler(handler) data = self.get_data() eq = self.assertEqual @@ -43,18 +49,19 @@ def test_simple(self): def test_custom_fmt(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'lineno': '%(lineno)d', - 'emitted_at': '%(asctime)s', - }) - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'lineno': '%(lineno)d', + 'emitted_at': '%(asctime)s', + }) + ) + log.addHandler(handler) + log.info({'sample': 'value'}) + log.removeHandler(handler) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -65,14 +72,33 @@ def test_custom_fmt(self): def test_exclude_attrs(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(exclude_attrs=[]) - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(exclude_attrs=[]) + ) + log.addHandler(handler) + log.info({'sample': 'value'}) + log.removeHandler(handler) + + data = self.get_data() + self.assertTrue('name' in data[0][2]) + self.assertEqual('fluent.test', data[0][2]['name']) + self.assertTrue('lineno' in data[0][2]) + + def test_exclude_attrs_with_exclusion(self): + handler = fluent.handler.FluentHandler('app.follow', port=self._port) + + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(exclude_attrs=["funcName"]) + ) + log.addHandler(handler) + log.info({'sample': 'value'}) + log.removeHandler(handler) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -82,14 +108,15 @@ def test_exclude_attrs(self): def test_exclude_attrs_with_extra(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(exclude_attrs=[]) - ) - log.addHandler(handler) - log.info("Test with value '%s'", "test value", extra={"x": 1234}) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(exclude_attrs=[]) + ) + log.addHandler(handler) + log.info("Test with value '%s'", "test value", extra={"x": 1234}) + log.removeHandler(handler) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -102,18 +129,19 @@ def test_exclude_attrs_with_extra(self): def test_custom_fmt_with_format_style(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '{name}', - 'lineno': '{lineno}', - 'emitted_at': '{asctime}', - }, style='{') - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '{name}', + 'lineno': '{lineno}', + 'emitted_at': '{asctime}', + }, style='{') + ) + log.addHandler(handler) + log.info({'sample': 'value'}) + log.removeHandler(handler) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -125,18 +153,19 @@ def test_custom_fmt_with_format_style(self): def test_custom_fmt_with_template_style(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '${name}', - 'lineno': '${lineno}', - 'emitted_at': '${asctime}', - }, style='$') - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '${name}', + 'lineno': '${lineno}', + 'emitted_at': '${asctime}', + }, style='$') + ) + log.addHandler(handler) + log.info({'sample': 'value'}) + log.removeHandler(handler) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -147,37 +176,39 @@ def test_custom_fmt_with_template_style(self): def test_custom_field_raise_exception(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }) - ) - log.addHandler(handler) - with self.assertRaises(KeyError): - log.info({'sample': 'value'}) - log.removeHandler(handler) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'custom_field': '%(custom_field)s' + }) + ) + log.addHandler(handler) + + with self.assertRaises(KeyError): + log.info({'sample': 'value'}) + + log.removeHandler(handler) def test_custom_field_fill_missing_fmt_key_is_true(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }, - fill_missing_fmt_key=True + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter( + fluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'custom_field': '%(custom_field)s' + }, + fill_missing_fmt_key=True + ) ) - ) - log.addHandler(handler) - log.info({'sample': 'value'}) - log.removeHandler(handler) - handler.close() + log.addHandler(handler) + log.info({'sample': 'value'}) + log.removeHandler(handler) data = self.get_data() self.assertTrue('name' in data[0][2]) @@ -189,12 +220,15 @@ def test_custom_field_fill_missing_fmt_key_is_true(self): def test_json_encoded_message(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info('{"key": "hello world!", "param": "value"}') - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + log.info('{"key": "hello world!", "param": "value"}') + + log.removeHandler(handler) data = self.get_data() self.assertTrue('key' in data[0][2]) @@ -203,12 +237,15 @@ def test_json_encoded_message(self): def test_json_encoded_message_without_json(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter(format_json=False)) - log.addHandler(handler) - log.info('{"key": "hello world!", "param": "value"}') - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter(format_json=False)) + log.addHandler(handler) + + log.info('{"key": "hello world!", "param": "value"}') + + log.removeHandler(handler) data = self.get_data() self.assertTrue('key' not in data[0][2]) @@ -217,12 +254,13 @@ def test_json_encoded_message_without_json(self): def test_unstructured_message(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info('hello %s', 'world') - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info('hello %s', 'world') + log.removeHandler(handler) data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -231,12 +269,13 @@ def test_unstructured_message(self): def test_unstructured_formatted_message(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info('hello world, %s', 'you!') - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info('hello world, %s', 'you!') + log.removeHandler(handler) data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -245,12 +284,13 @@ def test_unstructured_formatted_message(self): def test_number_string_simple_message(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info("1") - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info("1") + log.removeHandler(handler) data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -258,12 +298,13 @@ def test_number_string_simple_message(self): def test_non_string_simple_message(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info(42) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info(42) + log.removeHandler(handler) data = self.get_data() self.assertTrue('message' in data[0][2]) @@ -271,12 +312,13 @@ def test_non_string_simple_message(self): def test_non_string_dict_message(self): handler = fluent.handler.FluentHandler('app.follow', port=self._port) - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) - log.info({42: 'root'}) - handler.close() + with handler: + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + log.info({42: 'root'}) + log.removeHandler(handler) data = self.get_data() # For some reason, non-string keys are ignored diff --git a/tests/test_sender.py b/tests/test_sender.py index f3ce332..2a82f48 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -2,8 +2,12 @@ from __future__ import print_function +import errno import socket +import sys import unittest +from shutil import rmtree +from tempfile import mkdtemp import msgpack @@ -49,10 +53,13 @@ def setUp(self): port=self._server.port) def tearDown(self): - self._sender.close() + try: + self._sender.close() + finally: + self._server.close() def get_data(self): - return self._server.get_recieved() + return self._server.get_received() def test_simple(self): sender = self._sender @@ -128,17 +135,114 @@ def test_clear_last_error(self): self._sender.clear_last_error() self.assertEqual(self._sender.last_error, None) + self._sender.clear_last_error() + self.assertEqual(self._sender.last_error, None) + + def test_emit_error(self): + with self._sender as sender: + sender.emit("blah", {"a": object()}) - @unittest.skip( - "This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped") - # @patch('fluent.sender.socket') - def test_connect_exception_during_sender_init(self, mock_socket): - # Make the socket.socket().connect() call raise a custom exception - mock_connect = mock_socket.socket.return_value.connect - EXCEPTION_MSG = "a sender init socket connect() exception" - mock_connect.side_effect = socket.error(EXCEPTION_MSG) + data = self._server.get_received() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2]["message"], "Can't output to log") - self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG) + def test_verbose(self): + with self._sender as sender: + sender.verbose = True + sender.emit('foo', {'bar': 'baz'}) + # No assertions here, just making sure there are no exceptions + + def test_failure_to_connect(self): + self._server.close() + + with self._sender as sender: + sender._send_internal(b"123") + self.assertEqual(sender.pendings, b"123") + self.assertIsNone(sender.socket) + + sender._send_internal(b"456") + self.assertEqual(sender.pendings, b"123456") + self.assertIsNone(sender.socket) + + sender.pendings = None + overflows = [] + + def boh(buf): + overflows.append(buf) + + def boh_with_error(buf): + raise RuntimeError + + sender.buffer_overflow_handler = boh + + sender._send_internal(b"0" * sender.bufmax) + self.assertFalse(overflows) # No overflow + + sender._send_internal(b"1") + self.assertTrue(overflows) + self.assertEqual(overflows.pop(0), b"0" * sender.bufmax + b"1") + + sender.buffer_overflow_handler = None + sender._send_internal(b"0" * sender.bufmax) + sender._send_internal(b"1") + self.assertIsNone(sender.pendings) + + sender.buffer_overflow_handler = boh_with_error + sender._send_internal(b"0" * sender.bufmax) + sender._send_internal(b"1") + self.assertIsNone(sender.pendings) + + sender._send_internal(b"1") + self.assertFalse(overflows) # No overflow + self.assertEqual(sender.pendings, b"1") + self.assertIsNone(sender.socket) + + sender.buffer_overflow_handler = boh + sender.close() + self.assertEqual(overflows.pop(0), b"1") + + def test_broken_conn(self): + with self._sender as sender: + sender._send_internal(b"123") + self.assertIsNone(sender.pendings, b"123") + self.assertTrue(sender.socket) + + class FakeSocket: + def send(self, bytes_): + return 0 + + def shutdown(self, mode): + pass + + def close(self): + pass + + old_sock = self._sender.socket + self._sender.socket = FakeSocket() + try: + self.assertFalse(sender._send_internal(b"456")) + self.assertTrue(sender.last_error.errno, errno.EPIPE) + finally: + self._sender.socket = old_sock + + @unittest.skipIf(sys.platform == "win32", "Unix socket not supported") + def test_unix_socket(self): + self.tearDown() + tmp_dir = mkdtemp() + try: + server_file = 'unix://' + tmp_dir + "/tmp.unix" + self._server = mockserver.MockRecvServer(server_file) + self._sender = fluent.sender.FluentSender(tag='test', + host=server_file) + with self._sender as sender: + self.assertTrue(sender.emit('foo', {'bar': 'baz'})) + + data = self._server.get_received() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + + finally: + rmtree(tmp_dir, True) class TestEventTime(unittest.TestCase):