Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 74 additions & 22 deletions sentry_sdk/integrations/huey.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.traces import SegmentSource, SpanStatus, StreamedSpan
from sentry_sdk.tracing import (
BAGGAGE_HEADER_NAME,
SENTRY_TRACE_HEADER_NAME,
TransactionSource,
)
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import (
SENSITIVE_DATA_SUBSTITUTE,
_register_control_flow_exception,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
Expand Down Expand Up @@ -53,14 +56,17 @@ class HueyIntegration(Integration):
def setup_once() -> None:
patch_enqueue()
patch_execute()
_register_control_flow_exception(
[CancelExecution, RetryTask, TaskLockedException]
)


def patch_enqueue() -> None:
old_enqueue = Huey.enqueue

@ensure_integration_enabled(HueyIntegration, old_enqueue)
def _sentry_enqueue(
self: "Huey", item: "Union[Task, HueyGroup, HueyChord]"
self: "Huey", item: "Any"
) -> "Optional[Union[Result, ResultGroup]]":
if HueyChord is not None and isinstance(item, HueyChord):
span_name = "Huey Chord"
Expand All @@ -69,16 +75,31 @@ def _sentry_enqueue(
else:
span_name = item.name

with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_HUEY,
name=span_name,
origin=HueyIntegration.origin,
):
if (
not isinstance(item, PeriodicTask)
and not (HueyGroup is not None and isinstance(item, HueyGroup))
and not (HueyChord is not None and isinstance(item, HueyChord))
):
is_span_streaming_enabled = has_span_streaming_enabled(
sentry_sdk.get_client().options
)

span_ctx = None
if is_span_streaming_enabled:
span_ctx = sentry_sdk.traces.start_span(
name=span_name,
attributes={
"sentry.op": OP.QUEUE_SUBMIT_HUEY,
"sentry.origin": HueyIntegration.origin,
},
)
else:
span_ctx = sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_HUEY,
name=span_name,
origin=HueyIntegration.origin,
)

no_headers_types = (PeriodicTask,) + tuple(
t for t in [HueyGroup, HueyChord] if t is not None
)
with span_ctx:
if not isinstance(item, no_headers_types):
# Attach trace propagation data to task kwargs. We do
# not do this for periodic tasks, as these don't
# really have an originating transaction.
Expand Down Expand Up @@ -124,12 +145,22 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]":

def _capture_exception(exc_info: "ExcInfo") -> None:
scope = sentry_sdk.get_current_scope()
is_span_streaming_enabled = has_span_streaming_enabled(
sentry_sdk.get_client().options
)

if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS:
scope.transaction.set_status(SPANSTATUS.ABORTED)
if not is_span_streaming_enabled:
scope.transaction.set_status(SPANSTATUS.ABORTED)
elif type(scope._span) is StreamedSpan:
scope._span._segment.status = SpanStatus.OK
return

Comment thread
sentry[bot] marked this conversation as resolved.
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
if not is_span_streaming_enabled:
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
elif type(scope._span) is StreamedSpan:
scope._span._segment.status = SpanStatus.ERROR
Comment thread
cursor[bot] marked this conversation as resolved.

event, hint = event_from_exception(
exc_info,
client_options=sentry_sdk.get_client().options,
Expand Down Expand Up @@ -167,21 +198,42 @@ def _sentry_execute(
scope.add_event_processor(_make_event_processor(task))

sentry_headers = task.kwargs.pop("sentry_headers", None)

transaction = continue_trace(
sentry_headers or {},
name=task.name,
op=OP.QUEUE_TASK_HUEY,
source=TransactionSource.TASK,
origin=HueyIntegration.origin,
is_span_streaming_enabled = has_span_streaming_enabled(
sentry_sdk.get_client().options
)
transaction.set_status(SPANSTATUS.OK)

if is_span_streaming_enabled:
headers = sentry_headers or {}
sentry_sdk.traces.continue_trace(headers)
span_ctx = sentry_sdk.traces.start_span(
name=task.name,
attributes={
"sentry.op": OP.QUEUE_TASK_HUEY,
"sentry.origin": HueyIntegration.origin,
"sentry.span.source": SegmentSource.TASK,
"messaging.message.id": task.id,
"messaging.message.system": "huey",
"messaging.message.retry.count": (task.default_retries or 0)
- task.retries,
},
parent_span=None,
)
Comment thread
ericapisani marked this conversation as resolved.
else:
transaction = continue_trace(
sentry_headers or {},
name=task.name,
op=OP.QUEUE_TASK_HUEY,
source=TransactionSource.TASK,
origin=HueyIntegration.origin,
)
transaction.set_status(SPANSTATUS.OK)
span_ctx = sentry_sdk.start_transaction(transaction)

if not getattr(task, "_sentry_is_patched", False):
task.execute = _wrap_task_execute(task.execute)
task._sentry_is_patched = True

with sentry_sdk.start_transaction(transaction):
with span_ctx:
return old_execute(self, task, timestamp)

Huey._execute = _sentry_execute
11 changes: 8 additions & 3 deletions sentry_sdk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@

# These exceptions won't set the span status to error if they occur. Use
# register_control_flow_exception to add to this list
_control_flow_exception_classes: "list[type]" = []
_control_flow_exception_classes: "set[type]" = set()


def is_internal_task() -> bool:
Expand Down Expand Up @@ -1982,8 +1982,13 @@ def get_current_thread_meta(
return None, None


def _register_control_flow_exception(exc_type: type) -> None:
_control_flow_exception_classes.append(exc_type)
def _register_control_flow_exception(
exc_type: "Union[type, list[type], tuple[type], set[type]]",
) -> None:
if isinstance(exc_type, (list, tuple, set)):
_control_flow_exception_classes.update(exc_type)
else:
_control_flow_exception_classes.add(exc_type)


def should_be_treated_as_error(ty: "Any", value: "Any") -> bool:
Expand Down
Loading
Loading