From 36ba6e5f5782d25c0bb353f6f2a206c79f5e1f15 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Wed, 13 Dec 2023 14:17:06 +0100 Subject: [PATCH 01/19] Add forking helpers --- datadog/dogstatsd/base.py | 68 ++++++++++++++++++- .../integration/dogstatsd/test_statsd_fork.py | 43 ++++++++++++ .../dogstatsd/test_statsd_sender.py | 34 ++++++++++ 3 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 tests/integration/dogstatsd/test_statsd_fork.py diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index ebb22618e..192818a2c 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -16,6 +16,7 @@ import threading import time from threading import Lock, RLock +import weakref try: import queue @@ -93,12 +94,29 @@ ] ) + "\n" +Stop = object() + +SUPPORTS_FORKING = hasattr(os, "register_at_fork") + +_instances = weakref.WeakSet() + +def pre_fork(): + for c in _instances: + c.pre_fork() + +def post_fork(): + for c in _instances: + c.post_fork() + +if SUPPORTS_FORKING: + os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # pylint: disable=useless-object-inheritance,too-many-instance-attributes # pylint: disable=too-many-arguments,too-many-locals class DogStatsd(object): OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3) + def __init__( self, host=DEFAULT_HOST, # type: Text @@ -408,12 +426,18 @@ def __init__( # as a value for disabling the automatic flush timer as well. self._flush_interval = flush_interval self._flush_thread_stop = threading.Event() + self._flush_thread = None self._start_flush_thread(self._flush_interval) self._queue = None + self._sender_thread = None + if not disable_background_sender: self.enable_background_sender(sender_queue_size, sender_queue_timeout) + if SUPPORTS_FORKING: + _instances.add(self) + @property def socket_path(self): return self._socket_path @@ -1237,9 +1261,18 @@ def _start_sender_thread(self): self._sender_thread.daemon = True self._sender_thread.start() + def _stop_sender_thread(self): + self._queue.put(Stop) + self._sender_thread.join() + self._sender_thread = None + def _sender_main_loop(self): while True: - self._xmit_packet_with_telemetry(self._queue.get()) + item = self._queue.get() + if item is Stop: + self._queue.task_done() + return + self._xmit_packet_with_telemetry(item) self._queue.task_done() def wait_for_pending(self): @@ -1251,5 +1284,38 @@ def wait_for_pending(self): if self._queue is not None: self._queue.join() + def pre_fork(self): + """Prepare client for a process fork. + + Flush any pending payloads, stop all background threads and + close the connection. Once the function returns. + + The client should not be used from this point until + post_fork() is called. + """ + log.debug("[%d] pre_fork for %s", os.getpid(), self) + + if self._flush_thread is not None: + self._stop_flush_thread() + + if self._sender_thread is not None: + self._stop_sender_thread() + + self.close_socket() + + def post_fork(self): + """Restore the client state after a fork.""" + + log.debug("[%d] post_fork for %s", os.getpid(), self) + + with self._socket_lock: + if self.socket or self.telemetry_socket: + log.warning("Open socket detected after fork. Call pre_fork() before os.fork().") + self.close_socket() + + self._start_flush_thread(self._flush_interval) + if self._queue: + self._start_sender_thread() + statsd = DogStatsd() diff --git a/tests/integration/dogstatsd/test_statsd_fork.py b/tests/integration/dogstatsd/test_statsd_fork.py new file mode 100644 index 000000000..5b19f37bb --- /dev/null +++ b/tests/integration/dogstatsd/test_statsd_fork.py @@ -0,0 +1,43 @@ +import os +import itertools +import socket + +import pytest + +from datadog.dogstatsd.base import DogStatsd, SUPPORTS_FORKING + + +@pytest.mark.parametrize( + "disable_background_sender, disable_buffering", + list(itertools.product([True, False], [True, False])), +) +def test_register_at_fork(disable_background_sender, disable_buffering): + if not SUPPORTS_FORKING: + pytest.skip("os.register_at_fork is required for this test") + + statsd = DogStatsd( + telemetry_min_flush_interval=0, + disable_background_sender=disable_background_sender, + disable_buffering=disable_buffering, + ) + + tracker = {} + + def track(method): + def inner(*args, **kwargs): + method(*args, **kwargs) + tracker[method] = True + + return inner + + statsd.pre_fork = track(statsd.pre_fork) + statsd.post_fork = track(statsd.post_fork) + + pid = os.fork() + if pid == 0: + os._exit(0) + + assert pid > 0 + os.waitpid(pid, 0) + + assert len(tracker) == 2 diff --git a/tests/integration/dogstatsd/test_statsd_sender.py b/tests/integration/dogstatsd/test_statsd_sender.py index 9f4cbb930..95a822bc9 100644 --- a/tests/integration/dogstatsd/test_statsd_sender.py +++ b/tests/integration/dogstatsd/test_statsd_sender.py @@ -46,3 +46,37 @@ def test_set_socket_timeout(): statsd.close_socket() assert statsd.get_socket().gettimeout() == 1 + +@pytest.mark.parametrize( + "disable_background_sender, disable_buffering", + list(itertools.product([True, False], [True, False])), +) +def test_fork_hooks(disable_background_sender, disable_buffering): + statsd = DogStatsd( + telemetry_min_flush_interval=0, + disable_background_sender=disable_background_sender, + disable_buffering=disable_buffering, + ) + + foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0) + statsd.socket = foo + + statsd.increment("test.metric") + + assert disable_buffering or statsd._flush_thread.is_alive() + assert disable_background_sender or statsd._sender_thread.is_alive() + + statsd.pre_fork() + + assert statsd._flush_thread is None + assert statsd._sender_thread is None + assert statsd._queue is None or statsd._queue.empty() + assert len(statsd._buffer) == 0 + + statsd.post_fork() + + assert disable_buffering or statsd._flush_thread.is_alive() + assert disable_background_sender or statsd._sender_thread.is_alive() + + foo.close() + bar.close() From 3b61696c93e2047282a06ca98f11af31bbe540dd Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Wed, 13 Dec 2023 20:10:10 +0100 Subject: [PATCH 02/19] Add disable_background_sender method. --- datadog/dogstatsd/base.py | 115 ++++++++++++++++++++++++++------------ 1 file changed, 80 insertions(+), 35 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 192818a2c..6ee1447c7 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -431,6 +431,10 @@ def __init__( self._queue = None self._sender_thread = None + self._sender_enabled = False + self._sender_lock = RLock() + # Indicates if the process is about to fork, so we shouldn't start any new threads yet. + self._forking = False if not disable_background_sender: self.enable_background_sender(sender_queue_size, sender_queue_timeout) @@ -471,13 +475,9 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): :type sender_queue_timeout: float """ - # Avoid a race on _queue with the background buffer flush thread that reads _queue. - with self._buffer_lock: - if self._queue is not None: - return - - self._queue = queue.Queue(sender_queue_size) - self._start_sender_thread() + with self._sender_lock: + self._sender_enabled = True + self._sender_queue_size = sender_queue_size if sender_queue_timeout is None: self._queue_blocking = True self._queue_timeout = None @@ -485,6 +485,17 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): self._queue_blocking = sender_queue_timeout > 0 self._queue_timeout = max(0, sender_queue_timeout) + self._start_sender_thread() + + def disable_background_sender(self): + """Disable background sender mode. + + This call will block until all previously queued payloads are sent. + """ + with self._sender_lock: + self._sender_enabled = False + self._stop_sender_thread() + def disable_telemetry(self): self._telemetry = False @@ -1008,13 +1019,15 @@ def _is_telemetry_flush_time(self): self._last_flush_time + self._telemetry_flush_interval < time.time() def _send_to_server(self, packet): - if self._queue is not None: - try: - self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout) - except queue.Full: - self.packets_dropped_queue += 1 - self.bytes_dropped_queue += 1 - return + # Prevent a race with disable_background_sender + with self._buffer_lock: + if self._queue is not None: + try: + self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout) + except queue.Full: + self.packets_dropped_queue += 1 + self.bytes_dropped_queue += 1 + return self._xmit_packet_with_telemetry(packet + '\n') @@ -1253,27 +1266,51 @@ def _set_container_id(self, container_id, origin_detection_enabled): self._container_id = None def _start_sender_thread(self): - log.debug("Starting background sender thread") - self._sender_thread = threading.Thread( - name="{}_sender_thread".format(self.__class__.__name__), - target=self._sender_main_loop, - ) - self._sender_thread.daemon = True - self._sender_thread.start() + # This method should be reentrant and idempotent. + + # Prevent races with disable_background_sender and post_fork. + with self._sender_lock: + if not self._sender_enabled or self._forking: + return + + # Avoid race on _queue with _send_to_server. + with self._buffer_lock: + if self._queue is not None: + return + self._queue = queue.Queue(self._sender_queue_size) + + log.debug("Starting background sender thread") + self._sender_thread = threading.Thread( + name="{}_sender_thread".format(self.__class__.__name__), + target=self._sender_main_loop, + args=(self._queue,) + ) + self._sender_thread.daemon = True + self._sender_thread.start() def _stop_sender_thread(self): - self._queue.put(Stop) - self._sender_thread.join() - self._sender_thread = None + # This method should be reentrant and idempotent. + + # Avoid race with _start_sender_thread on _sender_thread. + with self._sender_lock: + # Lock ensures that nothing gets added to the queue after we disable it. + with self._buffer_lock: + if not self._queue: + return + self._queue.put(Stop) + self._queue = None + + self._sender_thread.join() + self._sender_thread = None - def _sender_main_loop(self): + def _sender_main_loop(self, queue): while True: - item = self._queue.get() + item = queue.get() if item is Stop: - self._queue.task_done() + queue.task_done() return self._xmit_packet_with_telemetry(item) - self._queue.task_done() + queue.task_done() def wait_for_pending(self): """ @@ -1281,8 +1318,15 @@ def wait_for_pending(self): """ self.flush() - if self._queue is not None: - self._queue.join() + + queue = None + # Avoid race with disable_background_sender. + with self._buffer_lock: + queue = self._queue + + # Do join outside of the lock so we don't block other threads from sending metrics. + if queue is not None: + queue.join() def pre_fork(self): """Prepare client for a process fork. @@ -1295,12 +1339,12 @@ def pre_fork(self): """ log.debug("[%d] pre_fork for %s", os.getpid(), self) + self._forking = True + if self._flush_thread is not None: self._stop_flush_thread() - if self._sender_thread is not None: - self._stop_sender_thread() - + self._stop_sender_thread() self.close_socket() def post_fork(self): @@ -1313,9 +1357,10 @@ def post_fork(self): log.warning("Open socket detected after fork. Call pre_fork() before os.fork().") self.close_socket() + self._forking = False + self._start_flush_thread(self._flush_interval) - if self._queue: - self._start_sender_thread() + self._start_sender_thread() statsd = DogStatsd() From 22de4ef06fda24885b3f54705c4c575c74322f9b Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Wed, 13 Dec 2023 20:10:44 +0100 Subject: [PATCH 03/19] Add stop method. --- datadog/dogstatsd/base.py | 13 +++++++++++++ tests/integration/dogstatsd/test_statsd_sender.py | 9 ++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 6ee1447c7..56b764e77 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -1362,5 +1362,18 @@ def post_fork(self): self._start_flush_thread(self._flush_interval) self._start_sender_thread() + def stop(self): + """Stop the client. + + Disable buffering, background sender and flush any pending payloads to the server. + + Client remains usable after this method, but sending metrics may block if socket_timeout is enabled. + """ + + self.disable_background_sender() + self.disable_buffering = True + self.flush() + self.close_socket() + statsd = DogStatsd() diff --git a/tests/integration/dogstatsd/test_statsd_sender.py b/tests/integration/dogstatsd/test_statsd_sender.py index 95a822bc9..222897643 100644 --- a/tests/integration/dogstatsd/test_statsd_sender.py +++ b/tests/integration/dogstatsd/test_statsd_sender.py @@ -7,10 +7,10 @@ from datadog.dogstatsd.base import DogStatsd @pytest.mark.parametrize( - "disable_background_sender, disable_buffering, wait_for_pending, socket_timeout", - list(itertools.product([True, False], [True, False], [True, False], [0, 1])), + "disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop", + list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])), ) -def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout): +def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop): # Test basic sender operation with an assortment of options foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0) statsd = DogStatsd( @@ -35,6 +35,9 @@ def reader_thread(): if wait_for_pending: statsd.wait_for_pending() + if stop: + statsd.stop() + t.join(timeout=10) assert not t.is_alive() From 139993b00e54ef1c4fe4554d67c07c39c9197cec Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 13:20:46 +0100 Subject: [PATCH 04/19] Add opt-out --- datadog/dogstatsd/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 56b764e77..de1d6095d 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -96,7 +96,7 @@ Stop = object() -SUPPORTS_FORKING = hasattr(os, "register_at_fork") +SUPPORTS_FORKING = hasattr(os, "register_at_fork") and not os.environ.get("DD_DOGSTATSD_DISABLE_FORK_SUPPORT", None) _instances = weakref.WeakSet() From 63f53a153fa11bfef4a0d5b401977530101d738d Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 13:38:41 +0100 Subject: [PATCH 05/19] Simplify locking Instead of separate locks for buffering and sender modes configuration, use the same lock. --- datadog/dogstatsd/base.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index de1d6095d..7ff1ae0c0 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -409,10 +409,8 @@ def __init__( self._reset_buffer() - # This lock is used for all cases where buffering functionality is - # being toggled (by `open_buffer()`, `close_buffer()`, or - # `self._disable_buffering` calls). - self._buffering_toggle_lock = RLock() + # This lock is used for all cases where client configuration is being changed: buffering, sender mode. + self._config_lock = RLock() # If buffering is disabled, we bypass the buffer function. self._send = self._send_to_buffer @@ -432,7 +430,6 @@ def __init__( self._queue = None self._sender_thread = None self._sender_enabled = False - self._sender_lock = RLock() # Indicates if the process is about to fork, so we shouldn't start any new threads yet. self._forking = False @@ -475,7 +472,7 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): :type sender_queue_timeout: float """ - with self._sender_lock: + with self._config_lock: self._sender_enabled = True self._sender_queue_size = sender_queue_size if sender_queue_timeout is None: @@ -492,7 +489,7 @@ def disable_background_sender(self): This call will block until all previously queued payloads are sent. """ - with self._sender_lock: + with self._config_lock: self._sender_enabled = False self._stop_sender_thread() @@ -558,12 +555,12 @@ def __exit__(self, exc_type, value, traceback): @property def disable_buffering(self): - with self._buffering_toggle_lock: + with self._config_lock: return self._disable_buffering @disable_buffering.setter def disable_buffering(self, is_disabled): - with self._buffering_toggle_lock: + with self._config_lock: # If the toggle didn't change anything, this method is a noop if self._disable_buffering == is_disabled: return @@ -705,7 +702,7 @@ def open_buffer(self, max_buffer_size=None): Note: This method must be called before close_buffer() matching invocation. """ - self._buffering_toggle_lock.acquire() + self._config_lock.acquire() # XXX Remove if `disable_buffering` default is changed to False self._send = self._send_to_buffer @@ -729,7 +726,7 @@ def close_buffer(self): if self._disable_buffering: self._send = self._send_to_server - self._buffering_toggle_lock.release() + self._config_lock.release() def _reset_buffer(self): with self._buffer_lock: @@ -1269,7 +1266,7 @@ def _start_sender_thread(self): # This method should be reentrant and idempotent. # Prevent races with disable_background_sender and post_fork. - with self._sender_lock: + with self._config_lock: if not self._sender_enabled or self._forking: return @@ -1292,7 +1289,7 @@ def _stop_sender_thread(self): # This method should be reentrant and idempotent. # Avoid race with _start_sender_thread on _sender_thread. - with self._sender_lock: + with self._config_lock: # Lock ensures that nothing gets added to the queue after we disable it. with self._buffer_lock: if not self._queue: From 60a9f4664c495e4a5e02d10fa15e40d8d70a8919 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 13:44:21 +0100 Subject: [PATCH 06/19] Move locks out of _start/_stop_sender_thread methods --- datadog/dogstatsd/base.py | 65 ++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 7ff1ae0c0..b792e50e4 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -505,6 +505,9 @@ def _start_flush_thread(self, flush_interval): log.debug("Statsd periodic buffer flush is disabled") return + if self._flush_thread is not None: + return + def _flush_thread_loop(self, flush_interval): while not self._flush_thread_stop.is_set(): time.sleep(flush_interval) @@ -1263,42 +1266,34 @@ def _set_container_id(self, container_id, origin_detection_enabled): self._container_id = None def _start_sender_thread(self): - # This method should be reentrant and idempotent. + if not self._sender_enabled or self._forking: + return - # Prevent races with disable_background_sender and post_fork. - with self._config_lock: - if not self._sender_enabled or self._forking: + # Avoid race on _queue with _send_to_server. + with self._buffer_lock: + if self._queue is not None: return + self._queue = queue.Queue(self._sender_queue_size) - # Avoid race on _queue with _send_to_server. - with self._buffer_lock: - if self._queue is not None: - return - self._queue = queue.Queue(self._sender_queue_size) - - log.debug("Starting background sender thread") - self._sender_thread = threading.Thread( - name="{}_sender_thread".format(self.__class__.__name__), - target=self._sender_main_loop, - args=(self._queue,) - ) - self._sender_thread.daemon = True - self._sender_thread.start() + log.debug("Starting background sender thread") + self._sender_thread = threading.Thread( + name="{}_sender_thread".format(self.__class__.__name__), + target=self._sender_main_loop, + args=(self._queue,) + ) + self._sender_thread.daemon = True + self._sender_thread.start() def _stop_sender_thread(self): - # This method should be reentrant and idempotent. - - # Avoid race with _start_sender_thread on _sender_thread. - with self._config_lock: - # Lock ensures that nothing gets added to the queue after we disable it. - with self._buffer_lock: - if not self._queue: - return - self._queue.put(Stop) - self._queue = None + # Lock ensures that nothing gets added to the queue after we disable it. + with self._buffer_lock: + if not self._queue: + return + self._queue.put(Stop) + self._queue = None - self._sender_thread.join() - self._sender_thread = None + self._sender_thread.join() + self._sender_thread = None def _sender_main_loop(self, queue): while True: @@ -1338,10 +1333,9 @@ def pre_fork(self): self._forking = True - if self._flush_thread is not None: + with self._config_lock: self._stop_flush_thread() - - self._stop_sender_thread() + self._stop_sender_thread() self.close_socket() def post_fork(self): @@ -1356,8 +1350,9 @@ def post_fork(self): self._forking = False - self._start_flush_thread(self._flush_interval) - self._start_sender_thread() + with self._config_lock: + self._start_flush_thread(self._flush_interval) + self._start_sender_thread() def stop(self): """Stop the client. From 1ccb2d898d4eaabc6a5a2bbdbbba71fa2fd15a96 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 13:44:44 +0100 Subject: [PATCH 07/19] Don't start flush thread if we are about to fork --- datadog/dogstatsd/base.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index b792e50e4..9d4aaa65a 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -419,6 +419,9 @@ def __init__( self._send = self._send_to_server log.debug("Statsd buffering is disabled") + # Indicates if the process is about to fork, so we shouldn't start any new threads yet. + self._forking = False + # Start the flush thread if buffering is enabled and the interval is above # a reasonable range. This both prevents thrashing and allow us to use "0.0" # as a value for disabling the automatic flush timer as well. @@ -430,8 +433,6 @@ def __init__( self._queue = None self._sender_thread = None self._sender_enabled = False - # Indicates if the process is about to fork, so we shouldn't start any new threads yet. - self._forking = False if not disable_background_sender: self.enable_background_sender(sender_queue_size, sender_queue_timeout) @@ -505,6 +506,9 @@ def _start_flush_thread(self, flush_interval): log.debug("Statsd periodic buffer flush is disabled") return + if self._forking: + return + if self._flush_thread is not None: return From 95d50488fd37c18ecfb339448d1d70f787b8741c Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 14:04:02 +0100 Subject: [PATCH 08/19] Add per-instance opt-out from tracking --- datadog/dogstatsd/base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 9d4aaa65a..e96abc7a2 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -97,6 +97,7 @@ Stop = object() SUPPORTS_FORKING = hasattr(os, "register_at_fork") and not os.environ.get("DD_DOGSTATSD_DISABLE_FORK_SUPPORT", None) +TRACK_INSTANCES = not os.environ.get("DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING", None) _instances = weakref.WeakSet() @@ -143,6 +144,7 @@ def __init__( disable_background_sender=True, # type: bool sender_queue_size=0, # type: int sender_queue_timeout=0, # type: Optional[float] + track_instance=True, # type: bool ): # type: (...) -> None """ Initialize a DogStatsd object. @@ -301,6 +303,10 @@ def __init__( If set to zero drop the packet immediately if the queue is full. Default: 0 (no wait) :type sender_queue_timeout: float + + :param track_instance: Keep track of this instance and automatically handle cleanup when os.fork() is called, if supported. + Default: True. + :type track_instance: boolean """ self._socket_lock = Lock() @@ -437,7 +443,7 @@ def __init__( if not disable_background_sender: self.enable_background_sender(sender_queue_size, sender_queue_timeout) - if SUPPORTS_FORKING: + if TRACK_INSTANCES and track_instance: _instances.add(self) @property From 6ddd73605e3b186a5ca8ee84529db63beeb8be14 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 15:25:06 +0100 Subject: [PATCH 09/19] Document global fork hooks --- datadog/dogstatsd/base.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index e96abc7a2..cda01a15e 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -102,10 +102,18 @@ _instances = weakref.WeakSet() def pre_fork(): + """Prepare all client instances for a process fork. + + If SUPPORTS_FORKING is true, this will be called automatically before os.fork(). + """ for c in _instances: c.pre_fork() def post_fork(): + """Restore all client instances after a fork. + + If SUPPORTS_FORKING is true, this will be called automatically before os.fork(). + """ for c in _instances: c.post_fork() From 9082c7c8fd69c4e74c60df394dfd4c00b56c2f03 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 15:26:35 +0100 Subject: [PATCH 10/19] Remove warning when stopping already stopped thread Now that disable_buffering and pre_fork may both try to stop the flush thread at the same time, attempt to stop an already stopped thread does not indicate a bug in the client code. --- datadog/dogstatsd/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index cda01a15e..29316e8a8 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -547,7 +547,6 @@ def _flush_thread_loop(self, flush_interval): # Note: Invocations of this method should be thread-safe def _stop_flush_thread(self): if not self._flush_thread: - log.warning("No statsd flush thread to stop") return try: From e4e15bfb1b1f7370c9a50078ce910bc8d694b82e Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 16:11:36 +0100 Subject: [PATCH 11/19] Remove some redundant locking Single field assignement is atomic according to Python FAQ, so we don't need to protect reads and writes to self._queue as such. The only place where we need a lock is when stopping the sender, to ensure that the Stop marker is the last thing to be placed in there. https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe --- datadog/dogstatsd/base.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 29316e8a8..8e2066752 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -1036,15 +1036,17 @@ def _is_telemetry_flush_time(self): self._last_flush_time + self._telemetry_flush_interval < time.time() def _send_to_server(self, packet): - # Prevent a race with disable_background_sender - with self._buffer_lock: - if self._queue is not None: - try: - self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout) - except queue.Full: - self.packets_dropped_queue += 1 - self.bytes_dropped_queue += 1 - return + # Skip the lock if the queue is None. There is no race with enable_background_sender. + if self._queue is not None: + # Prevent a race with disable_background_sender. + with self._buffer_lock: + if self._queue is not None: + try: + self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout) + except queue.Full: + self.packets_dropped_queue += 1 + self.bytes_dropped_queue += 1 + return self._xmit_packet_with_telemetry(packet + '\n') @@ -1286,11 +1288,10 @@ def _start_sender_thread(self): if not self._sender_enabled or self._forking: return - # Avoid race on _queue with _send_to_server. - with self._buffer_lock: - if self._queue is not None: - return - self._queue = queue.Queue(self._sender_queue_size) + if self._queue is not None: + return + + self._queue = queue.Queue(self._sender_queue_size) log.debug("Starting background sender thread") self._sender_thread = threading.Thread( @@ -1328,12 +1329,11 @@ def wait_for_pending(self): self.flush() - queue = None - # Avoid race with disable_background_sender. - with self._buffer_lock: - queue = self._queue + # Avoid race with disable_background_sender. We don't need a + # lock, just copy the value so it doesn't change between the + # check and join later. + queue = self._queue - # Do join outside of the lock so we don't block other threads from sending metrics. if queue is not None: queue.join() From b536a1c12caf3ac672a74abf4dbd960636672697 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 17:18:06 +0100 Subject: [PATCH 12/19] Add type annotation for mypy --- datadog/dogstatsd/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 8e2066752..66582e49a 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -99,7 +99,7 @@ SUPPORTS_FORKING = hasattr(os, "register_at_fork") and not os.environ.get("DD_DOGSTATSD_DISABLE_FORK_SUPPORT", None) TRACK_INSTANCES = not os.environ.get("DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING", None) -_instances = weakref.WeakSet() +_instances = weakref.WeakSet() # type: weakref.WeakSet def pre_fork(): """Prepare all client instances for a process fork. From 69b49f79e82b9487d89847859d5023fbb80c80b8 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 17:28:06 +0100 Subject: [PATCH 13/19] Silence type error on os.register_at_fork os.register_at_fork is not available on all python versions, notably python 2.7. While we check for the availability explicitly, mypy cannot see through that and complains. --- datadog/dogstatsd/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 66582e49a..23b64c135 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -118,7 +118,7 @@ def post_fork(): c.post_fork() if SUPPORTS_FORKING: - os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) + os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore # pylint: disable=useless-object-inheritance,too-many-instance-attributes # pylint: disable=too-many-arguments,too-many-locals From ec8f50455c528d5be9a5075941f33ddb0b688ac5 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 14 Dec 2023 17:45:03 +0100 Subject: [PATCH 14/19] Fix formatting --- datadog/dogstatsd/base.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 23b64c135..a0e963b30 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -99,7 +99,8 @@ SUPPORTS_FORKING = hasattr(os, "register_at_fork") and not os.environ.get("DD_DOGSTATSD_DISABLE_FORK_SUPPORT", None) TRACK_INSTANCES = not os.environ.get("DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING", None) -_instances = weakref.WeakSet() # type: weakref.WeakSet +_instances = weakref.WeakSet() # type: weakref.WeakSet + def pre_fork(): """Prepare all client instances for a process fork. @@ -109,6 +110,7 @@ def pre_fork(): for c in _instances: c.pre_fork() + def post_fork(): """Restore all client instances after a fork. @@ -117,15 +119,16 @@ def post_fork(): for c in _instances: c.post_fork() + if SUPPORTS_FORKING: - os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore + os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore + # pylint: disable=useless-object-inheritance,too-many-instance-attributes # pylint: disable=too-many-arguments,too-many-locals class DogStatsd(object): OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3) - def __init__( self, host=DEFAULT_HOST, # type: Text @@ -312,7 +315,8 @@ def __init__( Default: 0 (no wait) :type sender_queue_timeout: float - :param track_instance: Keep track of this instance and automatically handle cleanup when os.fork() is called, if supported. + :param track_instance: Keep track of this instance and automatically handle cleanup when os.fork() is called, + if supported. Default: True. :type track_instance: boolean """ From 4363dd48fe210d4ddd2a090feed3e28038c380ee Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Fri, 5 Jan 2024 17:00:27 +0100 Subject: [PATCH 15/19] Fix docstring --- datadog/dogstatsd/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index d76df0525..54aa71e0d 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -114,7 +114,7 @@ def pre_fork(): def post_fork(): """Restore all client instances after a fork. - If SUPPORTS_FORKING is true, this will be called automatically before os.fork(). + If SUPPORTS_FORKING is true, this will be called automatically after os.fork(). """ for c in _instances: c.post_fork() From f923a46aaa9815fd29f9e1a5855c4d3a47c8ceb7 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Fri, 5 Jan 2024 17:01:43 +0100 Subject: [PATCH 16/19] Update docs to reference stop() rather than wait_for_pending(). --- datadog/dogstatsd/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 54aa71e0d..1dcb64cdc 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -299,7 +299,7 @@ def __init__( :param disable_background_sender: Use a background thread to communicate with the dogstatsd server. Optional. When enabled, a background thread will be used to send metric payloads to the Agent. - Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent. + Applications should call stop() before exiting to make sure all pending payloads are sent. Default: True. :type disable_background_sender: boolean @@ -474,7 +474,7 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): Use a background thread to communicate with the dogstatsd server. When enabled, a background thread will be used to send metric payloads to the Agent. - Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent. + Applications should call stop() before exiting to make sure all pending payloads are sent. This method is not thread safe and should not be called concurrently with other methods on the current object. Normally, this should be called shortly after process initialization (for example from a post-fork hook in a From ee120f98bb0cabde46d6c5cf6ae38c535f9aaeb6 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Fri, 5 Jan 2024 17:05:21 +0100 Subject: [PATCH 17/19] Update docstring for enable_background_sender The method is now thread safe, and can be used with forking applications. --- datadog/dogstatsd/base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 1dcb64cdc..ba90704b0 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -476,9 +476,8 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): Applications should call stop() before exiting to make sure all pending payloads are sent. - This method is not thread safe and should not be called concurrently with other methods on the current object. - Normally, this should be called shortly after process initialization (for example from a post-fork hook in a - forking server). + Compatible with os.fork() starting with Python 3.7. On earlier versions, compatible if applications + arrange to call pre_fork() and post_fork() module functions around calls to os.fork(). :param sender_queue_size: Set the maximum number of packets to queue for the sender. Optional How may packets to queue before blocking or dropping the packet if the packet queue is already full. From 9db758ac9c12c042f2ed4dbafc10565e2ce47673 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Fri, 5 Jan 2024 17:06:41 +0100 Subject: [PATCH 18/19] Fix docstring for enable_background_sender Re-format parameter descriptions so they are rendered correctly in the html docs. --- datadog/dogstatsd/base.py | 20 +++++++++----------- doc/source/index.rst | 9 +++++++++ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index ba90704b0..ee9946474 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -479,17 +479,15 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): Compatible with os.fork() starting with Python 3.7. On earlier versions, compatible if applications arrange to call pre_fork() and post_fork() module functions around calls to os.fork(). - :param sender_queue_size: Set the maximum number of packets to queue for the sender. Optional - How may packets to queue before blocking or dropping the packet if the packet queue is already full. - Default: 0 (unlimited). - :type sender_queue_size: integer - - :param sender_queue_timeout: Set timeout for packet queue operations, in seconds. Optional. - How long the application thread is willing to wait for the queue clear up before dropping the metric packet. - If set to None, wait forever. - If set to zero drop the packet immediately if the queue is full. - Default: 0 (no wait) - :type sender_queue_timeout: float + :param sender_queue_size: Set the maximum number of packets to queue for the sender. + How many packets to queue before blocking or dropping the packet if the packet queue is already full. + Default: 0 (unlimited). + :type sender_queue_size: integer, optional + :param sender_queue_timeout: Set timeout for packet queue operations, in seconds. + How long the application thread is willing to wait for the queue clear up before dropping the metric packet. + If set to None, wait forever. If set to zero drop the packet immediately if the queue is full. + Default: 0 (no wait). + :type sender_queue_timeout: float, optional """ with self._config_lock: diff --git a/doc/source/index.rst b/doc/source/index.rst index a251f0946..c8a64cb4a 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -188,6 +188,15 @@ Usage >>> initialize(statsd_host="localhost", statsd_port=8125) >>> statsd.increment("home.page.hits") +.. data:: datadog.dogstatsd.base.SUPPORTS_FORKING + + Indicates whether the Python runtime supports os.register_at_fork(). When + true, buffering and background sender can be safely used in applications + that use os.fork(). + +.. autofunction:: datadog.dogstatsd.base.pre_fork +.. autofunction:: datadog.dogstatsd.base.post_fork + Get in Touch ============ From 35df4a16ecdc9651879b17705e5456d0a8aad681 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Wed, 10 Jan 2024 17:35:47 +0100 Subject: [PATCH 19/19] Document new environment variables. --- datadog/dogstatsd/base.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index ee9946474..1f58fe550 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -203,6 +203,13 @@ def __init__( for origin detection. :type DD_ORIGIN_DETECTION_ENABLED: boolean + :envvar DD_DOGSTATSD_DISABLE_FORK_SUPPORT: Don't install global fork hooks with os.register_at_fork. + Global fork hooks then need to be called manually before and after calling os.fork. + :type DD_DOGSTATSD_DISABLE_FORK_SUPPORT: boolean + + :envvar DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: Don't register instances of this class with global fork hooks. + :type DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: boolean + :param host: the host of the DogStatsd server. :type host: string