Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(profiling): lock Recorder on reset #1560

Merged
merged 4 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 67 additions & 0 deletions ddtrace/profiling/_nogevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- encoding: utf-8 -*-
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol "_nogevent.py" gotta love that we have to bend over backwards to deal with gevent 🙈 🤦

I can't really think of a better name... maybe "compat" or "truth" lol.. 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

truth haha.

I had this idea of module for a few weeks, but I was not able to come with a name.

Finally, it hit me. 😆

"""This files exposes non-gevent Python original functions."""
import threading

from ddtrace.vendor import six
from ddtrace.vendor import attr


try:
import gevent.monkey
except ImportError:

def get_original(module, func):
return getattr(__import__(module), func)

def is_module_patched(module):
return False


else:
get_original = gevent.monkey.get_original
is_module_patched = gevent.monkey.is_module_patched


sleep = get_original("time", "sleep")

try:
# Python ≥ 3.8
threading_get_native_id = get_original("threading", "get_native_id")
except AttributeError:
threading_get_native_id = None

start_new_thread = get_original(six.moves._thread.__name__, "start_new_thread")
thread_get_ident = get_original(six.moves._thread.__name__, "get_ident")
Thread = get_original("threading", "Thread")
Lock = get_original("threading", "Lock")


if is_module_patched("threading"):

@attr.s
class DoubleLock(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I was skeptical of the need to lock across both coroutine and thread at first but I think I was able to work through it in my head (with my limited knowledge of gevent).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's really not that obvious. Basically, if you have a thread with coroutines, you have concurrency in that thread between coroutines so you need a gevent-lock to make sure they are excluded and scheduled correctly.
The coroutine also needs to exclude threads, so you need a thread-lock to make sure you have, in the end, one thread and one coroutine acting.

I think what I was scared of is that you can use a gevent-lock from 2 threads, but it seems to work fine if you use a thread-lock first to synchronize. Scary!

import threading
import sys
import gevent.monkey
import time

gevent.monkey.patch_all()

gl = threading.Lock()

sleep = gevent.monkey.get_original("time", "sleep")
thread = gevent.monkey.get_original("_thread", "start_new_thread")
tl = gevent.monkey.get_original("threading", "Lock")()


def lock_unlock():
    while True:
        tl.acquire()
        gl.acquire()
        print("thread got it")
        sleep(0.01)
        tl.release()
        gl.release()
        sleep(0.01)


th = thread(lock_unlock, ())

while True:
    tl.acquire()
    gl.acquire()
    print("gevent got it")
    time.sleep(0.01)
    tl.release()
    gl.release()
    sleep(0.01)

If you remove the thread-lock, it explodes.

I should write a blog post about that. 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was curious about the ordering! Thanks for adding the comment here and in the source below 😄. Blog post would be great!

"""A lock that prevent concurrency from a gevent coroutine and from a threading.Thread at the same time."""

_lock = attr.ib(factory=threading.Lock, init=False, repr=False)
_thread_lock = attr.ib(factory=Lock, init=False, repr=False)

def acquire(self):
# You cannot acquire a gevent-lock from another thread if it has been acquired already:
# make sure we exclude the gevent-lock from being acquire by another thread by using a thread-lock first.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

self._thread_lock.acquire()
self._lock.acquire()

def release(self):
self._lock.release()
self._thread_lock.release()

def __enter__(self):
self.acquire()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.release()


else:
DoubleLock = threading.Lock
31 changes: 8 additions & 23 deletions ddtrace/profiling/_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import threading

from ddtrace.profiling import _service
from ddtrace.profiling import _nogevent
from ddtrace.vendor import attr
from ddtrace.vendor import six


PERIODIC_THREAD_IDS = set()
Expand Down Expand Up @@ -70,14 +70,6 @@ def __init__(self, interval, target, name=None, on_shutdown=None):
:param on_shutdown: The function to call when the thread shuts down.
"""
super(_GeventPeriodicThread, self).__init__(interval, target, name, on_shutdown)
import gevent.monkey

self._sleep = gevent.monkey.get_original("time", "sleep")
try:
# Python ≥ 3.8
self._get_native_id = gevent.monkey.get_original("threading", "get_native_id")
except AttributeError:
self._get_native_id = None
self._tident = None

@property
Expand All @@ -86,24 +78,20 @@ def ident(self):

def start(self):
"""Start the thread."""
import gevent.monkey

start_new_thread = gevent.monkey.get_original(six.moves._thread.__name__, "start_new_thread")

self.quit = False
self.has_quit = False
threading._limbo[self] = self
try:
self._tident = start_new_thread(self.run, tuple())
self._tident = _nogevent.start_new_thread(self.run, tuple())
except Exception:
del threading._limbo[self]
if self._get_native_id:
self._native_id = self._get_native_id()
if _nogevent.threading_get_native_id:
self._native_id = _nogevent.threading_get_native_id()

def join(self, timeout=None):
# FIXME: handle the timeout argument
while not self.has_quit:
self._sleep(self.SLEEP_INTERVAL)
_nogevent.sleep(self.SLEEP_INTERVAL)

def stop(self):
"""Stop the thread."""
Expand All @@ -121,7 +109,7 @@ def run(self):
self._target()
slept = 0
while self.quit is False and slept < self.interval:
self._sleep(self.SLEEP_INTERVAL)
_nogevent.sleep(self.SLEEP_INTERVAL)
slept += self.SLEEP_INTERVAL
if self._on_shutdown is not None:
self._on_shutdown()
Expand Down Expand Up @@ -151,11 +139,8 @@ def PeriodicRealThread(*args, **kwargs):
in e.g. the gevent case, where Lock object must not be shared with the MainThread (otherwise it'd dead lock).

"""
if "gevent" in sys.modules:
import gevent.monkey

if gevent.monkey.is_module_patched("threading"):
return _GeventPeriodicThread(*args, **kwargs)
if _nogevent.is_module_patched("threading"):
return _GeventPeriodicThread(*args, **kwargs)
return PeriodicThread(*args, **kwargs)


Expand Down
3 changes: 2 additions & 1 deletion ddtrace/profiling/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
return self.stop()
self.stop()
self.join()

def start(self):
"""Start the service."""
Expand Down
21 changes: 6 additions & 15 deletions ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import weakref
from ddtrace import compat
from ddtrace.profiling import _attr
from ddtrace.profiling import _periodic
from ddtrace.profiling import _nogevent
from ddtrace.profiling import collector
from ddtrace.profiling import event
from ddtrace.profiling.collector import _traceback
Expand All @@ -21,22 +22,12 @@ from ddtrace.vendor import six
_LOG = logging.getLogger(__name__)


if "gevent" in sys.modules:
try:
import gevent.monkey
except ImportError:
_LOG.error("gevent loaded but unable to import gevent.monkey")
from threading import Lock as _threading_Lock
from ddtrace.vendor.six.moves._thread import get_ident as _thread_get_ident
else:
_threading_Lock = gevent.monkey.get_original("threading", "Lock")
_thread_get_ident = gevent.monkey.get_original("thread" if six.PY2 else "_thread", "get_ident")

if _nogevent.is_module_patched("threading"):
# NOTE: bold assumption: this module is always imported by the MainThread.
# The python `threading` module makes that assumption and it's beautiful we're going to do the same.
_main_thread_id = _thread_get_ident()
# We don't have the choice has we can't access the original MainThread
_main_thread_id = _nogevent.thread_get_ident()
else:
from threading import Lock as _threading_Lock
from ddtrace.vendor.six.moves._thread import get_ident as _thread_get_ident
if six.PY2:
_main_thread_id = threading._MainThread().ident
Expand Down Expand Up @@ -388,7 +379,7 @@ class _ThreadSpanLinks(object):
# Keys is a thread_id
# Value is a set of weakrefs to spans
_thread_id_to_spans = attr.ib(factory=lambda: collections.defaultdict(set), repr=False, init=False)
_lock = attr.ib(factory=_threading_Lock, repr=False, init=False)
_lock = attr.ib(factory=_nogevent.Lock, repr=False, init=False)

def link_span(self, span):
"""Link a span to its running environment.
Expand All @@ -397,7 +388,7 @@ class _ThreadSpanLinks(object):
"""
# Since we're going to iterate over the set, make sure it's locked
with self._lock:
self._thread_id_to_spans[_thread_get_ident()].add(weakref.ref(span))
self._thread_id_to_spans[_nogevent.thread_get_ident()].add(weakref.ref(span))

def clear_threads(self, existing_thread_ids):
"""Clear the stored list of threads based on the list of existing thread ids.
Expand Down
12 changes: 8 additions & 4 deletions ddtrace/profiling/recorder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- encoding: utf-8 -*-
import collections

from ddtrace.profiling import _nogevent
from ddtrace.vendor import attr


Expand Down Expand Up @@ -30,6 +31,7 @@ class Recorder(object):
"""A dict of {event_type_class: max events} to limit the number of events to record."""

events = attr.ib(init=False, repr=False)
_events_lock = attr.ib(init=False, repr=False, factory=_nogevent.DoubleLock)

def __attrs_post_init__(self):
self._reset_events()
Expand All @@ -51,8 +53,9 @@ def push_events(self, events):
"""
if events:
event_type = events[0].__class__
q = self.events[event_type]
q.extend(events)
with self._events_lock:
q = self.events[event_type]
q.extend(events)

def _get_deque_for_event_type(self, event_type):
return collections.deque(maxlen=self.max_events.get(event_type, self.default_max_events))
Expand All @@ -68,6 +71,7 @@ def reset(self):

:return: The list of events that has been removed.
"""
events = self.events
self._reset_events()
with self._events_lock:
events = self.events
self._reset_events()
return events
60 changes: 34 additions & 26 deletions tests/profiling/collector/test_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,14 @@
import ddtrace
from ddtrace.vendor import six

from ddtrace.profiling import _nogevent
from ddtrace.profiling import recorder
from ddtrace.profiling.collector import stack

from . import test_collector

TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False)

try:
from gevent import monkey
except ImportError:
real_sleep = time.sleep
real_Thread = threading.Thread
real_Lock = threading.Lock
else:
real_sleep = monkey.get_original("time", "sleep")
real_Thread = monkey.get_original("threading", "Thread")
real_Lock = monkey.get_original("threading", "Lock")


def func1():
return func2()
Expand All @@ -45,7 +35,7 @@ def func4():


def func5():
return real_sleep(1)
return _nogevent.sleep(1)


def test_collect_truncate():
Expand Down Expand Up @@ -192,6 +182,25 @@ def test_stress_threads():
t.join()


def test_stress_threads_run_as_thread():
NB_THREADS = 40

threads = []
for i in range(NB_THREADS):
t = threading.Thread(target=_f0) # noqa: E149,F821
t.start()
threads.append(t)

r = recorder.Recorder()
s = stack.StackCollector(recorder=r)
# This mainly check nothing bad happens when we collect a lot of threads and store the result in the Recorder
with s:
time.sleep(3)
assert r.events[stack.StackSampleEvent]
for t in threads:
t.join()


@pytest.mark.skipif(not stack.FEATURES["stack-exceptions"], reason="Stack exceptions not supported")
@pytest.mark.skipif(TESTING_GEVENT, reason="Test not compatible with gevent")
def test_exception_collection_threads():
Expand Down Expand Up @@ -221,21 +230,20 @@ def test_exception_collection_threads():
def test_exception_collection():
r = recorder.Recorder()
c = stack.StackCollector(r)
c.start()
try:
raise ValueError("hello")
except Exception:
real_sleep(1)
c.stop()
with c:
try:
raise ValueError("hello")
except Exception:
_nogevent.sleep(1)

exception_events = r.events[stack.StackExceptionSampleEvent]
assert len(exception_events) >= 1
e = exception_events[0]
assert e.timestamp > 0
assert e.sampling_period > 0
assert e.thread_id == stack._thread_get_ident()
assert e.thread_id == _nogevent.thread_get_ident()
assert e.thread_name == "MainThread"
assert e.frames == [(__file__, 228, "test_exception_collection")]
assert e.frames == [(__file__, 237, "test_exception_collection")]
assert e.nframes == 1
assert e.exc_type == ValueError

Expand All @@ -255,7 +263,7 @@ def tracer_and_collector():
def test_thread_to_span_thread_isolation(tracer_and_collector):
t, c = tracer_and_collector
root = t.start_span("root")
thread_id = stack._thread_get_ident()
thread_id = _nogevent.thread_get_ident()
assert c._thread_span_links.get_active_leaf_spans_from_thread_id(thread_id) == {root}

store = {}
Expand All @@ -278,7 +286,7 @@ def start_span():
def test_thread_to_span_multiple(tracer_and_collector):
t, c = tracer_and_collector
root = t.start_span("root")
thread_id = stack._thread_get_ident()
thread_id = _nogevent.thread_get_ident()
assert c._thread_span_links.get_active_leaf_spans_from_thread_id(thread_id) == {root}
subspan = t.start_span("subtrace", child_of=root)
assert c._thread_span_links.get_active_leaf_spans_from_thread_id(thread_id) == {subspan}
Expand All @@ -297,7 +305,7 @@ def test_thread_to_child_span_multiple_unknown_thread(tracer_and_collector):
def test_thread_to_child_span_clear(tracer_and_collector):
t, c = tracer_and_collector
root = t.start_span("root")
thread_id = stack._thread_get_ident()
thread_id = _nogevent.thread_get_ident()
assert c._thread_span_links.get_active_leaf_spans_from_thread_id(thread_id) == {root}
c._thread_span_links.clear_threads(set())
assert c._thread_span_links.get_active_leaf_spans_from_thread_id(thread_id) == set()
Expand All @@ -306,7 +314,7 @@ def test_thread_to_child_span_clear(tracer_and_collector):
def test_thread_to_child_span_multiple_more_children(tracer_and_collector):
t, c = tracer_and_collector
root = t.start_span("root")
thread_id = stack._thread_get_ident()
thread_id = _nogevent.thread_get_ident()
assert c._thread_span_links.get_active_leaf_spans_from_thread_id(thread_id) == {root}
subspan = t.start_span("subtrace", child_of=root)
subsubspan = t.start_span("subsubtrace", child_of=subspan)
Expand Down Expand Up @@ -374,10 +382,10 @@ def _trace():
def test_thread_time_cache():
tt = stack._ThreadTime()

lock = real_Lock()
lock = _nogevent.Lock()
lock.acquire()

t = real_Thread(target=lock.acquire)
t = _nogevent.Thread(target=lock.acquire)
t.start()

main_thread_id = threading.current_thread().ident
Expand Down