Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sentry_sdk/_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
from datetime import datetime, timezone
from typing import TYPE_CHECKING, TypeVar, Generic
import weakref

from sentry_sdk.utils import format_timestamp
from sentry_sdk.envelope import Envelope, Item, PayloadRef
Expand Down Expand Up @@ -38,6 +39,26 @@ def __init__(
self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
if hasattr(os, "register_at_fork"):
weak_reset = weakref.WeakMethod(self._reset_thread_state)

def _reset_in_child() -> None:
method = weak_reset()
if method is not None:
method()

os.register_at_fork(after_in_child=_reset_in_child)

def _reset_thread_state(self) -> None:
self._buffer = []
self._running = True
self._lock = threading.Lock()
self._active = threading.local()
self._flush_event = threading.Event()
self._flusher = None
self._flusher_pid = None
Comment thread
ericapisani marked this conversation as resolved.
Comment thread
sentry[bot] marked this conversation as resolved.

def _ensure_thread(self) -> bool:
"""For forking processes we might need to restart this thread.
This ensures that our process actually has that thread running.
Expand Down
28 changes: 27 additions & 1 deletion sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import threading
from collections import defaultdict
from datetime import datetime, timezone
import os
import threading
from typing import TYPE_CHECKING
import weakref

from sentry_sdk._batcher import Batcher
from sentry_sdk.envelope import Envelope, Item, PayloadRef
Expand Down Expand Up @@ -50,6 +52,30 @@ def __init__(
self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
if hasattr(os, "register_at_fork"):
weak_reset = weakref.WeakMethod(self._reset_thread_state)

def _reset_in_child() -> None:
method = weak_reset()
if method is not None:
method()

os.register_at_fork(after_in_child=_reset_in_child)
Comment thread
alexander-alderman-webb marked this conversation as resolved.

def _reset_thread_state(self) -> None:
self._span_buffer = defaultdict(list)
self._running_size = defaultdict(lambda: 0)
self._running = True

self._lock = threading.Lock()
self._active = threading.local()

self._flush_event = threading.Event()

self._flusher = None
self._flusher_pid = None

def add(self, span: "StreamedSpan") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
Expand Down
58 changes: 58 additions & 0 deletions tests/test_logs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import os
import sys
import time
from typing import List, Any, Mapping, Union
Expand Down Expand Up @@ -819,3 +820,60 @@ def add_to_envelope_with_reentrant_add(envelope):
assert reentrant_add_called
# If the re-entrancy guard didn't work, this test would hang and it'd
# eventually be timed out by pytest-timeout


@pytest.mark.skipif(
sys.platform == "win32"
or not hasattr(os, "fork")
or not hasattr(os, "register_at_fork"),
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
)
def test_log_batcher_lock_reset_in_child_after_fork(sentry_init):
"""Regression test for the LogBatcher fork-deadlock fix.

If os.fork() runs while another thread holds LogBatcher._lock, the
child inherits the lock locked. The holding thread does not exist in
the child, so the lock can never be released and _ensure_thread
deadlocks forever. The after-fork hook must replace the lock with a
fresh one in the child and reset
_flusher / _flusher_pid / _buffer / _active / _flush_event.
"""
sentry_init(enable_logs=True)
batcher = sentry_sdk.get_client().log_batcher
assert batcher is not None

original_lock = batcher._lock
original_lock.acquire()

batcher._buffer.append(object())
batcher._active.flag = True
batcher._flush_event.set()
batcher._running = False

pid = os.fork()
if pid == 0:
replaced = batcher._lock is not original_lock
unheld = batcher._lock.acquire(blocking=False)

flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
buffer_reset = len(batcher._buffer) == 0
active_reset = not getattr(batcher._active, "flag", False)

event_reset = not batcher._flush_event.is_set()
running_reset = batcher._running is True

os._exit(
0
if replaced
and unheld
and flusher_reset
and buffer_reset
and active_reset
and event_reset
and running_reset
else 1
)

original_lock.release()
_, status = os.waitpid(pid, 0)
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
59 changes: 59 additions & 0 deletions tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import sys
from typing import List
from unittest import mock

import pytest

import sentry_sdk
from sentry_sdk import get_client
Expand Down Expand Up @@ -512,3 +515,59 @@ def before_send_metric(metric, _):
)

get_client().flush()


@pytest.mark.skipif(
sys.platform == "win32"
or not hasattr(os, "fork")
or not hasattr(os, "register_at_fork"),
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
)
def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init):
"""Regression test for the MetricsBatcher fork-deadlock fix.

If os.fork() runs while another thread holds MetricsBatcher._lock,
the child inherits the lock locked. The holding thread does not
exist in the child, so the lock can never be released and
_ensure_thread deadlocks forever. The after-fork hook must replace
the lock with a fresh one in the child and reset
_flusher / _flusher_pid / _buffer / _active / _flush_event.
"""
sentry_init()
batcher = sentry_sdk.get_client().metrics_batcher
assert batcher is not None

original_lock = batcher._lock
original_lock.acquire()

batcher._buffer.append(object())
batcher._active.flag = True
batcher._flush_event.set()
batcher._running = False

pid = os.fork()
if pid == 0:
replaced = batcher._lock is not original_lock
unheld = batcher._lock.acquire(blocking=False)

flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
buffer_reset = len(batcher._buffer) == 0
active_reset = not getattr(batcher._active, "flag", False)
event_reset = not batcher._flush_event.is_set()
running_reset = batcher._running is True

os._exit(
0
if replaced
and unheld
and flusher_reset
and buffer_reset
and active_reset
and event_reset
and running_reset
else 1
)

original_lock.release()
_, status = os.waitpid(pid, 0)
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
65 changes: 65 additions & 0 deletions tests/tracing/test_span_streaming.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
import re
import sys
import time
Expand Down Expand Up @@ -1554,3 +1555,67 @@ def test_transport_format(sentry_init, capture_envelopes):
}
]
}


@pytest.mark.skipif(
sys.platform == "win32"
or not hasattr(os, "fork")
or not hasattr(os, "register_at_fork"),
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
)
def test_span_batcher_lock_reset_in_child_after_fork(sentry_init):
"""Regression test for the SpanBatcher fork-deadlock fix.

If os.fork() runs while another thread holds SpanBatcher._lock, the
child inherits the lock locked. The holding thread does not exist in
the child, so the lock can never be released and _ensure_thread
deadlocks forever. The after-fork hook must replace the lock with a
fresh one in the child and reset
_flusher / _flusher_pid / _span_buffer / _running_size / _active /
_flush_event.
"""
sentry_init(
traces_sample_rate=1.0,
_experiments={"trace_lifecycle": "stream"},
)
batcher = sentry_sdk.get_client().span_batcher
assert batcher is not None

original_lock = batcher._lock
original_lock.acquire()

batcher._span_buffer["test-trace-id"].append(object())
batcher._running_size["test-trace-id"] = 42
batcher._active.flag = True
batcher._flush_event.set()
batcher._running = False

pid = os.fork()
if pid == 0:
replaced = batcher._lock is not original_lock
unheld = batcher._lock.acquire(blocking=False)

flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
span_buffer_reset = len(batcher._span_buffer) == 0
running_size_reset = len(batcher._running_size) == 0

active_reset = not getattr(batcher._active, "flag", False)
event_reset = not batcher._flush_event.is_set()
running_reset = batcher._running is True

os._exit(
0
if replaced
and unheld
and flusher_reset
and span_buffer_reset
and running_size_reset
and active_reset
and event_reset
and running_reset
else 1
)

original_lock.release()
_, status = os.waitpid(pid, 0)
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
Loading