diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 4ba8046814..8d80b6a668 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -3,6 +3,7 @@ import threading from datetime import datetime, timezone from typing import TYPE_CHECKING, TypeVar, Generic +import weakref from sentry_sdk.utils import format_timestamp from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -38,6 +39,26 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None + # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 + if hasattr(os, "register_at_fork"): + weak_reset = weakref.WeakMethod(self._reset_thread_state) + + def _reset_in_child() -> None: + method = weak_reset() + if method is not None: + method() + + os.register_at_fork(after_in_child=_reset_in_child) + + def _reset_thread_state(self) -> None: + self._buffer = [] + self._running = True + self._lock = threading.Lock() + self._active = threading.local() + self._flush_event = threading.Event() + self._flusher = None + self._flusher_pid = None + def _ensure_thread(self) -> bool: """For forking processes we might need to restart this thread. This ensures that our process actually has that thread running. diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 78204f1a3a..9e1b96f0cf 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,7 +1,9 @@ -import threading from collections import defaultdict from datetime import datetime, timezone +import os +import threading from typing import TYPE_CHECKING +import weakref from sentry_sdk._batcher import Batcher from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -50,6 +52,30 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None + # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 + if hasattr(os, "register_at_fork"): + weak_reset = weakref.WeakMethod(self._reset_thread_state) + + def _reset_in_child() -> None: + method = weak_reset() + if method is not None: + method() + + os.register_at_fork(after_in_child=_reset_in_child) + + def _reset_thread_state(self) -> None: + self._span_buffer = defaultdict(list) + self._running_size = defaultdict(lambda: 0) + self._running = True + + self._lock = threading.Lock() + self._active = threading.local() + + self._flush_event = threading.Event() + + self._flusher = None + self._flusher_pid = None + def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. # This prevents deadlocks when code running inside the batcher (e.g. diff --git a/tests/test_logs.py b/tests/test_logs.py index 86861cfe90..cd52ec1cde 100644 --- a/tests/test_logs.py +++ b/tests/test_logs.py @@ -1,5 +1,6 @@ import json import logging +import os import sys import time from typing import List, Any, Mapping, Union @@ -819,3 +820,60 @@ def add_to_envelope_with_reentrant_add(envelope): assert reentrant_add_called # If the re-entrancy guard didn't work, this test would hang and it'd # eventually be timed out by pytest-timeout + + +@pytest.mark.skipif( + sys.platform == "win32" + or not hasattr(os, "fork") + or not hasattr(os, "register_at_fork"), + reason="requires POSIX fork and os.register_at_fork (Python 3.7+)", +) +def test_log_batcher_lock_reset_in_child_after_fork(sentry_init): + """Regression test for the LogBatcher fork-deadlock fix. + + If os.fork() runs while another thread holds LogBatcher._lock, the + child inherits the lock locked. The holding thread does not exist in + the child, so the lock can never be released and _ensure_thread + deadlocks forever. The after-fork hook must replace the lock with a + fresh one in the child and reset + _flusher / _flusher_pid / _buffer / _active / _flush_event. + """ + sentry_init(enable_logs=True) + batcher = sentry_sdk.get_client().log_batcher + assert batcher is not None + + original_lock = batcher._lock + original_lock.acquire() + + batcher._buffer.append(object()) + batcher._active.flag = True + batcher._flush_event.set() + batcher._running = False + + pid = os.fork() + if pid == 0: + replaced = batcher._lock is not original_lock + unheld = batcher._lock.acquire(blocking=False) + + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None + buffer_reset = len(batcher._buffer) == 0 + active_reset = not getattr(batcher._active, "flag", False) + + event_reset = not batcher._flush_event.is_set() + running_reset = batcher._running is True + + os._exit( + 0 + if replaced + and unheld + and flusher_reset + and buffer_reset + and active_reset + and event_reset + and running_reset + else 1 + ) + + original_lock.release() + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0 diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 3ad3f6042d..93cca35897 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -1,6 +1,9 @@ +import os +import sys from typing import List from unittest import mock +import pytest import sentry_sdk from sentry_sdk import get_client @@ -512,3 +515,59 @@ def before_send_metric(metric, _): ) get_client().flush() + + +@pytest.mark.skipif( + sys.platform == "win32" + or not hasattr(os, "fork") + or not hasattr(os, "register_at_fork"), + reason="requires POSIX fork and os.register_at_fork (Python 3.7+)", +) +def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init): + """Regression test for the MetricsBatcher fork-deadlock fix. + + If os.fork() runs while another thread holds MetricsBatcher._lock, + the child inherits the lock locked. The holding thread does not + exist in the child, so the lock can never be released and + _ensure_thread deadlocks forever. The after-fork hook must replace + the lock with a fresh one in the child and reset + _flusher / _flusher_pid / _buffer / _active / _flush_event. + """ + sentry_init() + batcher = sentry_sdk.get_client().metrics_batcher + assert batcher is not None + + original_lock = batcher._lock + original_lock.acquire() + + batcher._buffer.append(object()) + batcher._active.flag = True + batcher._flush_event.set() + batcher._running = False + + pid = os.fork() + if pid == 0: + replaced = batcher._lock is not original_lock + unheld = batcher._lock.acquire(blocking=False) + + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None + buffer_reset = len(batcher._buffer) == 0 + active_reset = not getattr(batcher._active, "flag", False) + event_reset = not batcher._flush_event.is_set() + running_reset = batcher._running is True + + os._exit( + 0 + if replaced + and unheld + and flusher_reset + and buffer_reset + and active_reset + and event_reset + and running_reset + else 1 + ) + + original_lock.release() + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0 diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 8859aa39c3..94bc2346ba 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1,4 +1,5 @@ import asyncio +import os import re import sys import time @@ -1554,3 +1555,67 @@ def test_transport_format(sentry_init, capture_envelopes): } ] } + + +@pytest.mark.skipif( + sys.platform == "win32" + or not hasattr(os, "fork") + or not hasattr(os, "register_at_fork"), + reason="requires POSIX fork and os.register_at_fork (Python 3.7+)", +) +def test_span_batcher_lock_reset_in_child_after_fork(sentry_init): + """Regression test for the SpanBatcher fork-deadlock fix. + + If os.fork() runs while another thread holds SpanBatcher._lock, the + child inherits the lock locked. The holding thread does not exist in + the child, so the lock can never be released and _ensure_thread + deadlocks forever. The after-fork hook must replace the lock with a + fresh one in the child and reset + _flusher / _flusher_pid / _span_buffer / _running_size / _active / + _flush_event. + """ + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + batcher = sentry_sdk.get_client().span_batcher + assert batcher is not None + + original_lock = batcher._lock + original_lock.acquire() + + batcher._span_buffer["test-trace-id"].append(object()) + batcher._running_size["test-trace-id"] = 42 + batcher._active.flag = True + batcher._flush_event.set() + batcher._running = False + + pid = os.fork() + if pid == 0: + replaced = batcher._lock is not original_lock + unheld = batcher._lock.acquire(blocking=False) + + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None + span_buffer_reset = len(batcher._span_buffer) == 0 + running_size_reset = len(batcher._running_size) == 0 + + active_reset = not getattr(batcher._active, "flag", False) + event_reset = not batcher._flush_event.is_set() + running_reset = batcher._running is True + + os._exit( + 0 + if replaced + and unheld + and flusher_reset + and span_buffer_reset + and running_size_reset + and active_reset + and event_reset + and running_reset + else 1 + ) + + original_lock.release() + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0