Skip to content

Commit

Permalink
Implement new POTel span processor (#3223)
Browse files Browse the repository at this point in the history
* only acts on `on_end` instead of both `on_start/on_end` as before
* store children spans in a dict mapping `span_id -> children`
* new dict only stores otel span objects and no sentry transaction/span objects so we save a bit of useless memory allocation
* I'm not using our current `Transaction/Span` classes at all to build the event because when we add our APIs later, we'll need to rip these out and we also avoid having to deal with the `instrumenter` problem
* if we get a root span (without parent), we recursively walk the dict and find the children and package up the transaction event and send it 
  * I didn't do it like JS because I think this way is better
  *  they [group an array of `finished_spans`](https://github.com/getsentry/sentry-javascript/blob/7e298036a21a5658f3eb9ba184165178c48d7ef8/packages/opentelemetry/src/spanExporter.ts#L132) every time a root span ends and I think this uses more cpu than what I did
  * and the dict like I used it doesn't take more space than the array either
* if we get a span with a parent we just update the dict to find the span later
* moved the common `is_sentry_span` logic to utils
  • Loading branch information
sl0thentr0py committed Jul 9, 2024
1 parent cd3e140 commit f0c1a84
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 114 deletions.
2 changes: 2 additions & 0 deletions sentry_sdk/integrations/opentelemetry/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@

SENTRY_TRACE_KEY = create_key("sentry-trace")
SENTRY_BAGGAGE_KEY = create_key("sentry-baggage")
OTEL_SENTRY_CONTEXT = "otel"
SPAN_ORIGIN = "auto.otel"
149 changes: 143 additions & 6 deletions sentry_sdk/integrations/opentelemetry/potel_span_processor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
from opentelemetry.sdk.trace import SpanProcessor
from collections import deque, defaultdict

from opentelemetry.trace import format_trace_id, format_span_id
from opentelemetry.context import Context
from opentelemetry.sdk.trace import Span, ReadableSpan, SpanProcessor

from sentry_sdk import capture_event
from sentry_sdk.integrations.opentelemetry.utils import (
is_sentry_span,
convert_otel_timestamp,
extract_span_data,
)
from sentry_sdk.integrations.opentelemetry.consts import (
OTEL_SENTRY_CONTEXT,
SPAN_ORIGIN,
)
from sentry_sdk._types import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Optional
from opentelemetry.sdk.trace import ReadableSpan
from typing import Optional, List, Any, Deque, DefaultDict
from sentry_sdk._types import Event


class PotelSentrySpanProcessor(SpanProcessor):
Expand All @@ -22,15 +35,25 @@ def __new__(cls):

def __init__(self):
# type: () -> None
pass
self._children_spans = defaultdict(
list
) # type: DefaultDict[int, List[ReadableSpan]]

def on_start(self, span, parent_context=None):
# type: (ReadableSpan, Optional[Context]) -> None
# type: (Span, Optional[Context]) -> None
pass

def on_end(self, span):
# type: (ReadableSpan) -> None
pass
if is_sentry_span(span):
return

# TODO-neel-potel-remote only take parent if not remote
if span.parent:
self._children_spans[span.parent.span_id].append(span)
else:
# if have a root span ending, we build a transaction and send it
self._flush_root_span(span)

# TODO-neel-potel not sure we need a clear like JS
def shutdown(self):
Expand All @@ -42,3 +65,117 @@ def shutdown(self):
def force_flush(self, timeout_millis=30000):
# type: (int) -> bool
return True

def _flush_root_span(self, span):
# type: (ReadableSpan) -> None
transaction_event = self._root_span_to_transaction_event(span)
if not transaction_event:
return

spans = []
for child in self._collect_children(span):
span_json = self._span_to_json(child)
if span_json:
spans.append(span_json)
transaction_event["spans"] = spans
# TODO-neel-potel sort and cutoff max spans

capture_event(transaction_event)

def _collect_children(self, span):
# type: (ReadableSpan) -> List[ReadableSpan]
if not span.context:
return []

children = []
bfs_queue = deque() # type: Deque[int]
bfs_queue.append(span.context.span_id)

while bfs_queue:
parent_span_id = bfs_queue.popleft()
node_children = self._children_spans.pop(parent_span_id, [])
children.extend(node_children)
bfs_queue.extend(
[child.context.span_id for child in node_children if child.context]
)

return children

# we construct the event from scratch here
# and not use the current Transaction class for easier refactoring
def _root_span_to_transaction_event(self, span):
# type: (ReadableSpan) -> Optional[Event]
if not span.context:
return None
if not span.start_time:
return None
if not span.end_time:
return None

trace_id = format_trace_id(span.context.trace_id)
span_id = format_span_id(span.context.span_id)
parent_span_id = format_span_id(span.parent.span_id) if span.parent else None

(op, description, _) = extract_span_data(span)

trace_context = {
"trace_id": trace_id,
"span_id": span_id,
"origin": SPAN_ORIGIN,
"op": op,
"status": "ok", # TODO-neel-potel span status mapping
} # type: dict[str, Any]

if parent_span_id:
trace_context["parent_span_id"] = parent_span_id
if span.attributes:
trace_context["data"] = dict(span.attributes)

contexts = {"trace": trace_context}
if span.resource.attributes:
contexts[OTEL_SENTRY_CONTEXT] = {"resource": dict(span.resource.attributes)}

event = {
"type": "transaction",
"transaction": description,
# TODO-neel-potel tx source based on integration
"transaction_info": {"source": "custom"},
"contexts": contexts,
"start_timestamp": convert_otel_timestamp(span.start_time),
"timestamp": convert_otel_timestamp(span.end_time),
} # type: Event

return event

def _span_to_json(self, span):
# type: (ReadableSpan) -> Optional[dict[str, Any]]
if not span.context:
return None
if not span.start_time:
return None
if not span.end_time:
return None

trace_id = format_trace_id(span.context.trace_id)
span_id = format_span_id(span.context.span_id)
parent_span_id = format_span_id(span.parent.span_id) if span.parent else None

(op, description, _) = extract_span_data(span)

span_json = {
"trace_id": trace_id,
"span_id": span_id,
"origin": SPAN_ORIGIN,
"op": op,
"description": description,
"status": "ok", # TODO-neel-potel span status mapping
"start_timestamp": convert_otel_timestamp(span.start_time),
"timestamp": convert_otel_timestamp(span.end_time),
} # type: dict[str, Any]

if parent_span_id:
span_json["parent_span_id"] = parent_span_id
if span.attributes:
span_json["data"] = dict(span.attributes)

return span_json
118 changes: 15 additions & 103 deletions sentry_sdk/integrations/opentelemetry/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@

from opentelemetry.context import get_value
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan as OTelSpan
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import (
format_span_id,
format_trace_id,
get_current_span,
SpanKind,
)
from opentelemetry.trace.span import (
INVALID_SPAN_ID,
Expand All @@ -20,22 +18,24 @@
from sentry_sdk.integrations.opentelemetry.consts import (
SENTRY_BAGGAGE_KEY,
SENTRY_TRACE_KEY,
OTEL_SENTRY_CONTEXT,
SPAN_ORIGIN,
)
from sentry_sdk.integrations.opentelemetry.utils import (
is_sentry_span,
extract_span_data,
)
from sentry_sdk.scope import add_global_event_processor
from sentry_sdk.tracing import Transaction, Span as SentrySpan
from sentry_sdk.utils import Dsn
from sentry_sdk._types import TYPE_CHECKING

from urllib3.util import parse_url as urlparse

if TYPE_CHECKING:
from typing import Any, Optional, Union
from opentelemetry import context as context_api
from sentry_sdk._types import Event, Hint

OPEN_TELEMETRY_CONTEXT = "otel"
SPAN_MAX_TIME_OPEN_MINUTES = 10
SPAN_ORIGIN = "auto.otel"


def link_trace_context_to_error_event(event, otel_span_map):
Expand Down Expand Up @@ -117,18 +117,13 @@ def on_start(self, otel_span, parent_context=None):
if not client.dsn:
return

try:
_ = Dsn(client.dsn)
except Exception:
return

if client.options["instrumenter"] != INSTRUMENTER.OTEL:
return

if not otel_span.get_span_context().is_valid:
return

if self._is_sentry_span(otel_span):
if is_sentry_span(otel_span):
return

trace_data = self._get_trace_data(otel_span, parent_context)
Expand Down Expand Up @@ -200,7 +195,7 @@ def on_end(self, otel_span):
if isinstance(sentry_span, Transaction):
sentry_span.name = otel_span.name
sentry_span.set_context(
OPEN_TELEMETRY_CONTEXT, self._get_otel_context(otel_span)
OTEL_SENTRY_CONTEXT, self._get_otel_context(otel_span)
)
self._update_transaction_with_otel_data(sentry_span, otel_span)

Expand All @@ -223,27 +218,6 @@ def on_end(self, otel_span):

self._prune_old_spans()

def _is_sentry_span(self, otel_span):
# type: (OTelSpan) -> bool
"""
Break infinite loop:
HTTP requests to Sentry are caught by OTel and send again to Sentry.
"""
otel_span_url = None
if otel_span.attributes is not None:
otel_span_url = otel_span.attributes.get(SpanAttributes.HTTP_URL)
otel_span_url = cast("Optional[str]", otel_span_url)

dsn_url = None
client = get_client()
if client.dsn:
dsn_url = Dsn(client.dsn).netloc

if otel_span_url and dsn_url and dsn_url in otel_span_url:
return True

return False

def _get_otel_context(self, otel_span):
# type: (OTelSpan) -> dict[str, Any]
"""
Expand Down Expand Up @@ -315,81 +289,19 @@ def _update_span_with_otel_data(self, sentry_span, otel_span):
"""
sentry_span.set_data("otel.kind", otel_span.kind)

op = otel_span.name
description = otel_span.name

if otel_span.attributes is not None:
for key, val in otel_span.attributes.items():
sentry_span.set_data(key, val)

http_method = otel_span.attributes.get(SpanAttributes.HTTP_METHOD)
http_method = cast("Optional[str]", http_method)

db_query = otel_span.attributes.get(SpanAttributes.DB_SYSTEM)

if http_method:
op = "http"

if otel_span.kind == SpanKind.SERVER:
op += ".server"
elif otel_span.kind == SpanKind.CLIENT:
op += ".client"

description = http_method

peer_name = otel_span.attributes.get(SpanAttributes.NET_PEER_NAME, None)
if peer_name:
description += " {}".format(peer_name)

target = otel_span.attributes.get(SpanAttributes.HTTP_TARGET, None)
if target:
description += " {}".format(target)

if not peer_name and not target:
url = otel_span.attributes.get(SpanAttributes.HTTP_URL, None)
url = cast("Optional[str]", url)
if url:
parsed_url = urlparse(url)
url = "{}://{}{}".format(
parsed_url.scheme, parsed_url.netloc, parsed_url.path
)
description += " {}".format(url)

status_code = otel_span.attributes.get(
SpanAttributes.HTTP_STATUS_CODE, None
)
status_code = cast("Optional[int]", status_code)
if status_code:
sentry_span.set_http_status(status_code)

elif db_query:
op = "db"
statement = otel_span.attributes.get(SpanAttributes.DB_STATEMENT, None)
statement = cast("Optional[str]", statement)
if statement:
description = statement

(op, description, status_code) = extract_span_data(otel_span)
sentry_span.op = op
sentry_span.description = description
if status_code:
sentry_span.set_http_status(status_code)

def _update_transaction_with_otel_data(self, sentry_span, otel_span):
# type: (SentrySpan, OTelSpan) -> None
if otel_span.attributes is None:
return

http_method = otel_span.attributes.get(SpanAttributes.HTTP_METHOD)

if http_method:
status_code = otel_span.attributes.get(SpanAttributes.HTTP_STATUS_CODE)
status_code = cast("Optional[int]", status_code)
if status_code:
sentry_span.set_http_status(status_code)

op = "http"

if otel_span.kind == SpanKind.SERVER:
op += ".server"
elif otel_span.kind == SpanKind.CLIENT:
op += ".client"

sentry_span.op = op
(op, _, status_code) = extract_span_data(otel_span)
sentry_span.op = op
if status_code:
sentry_span.set_http_status(status_code)
Loading

0 comments on commit f0c1a84

Please sign in to comment.