Skip to content

Commit

Permalink
tests(profiling): Add tests for thread schedulers (#1683)
Browse files Browse the repository at this point in the history
* tests(profiling): Add tests for thread schedulers
  • Loading branch information
Zylphrex committed Oct 20, 2022
1 parent d2547ea commit 1c651c6
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 47 deletions.
93 changes: 50 additions & 43 deletions sentry_sdk/profiler.py
Expand Up @@ -111,17 +111,16 @@ def setup_profiler(options):
# To buffer samples for `buffer_secs` at `frequency` Hz, we need
# a capcity of `buffer_secs * frequency`.
_sample_buffer = SampleBuffer(capacity=buffer_secs * frequency)
_sampler = _init_sample_stack_fn(_sample_buffer)

profiler_mode = options["_experiments"].get("profiler_mode", SigprofScheduler.mode)
if profiler_mode == SigprofScheduler.mode:
_scheduler = SigprofScheduler(sampler=_sampler, frequency=frequency)
_scheduler = SigprofScheduler(sample_buffer=_sample_buffer, frequency=frequency)
elif profiler_mode == SigalrmScheduler.mode:
_scheduler = SigalrmScheduler(sampler=_sampler, frequency=frequency)
_scheduler = SigalrmScheduler(sample_buffer=_sample_buffer, frequency=frequency)
elif profiler_mode == SleepScheduler.mode:
_scheduler = SleepScheduler(sampler=_sampler, frequency=frequency)
_scheduler = SleepScheduler(sample_buffer=_sample_buffer, frequency=frequency)
elif profiler_mode == EventScheduler.mode:
_scheduler = EventScheduler(sampler=_sampler, frequency=frequency)
_scheduler = EventScheduler(sample_buffer=_sample_buffer, frequency=frequency)
else:
raise ValueError("Unknown profiler mode: {}".format(profiler_mode))
_scheduler.setup()
Expand All @@ -142,29 +141,6 @@ def teardown_profiler():
_scheduler = None


def _init_sample_stack_fn(buffer):
# type: (SampleBuffer) -> Callable[..., None]

def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""

buffer.write(
(
nanosecond_time(),
[
(tid, extract_stack(frame))
for tid, frame in sys._current_frames().items()
],
)
)

return _sample_stack


# We want to impose a stack depth limit so that samples aren't too large.
MAX_STACK_DEPTH = 128

Expand Down Expand Up @@ -242,8 +218,14 @@ def get_frame_name(frame):


class Profile(object):
def __init__(self, transaction, hub=None):
# type: (sentry_sdk.tracing.Transaction, Optional[sentry_sdk.Hub]) -> None
def __init__(
self,
scheduler, # type: Scheduler
transaction, # type: sentry_sdk.tracing.Transaction
hub=None, # type: Optional[sentry_sdk.Hub]
):
# type: (...) -> None
self.scheduler = scheduler
self.transaction = transaction
self.hub = hub
self._start_ns = None # type: Optional[int]
Expand All @@ -253,27 +235,26 @@ def __init__(self, transaction, hub=None):

def __enter__(self):
# type: () -> None
assert _scheduler is not None
self._start_ns = nanosecond_time()
_scheduler.start_profiling()
self.scheduler.start_profiling()

def __exit__(self, ty, value, tb):
# type: (Optional[Any], Optional[Any], Optional[Any]) -> None
assert _scheduler is not None
_scheduler.stop_profiling()
self.scheduler.stop_profiling()
self._stop_ns = nanosecond_time()

def to_json(self, event_opt):
# type: (Any) -> Dict[str, Any]
assert _sample_buffer is not None
assert self._start_ns is not None
assert self._stop_ns is not None

return {
"environment": event_opt.get("environment"),
"event_id": uuid.uuid4().hex,
"platform": "python",
"profile": _sample_buffer.slice_profile(self._start_ns, self._stop_ns),
"profile": self.scheduler.sample_buffer.slice_profile(
self._start_ns, self._stop_ns
),
"release": event_opt.get("release", ""),
"timestamp": event_opt["timestamp"],
"version": "1",
Expand Down Expand Up @@ -406,13 +387,36 @@ def slice_profile(self, start_ns, stop_ns):
"thread_metadata": thread_metadata,
}

def make_sampler(self):
# type: () -> Callable[..., None]

def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""

self.write(
(
nanosecond_time(),
[
(tid, extract_stack(frame))
for tid, frame in sys._current_frames().items()
],
)
)

return _sample_stack


class Scheduler(object):
mode = "unknown"

def __init__(self, sampler, frequency):
# type: (Callable[..., None], int) -> None
self.sampler = sampler
def __init__(self, sample_buffer, frequency):
# type: (SampleBuffer, int) -> None
self.sample_buffer = sample_buffer
self.sampler = sample_buffer.make_sampler()
self._lock = threading.Lock()
self._count = 0
self._interval = 1.0 / frequency
Expand Down Expand Up @@ -447,9 +451,11 @@ class ThreadScheduler(Scheduler):
mode = "thread"
name = None # type: Optional[str]

def __init__(self, sampler, frequency):
# type: (Callable[..., None], int) -> None
super(ThreadScheduler, self).__init__(sampler=sampler, frequency=frequency)
def __init__(self, sample_buffer, frequency):
# type: (SampleBuffer, int) -> None
super(ThreadScheduler, self).__init__(
sample_buffer=sample_buffer, frequency=frequency
)
self.stop_events = Queue()

def setup(self):
Expand Down Expand Up @@ -716,7 +722,8 @@ def start_profiling(transaction, hub=None):

# if profiling was not enabled, this should be a noop
if _should_profile(transaction, hub):
with Profile(transaction, hub=hub):
assert _scheduler is not None
with Profile(_scheduler, transaction, hub=hub):
yield
else:
yield
80 changes: 76 additions & 4 deletions tests/test_profiler.py
Expand Up @@ -7,6 +7,7 @@
import pytest

from sentry_sdk.profiler import (
EventScheduler,
RawFrameData,
SampleBuffer,
SleepScheduler,
Expand Down Expand Up @@ -187,12 +188,83 @@ def get_scheduler_threads(scheduler):
return [thread for thread in threading.enumerate() if thread.name == scheduler.name]


class DummySampleBuffer(SampleBuffer):
def __init__(self, capacity, sample_data=None):
super(DummySampleBuffer, self).__init__(capacity)
self.sample_data = [] if sample_data is None else sample_data

def make_sampler(self):
def _sample_stack(*args, **kwargs):
print("writing", self.sample_data[0])
self.write(self.sample_data.pop(0))

return _sample_stack


@minimum_python_33
@pytest.mark.parametrize(
("scheduler_class",),
[
pytest.param(SleepScheduler, id="sleep scheduler"),
pytest.param(EventScheduler, id="event scheduler"),
],
)
def test_thread_scheduler_takes_first_samples(scheduler_class):
sample_buffer = DummySampleBuffer(
capacity=1, sample_data=[(0, [(0, [RawFrameData("name", "file", 1)])])]
)
scheduler = scheduler_class(sample_buffer=sample_buffer, frequency=1000)
assert scheduler.start_profiling()
# immediately stopping means by the time the sampling thread will exit
# before it samples at the end of the first iteration
assert scheduler.stop_profiling()
time.sleep(0.002)
assert len(get_scheduler_threads(scheduler)) == 0

# there should be exactly 1 sample because we always sample once immediately
profile = sample_buffer.slice_profile(0, 1)
assert len(profile["samples"]) == 1


@minimum_python_33
def test_sleep_scheduler_single_background_thread():
def sampler():
pass
@pytest.mark.parametrize(
("scheduler_class",),
[
pytest.param(SleepScheduler, id="sleep scheduler"),
pytest.param(EventScheduler, id="event scheduler"),
],
)
def test_thread_scheduler_takes_more_samples(scheduler_class):
sample_buffer = DummySampleBuffer(
capacity=10,
sample_data=[(i, [(0, [RawFrameData("name", "file", 1)])]) for i in range(3)],
)
scheduler = scheduler_class(sample_buffer=sample_buffer, frequency=1000)
assert scheduler.start_profiling()
# waiting a little before stopping the scheduler means the profiling
# thread will get a chance to take a few samples before exiting
time.sleep(0.002)
assert scheduler.stop_profiling()
time.sleep(0.002)
assert len(get_scheduler_threads(scheduler)) == 0

# there should be more than 1 sample because we always sample once immediately
# plus any samples take afterwards
profile = sample_buffer.slice_profile(0, 3)
assert len(profile["samples"]) > 1

scheduler = SleepScheduler(sampler=sampler, frequency=1000)

@minimum_python_33
@pytest.mark.parametrize(
("scheduler_class",),
[
pytest.param(SleepScheduler, id="sleep scheduler"),
pytest.param(EventScheduler, id="event scheduler"),
],
)
def test_thread_scheduler_single_background_thread(scheduler_class):
sample_buffer = SampleBuffer(1)
scheduler = scheduler_class(sample_buffer=sample_buffer, frequency=1000)

assert scheduler.start_profiling()

Expand Down

0 comments on commit 1c651c6

Please sign in to comment.