From 36b23831fcfe5da985685fb1c8ac3dba17906fc5 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 25 Nov 2025 12:00:42 -0800 Subject: [PATCH 1/6] don't fire action if there is a conflict when creating WAGS --- .../workflow_engine/processors/action.py | 57 +++++++++++++------ .../workflow_engine/processors/test_action.py | 34 ++++++++++- 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/src/sentry/workflow_engine/processors/action.py b/src/sentry/workflow_engine/processors/action.py index 3580d600831948..989b6d35593986 100644 --- a/src/sentry/workflow_engine/processors/action.py +++ b/src/sentry/workflow_engine/processors/action.py @@ -2,7 +2,7 @@ from collections import defaultdict from datetime import datetime, timedelta -from django.db import models +from django.db import connection, models from django.db.models import Case, Value, When from django.utils import timezone @@ -114,24 +114,43 @@ def process_workflow_action_group_statuses( def update_workflow_action_group_statuses( now: datetime, statuses_to_update: set[int], missing_statuses: list[WorkflowActionGroupStatus] -) -> None: - WorkflowActionGroupStatus.objects.filter( +) -> tuple[int, int, list[tuple[int, int]]]: + updated_count = WorkflowActionGroupStatus.objects.filter( id__in=statuses_to_update, date_updated__lt=now ).update(date_updated=now) - all_statuses = WorkflowActionGroupStatus.objects.bulk_create( - missing_statuses, - batch_size=1000, - ignore_conflicts=True, - ) - missing_status_pairs = [ - (status.workflow_id, status.action_id) for status in all_statuses if status.id is None + if not missing_statuses: + return updated_count, 0, [] + + # Use raw SQL: only returns successfully created rows + with connection.cursor() as cursor: + # Build values for batch insert + values_placeholders = [] + values_data = [] + for s in missing_statuses: + values_placeholders.append("(%s, %s, %s, %s, %s)") + values_data.extend([s.workflow_id, s.action_id, s.group_id, now, now]) + + sql = f""" + INSERT INTO workflow_engine_workflowactiongroupstatus + (workflow_id, action_id, group_id, date_added, date_updated) + VALUES {', '.join(values_placeholders)} + ON CONFLICT (workflow_id, action_id, group_id) DO NOTHING + RETURNING workflow_id, action_id + """ + + cursor.execute(sql, values_data) + created_rows = set(cursor.fetchall()) # Only returns newly inserted rows + + # Figure out which ones conflicted (weren't returned) + uncreated_statuses = [ + (s.workflow_id, s.action_id) + for s in missing_statuses + if (s.workflow_id, s.action_id) not in created_rows ] - if missing_status_pairs: - logger.warning( - "Failed to create WorkflowActionGroupStatus objects", - extra={"missing_status_pairs": missing_status_pairs}, - ) + + created_count = len(created_rows) + return updated_count, created_count, uncreated_statuses def get_unique_active_actions( @@ -199,7 +218,13 @@ def filter_recently_fired_workflow_actions( now=now, ) ) - update_workflow_action_group_statuses(now, statuses_to_update, missing_statuses) + _, _, uncreated_statuses = update_workflow_action_group_statuses( + now, statuses_to_update, missing_statuses + ) + + # if statuses were not created for some reason, we should not fire for them + for workflow_id, action_id in uncreated_statuses: + action_to_workflow_ids.pop(action_id, None) actions_queryset = Action.objects.filter(id__in=list(action_to_workflow_ids.keys())) diff --git a/tests/sentry/workflow_engine/processors/test_action.py b/tests/sentry/workflow_engine/processors/test_action.py index acef8e820ed4c6..8866340095083b 100644 --- a/tests/sentry/workflow_engine/processors/test_action.py +++ b/tests/sentry/workflow_engine/processors/test_action.py @@ -107,7 +107,10 @@ def test_multiple_workflows_single_action__first_fire(self) -> None: # dedupes action if both workflows will fire it assert set(triggered_actions) == {self.action} # Dedupes action so we have a single workflow_id -> environment to fire with - assert {getattr(action, "workflow_id") for action in triggered_actions} == {workflow.id} + assert getattr(triggered_actions[0], "workflow_id") in { + self.workflow.id, + workflow.id, + } # either is valid assert WorkflowActionGroupStatus.objects.filter(action=self.action).count() == 2 @@ -222,6 +225,35 @@ def test_update_workflow_action_group_statuses(self) -> None: for status in all_statuses: assert status.date_updated == timezone.now() + def test_returns_uncreated_statuses(self) -> None: + WorkflowActionGroupStatus.objects.create( + workflow=self.workflow, action=self.action, group=self.group + ) + + statuses_to_create = [ + WorkflowActionGroupStatus( + workflow=self.workflow, + action=self.action, + group=self.group, + date_updated=timezone.now(), + ) + ] + _, _, uncreated_statuses = update_workflow_action_group_statuses( + timezone.now(), {}, statuses_to_create + ) + + assert uncreated_statuses == [(self.workflow.id, self.action.id)] + + @patch("sentry.workflow_engine.processors.action.update_workflow_action_group_statuses") + def test_does_not_fire_for_uncreated_statuses(self, mock_update: MagicMock) -> None: + mock_update.return_value = (0, 0, [(self.workflow.id, self.action.id)]) + + triggered_actions = filter_recently_fired_workflow_actions( + set(DataConditionGroup.objects.all()), self.event_data + ) + + assert set(triggered_actions) == set() + class TestIsActionPermitted(BaseWorkflowTest): @patch("sentry.workflow_engine.processors.action._get_integration_features") From 636023388df2b383020ff53b820559154615e064 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 25 Nov 2025 13:17:00 -0800 Subject: [PATCH 2/6] bot suggested change --- .../workflow_engine/processors/action.py | 20 ++++++++-------- .../workflow_engine/processors/test_action.py | 23 +++++++++++++++++-- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/sentry/workflow_engine/processors/action.py b/src/sentry/workflow_engine/processors/action.py index 989b6d35593986..6faaab8883f3fc 100644 --- a/src/sentry/workflow_engine/processors/action.py +++ b/src/sentry/workflow_engine/processors/action.py @@ -77,7 +77,7 @@ def process_workflow_action_group_statuses( Prepare the statuses to update and create. """ - action_to_workflow_ids: dict[int, int] = {} # will dedupe because there can be only 1 + updated_action_to_workflows_ids: dict[int, set[int]] = defaultdict(set) workflow_frequencies: dict[int, timedelta] = { workflow.id: workflow.config.get("frequency", 0) * timedelta(minutes=1) for workflow in workflows @@ -91,7 +91,7 @@ def process_workflow_action_group_statuses( status.workflow_id, zero_timedelta ): # we should fire the workflow for this action - action_to_workflow_ids[action_id] = status.workflow_id + updated_action_to_workflows_ids[action_id].add(status.workflow_id) statuses_to_update.add(status.id) missing_statuses: list[WorkflowActionGroupStatus] = [] @@ -107,9 +107,9 @@ def process_workflow_action_group_statuses( workflow_id=workflow_id, action_id=action_id, group=group, date_updated=now ) ) - action_to_workflow_ids[action_id] = workflow_id + updated_action_to_workflows_ids[action_id].add(workflow_id) - return action_to_workflow_ids, statuses_to_update, missing_statuses + return updated_action_to_workflows_ids, statuses_to_update, missing_statuses def update_workflow_action_group_statuses( @@ -209,7 +209,7 @@ def filter_recently_fired_workflow_actions( workflow_ids=workflow_ids, ) now = timezone.now() - action_to_workflow_ids, statuses_to_update, missing_statuses = ( + action_to_workflows_ids, statuses_to_update, missing_statuses = ( process_workflow_action_group_statuses( action_to_workflows_ids=action_to_workflows_ids, action_to_statuses=action_to_statuses, @@ -224,14 +224,16 @@ def filter_recently_fired_workflow_actions( # if statuses were not created for some reason, we should not fire for them for workflow_id, action_id in uncreated_statuses: - action_to_workflow_ids.pop(action_id, None) + action_to_workflows_ids[action_id].remove(workflow_id) + if not action_to_workflows_ids[action_id]: + action_to_workflows_ids.pop(action_id, None) - actions_queryset = Action.objects.filter(id__in=list(action_to_workflow_ids.keys())) + actions_queryset = Action.objects.filter(id__in=list(action_to_workflows_ids.keys())) # annotate actions with workflow_id they are firing for (deduped) workflow_id_cases = [ - When(id=action_id, then=Value(workflow_id)) - for action_id, workflow_id in action_to_workflow_ids.items() + When(id=action_id, then=Value(list(workflow_ids)[0])) + for action_id, workflow_ids in action_to_workflows_ids.items() ] return actions_queryset.annotate( diff --git a/tests/sentry/workflow_engine/processors/test_action.py b/tests/sentry/workflow_engine/processors/test_action.py index 8866340095083b..853337b1233e6a 100644 --- a/tests/sentry/workflow_engine/processors/test_action.py +++ b/tests/sentry/workflow_engine/processors/test_action.py @@ -194,8 +194,8 @@ def test_process_workflow_action_group_statuses(self) -> None: ) assert action_to_workflow_ids == { - self.action.id: self.workflow.id, - action.id: workflow.id, + self.action.id: {self.workflow.id}, + action.id: {workflow.id}, } assert statuses_to_update == {status_2.id} @@ -254,6 +254,25 @@ def test_does_not_fire_for_uncreated_statuses(self, mock_update: MagicMock) -> N assert set(triggered_actions) == set() + @patch("sentry.workflow_engine.processors.action.update_workflow_action_group_statuses") + def test_fires_for_non_conflicting_workflow(self, mock_update: MagicMock) -> None: + workflow = self.create_workflow(organization=self.organization, config={"frequency": 1440}) + action_group = self.create_data_condition_group(logic_type="any-short") + self.create_data_condition_group_action( + condition_group=action_group, + action=self.action, + ) # shared action + self.create_workflow_data_condition_group(workflow, action_group) + + mock_update.return_value = (0, 0, [(self.workflow.id, self.action.id)]) + + triggered_actions = filter_recently_fired_workflow_actions( + set(DataConditionGroup.objects.all()), self.event_data + ) + + assert set(triggered_actions) == {self.action} + assert getattr(triggered_actions[0], "workflow_id") == workflow.id + class TestIsActionPermitted(BaseWorkflowTest): @patch("sentry.workflow_engine.processors.action._get_integration_features") From 8a9757cfd84b2198e0291f7aeda13a2d7a92192e Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 25 Nov 2025 13:24:31 -0800 Subject: [PATCH 3/6] fix typing --- src/sentry/workflow_engine/processors/action.py | 2 +- tests/sentry/workflow_engine/processors/test_action.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/workflow_engine/processors/action.py b/src/sentry/workflow_engine/processors/action.py index 6faaab8883f3fc..d241b82ab6fb73 100644 --- a/src/sentry/workflow_engine/processors/action.py +++ b/src/sentry/workflow_engine/processors/action.py @@ -71,7 +71,7 @@ def process_workflow_action_group_statuses( workflows: BaseQuerySet[Workflow], group: Group, now: datetime, -) -> tuple[dict[int, int], set[int], list[WorkflowActionGroupStatus]]: +) -> tuple[dict[int, set[int]], set[int], list[WorkflowActionGroupStatus]]: """ Determine which workflow actions should be fired based on their statuses. Prepare the statuses to update and create. diff --git a/tests/sentry/workflow_engine/processors/test_action.py b/tests/sentry/workflow_engine/processors/test_action.py index 853337b1233e6a..bb4f35aaff3f79 100644 --- a/tests/sentry/workflow_engine/processors/test_action.py +++ b/tests/sentry/workflow_engine/processors/test_action.py @@ -239,7 +239,7 @@ def test_returns_uncreated_statuses(self) -> None: ) ] _, _, uncreated_statuses = update_workflow_action_group_statuses( - timezone.now(), {}, statuses_to_create + timezone.now(), set(), statuses_to_create ) assert uncreated_statuses == [(self.workflow.id, self.action.id)] From cefcea2abdeac33e2c81b12fb995eca2da7deb67 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 25 Nov 2025 13:27:21 -0800 Subject: [PATCH 4/6] remove unnecessary test change --- tests/sentry/workflow_engine/processors/test_action.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/sentry/workflow_engine/processors/test_action.py b/tests/sentry/workflow_engine/processors/test_action.py index bb4f35aaff3f79..29891311d9e6e0 100644 --- a/tests/sentry/workflow_engine/processors/test_action.py +++ b/tests/sentry/workflow_engine/processors/test_action.py @@ -107,10 +107,7 @@ def test_multiple_workflows_single_action__first_fire(self) -> None: # dedupes action if both workflows will fire it assert set(triggered_actions) == {self.action} # Dedupes action so we have a single workflow_id -> environment to fire with - assert getattr(triggered_actions[0], "workflow_id") in { - self.workflow.id, - workflow.id, - } # either is valid + assert getattr(triggered_actions[0], "workflow_id") == self.workflow.id assert WorkflowActionGroupStatus.objects.filter(action=self.action).count() == 2 From 944460db79269f9fe5d074efa3ca786c2b1bee69 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 25 Nov 2025 14:34:46 -0800 Subject: [PATCH 5/6] fixes from review --- src/sentry/workflow_engine/processors/action.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/sentry/workflow_engine/processors/action.py b/src/sentry/workflow_engine/processors/action.py index d241b82ab6fb73..53c151ad397e54 100644 --- a/src/sentry/workflow_engine/processors/action.py +++ b/src/sentry/workflow_engine/processors/action.py @@ -36,6 +36,9 @@ logger = logging.getLogger(__name__) EnqueuedAction = tuple[DataConditionGroup, list[DataCondition]] +UpdatedStatuses = int +CreatedStatuses = int +ConflictedStatuses = list[tuple[int, int]] # (workflow_id, action_id) def get_workflow_action_group_statuses( @@ -114,7 +117,7 @@ def process_workflow_action_group_statuses( def update_workflow_action_group_statuses( now: datetime, statuses_to_update: set[int], missing_statuses: list[WorkflowActionGroupStatus] -) -> tuple[int, int, list[tuple[int, int]]]: +) -> tuple[UpdatedStatuses, CreatedStatuses, ConflictedStatuses]: updated_count = WorkflowActionGroupStatus.objects.filter( id__in=statuses_to_update, date_updated__lt=now ).update(date_updated=now) @@ -123,6 +126,7 @@ def update_workflow_action_group_statuses( return updated_count, 0, [] # Use raw SQL: only returns successfully created rows + # XXX: the query does not currently include batch size limit like bulk_create does with connection.cursor() as cursor: # Build values for batch insert values_placeholders = [] @@ -143,14 +147,14 @@ def update_workflow_action_group_statuses( created_rows = set(cursor.fetchall()) # Only returns newly inserted rows # Figure out which ones conflicted (weren't returned) - uncreated_statuses = [ + conflicted_statuses = [ (s.workflow_id, s.action_id) for s in missing_statuses if (s.workflow_id, s.action_id) not in created_rows ] created_count = len(created_rows) - return updated_count, created_count, uncreated_statuses + return updated_count, created_count, conflicted_statuses def get_unique_active_actions( @@ -226,13 +230,15 @@ def filter_recently_fired_workflow_actions( for workflow_id, action_id in uncreated_statuses: action_to_workflows_ids[action_id].remove(workflow_id) if not action_to_workflows_ids[action_id]: - action_to_workflows_ids.pop(action_id, None) + action_to_workflows_ids.pop(action_id) actions_queryset = Action.objects.filter(id__in=list(action_to_workflows_ids.keys())) # annotate actions with workflow_id they are firing for (deduped) workflow_id_cases = [ - When(id=action_id, then=Value(list(workflow_ids)[0])) + When( + id=action_id, then=Value(min(list(workflow_ids))) + ) # select 1 workflow to fire for, this is arbitrary but deterministic for action_id, workflow_ids in action_to_workflows_ids.items() ] From f99c4187f5440d79d711e81159762503ab9d7d39 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 25 Nov 2025 14:51:17 -0800 Subject: [PATCH 6/6] naming var --- src/sentry/workflow_engine/processors/action.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/workflow_engine/processors/action.py b/src/sentry/workflow_engine/processors/action.py index 53c151ad397e54..c444f0df4e07c7 100644 --- a/src/sentry/workflow_engine/processors/action.py +++ b/src/sentry/workflow_engine/processors/action.py @@ -222,12 +222,12 @@ def filter_recently_fired_workflow_actions( now=now, ) ) - _, _, uncreated_statuses = update_workflow_action_group_statuses( + _, _, conflicted_statuses = update_workflow_action_group_statuses( now, statuses_to_update, missing_statuses ) # if statuses were not created for some reason, we should not fire for them - for workflow_id, action_id in uncreated_statuses: + for workflow_id, action_id in conflicted_statuses: action_to_workflows_ids[action_id].remove(workflow_id) if not action_to_workflows_ids[action_id]: action_to_workflows_ids.pop(action_id)