Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve forking support #804

Merged
merged 21 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 184 additions & 43 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import threading
import time
from threading import Lock, RLock
import weakref

try:
import queue
Expand Down Expand Up @@ -93,6 +94,35 @@
]
) + "\n"

Stop = object()

SUPPORTS_FORKING = hasattr(os, "register_at_fork") and not os.environ.get("DD_DOGSTATSD_DISABLE_FORK_SUPPORT", None)
TRACK_INSTANCES = not os.environ.get("DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING", None)

_instances = weakref.WeakSet() # type: weakref.WeakSet


def pre_fork():
"""Prepare all client instances for a process fork.

If SUPPORTS_FORKING is true, this will be called automatically before os.fork().
"""
for c in _instances:
c.pre_fork()


def post_fork():
"""Restore all client instances after a fork.

If SUPPORTS_FORKING is true, this will be called automatically after os.fork().
"""
for c in _instances:
c.post_fork()


if SUPPORTS_FORKING:
os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore


# pylint: disable=useless-object-inheritance,too-many-instance-attributes
# pylint: disable=too-many-arguments,too-many-locals
Expand Down Expand Up @@ -125,6 +155,7 @@ def __init__(
disable_background_sender=True, # type: bool
sender_queue_size=0, # type: int
sender_queue_timeout=0, # type: Optional[float]
track_instance=True, # type: bool
): # type: (...) -> None
"""
Initialize a DogStatsd object.
Expand Down Expand Up @@ -172,6 +203,13 @@ def __init__(
for origin detection.
:type DD_ORIGIN_DETECTION_ENABLED: boolean

:envvar DD_DOGSTATSD_DISABLE_FORK_SUPPORT: Don't install global fork hooks with os.register_at_fork.
Global fork hooks then need to be called manually before and after calling os.fork.
:type DD_DOGSTATSD_DISABLE_FORK_SUPPORT: boolean

:envvar DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: Don't register instances of this class with global fork hooks.
:type DD_DOGSTATSD_DISABLE_INSTANCE_TRACKING: boolean

:param host: the host of the DogStatsd server.
:type host: string

Expand Down Expand Up @@ -268,7 +306,7 @@ def __init__(

:param disable_background_sender: Use a background thread to communicate with the dogstatsd server. Optional.
When enabled, a background thread will be used to send metric payloads to the Agent.
Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
Applications should call stop() before exiting to make sure all pending payloads are sent.
Default: True.
:type disable_background_sender: boolean

Expand All @@ -283,6 +321,11 @@ def __init__(
If set to zero drop the packet immediately if the queue is full.
Default: 0 (no wait)
:type sender_queue_timeout: float

:param track_instance: Keep track of this instance and automatically handle cleanup when os.fork() is called,
if supported.
Default: True.
:type track_instance: boolean
"""

self._socket_lock = Lock()
Expand Down Expand Up @@ -387,10 +430,8 @@ def __init__(

self._reset_buffer()

# This lock is used for all cases where buffering functionality is
# being toggled (by `open_buffer()`, `close_buffer()`, or
# `self._disable_buffering` calls).
self._buffering_toggle_lock = RLock()
# This lock is used for all cases where client configuration is being changed: buffering, sender mode.
self._config_lock = RLock()

# If buffering is disabled, we bypass the buffer function.
self._send = self._send_to_buffer
Expand All @@ -399,17 +440,27 @@ def __init__(
self._send = self._send_to_server
log.debug("Statsd buffering is disabled")

# Indicates if the process is about to fork, so we shouldn't start any new threads yet.
self._forking = False

# Start the flush thread if buffering is enabled and the interval is above
# a reasonable range. This both prevents thrashing and allow us to use "0.0"
# as a value for disabling the automatic flush timer as well.
self._flush_interval = flush_interval
self._flush_thread_stop = threading.Event()
self._flush_thread = None
self._start_flush_thread(self._flush_interval)

self._queue = None
self._sender_thread = None
self._sender_enabled = False

if not disable_background_sender:
self.enable_background_sender(sender_queue_size, sender_queue_timeout)

if TRACK_INSTANCES and track_instance:
_instances.add(self)

@property
def socket_path(self):
return self._socket_path
Expand All @@ -430,39 +481,43 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
Use a background thread to communicate with the dogstatsd server.
When enabled, a background thread will be used to send metric payloads to the Agent.

Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.

This method is not thread safe and should not be called concurrently with other methods on the current object.
Normally, this should be called shortly after process initialization (for example from a post-fork hook in a
forking server).
Applications should call stop() before exiting to make sure all pending payloads are sent.

:param sender_queue_size: Set the maximum number of packets to queue for the sender. Optional
How may packets to queue before blocking or dropping the packet if the packet queue is already full.
Default: 0 (unlimited).
:type sender_queue_size: integer
Compatible with os.fork() starting with Python 3.7. On earlier versions, compatible if applications
arrange to call pre_fork() and post_fork() module functions around calls to os.fork().

:param sender_queue_timeout: Set timeout for packet queue operations, in seconds. Optional.
How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
If set to None, wait forever.
If set to zero drop the packet immediately if the queue is full.
Default: 0 (no wait)
:type sender_queue_timeout: float
:param sender_queue_size: Set the maximum number of packets to queue for the sender.
How many packets to queue before blocking or dropping the packet if the packet queue is already full.
Default: 0 (unlimited).
:type sender_queue_size: integer, optional
:param sender_queue_timeout: Set timeout for packet queue operations, in seconds.
How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
If set to None, wait forever. If set to zero drop the packet immediately if the queue is full.
Default: 0 (no wait).
:type sender_queue_timeout: float, optional
"""

# Avoid a race on _queue with the background buffer flush thread that reads _queue.
with self._buffer_lock:
if self._queue is not None:
return

self._queue = queue.Queue(sender_queue_size)
self._start_sender_thread()
with self._config_lock:
self._sender_enabled = True
self._sender_queue_size = sender_queue_size
if sender_queue_timeout is None:
self._queue_blocking = True
self._queue_timeout = None
else:
self._queue_blocking = sender_queue_timeout > 0
self._queue_timeout = max(0, sender_queue_timeout)

self._start_sender_thread()

def disable_background_sender(self):
"""Disable background sender mode.

This call will block until all previously queued payloads are sent.
"""
with self._config_lock:
self._sender_enabled = False
self._stop_sender_thread()

def disable_telemetry(self):
self._telemetry = False

Expand All @@ -475,6 +530,12 @@ def _start_flush_thread(self, flush_interval):
log.debug("Statsd periodic buffer flush is disabled")
return

if self._forking:
return

if self._flush_thread is not None:
return

def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
Expand All @@ -496,7 +557,6 @@ def _flush_thread_loop(self, flush_interval):
# Note: Invocations of this method should be thread-safe
def _stop_flush_thread(self):
if not self._flush_thread:
log.warning("No statsd flush thread to stop")
return

try:
Expand Down Expand Up @@ -525,12 +585,12 @@ def __exit__(self, exc_type, value, traceback):

@property
def disable_buffering(self):
with self._buffering_toggle_lock:
with self._config_lock:
return self._disable_buffering

@disable_buffering.setter
def disable_buffering(self, is_disabled):
with self._buffering_toggle_lock:
with self._config_lock:
# If the toggle didn't change anything, this method is a noop
if self._disable_buffering == is_disabled:
return
Expand Down Expand Up @@ -672,7 +732,7 @@ def open_buffer(self, max_buffer_size=None):
Note: This method must be called before close_buffer() matching invocation.
"""

self._buffering_toggle_lock.acquire()
self._config_lock.acquire()

# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_buffer
Expand All @@ -696,7 +756,7 @@ def close_buffer(self):
if self._disable_buffering:
self._send = self._send_to_server

self._buffering_toggle_lock.release()
self._config_lock.release()

def _reset_buffer(self):
with self._buffer_lock:
Expand Down Expand Up @@ -986,13 +1046,17 @@ def _is_telemetry_flush_time(self):
self._last_flush_time + self._telemetry_flush_interval < time.time()

def _send_to_server(self, packet):
# Skip the lock if the queue is None. There is no race with enable_background_sender.
if self._queue is not None:
try:
self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout)
except queue.Full:
self.packets_dropped_queue += 1
self.bytes_dropped_queue += 1
return
# Prevent a race with disable_background_sender.
with self._buffer_lock:
if self._queue is not None:
try:
self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout)
except queue.Full:
self.packets_dropped_queue += 1
self.bytes_dropped_queue += 1
return

self._xmit_packet_with_telemetry(packet + '\n')

Expand Down Expand Up @@ -1231,27 +1295,104 @@ def _set_container_id(self, container_id, origin_detection_enabled):
self._container_id = None

def _start_sender_thread(self):
if not self._sender_enabled or self._forking:
return

if self._queue is not None:
return

self._queue = queue.Queue(self._sender_queue_size)

log.debug("Starting background sender thread")
self._sender_thread = threading.Thread(
name="{}_sender_thread".format(self.__class__.__name__),
target=self._sender_main_loop,
args=(self._queue,)
)
self._sender_thread.daemon = True
self._sender_thread.start()

def _sender_main_loop(self):
def _stop_sender_thread(self):
# Lock ensures that nothing gets added to the queue after we disable it.
with self._buffer_lock:
if not self._queue:
return
self._queue.put(Stop)
self._queue = None

self._sender_thread.join()
self._sender_thread = None

def _sender_main_loop(self, queue):
while True:
self._xmit_packet_with_telemetry(self._queue.get())
self._queue.task_done()
item = queue.get()
if item is Stop:
queue.task_done()
return
self._xmit_packet_with_telemetry(item)
queue.task_done()

def wait_for_pending(self):
"""
Flush the buffer and wait for all queued payloads to be written to the server.
"""

self.flush()
if self._queue is not None:
self._queue.join()

# Avoid race with disable_background_sender. We don't need a
# lock, just copy the value so it doesn't change between the
# check and join later.
queue = self._queue

if queue is not None:
queue.join()

def pre_fork(self):
"""Prepare client for a process fork.

Flush any pending payloads, stop all background threads and
close the connection. Once the function returns.

The client should not be used from this point until
post_fork() is called.
"""
log.debug("[%d] pre_fork for %s", os.getpid(), self)

self._forking = True

with self._config_lock:
self._stop_flush_thread()
self._stop_sender_thread()
self.close_socket()

def post_fork(self):
"""Restore the client state after a fork."""

log.debug("[%d] post_fork for %s", os.getpid(), self)

with self._socket_lock:
if self.socket or self.telemetry_socket:
log.warning("Open socket detected after fork. Call pre_fork() before os.fork().")
self.close_socket()

self._forking = False

with self._config_lock:
self._start_flush_thread(self._flush_interval)
self._start_sender_thread()

def stop(self):
"""Stop the client.

Disable buffering, background sender and flush any pending payloads to the server.

Client remains usable after this method, but sending metrics may block if socket_timeout is enabled.
"""

self.disable_background_sender()
self.disable_buffering = True
self.flush()
self.close_socket()


statsd = DogStatsd()
9 changes: 9 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ Usage
>>> initialize(statsd_host="localhost", statsd_port=8125)
>>> statsd.increment("home.page.hits")

.. data:: datadog.dogstatsd.base.SUPPORTS_FORKING

Indicates whether the Python runtime supports os.register_at_fork(). When
true, buffering and background sender can be safely used in applications
that use os.fork().

.. autofunction:: datadog.dogstatsd.base.pre_fork
.. autofunction:: datadog.dogstatsd.base.post_fork


Get in Touch
============
Expand Down
Loading
Loading