Skip to content

Commit

Permalink
feat(profiling): Add thread data to spans (#2843)
Browse files Browse the repository at this point in the history
As per getsentry/rfc#75, this adds the thread data to the spans. This will be
needed for the continuous profiling mode in #2830.
  • Loading branch information
Zylphrex committed Mar 20, 2024
1 parent 500e087 commit 48d7767
Show file tree
Hide file tree
Showing 26 changed files with 599 additions and 367 deletions.
12 changes: 12 additions & 0 deletions sentry_sdk/consts.py
Expand Up @@ -191,6 +191,18 @@ class SPANDATA:
Example: "http.handler"
"""

THREAD_ID = "thread.id"
"""
Identifier of a thread from where the span originated. This should be a string.
Example: "7972576320"
"""

THREAD_NAME = "thread.name"
"""
Label identifying a thread from where the span originated. This should be a string.
Example: "MainThread"
"""


class OP:
CACHE_GET_ITEM = "cache.get_item"
Expand Down
70 changes: 5 additions & 65 deletions sentry_sdk/profiler.py
Expand Up @@ -42,6 +42,8 @@
from sentry_sdk.utils import (
capture_internal_exception,
filename_for_module,
get_current_thread_meta,
is_gevent,
is_valid_sample_rate,
logger,
nanosecond_time,
Expand Down Expand Up @@ -126,32 +128,16 @@


try:
from gevent import get_hub as get_gevent_hub # type: ignore
from gevent.monkey import get_original, is_module_patched # type: ignore
from gevent.monkey import get_original # type: ignore
from gevent.threadpool import ThreadPool # type: ignore

thread_sleep = get_original("time", "sleep")
except ImportError:

def get_gevent_hub():
# type: () -> Any
return None

thread_sleep = time.sleep

def is_module_patched(*args, **kwargs):
# type: (*Any, **Any) -> bool
# unable to import from gevent means no modules have been patched
return False

ThreadPool = None


def is_gevent():
# type: () -> bool
return is_module_patched("threading") or is_module_patched("_thread")


_scheduler = None # type: Optional[Scheduler]

# The default sampling frequency to use. This is set at 101 in order to
Expand Down Expand Up @@ -389,52 +375,6 @@ def get_frame_name(frame):
MAX_PROFILE_DURATION_NS = int(3e10) # 30 seconds


def get_current_thread_id(thread=None):
# type: (Optional[threading.Thread]) -> Optional[int]
"""
Try to get the id of the current thread, with various fall backs.
"""

# if a thread is specified, that takes priority
if thread is not None:
try:
thread_id = thread.ident
if thread_id is not None:
return thread_id
except AttributeError:
pass

# if the app is using gevent, we should look at the gevent hub first
# as the id there differs from what the threading module reports
if is_gevent():
gevent_hub = get_gevent_hub()
if gevent_hub is not None:
try:
# this is undocumented, so wrap it in try except to be safe
return gevent_hub.thread_ident
except AttributeError:
pass

# use the current thread's id if possible
try:
current_thread_id = threading.current_thread().ident
if current_thread_id is not None:
return current_thread_id
except AttributeError:
pass

# if we can't get the current thread id, fall back to the main thread id
try:
main_thread_id = threading.main_thread().ident
if main_thread_id is not None:
return main_thread_id
except AttributeError:
pass

# we've tried everything, time to give up
return None


class Profile(object):
def __init__(
self,
Expand All @@ -456,7 +396,7 @@ def __init__(

# Various framework integrations are capable of overwriting the active thread id.
# If it is set to `None` at the end of the profile, we fall back to the default.
self._default_active_thread_id = get_current_thread_id() or 0 # type: int
self._default_active_thread_id = get_current_thread_meta()[0] or 0 # type: int
self.active_thread_id = None # type: Optional[int]

try:
Expand All @@ -479,7 +419,7 @@ def __init__(

def update_active_thread_id(self):
# type: () -> None
self.active_thread_id = get_current_thread_id()
self.active_thread_id = get_current_thread_meta()[0]
logger.debug(
"[Profiling] updating active thread id to {tid}".format(
tid=self.active_thread_id
Expand Down
19 changes: 18 additions & 1 deletion sentry_sdk/tracing.py
Expand Up @@ -5,7 +5,12 @@

import sentry_sdk
from sentry_sdk.consts import INSTRUMENTER
from sentry_sdk.utils import is_valid_sample_rate, logger, nanosecond_time
from sentry_sdk.utils import (
get_current_thread_meta,
is_valid_sample_rate,
logger,
nanosecond_time,
)
from sentry_sdk._compat import datetime_utcnow, utc_from_timestamp, PY2
from sentry_sdk.consts import SPANDATA
from sentry_sdk._types import TYPE_CHECKING
Expand Down Expand Up @@ -172,6 +177,9 @@ def __init__(
self._span_recorder = None # type: Optional[_SpanRecorder]
self._local_aggregator = None # type: Optional[LocalAggregator]

thread_id, thread_name = get_current_thread_meta()
self.set_thread(thread_id, thread_name)

# TODO this should really live on the Transaction class rather than the Span
# class
def init_span_recorder(self, maxlen):
Expand Down Expand Up @@ -418,6 +426,15 @@ def set_status(self, value):
# type: (str) -> None
self.status = value

def set_thread(self, thread_id, thread_name):
# type: (Optional[int], Optional[str]) -> None

if thread_id is not None:
self.set_data(SPANDATA.THREAD_ID, str(thread_id))

if thread_name is not None:
self.set_data(SPANDATA.THREAD_NAME, thread_name)

def set_http_status(self, http_status):
# type: (int) -> None
self.set_tag(
Expand Down
56 changes: 56 additions & 0 deletions sentry_sdk/utils.py
Expand Up @@ -1746,9 +1746,14 @@ def now():


try:
from gevent import get_hub as get_gevent_hub
from gevent.monkey import is_module_patched
except ImportError:

def get_gevent_hub():
# type: () -> Any
return None

def is_module_patched(*args, **kwargs):
# type: (*Any, **Any) -> bool
# unable to import from gevent means no modules have been patched
Expand All @@ -1758,3 +1763,54 @@ def is_module_patched(*args, **kwargs):
def is_gevent():
# type: () -> bool
return is_module_patched("threading") or is_module_patched("_thread")


def get_current_thread_meta(thread=None):
# type: (Optional[threading.Thread]) -> Tuple[Optional[int], Optional[str]]
"""
Try to get the id of the current thread, with various fall backs.
"""

# if a thread is specified, that takes priority
if thread is not None:
try:
thread_id = thread.ident
thread_name = thread.name
if thread_id is not None:
return thread_id, thread_name
except AttributeError:
pass

# if the app is using gevent, we should look at the gevent hub first
# as the id there differs from what the threading module reports
if is_gevent():
gevent_hub = get_gevent_hub()
if gevent_hub is not None:
try:
# this is undocumented, so wrap it in try except to be safe
return gevent_hub.thread_ident, None
except AttributeError:
pass

# use the current thread's id if possible
try:
thread = threading.current_thread()
thread_id = thread.ident
thread_name = thread.name
if thread_id is not None:
return thread_id, thread_name
except AttributeError:
pass

# if we can't get the current thread id, fall back to the main thread id
try:
thread = threading.main_thread()
thread_id = thread.ident
thread_name = thread.name
if thread_id is not None:
return thread_id, thread_name
except AttributeError:
pass

# we've tried everything, time to give up
return None, None
12 changes: 12 additions & 0 deletions tests/conftest.py
Expand Up @@ -652,3 +652,15 @@ def patch_start_tracing_child(fake_transaction_is_none=False):
return_value=fake_transaction,
):
yield fake_start_child


class ApproxDict(dict):
def __eq__(self, other):
# For an ApproxDict to equal another dict, the other dict just needs to contain
# all the keys from the ApproxDict with the same values.
#
# The other dict may contain additional keys with any value.
return all(key in other and other[key] == value for key, value in self.items())

def __ne__(self, other):
return not self.__eq__(other)
21 changes: 12 additions & 9 deletions tests/integrations/aiohttp/test_aiohttp.py
Expand Up @@ -9,6 +9,7 @@

from sentry_sdk import capture_message, start_transaction
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from tests.conftest import ApproxDict

try:
from unittest import mock # python 3.3 and above
Expand Down Expand Up @@ -495,15 +496,17 @@ async def handler(request):
crumb = event["breadcrumbs"]["values"][0]
assert crumb["type"] == "http"
assert crumb["category"] == "httplib"
assert crumb["data"] == {
"url": "http://127.0.0.1:{}/".format(raw_server.port),
"http.fragment": "",
"http.method": "GET",
"http.query": "",
"http.response.status_code": 200,
"reason": "OK",
"extra": "foo",
}
assert crumb["data"] == ApproxDict(
{
"url": "http://127.0.0.1:{}/".format(raw_server.port),
"http.fragment": "",
"http.method": "GET",
"http.query": "",
"http.response.status_code": 200,
"reason": "OK",
"extra": "foo",
}
)


@pytest.mark.asyncio
Expand Down
17 changes: 10 additions & 7 deletions tests/integrations/asyncpg/test_asyncpg.py
Expand Up @@ -34,6 +34,7 @@
from sentry_sdk.consts import SPANDATA
from sentry_sdk.tracing_utils import record_sql_queries
from sentry_sdk._compat import contextmanager
from tests.conftest import ApproxDict

try:
from unittest import mock
Expand All @@ -46,13 +47,15 @@
)
CRUMBS_CONNECT = {
"category": "query",
"data": {
"db.name": PG_NAME,
"db.system": "postgresql",
"db.user": PG_USER,
"server.address": PG_HOST,
"server.port": PG_PORT,
},
"data": ApproxDict(
{
"db.name": PG_NAME,
"db.system": "postgresql",
"db.user": PG_USER,
"server.address": PG_HOST,
"server.port": PG_PORT,
}
),
"message": "connect",
"type": "default",
}
Expand Down
29 changes: 19 additions & 10 deletions tests/integrations/boto3/test_s3.py
Expand Up @@ -4,6 +4,7 @@

from sentry_sdk import Hub
from sentry_sdk.integrations.boto3 import Boto3Integration
from tests.conftest import ApproxDict
from tests.integrations.boto3.aws_mock import MockResponse
from tests.integrations.boto3 import read_fixture

Expand Down Expand Up @@ -65,12 +66,14 @@ def test_streaming(sentry_init, capture_events):
span1 = event["spans"][0]
assert span1["op"] == "http.client"
assert span1["description"] == "aws.s3.GetObject"
assert span1["data"] == {
"http.method": "GET",
"aws.request.url": "https://bucket.s3.amazonaws.com/foo.pdf",
"http.fragment": "",
"http.query": "",
}
assert span1["data"] == ApproxDict(
{
"http.method": "GET",
"aws.request.url": "https://bucket.s3.amazonaws.com/foo.pdf",
"http.fragment": "",
"http.query": "",
}
)

span2 = event["spans"][1]
assert span2["op"] == "http.client.stream"
Expand Down Expand Up @@ -123,7 +126,13 @@ def test_omit_url_data_if_parsing_fails(sentry_init, capture_events):
transaction.finish()

(event,) = events
assert event["spans"][0]["data"] == {
"http.method": "GET",
# no url data
}
assert event["spans"][0]["data"] == ApproxDict(
{
"http.method": "GET",
# no url data
}
)

assert "aws.request.url" not in event["spans"][0]["data"]
assert "http.fragment" not in event["spans"][0]["data"]
assert "http.query" not in event["spans"][0]["data"]
2 changes: 2 additions & 0 deletions tests/integrations/celery/test_celery.py
Expand Up @@ -10,6 +10,7 @@
)

from sentry_sdk._compat import text_type
from tests.conftest import ApproxDict

from celery import Celery, VERSION
from celery.bin import worker
Expand Down Expand Up @@ -218,6 +219,7 @@ def dummy_task(x, y):
assert execution_event["spans"] == []
assert submission_event["spans"] == [
{
"data": ApproxDict(),
"description": "dummy_task",
"op": "queue.submit.celery",
"parent_span_id": submission_event["contexts"]["trace"]["span_id"],
Expand Down

0 comments on commit 48d7767

Please sign in to comment.