From 4bf5dac8e7b0981d436cadfd1c75b5242d77bbbc Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Tue, 28 Apr 2026 11:46:55 -0400 Subject: [PATCH 1/6] fix(batcher): Reset lock and flusher in child after fork Uses the same fix introduced in #6148 to prevent deadlocks in the monitor when os.fork() is called while another thread holds the monitor's lock. If os.fork() runs while another thread holds Batcher._lock, the child inherits the lock locked but the holding thread does not exist in the child, so the lock can never be released and _ensure_thread deadlocks forever. Register an after-fork hook via os.register_at_fork that replaces _lock with a fresh lock and resets _flusher / _flusher_pid in the child. Use a weakref to the batcher so the hook does not keep the instance alive. Move shared init out of SpanBatcher into the base Batcher.__init__ so all batchers (log, metrics, span) get the fork-safety hook from a single place. Fixes PY-2391 Fixes #6149 --- sentry_sdk/_batcher.py | 19 +++++++++++++ sentry_sdk/_span_batcher.py | 14 ++-------- tests/test_logs.py | 38 ++++++++++++++++++++++++++ tests/test_metrics.py | 41 ++++++++++++++++++++++++++++ tests/tracing/test_span_streaming.py | 41 ++++++++++++++++++++++++++++ 5 files changed, 141 insertions(+), 12 deletions(-) diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 4ba8046814..5856d32b2f 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -3,6 +3,7 @@ import threading from datetime import datetime, timezone from typing import TYPE_CHECKING, TypeVar, Generic +import weakref from sentry_sdk.utils import format_timestamp from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -38,6 +39,24 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None + self_ref = weakref.ref(self) + + def _reset_thread_state() -> None: + batcher = self_ref() + + if batcher is not None: + batcher._flusher = None + batcher._lock = threading.Lock() + batcher._flusher_pid = None + + # Same as https://github.com/getsentry/sentry-python/issues/6148. + # If os.fork() runs while another thread holds self._lock, + # the child inherits the lock locked but the holding thread does + # not exist in the child, so the lock can never be released and + # _ensure_thread deadlocks forever. + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=_reset_thread_state) + def _ensure_thread(self) -> bool: """For forking processes we might need to restart this thread. This ensures that our process actually has that thread running. diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 78204f1a3a..dd4dfc7fd9 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,4 +1,3 @@ -import threading from collections import defaultdict from datetime import datetime, timezone from typing import TYPE_CHECKING @@ -8,7 +7,7 @@ from sentry_sdk.utils import format_timestamp, serialize_attribute if TYPE_CHECKING: - from typing import Any, Callable, Optional + from typing import Any, Callable from sentry_sdk.traces import StreamedSpan @@ -32,6 +31,7 @@ def __init__( capture_func: "Callable[[Envelope], None]", record_lost_func: "Callable[..., None]", ) -> None: + super().__init__(capture_func, record_lost_func) # Spans from different traces cannot be emitted in the same envelope # since the envelope contains a shared trace header. That's why we bucket # by trace_id, so that we can then send the buckets each in its own @@ -39,16 +39,6 @@ def __init__( # trace_id -> span buffer self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) self._running_size: dict[str, int] = defaultdict(lambda: 0) - self._capture_func = capture_func - self._record_lost_func = record_lost_func - self._running = True - self._lock = threading.Lock() - self._active: "threading.local" = threading.local() - - self._flush_event: "threading.Event" = threading.Event() - - self._flusher: "Optional[threading.Thread]" = None - self._flusher_pid: "Optional[int]" = None def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. diff --git a/tests/test_logs.py b/tests/test_logs.py index 86861cfe90..16af712ad2 100644 --- a/tests/test_logs.py +++ b/tests/test_logs.py @@ -1,5 +1,6 @@ import json import logging +import os import sys import time from typing import List, Any, Mapping, Union @@ -819,3 +820,40 @@ def add_to_envelope_with_reentrant_add(envelope): assert reentrant_add_called # If the re-entrancy guard didn't work, this test would hang and it'd # eventually be timed out by pytest-timeout + + +@pytest.mark.skipif( + sys.platform == "win32" + or not hasattr(os, "fork") + or not hasattr(os, "register_at_fork"), + reason="requires POSIX fork and os.register_at_fork (Python 3.7+)", +) +def test_log_batcher_lock_reset_in_child_after_fork(sentry_init): + """Regression test for the LogBatcher fork-deadlock fix. + + If os.fork() runs while another thread holds LogBatcher._lock, the + child inherits the lock locked. The holding thread does not exist in + the child, so the lock can never be released and _ensure_thread + deadlocks forever. The after-fork hook must replace the lock with a + fresh one in the child and reset _flusher / _flusher_pid. + """ + sentry_init(enable_logs=True) + batcher = sentry_sdk.get_client().log_batcher + assert batcher is not None + + original_lock = batcher._lock + original_lock.acquire() + pid = os.fork() + if pid == 0: + # Child: was the lock object replaced and is the new one not + # held? Without the fix, _lock is `original_lock` inherited + # locked, so `replaced` is False. blocking=False guarantees the + # child can't hang on a regression. + replaced = batcher._lock is not original_lock + unheld = batcher._lock.acquire(blocking=False) + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None + os._exit(0 if replaced and unheld and flusher_reset else 1) + + original_lock.release() + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0 diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 3ad3f6042d..df535fb9b6 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -1,6 +1,9 @@ +import os +import sys from typing import List from unittest import mock +import pytest import sentry_sdk from sentry_sdk import get_client @@ -512,3 +515,41 @@ def before_send_metric(metric, _): ) get_client().flush() + + +@pytest.mark.skipif( + sys.platform == "win32" + or not hasattr(os, "fork") + or not hasattr(os, "register_at_fork"), + reason="requires POSIX fork and os.register_at_fork (Python 3.7+)", +) +def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init): + """Regression test for the MetricsBatcher fork-deadlock fix. + + If os.fork() runs while another thread holds MetricsBatcher._lock, + the child inherits the lock locked. The holding thread does not + exist in the child, so the lock can never be released and + _ensure_thread deadlocks forever. The after-fork hook must replace + the lock with a fresh one in the child and reset + _flusher / _flusher_pid. + """ + sentry_init() + batcher = sentry_sdk.get_client().metrics_batcher + assert batcher is not None + + original_lock = batcher._lock + original_lock.acquire() + pid = os.fork() + if pid == 0: + # Child: was the lock object replaced and is the new one not + # held? Without the fix, _lock is `original_lock` inherited + # locked, so `replaced` is False. blocking=False guarantees the + # child can't hang on a regression. + replaced = batcher._lock is not original_lock + unheld = batcher._lock.acquire(blocking=False) + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None + os._exit(0 if replaced and unheld and flusher_reset else 1) + + original_lock.release() + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0 diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 849c0a5fb5..bfddeb4c79 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1,4 +1,5 @@ import asyncio +import os import re import sys import time @@ -1593,3 +1594,43 @@ def test_transport_format(sentry_init, capture_envelopes): assert "value" in value assert "type" in value assert value["type"] in ("string", "boolean", "integer", "double", "array") + + +@pytest.mark.skipif( + sys.platform == "win32" + or not hasattr(os, "fork") + or not hasattr(os, "register_at_fork"), + reason="requires POSIX fork and os.register_at_fork (Python 3.7+)", +) +def test_span_batcher_lock_reset_in_child_after_fork(sentry_init): + """Regression test for the SpanBatcher fork-deadlock fix. + + If os.fork() runs while another thread holds SpanBatcher._lock, the + child inherits the lock locked. The holding thread does not exist in + the child, so the lock can never be released and _ensure_thread + deadlocks forever. The after-fork hook must replace the lock with a + fresh one in the child and reset _flusher / _flusher_pid. + """ + sentry_init( + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + batcher = sentry_sdk.get_client().span_batcher + assert batcher is not None + + original_lock = batcher._lock + original_lock.acquire() + pid = os.fork() + if pid == 0: + # Child: was the lock object replaced and is the new one not + # held? Without the fix, _lock is `original_lock` inherited + # locked, so `replaced` is False. blocking=False guarantees the + # child can't hang on a regression. + replaced = batcher._lock is not original_lock + unheld = batcher._lock.acquire(blocking=False) + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None + os._exit(0 if replaced and unheld and flusher_reset else 1) + + original_lock.release() + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0 From b5073b221833a96ddbf9034698ef9c4e86bd2b40 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Wed, 29 Apr 2026 09:18:20 -0400 Subject: [PATCH 2/6] Revert the changes to the span batcher - this needs to be a separate class from batcher but that will not be done in this set of changes. Update batcher deadlock fix to use weakmethod, to be consistent with the approach taken in monitor.py --- sentry_sdk/_batcher.py | 27 +++++++++++++-------------- sentry_sdk/_span_batcher.py | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 5856d32b2f..cff85a8b3a 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -38,24 +38,23 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None + self._reset_thread_state() - self_ref = weakref.ref(self) + # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 + if hasattr(os, "register_at_fork"): + weak_reset = weakref.WeakMethod(self._reset_thread_state) - def _reset_thread_state() -> None: - batcher = self_ref() + def _reset_in_child() -> None: + method = weak_reset() + if method is not None: + method() - if batcher is not None: - batcher._flusher = None - batcher._lock = threading.Lock() - batcher._flusher_pid = None + os.register_at_fork(after_in_child=_reset_in_child) - # Same as https://github.com/getsentry/sentry-python/issues/6148. - # If os.fork() runs while another thread holds self._lock, - # the child inherits the lock locked but the holding thread does - # not exist in the child, so the lock can never be released and - # _ensure_thread deadlocks forever. - if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=_reset_thread_state) + def _reset_thread_state(self) -> None: + self._flusher = None + self._lock = threading.Lock() + self._flusher_pid = None def _ensure_thread(self) -> bool: """For forking processes we might need to restart this thread. diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index dd4dfc7fd9..e7050a865b 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -1,6 +1,9 @@ from collections import defaultdict from datetime import datetime, timezone -from typing import TYPE_CHECKING +import os +import threading +from typing import TYPE_CHECKING, Optional +import weakref from sentry_sdk._batcher import Batcher from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -31,7 +34,6 @@ def __init__( capture_func: "Callable[[Envelope], None]", record_lost_func: "Callable[..., None]", ) -> None: - super().__init__(capture_func, record_lost_func) # Spans from different traces cannot be emitted in the same envelope # since the envelope contains a shared trace header. That's why we bucket # by trace_id, so that we can then send the buckets each in its own @@ -39,6 +41,34 @@ def __init__( # trace_id -> span buffer self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) self._running_size: dict[str, int] = defaultdict(lambda: 0) + self._capture_func = capture_func + self._record_lost_func = record_lost_func + self._running = True + self._lock = threading.Lock() + self._active: "threading.local" = threading.local() + + self._flush_event: "threading.Event" = threading.Event() + + self._flusher: "Optional[threading.Thread]" = None + self._flusher_pid: "Optional[int]" = None + + self._reset_thread_state() + + # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 + if hasattr(os, "register_at_fork"): + weak_reset = weakref.WeakMethod(self._reset_thread_state) + + def _reset_in_child() -> None: + method = weak_reset() + if method is not None: + method() + + os.register_at_fork(after_in_child=_reset_in_child) + + def _reset_thread_state(self) -> None: + self._flusher = None + self._lock = threading.Lock() + self._flusher_pid = None def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. From 5c66f82df5ff99d629db2a4423513027bed67930 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Wed, 29 Apr 2026 09:24:01 -0400 Subject: [PATCH 3/6] Small cleanup --- sentry_sdk/_span_batcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index e7050a865b..623f13cedd 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone import os import threading -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING import weakref from sentry_sdk._batcher import Batcher @@ -10,7 +10,7 @@ from sentry_sdk.utils import format_timestamp, serialize_attribute if TYPE_CHECKING: - from typing import Any, Callable + from typing import Any, Callable, Optional from sentry_sdk.traces import StreamedSpan From ba1ef0f9ad0b4b423439ba7d780f2558f95a2015 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Wed, 29 Apr 2026 09:51:17 -0400 Subject: [PATCH 4/6] cleanups --- sentry_sdk/_batcher.py | 1 - sentry_sdk/_span_batcher.py | 7 ------- 2 files changed, 8 deletions(-) diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index cff85a8b3a..41625a1919 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -38,7 +38,6 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None - self._reset_thread_state() # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 if hasattr(os, "register_at_fork"): diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 623f13cedd..639efe9700 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -52,8 +52,6 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None - self._reset_thread_state() - # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 if hasattr(os, "register_at_fork"): weak_reset = weakref.WeakMethod(self._reset_thread_state) @@ -65,11 +63,6 @@ def _reset_in_child() -> None: os.register_at_fork(after_in_child=_reset_in_child) - def _reset_thread_state(self) -> None: - self._flusher = None - self._lock = threading.Lock() - self._flusher_pid = None - def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. # This prevents deadlocks when code running inside the batcher (e.g. From 17b3e0a6840d72f20fd32eb6a78da86fa7e56efd Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Wed, 29 Apr 2026 13:19:17 -0400 Subject: [PATCH 5/6] Address code review comments --- sentry_sdk/_batcher.py | 5 ++++- sentry_sdk/_span_batcher.py | 12 ++++++++++ tests/test_logs.py | 20 +++++++++++++++-- tests/test_metrics.py | 27 ++++++++++++++++++----- tests/tracing/test_span_streaming.py | 33 +++++++++++++++++++++++----- 5 files changed, 82 insertions(+), 15 deletions(-) diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 41625a1919..3e0b4af95d 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -51,8 +51,11 @@ def _reset_in_child() -> None: os.register_at_fork(after_in_child=_reset_in_child) def _reset_thread_state(self) -> None: - self._flusher = None + self._buffer = [] self._lock = threading.Lock() + self._active = threading.local() + self._flush_event = threading.Event() + self._flusher = None self._flusher_pid = None def _ensure_thread(self) -> bool: diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 639efe9700..ef9299de6b 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -63,6 +63,18 @@ def _reset_in_child() -> None: os.register_at_fork(after_in_child=_reset_in_child) + def _reset_thread_state(self) -> None: + self._span_buffer = defaultdict(list) + self._running_size = defaultdict(lambda: 0) + + self._lock = threading.Lock() + self._active = threading.local() + + self._flush_event = threading.Event() + + self._flusher = None + self._flusher_pid = None + def add(self, span: "StreamedSpan") -> None: # Bail out if the current thread is already executing batcher code. # This prevents deadlocks when code running inside the batcher (e.g. diff --git a/tests/test_logs.py b/tests/test_logs.py index 16af712ad2..bfc6fd62ac 100644 --- a/tests/test_logs.py +++ b/tests/test_logs.py @@ -835,7 +835,8 @@ def test_log_batcher_lock_reset_in_child_after_fork(sentry_init): child inherits the lock locked. The holding thread does not exist in the child, so the lock can never be released and _ensure_thread deadlocks forever. The after-fork hook must replace the lock with a - fresh one in the child and reset _flusher / _flusher_pid. + fresh one in the child and reset + _flusher / _flusher_pid / _buffer / _active / _flush_event. """ sentry_init(enable_logs=True) batcher = sentry_sdk.get_client().log_batcher @@ -843,6 +844,9 @@ def test_log_batcher_lock_reset_in_child_after_fork(sentry_init): original_lock = batcher._lock original_lock.acquire() + batcher._buffer.append(object()) + batcher._active.flag = True + batcher._flush_event.set() pid = os.fork() if pid == 0: # Child: was the lock object replaced and is the new one not @@ -852,7 +856,19 @@ def test_log_batcher_lock_reset_in_child_after_fork(sentry_init): replaced = batcher._lock is not original_lock unheld = batcher._lock.acquire(blocking=False) flusher_reset = batcher._flusher is None and batcher._flusher_pid is None - os._exit(0 if replaced and unheld and flusher_reset else 1) + buffer_reset = len(batcher._buffer) == 0 + active_reset = not getattr(batcher._active, "flag", False) + event_reset = not batcher._flush_event.is_set() + os._exit( + 0 + if replaced + and unheld + and flusher_reset + and buffer_reset + and active_reset + and event_reset + else 1 + ) original_lock.release() _, status = os.waitpid(pid, 0) diff --git a/tests/test_metrics.py b/tests/test_metrics.py index df535fb9b6..2e127a2e34 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -531,7 +531,7 @@ def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init): exist in the child, so the lock can never be released and _ensure_thread deadlocks forever. The after-fork hook must replace the lock with a fresh one in the child and reset - _flusher / _flusher_pid. + _flusher / _flusher_pid / _buffer / _active / _flush_event. """ sentry_init() batcher = sentry_sdk.get_client().metrics_batcher @@ -539,16 +539,31 @@ def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init): original_lock = batcher._lock original_lock.acquire() + + batcher._buffer.append(object()) + batcher._active.flag = True + batcher._flush_event.set() + pid = os.fork() if pid == 0: - # Child: was the lock object replaced and is the new one not - # held? Without the fix, _lock is `original_lock` inherited - # locked, so `replaced` is False. blocking=False guarantees the - # child can't hang on a regression. replaced = batcher._lock is not original_lock unheld = batcher._lock.acquire(blocking=False) + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None - os._exit(0 if replaced and unheld and flusher_reset else 1) + buffer_reset = len(batcher._buffer) == 0 + active_reset = not getattr(batcher._active, "flag", False) + event_reset = not batcher._flush_event.is_set() + + os._exit( + 0 + if replaced + and unheld + and flusher_reset + and buffer_reset + and active_reset + and event_reset + else 1 + ) original_lock.release() _, status = os.waitpid(pid, 0) diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index 0aa4960cad..efac19bca6 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1570,7 +1570,9 @@ def test_span_batcher_lock_reset_in_child_after_fork(sentry_init): child inherits the lock locked. The holding thread does not exist in the child, so the lock can never be released and _ensure_thread deadlocks forever. The after-fork hook must replace the lock with a - fresh one in the child and reset _flusher / _flusher_pid. + fresh one in the child and reset + _flusher / _flusher_pid / _span_buffer / _running_size / _active / + _flush_event. """ sentry_init( traces_sample_rate=1.0, @@ -1581,16 +1583,35 @@ def test_span_batcher_lock_reset_in_child_after_fork(sentry_init): original_lock = batcher._lock original_lock.acquire() + + batcher._span_buffer["test-trace-id"].append(object()) + batcher._running_size["test-trace-id"] = 42 + batcher._active.flag = True + batcher._flush_event.set() + pid = os.fork() if pid == 0: - # Child: was the lock object replaced and is the new one not - # held? Without the fix, _lock is `original_lock` inherited - # locked, so `replaced` is False. blocking=False guarantees the - # child can't hang on a regression. replaced = batcher._lock is not original_lock unheld = batcher._lock.acquire(blocking=False) + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None - os._exit(0 if replaced and unheld and flusher_reset else 1) + span_buffer_reset = len(batcher._span_buffer) == 0 + running_size_reset = len(batcher._running_size) == 0 + + active_reset = not getattr(batcher._active, "flag", False) + event_reset = not batcher._flush_event.is_set() + + os._exit( + 0 + if replaced + and unheld + and flusher_reset + and span_buffer_reset + and running_size_reset + and active_reset + and event_reset + else 1 + ) original_lock.release() _, status = os.waitpid(pid, 0) From dc1d5017f4cabfdf99a9f55ddbda024fa8cdf0fb Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Wed, 29 Apr 2026 13:34:15 -0400 Subject: [PATCH 6/6] Add reset of the running flag --- sentry_sdk/_batcher.py | 1 + sentry_sdk/_span_batcher.py | 1 + tests/test_logs.py | 12 ++++++++---- tests/test_metrics.py | 3 +++ tests/tracing/test_span_streaming.py | 3 +++ 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 3e0b4af95d..8d80b6a668 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -52,6 +52,7 @@ def _reset_in_child() -> None: def _reset_thread_state(self) -> None: self._buffer = [] + self._running = True self._lock = threading.Lock() self._active = threading.local() self._flush_event = threading.Event() diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index ef9299de6b..9e1b96f0cf 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -66,6 +66,7 @@ def _reset_in_child() -> None: def _reset_thread_state(self) -> None: self._span_buffer = defaultdict(list) self._running_size = defaultdict(lambda: 0) + self._running = True self._lock = threading.Lock() self._active = threading.local() diff --git a/tests/test_logs.py b/tests/test_logs.py index bfc6fd62ac..cd52ec1cde 100644 --- a/tests/test_logs.py +++ b/tests/test_logs.py @@ -844,21 +844,24 @@ def test_log_batcher_lock_reset_in_child_after_fork(sentry_init): original_lock = batcher._lock original_lock.acquire() + batcher._buffer.append(object()) batcher._active.flag = True batcher._flush_event.set() + batcher._running = False + pid = os.fork() if pid == 0: - # Child: was the lock object replaced and is the new one not - # held? Without the fix, _lock is `original_lock` inherited - # locked, so `replaced` is False. blocking=False guarantees the - # child can't hang on a regression. replaced = batcher._lock is not original_lock unheld = batcher._lock.acquire(blocking=False) + flusher_reset = batcher._flusher is None and batcher._flusher_pid is None buffer_reset = len(batcher._buffer) == 0 active_reset = not getattr(batcher._active, "flag", False) + event_reset = not batcher._flush_event.is_set() + running_reset = batcher._running is True + os._exit( 0 if replaced @@ -867,6 +870,7 @@ def test_log_batcher_lock_reset_in_child_after_fork(sentry_init): and buffer_reset and active_reset and event_reset + and running_reset else 1 ) diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 2e127a2e34..93cca35897 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -543,6 +543,7 @@ def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init): batcher._buffer.append(object()) batcher._active.flag = True batcher._flush_event.set() + batcher._running = False pid = os.fork() if pid == 0: @@ -553,6 +554,7 @@ def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init): buffer_reset = len(batcher._buffer) == 0 active_reset = not getattr(batcher._active, "flag", False) event_reset = not batcher._flush_event.is_set() + running_reset = batcher._running is True os._exit( 0 @@ -562,6 +564,7 @@ def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init): and buffer_reset and active_reset and event_reset + and running_reset else 1 ) diff --git a/tests/tracing/test_span_streaming.py b/tests/tracing/test_span_streaming.py index efac19bca6..94bc2346ba 100644 --- a/tests/tracing/test_span_streaming.py +++ b/tests/tracing/test_span_streaming.py @@ -1588,6 +1588,7 @@ def test_span_batcher_lock_reset_in_child_after_fork(sentry_init): batcher._running_size["test-trace-id"] = 42 batcher._active.flag = True batcher._flush_event.set() + batcher._running = False pid = os.fork() if pid == 0: @@ -1600,6 +1601,7 @@ def test_span_batcher_lock_reset_in_child_after_fork(sentry_init): active_reset = not getattr(batcher._active, "flag", False) event_reset = not batcher._flush_event.is_set() + running_reset = batcher._running is True os._exit( 0 @@ -1610,6 +1612,7 @@ def test_span_batcher_lock_reset_in_child_after_fork(sentry_init): and running_size_reset and active_reset and event_reset + and running_reset else 1 )