Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions src/sentry/grouping/grouptype.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,20 @@
from sentry.models.group import DEFAULT_TYPE_ID
from sentry.types.group import PriorityLevel
from sentry.workflow_engine.endpoints.validators.error_detector import ErrorDetectorValidator
from sentry.workflow_engine.handlers.detector.base import DetectorHandler
from sentry.workflow_engine.models.data_source import DataPacket
from sentry.workflow_engine.types import (
DetectorEvaluationResult,
DetectorGroupKey,
DetectorSettings,
from sentry.workflow_engine.handlers.detector.base import (
DetectorHandler,
GroupedDetectorEvaluationResult,
)
from sentry.workflow_engine.models.data_source import DataPacket
from sentry.workflow_engine.types import DetectorSettings

T = TypeVar("T")


class ErrorDetectorHandler(DetectorHandler):
def evaluate(
self, data_packet: DataPacket[T]
) -> dict[DetectorGroupKey, DetectorEvaluationResult]:
def evaluate_impl(self, data_packet: DataPacket[T]) -> GroupedDetectorEvaluationResult:
# placeholder
return {}
return GroupedDetectorEvaluationResult(result={}, tainted=False)


@dataclass(frozen=True)
Expand Down
9 changes: 8 additions & 1 deletion src/sentry/workflow_engine/handlers/detector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
"DetectorHandler",
"DetectorOccurrence",
"DetectorStateData",
"GroupedDetectorEvaluationResult",
"StatefulDetectorHandler",
]

from .base import DataPacketEvaluationType, DataPacketType, DetectorHandler, DetectorOccurrence
from .base import (
DataPacketEvaluationType,
DataPacketType,
DetectorHandler,
DetectorOccurrence,
GroupedDetectorEvaluationResult,
)
from .stateful import DetectorStateData, StatefulDetectorHandler
28 changes: 26 additions & 2 deletions src/sentry/workflow_engine/handlers/detector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sentry.issues.grouptype import GroupType
from sentry.issues.issue_occurrence import IssueEvidence, IssueOccurrence
from sentry.types.actor import Actor
from sentry.utils import metrics
from sentry.workflow_engine.models import DataConditionGroup, DataPacket, Detector
from sentry.workflow_engine.processors.data_condition_group import ProcessedDataConditionGroup
from sentry.workflow_engine.types import (
Expand Down Expand Up @@ -77,6 +78,12 @@ def to_issue_occurrence(
)


@dataclass(frozen=True)
class GroupedDetectorEvaluationResult:
result: dict[DetectorGroupKey, DetectorEvaluationResult]
tainted: bool


# TODO - @saponifi3d - Change this class to be a pure ABC and remove the `__init__` method.
# TODO - @saponifi3d - Once the change is made, we should introduce a `BaseDetector` class to evaluate simple cases
class DetectorHandler(abc.ABC, Generic[DataPacketType, DataPacketEvaluationType]):
Expand All @@ -102,10 +109,27 @@ def __init__(self, detector: Detector):
else:
self.condition_group = None

@abc.abstractmethod
def evaluate(
self, data_packet: DataPacket[DataPacketType]
) -> dict[DetectorGroupKey, DetectorEvaluationResult] | None:
) -> dict[DetectorGroupKey, DetectorEvaluationResult]:
tags = {
"detector_type": self.detector.type,
"result": "unknown",
}
try:
value = self.evaluate_impl(data_packet)
tags["result"] = "tainted" if value.tainted else "success"
metrics.incr("workflow_engine_detector.evaluation", tags=tags, sample_rate=1.0)
return value.result
except Exception:
tags["result"] = "failure"
metrics.incr("workflow_engine_detector.evaluation", tags=tags, sample_rate=1.0)
raise

@abc.abstractmethod
def evaluate_impl(
Comment on lines +129 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than changing the external interface on all of the detectors, could we just have like a _internal_evaluate or whatevs? (I'm also hoping to get some time to make a proper abstract class and then an implementing Detector class that is stateless, #TODO)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I've debated.
PEP8 is a bit fuzzy here, but the conventional interpretation seems to be (and I think stdlib shows this) that underscore prefixed means "not part of the interface" but anything we're exposing to subclasses is part of the interface.
So, I went with that. (ChatGPT agreed when asked in neutral language).
I think it's fuzzy, so I'm happy to come back and rename.

self, data_packet: DataPacket[DataPacketType]
) -> GroupedDetectorEvaluationResult:
"""
This method is used to evaluate the data packet's value against the conditions on the detector.
"""
Expand Down
16 changes: 11 additions & 5 deletions src/sentry/workflow_engine/handlers/detector/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DetectorHandler,
DetectorOccurrence,
EventData,
GroupedDetectorEvaluationResult,
)
from sentry.workflow_engine.models import DataPacket, Detector, DetectorState
from sentry.workflow_engine.processors.data_condition_group import (
Expand Down Expand Up @@ -371,14 +372,16 @@ def _build_workflow_engine_evidence_data(
],
}

def evaluate(
def evaluate_impl(
self, data_packet: DataPacket[DataPacketType]
) -> dict[DetectorGroupKey, DetectorEvaluationResult]:
) -> GroupedDetectorEvaluationResult:
dedupe_value = self.extract_dedupe_value(data_packet)
group_data_values = self._extract_value_from_packet(data_packet)
state = self.state_manager.get_state_data(list(group_data_values.keys()))
results: dict[DetectorGroupKey, DetectorEvaluationResult] = {}

tainted = False

for group_key, data_value in group_data_values.items():
state_data: DetectorStateData = state[group_key]
if dedupe_value <= state_data.dedupe_value:
Expand All @@ -391,7 +394,10 @@ def evaluate(
group_data_values[group_key]
)

if condition_results is None or condition_results.logic_result is False:
if condition_results is not None and condition_results.logic_result.is_tainted():
tainted = True

if condition_results is None or condition_results.logic_result.triggered is False:
# Invalid condition result, nothing we can do
# Or if we didn't match any conditions in the evaluation
continue
Expand Down Expand Up @@ -441,7 +447,7 @@ def evaluate(
)

self.state_manager.commit_state_updates()
return results
return GroupedDetectorEvaluationResult(result=results, tainted=tainted)

def _create_resolve_message(
self,
Expand Down Expand Up @@ -624,7 +630,7 @@ def _evaluation_detector_conditions(
},
)

if condition_evaluation.logic_result:
if condition_evaluation.logic_result.triggered:
validated_condition_results: list[DetectorPriorityLevel] = [
condition_result.result
for condition_result in condition_evaluation.condition_results
Expand Down
31 changes: 16 additions & 15 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr
from sentry.utils import metrics, registry
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionResult, DetectorPriorityLevel
from sentry.workflow_engine.types import ConditionError, DataConditionResult, DetectorPriorityLevel
from sentry.workflow_engine.utils import scopedstats

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -145,7 +145,7 @@ def get_snapshot(self) -> dict[str, Any]:
"condition_result": self.condition_result,
}

def get_condition_result(self) -> DataConditionResult:
def get_condition_result(self) -> DataConditionResult | ConditionError:
match self.condition_result:
case float() | bool():
return self.condition_result
Expand All @@ -159,15 +159,15 @@ def get_condition_result(self) -> DataConditionResult:
"Invalid condition result",
extra={"condition_result": self.condition_result, "id": self.id},
)
return ConditionError(msg="Invalid condition result")

return None

def _evaluate_operator(self, condition_type: Condition, value: T) -> DataConditionResult:
def _evaluate_operator(
self, condition_type: Condition, value: T
) -> DataConditionResult | ConditionError:
# If the condition is a base type, handle it directly
op = CONDITION_OPS[condition_type]
result = None
try:
result = op(cast(Any, value), self.comparison)
return op(cast(Any, value), self.comparison)
except TypeError:
logger.exception(
"Invalid comparison for data condition",
Expand All @@ -178,19 +178,20 @@ def _evaluate_operator(self, condition_type: Condition, value: T) -> DataConditi
"condition_id": self.id,
},
)

return result
return ConditionError(msg="Invalid comparison for data condition")

@scopedstats.timer()
def _evaluate_condition(self, condition_type: Condition, value: T) -> DataConditionResult:
def _evaluate_condition(
self, condition_type: Condition, value: T
) -> DataConditionResult | ConditionError:
try:
handler = condition_handler_registry.get(condition_type)
except registry.NoRegistrationExistsError:
logger.exception(
"No registration exists for condition",
extra={"type": self.type, "id": self.id},
)
return None
return ConditionError(msg="No registration exists for condition")

should_be_fast = not is_slow_condition(self)
start_time = time.time()
Expand All @@ -212,7 +213,7 @@ def _evaluate_condition(self, condition_type: Condition, value: T) -> DataCondit
"error": str(e),
},
)
return None
return ConditionError(msg=str(e))
finally:
duration = time.time() - start_time
if should_be_fast and duration >= FAST_CONDITION_TOO_SLOW_THRESHOLD.total_seconds():
Expand All @@ -230,17 +231,17 @@ def _evaluate_condition(self, condition_type: Condition, value: T) -> DataCondit

return result

def evaluate_value(self, value: T) -> DataConditionResult:
def evaluate_value(self, value: T) -> DataConditionResult | ConditionError:
try:
condition_type = Condition(self.type)
except ValueError:
logger.exception(
"Invalid condition type",
extra={"type": self.type, "id": self.id},
)
return None
return ConditionError(msg="Invalid condition type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just raise these as errors, then handle them in the evaluate methods? just thinking it'd be nice to have a single place to be able to change the control flow, not sure how much we'd need to change in the evaluate off the top of my head to support that as a new flow tho.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an important question, and I'm not settled on it, but I leaned toward return value like this because:

  1. Mypy can tell us where it isn't being handled.
  2. These errors are theoretically already handled/reported, so a new exception type gives the wrong idea. If you don't care about the result, you can ignore it and that's fine. if you do care, you have to decide what to do for errors.


result: DataConditionResult
result: DataConditionResult | ConditionError
if condition_type in CONDITION_OPS:
result = self._evaluate_operator(condition_type, value)
else:
Expand Down
9 changes: 5 additions & 4 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from sentry.models.owner_base import OwnerModel
from sentry.workflow_engine.models.data_condition import DataCondition, is_slow_condition
from sentry.workflow_engine.models.data_condition_group import DataConditionGroup
from sentry.workflow_engine.types import WorkflowEventData
from sentry.workflow_engine.processors.data_condition_group import TriggerResult
from sentry.workflow_engine.types import ConditionError, WorkflowEventData

from .json_config import JSONConfigBase

Expand Down Expand Up @@ -93,7 +94,7 @@ def get_audit_log_data(self) -> dict[str, Any]:

def evaluate_trigger_conditions(
self, event_data: WorkflowEventData, when_data_conditions: list[DataCondition] | None = None
) -> tuple[bool, list[DataCondition]]:
) -> tuple[TriggerResult, list[DataCondition]]:
"""
Evaluate the conditions for the workflow trigger and return if the evaluation was successful.
If there aren't any workflow trigger conditions, the workflow is considered triggered.
Expand All @@ -104,7 +105,7 @@ def evaluate_trigger_conditions(
)

if self.when_condition_group_id is None:
return True, []
return TriggerResult.TRUE, []

workflow_event_data = replace(event_data, workflow_env=self.environment)
try:
Expand All @@ -116,7 +117,7 @@ def evaluate_trigger_conditions(
"DataConditionGroup does not exist",
extra={"id": self.when_condition_group_id},
)
return False, []
return TriggerResult(False, ConditionError(msg="DataConditionGroup does not exist")), []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something i was grappling about on my pr for this stuff, should we make an enum for the error messages? so we could do like ConditionErrorMessages.DOES_NOT_EXIST kind of thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this context I'm not really expecting anyone to use the message text, it's more of a comment threaded through at runtime, so in this specific case i prefer it to be loose because we don't gain much by standardization.

group_evaluation, remaining_conditions = process_data_condition_group(
group, workflow_event_data, when_data_conditions
)
Expand Down
Loading
Loading