diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index e8d0da2dd..1f58fe550 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -16,6 +16,7 @@ import threading import time from threading import Lock, RLock +import weakref try: import queue @@ -93,6 +94,35 @@ ] ) + "\n" +Stop = object() + +SUPPORTS_FORKING = hasattr(os, "register_at_fork") and not os.environ.get("DD_DOGSTATSD_DISABLE_FORK_SUPPORT", None) +TRACK_INSTANCES = not os.environ.get("DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING", None) + +_instances = weakref.WeakSet() # type: weakref.WeakSet + + +def pre_fork(): + """Prepare all client instances for a process fork. + + If SUPPORTS_FORKING is true, this will be called automatically before os.fork(). + """ + for c in _instances: + c.pre_fork() + + +def post_fork(): + """Restore all client instances after a fork. + + If SUPPORTS_FORKING is true, this will be called automatically after os.fork(). + """ + for c in _instances: + c.post_fork() + + +if SUPPORTS_FORKING: + os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore + # pylint: disable=useless-object-inheritance,too-many-instance-attributes # pylint: disable=too-many-arguments,too-many-locals @@ -125,6 +155,7 @@ def __init__( disable_background_sender=True, # type: bool sender_queue_size=0, # type: int sender_queue_timeout=0, # type: Optional[float] + track_instance=True, # type: bool ): # type: (...) -> None """ Initialize a DogStatsd object. @@ -172,6 +203,13 @@ def __init__( for origin detection. :type DD_ORIGIN_DETECTION_ENABLED: boolean + :envvar DD_DOGSTATSD_DISABLE_FORK_SUPPORT: Don't install global fork hooks with os.register_at_fork. + Global fork hooks then need to be called manually before and after calling os.fork. + :type DD_DOGSTATSD_DISABLE_FORK_SUPPORT: boolean + + :envvar DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: Don't register instances of this class with global fork hooks. + :type DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: boolean + :param host: the host of the DogStatsd server. :type host: string @@ -268,7 +306,7 @@ def __init__( :param disable_background_sender: Use a background thread to communicate with the dogstatsd server. Optional. When enabled, a background thread will be used to send metric payloads to the Agent. - Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent. + Applications should call stop() before exiting to make sure all pending payloads are sent. Default: True. :type disable_background_sender: boolean @@ -283,6 +321,11 @@ def __init__( If set to zero drop the packet immediately if the queue is full. Default: 0 (no wait) :type sender_queue_timeout: float + + :param track_instance: Keep track of this instance and automatically handle cleanup when os.fork() is called, + if supported. + Default: True. + :type track_instance: boolean """ self._socket_lock = Lock() @@ -387,10 +430,8 @@ def __init__( self._reset_buffer() - # This lock is used for all cases where buffering functionality is - # being toggled (by `open_buffer()`, `close_buffer()`, or - # `self._disable_buffering` calls). - self._buffering_toggle_lock = RLock() + # This lock is used for all cases where client configuration is being changed: buffering, sender mode. + self._config_lock = RLock() # If buffering is disabled, we bypass the buffer function. self._send = self._send_to_buffer @@ -399,17 +440,27 @@ def __init__( self._send = self._send_to_server log.debug("Statsd buffering is disabled") + # Indicates if the process is about to fork, so we shouldn't start any new threads yet. + self._forking = False + # Start the flush thread if buffering is enabled and the interval is above # a reasonable range. This both prevents thrashing and allow us to use "0.0" # as a value for disabling the automatic flush timer as well. self._flush_interval = flush_interval self._flush_thread_stop = threading.Event() + self._flush_thread = None self._start_flush_thread(self._flush_interval) self._queue = None + self._sender_thread = None + self._sender_enabled = False + if not disable_background_sender: self.enable_background_sender(sender_queue_size, sender_queue_timeout) + if TRACK_INSTANCES and track_instance: + _instances.add(self) + @property def socket_path(self): return self._socket_path @@ -430,32 +481,25 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): Use a background thread to communicate with the dogstatsd server. When enabled, a background thread will be used to send metric payloads to the Agent. - Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent. - - This method is not thread safe and should not be called concurrently with other methods on the current object. - Normally, this should be called shortly after process initialization (for example from a post-fork hook in a - forking server). + Applications should call stop() before exiting to make sure all pending payloads are sent. - :param sender_queue_size: Set the maximum number of packets to queue for the sender. Optional - How may packets to queue before blocking or dropping the packet if the packet queue is already full. - Default: 0 (unlimited). - :type sender_queue_size: integer + Compatible with os.fork() starting with Python 3.7. On earlier versions, compatible if applications + arrange to call pre_fork() and post_fork() module functions around calls to os.fork(). - :param sender_queue_timeout: Set timeout for packet queue operations, in seconds. Optional. - How long the application thread is willing to wait for the queue clear up before dropping the metric packet. - If set to None, wait forever. - If set to zero drop the packet immediately if the queue is full. - Default: 0 (no wait) - :type sender_queue_timeout: float + :param sender_queue_size: Set the maximum number of packets to queue for the sender. + How many packets to queue before blocking or dropping the packet if the packet queue is already full. + Default: 0 (unlimited). + :type sender_queue_size: integer, optional + :param sender_queue_timeout: Set timeout for packet queue operations, in seconds. + How long the application thread is willing to wait for the queue clear up before dropping the metric packet. + If set to None, wait forever. If set to zero drop the packet immediately if the queue is full. + Default: 0 (no wait). + :type sender_queue_timeout: float, optional """ - # Avoid a race on _queue with the background buffer flush thread that reads _queue. - with self._buffer_lock: - if self._queue is not None: - return - - self._queue = queue.Queue(sender_queue_size) - self._start_sender_thread() + with self._config_lock: + self._sender_enabled = True + self._sender_queue_size = sender_queue_size if sender_queue_timeout is None: self._queue_blocking = True self._queue_timeout = None @@ -463,6 +507,17 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): self._queue_blocking = sender_queue_timeout > 0 self._queue_timeout = max(0, sender_queue_timeout) + self._start_sender_thread() + + def disable_background_sender(self): + """Disable background sender mode. + + This call will block until all previously queued payloads are sent. + """ + with self._config_lock: + self._sender_enabled = False + self._stop_sender_thread() + def disable_telemetry(self): self._telemetry = False @@ -475,6 +530,12 @@ def _start_flush_thread(self, flush_interval): log.debug("Statsd periodic buffer flush is disabled") return + if self._forking: + return + + if self._flush_thread is not None: + return + def _flush_thread_loop(self, flush_interval): while not self._flush_thread_stop.is_set(): time.sleep(flush_interval) @@ -496,7 +557,6 @@ def _flush_thread_loop(self, flush_interval): # Note: Invocations of this method should be thread-safe def _stop_flush_thread(self): if not self._flush_thread: - log.warning("No statsd flush thread to stop") return try: @@ -525,12 +585,12 @@ def __exit__(self, exc_type, value, traceback): @property def disable_buffering(self): - with self._buffering_toggle_lock: + with self._config_lock: return self._disable_buffering @disable_buffering.setter def disable_buffering(self, is_disabled): - with self._buffering_toggle_lock: + with self._config_lock: # If the toggle didn't change anything, this method is a noop if self._disable_buffering == is_disabled: return @@ -672,7 +732,7 @@ def open_buffer(self, max_buffer_size=None): Note: This method must be called before close_buffer() matching invocation. """ - self._buffering_toggle_lock.acquire() + self._config_lock.acquire() # XXX Remove if `disable_buffering` default is changed to False self._send = self._send_to_buffer @@ -696,7 +756,7 @@ def close_buffer(self): if self._disable_buffering: self._send = self._send_to_server - self._buffering_toggle_lock.release() + self._config_lock.release() def _reset_buffer(self): with self._buffer_lock: @@ -986,13 +1046,17 @@ def _is_telemetry_flush_time(self): self._last_flush_time + self._telemetry_flush_interval < time.time() def _send_to_server(self, packet): + # Skip the lock if the queue is None. There is no race with enable_background_sender. if self._queue is not None: - try: - self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout) - except queue.Full: - self.packets_dropped_queue += 1 - self.bytes_dropped_queue += 1 - return + # Prevent a race with disable_background_sender. + with self._buffer_lock: + if self._queue is not None: + try: + self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout) + except queue.Full: + self.packets_dropped_queue += 1 + self.bytes_dropped_queue += 1 + return self._xmit_packet_with_telemetry(packet + '\n') @@ -1231,18 +1295,42 @@ def _set_container_id(self, container_id, origin_detection_enabled): self._container_id = None def _start_sender_thread(self): + if not self._sender_enabled or self._forking: + return + + if self._queue is not None: + return + + self._queue = queue.Queue(self._sender_queue_size) + log.debug("Starting background sender thread") self._sender_thread = threading.Thread( name="{}_sender_thread".format(self.__class__.__name__), target=self._sender_main_loop, + args=(self._queue,) ) self._sender_thread.daemon = True self._sender_thread.start() - def _sender_main_loop(self): + def _stop_sender_thread(self): + # Lock ensures that nothing gets added to the queue after we disable it. + with self._buffer_lock: + if not self._queue: + return + self._queue.put(Stop) + self._queue = None + + self._sender_thread.join() + self._sender_thread = None + + def _sender_main_loop(self, queue): while True: - self._xmit_packet_with_telemetry(self._queue.get()) - self._queue.task_done() + item = queue.get() + if item is Stop: + queue.task_done() + return + self._xmit_packet_with_telemetry(item) + queue.task_done() def wait_for_pending(self): """ @@ -1250,8 +1338,61 @@ def wait_for_pending(self): """ self.flush() - if self._queue is not None: - self._queue.join() + + # Avoid race with disable_background_sender. We don't need a + # lock, just copy the value so it doesn't change between the + # check and join later. + queue = self._queue + + if queue is not None: + queue.join() + + def pre_fork(self): + """Prepare client for a process fork. + + Flush any pending payloads, stop all background threads and + close the connection. Once the function returns. + + The client should not be used from this point until + post_fork() is called. + """ + log.debug("[%d] pre_fork for %s", os.getpid(), self) + + self._forking = True + + with self._config_lock: + self._stop_flush_thread() + self._stop_sender_thread() + self.close_socket() + + def post_fork(self): + """Restore the client state after a fork.""" + + log.debug("[%d] post_fork for %s", os.getpid(), self) + + with self._socket_lock: + if self.socket or self.telemetry_socket: + log.warning("Open socket detected after fork. Call pre_fork() before os.fork().") + self.close_socket() + + self._forking = False + + with self._config_lock: + self._start_flush_thread(self._flush_interval) + self._start_sender_thread() + + def stop(self): + """Stop the client. + + Disable buffering, background sender and flush any pending payloads to the server. + + Client remains usable after this method, but sending metrics may block if socket_timeout is enabled. + """ + + self.disable_background_sender() + self.disable_buffering = True + self.flush() + self.close_socket() statsd = DogStatsd() diff --git a/doc/source/index.rst b/doc/source/index.rst index a251f0946..c8a64cb4a 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -188,6 +188,15 @@ Usage >>> initialize(statsd_host="localhost", statsd_port=8125) >>> statsd.increment("home.page.hits") +.. data:: datadog.dogstatsd.base.SUPPORTS_FORKING + + Indicates whether the Python runtime supports os.register_at_fork(). When + true, buffering and background sender can be safely used in applications + that use os.fork(). + +.. autofunction:: datadog.dogstatsd.base.pre_fork +.. autofunction:: datadog.dogstatsd.base.post_fork + Get in Touch ============ diff --git a/tests/integration/dogstatsd/test_statsd_fork.py b/tests/integration/dogstatsd/test_statsd_fork.py new file mode 100644 index 000000000..5b19f37bb --- /dev/null +++ b/tests/integration/dogstatsd/test_statsd_fork.py @@ -0,0 +1,43 @@ +import os +import itertools +import socket + +import pytest + +from datadog.dogstatsd.base import DogStatsd, SUPPORTS_FORKING + + +@pytest.mark.parametrize( + "disable_background_sender, disable_buffering", + list(itertools.product([True, False], [True, False])), +) +def test_register_at_fork(disable_background_sender, disable_buffering): + if not SUPPORTS_FORKING: + pytest.skip("os.register_at_fork is required for this test") + + statsd = DogStatsd( + telemetry_min_flush_interval=0, + disable_background_sender=disable_background_sender, + disable_buffering=disable_buffering, + ) + + tracker = {} + + def track(method): + def inner(*args, **kwargs): + method(*args, **kwargs) + tracker[method] = True + + return inner + + statsd.pre_fork = track(statsd.pre_fork) + statsd.post_fork = track(statsd.post_fork) + + pid = os.fork() + if pid == 0: + os._exit(0) + + assert pid > 0 + os.waitpid(pid, 0) + + assert len(tracker) == 2 diff --git a/tests/integration/dogstatsd/test_statsd_sender.py b/tests/integration/dogstatsd/test_statsd_sender.py index 9f4cbb930..222897643 100644 --- a/tests/integration/dogstatsd/test_statsd_sender.py +++ b/tests/integration/dogstatsd/test_statsd_sender.py @@ -7,10 +7,10 @@ from datadog.dogstatsd.base import DogStatsd @pytest.mark.parametrize( - "disable_background_sender, disable_buffering, wait_for_pending, socket_timeout", - list(itertools.product([True, False], [True, False], [True, False], [0, 1])), + "disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop", + list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])), ) -def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout): +def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop): # Test basic sender operation with an assortment of options foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0) statsd = DogStatsd( @@ -35,6 +35,9 @@ def reader_thread(): if wait_for_pending: statsd.wait_for_pending() + if stop: + statsd.stop() + t.join(timeout=10) assert not t.is_alive() @@ -46,3 +49,37 @@ def test_set_socket_timeout(): statsd.close_socket() assert statsd.get_socket().gettimeout() == 1 + +@pytest.mark.parametrize( + "disable_background_sender, disable_buffering", + list(itertools.product([True, False], [True, False])), +) +def test_fork_hooks(disable_background_sender, disable_buffering): + statsd = DogStatsd( + telemetry_min_flush_interval=0, + disable_background_sender=disable_background_sender, + disable_buffering=disable_buffering, + ) + + foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0) + statsd.socket = foo + + statsd.increment("test.metric") + + assert disable_buffering or statsd._flush_thread.is_alive() + assert disable_background_sender or statsd._sender_thread.is_alive() + + statsd.pre_fork() + + assert statsd._flush_thread is None + assert statsd._sender_thread is None + assert statsd._queue is None or statsd._queue.empty() + assert len(statsd._buffer) == 0 + + statsd.post_fork() + + assert disable_buffering or statsd._flush_thread.is_alive() + assert disable_background_sender or statsd._sender_thread.is_alive() + + foo.close() + bar.close()