Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions src/sentry/tasks/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import relay_tasks
from sentry.utils import metrics
from sentry.utils.exceptions import quiet_redis_noise
from sentry.utils.sdk import set_current_event_project

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -366,35 +367,36 @@ def _schedule_invalidate_project_config(
else:
check_debounce_keys["organization_id"] = org_id

if projectconfig_debounce_cache.invalidation.is_debounced(**check_debounce_keys):
# If this task is already in the queue, do not schedule another task.
with quiet_redis_noise():
if projectconfig_debounce_cache.invalidation.is_debounced(**check_debounce_keys):
# If this task is already in the queue, do not schedule another task.
metrics.incr(
"relay.projectconfig_cache.skipped",
tags={"reason": "debounce", "update_reason": trigger, "task": "invalidation"},
)
return

metrics.incr(
"relay.projectconfig_cache.skipped",
tags={"reason": "debounce", "update_reason": trigger, "task": "invalidation"},
"relay.projectconfig_cache.scheduled",
tags={
"update_reason": trigger,
"update_reason_details": trigger_details,
"task": "invalidation",
},
)
return

metrics.incr(
"relay.projectconfig_cache.scheduled",
tags={
"update_reason": trigger,
"update_reason_details": trigger_details,
"task": "invalidation",
},
)

invalidate_project_config.apply_async(
countdown=countdown,
kwargs={
"project_id": project_id,
"organization_id": organization_id,
"public_key": public_key,
"trigger": trigger,
"trigger_details": trigger_details,
},
)
invalidate_project_config.apply_async(
countdown=countdown,
kwargs={
"project_id": project_id,
"organization_id": organization_id,
"public_key": public_key,
"trigger": trigger,
"trigger_details": trigger_details,
},
)

# Use the original arguments to this function to set the debounce key.
projectconfig_debounce_cache.invalidation.debounce(
organization_id=organization_id, project_id=project_id, public_key=public_key
)
# Use the original arguments to this function to set the debounce key.
projectconfig_debounce_cache.invalidation.debounce(
organization_id=organization_id, project_id=project_id, public_key=public_key
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from collections.abc import Generator, Mapping
from contextlib import contextmanager
from typing import Any
from typing import Any, Literal

import sentry_sdk

Expand All @@ -10,11 +10,41 @@

logger = logging.getLogger(__name__)


# sentry_sdk doesn't export these.
_Event = Any
_ExcInfo = Any

SentryLevel = Literal["error", "warning", "info"]


@contextmanager
def set_sentry_exception_levels(
exception_levels: dict[type[BaseException], SentryLevel],
) -> Generator[None]:
"""
Context manager that sets up a Sentry error processor to set
specific exception types to configured Sentry levels.

Args:
exception_levels: Map of exception type to their desired Sentry levels
Note that type matching is done by equality, not isinstance.
"""

def process_error(event: _Event, exc_info: _ExcInfo) -> _Event | None:
exc = exc_info[1]
exc_type = type(exc)

# Check if this exception type should have its level overridden
if exc_type in exception_levels:
new_level = exception_levels[exc_type]
event["level"] = new_level

return event

with sentry_sdk.new_scope() as scope:
scope.add_error_processor(process_error)
yield


@contextmanager
def exception_grouping_context(
Expand Down Expand Up @@ -65,3 +95,24 @@ def timeout_grouping_context(*refinements: str) -> Generator[None]:
{ProcessingDeadlineExceeded: "task.processing_deadline_exceeded"}, *refinements
):
yield


@contextmanager
def quiet_redis_noise() -> Generator[None]:
"""
Context manager that sets up a Sentry error processor to quiet Redis noise.
Specifically, the current library versions report TimeoutError and MovedError
internally even when they're being appropriately handled, and it's incorrect for
those to be treated as errors in Sentry.
"""
from redis.exceptions import TimeoutError
from rediscluster.exceptions import ( # type: ignore[attr-defined]
MovedError,
RedisClusterException,
)

with (
exception_grouping_context({RedisClusterException: "redis.redis_cluster_exception"}),
set_sentry_exception_levels({TimeoutError: "info", MovedError: "info"}),
):
yield
3 changes: 2 additions & 1 deletion src/sentry/workflow_engine/tasks/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
from sentry.taskworker import namespaces
from sentry.taskworker.retry import Retry
from sentry.utils import metrics
from sentry.utils.exceptions import timeout_grouping_context
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.tasks.utils import (
build_workflow_event_data_from_activity,
build_workflow_event_data_from_event,
)
from sentry.workflow_engine.types import WorkflowEventData
from sentry.workflow_engine.utils import log_context, timeout_grouping_context
from sentry.workflow_engine.utils import log_context

logger = log_context.get_logger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion src/sentry/workflow_engine/tasks/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sentry.taskworker import namespaces
from sentry.taskworker.retry import Retry, retry_task
from sentry.utils import metrics
from sentry.utils.exceptions import quiet_redis_noise
from sentry.utils.locking import UnableToAcquireLock
from sentry.workflow_engine.models import Detector
from sentry.workflow_engine.tasks.utils import (
Expand All @@ -23,7 +24,6 @@
)
from sentry.workflow_engine.types import WorkflowEventData
from sentry.workflow_engine.utils import log_context, scopedstats
from sentry.workflow_engine.utils.sentry_level_utils import quiet_redis_noise

logger = log_context.get_logger(__name__)

Expand Down
2 changes: 0 additions & 2 deletions src/sentry/workflow_engine/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
__all__ = [
"metrics_incr",
"MetricTags",
"timeout_grouping_context",
]

from .exception_grouping import timeout_grouping_context
from .metrics import MetricTags, metrics_incr
67 changes: 0 additions & 67 deletions src/sentry/workflow_engine/utils/sentry_level_utils.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from collections.abc import Callable
from typing import Any
from unittest.mock import Mock, patch

from sentry.taskworker.state import CurrentTaskState
from sentry.taskworker.workerchild import ProcessingDeadlineExceeded
from sentry.workflow_engine.utils.exception_grouping import (
from sentry.utils.exceptions import (
exception_grouping_context,
set_sentry_exception_levels,
timeout_grouping_context,
)

Expand Down Expand Up @@ -34,7 +36,7 @@ def test_with_task_state_and_single_exception_mapping(self) -> None:
)

with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task",
"sentry.utils.exceptions.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
Expand Down Expand Up @@ -85,7 +87,7 @@ def test_with_task_state_and_multiple_exception_mappings(self) -> None:
)

with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task",
"sentry.utils.exceptions.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
Expand Down Expand Up @@ -156,7 +158,7 @@ def test_with_task_state_and_non_mapped_exception(self) -> None:
)

with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task",
"sentry.utils.exceptions.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
Expand Down Expand Up @@ -193,10 +195,8 @@ def capture_processor(processor: Any) -> None:

def test_without_task_state(self) -> None:
"""Test that the context works when no task state is available."""
with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task", return_value=None
):
with patch("sentry.workflow_engine.utils.exception_grouping.logger") as mock_logger:
with patch("sentry.utils.exceptions.current_task", return_value=None):
with patch("sentry.utils.exceptions.logger") as mock_logger:
exception_mapping: dict[type[BaseException], str] = {
CustomError: "custom.error.fingerprint"
}
Expand All @@ -210,9 +210,7 @@ def test_without_task_state(self) -> None:
def test_context_manager_yields_correctly(self) -> None:
"""Test that the context manager yields correctly."""
executed = False
with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task", return_value=None
):
with patch("sentry.utils.exceptions.current_task", return_value=None):
exception_mapping: dict[type[BaseException], str] = {
CustomError: "custom.error.fingerprint"
}
Expand All @@ -239,7 +237,7 @@ class DerivedError(BaseError):
pass

with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task",
"sentry.utils.exceptions.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
Expand Down Expand Up @@ -289,7 +287,7 @@ def test_empty_exception_mapping(self) -> None:
)

with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task",
"sentry.utils.exceptions.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
Expand Down Expand Up @@ -339,7 +337,7 @@ def test_timeout_grouping_context_works_as_before(self) -> None:
)

with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task",
"sentry.utils.exceptions.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
Expand Down Expand Up @@ -387,7 +385,7 @@ def test_timeout_grouping_context_ignores_other_exceptions(self) -> None:
)

with patch(
"sentry.workflow_engine.utils.exception_grouping.current_task",
"sentry.utils.exceptions.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
Expand Down Expand Up @@ -418,3 +416,34 @@ def capture_processor(processor: Any) -> None:
# Event should be unchanged
assert result == {"original": "data"}
assert "fingerprint" not in result


class TestSetSentryExceptionLevels:
def test_basic_functionality_minimal_mocking(self) -> None:
with patch("sentry_sdk.new_scope") as mock_scope:
mock_scope_instance = Mock()
mock_scope.return_value.__enter__ = Mock(return_value=mock_scope_instance)
mock_scope.return_value.__exit__ = Mock(return_value=None)

# Use a single-element list to capture the processor
captured_processors: list[Callable[[Any, Any], Any]] = []
mock_scope_instance.add_error_processor = captured_processors.append

# Use the context manager with exception type as key
with set_sentry_exception_levels({ValueError: "warning"}):
pass

# Basic validation that processor was captured
assert len(captured_processors) == 1
processor = captured_processors[0]

# Test that it correctly processes a ValueError
event = {"level": "error", "other_data": "preserved"}
exc = ValueError("test error")
exc_info = (ValueError, exc, None)

result = processor(event, exc_info)

# Verify the level was changed and other data preserved
assert result["level"] == "warning"
assert result["other_data"] == "preserved"
Loading
Loading