Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/sentry/api/serializers/models/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from django.db.models import Max, Prefetch, Q, prefetch_related_objects
from rest_framework import serializers

from sentry import features
from sentry.api.serializers import Serializer, register
from sentry.constants import ObjectStatus
from sentry.db.models.manager.base_query_set import BaseQuerySet
Expand Down Expand Up @@ -210,10 +209,7 @@ def get_attrs(self, item_list, user, **kwargs):
}

# Update lastTriggered with WorkflowFireHistory if available
if item_list and features.has(
"organizations:workflow-engine-single-process-workflows",
item_list[0].project.organization,
):
if item_list:
rule_ids = [rule.id for rule in item_list]
workflow_rule_lookup = dict(
AlertRuleWorkflow.objects.filter(rule_id__in=rule_ids).values_list(
Expand Down
181 changes: 87 additions & 94 deletions src/sentry/rules/history/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from django.db.models import Count, Max, OuterRef, Subquery
from django.db.models.functions import TruncHour

from sentry import features
from sentry.api.paginator import GenericOffsetPaginator, OffsetPaginator
from sentry.models.group import Group
from sentry.models.rulefirehistory import RuleFireHistory
Expand Down Expand Up @@ -69,60 +68,57 @@ def fetch_rule_groups_paginated(
cursor: Cursor | None = None,
per_page: int = 25,
) -> CursorResult[RuleGroupHistory]:
if features.has(
"organizations:workflow-engine-single-process-workflows", rule.project.organization
):
try:
alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id)
workflow = alert_rule_workflow.workflow

# Performs the raw SQL query with pagination
def data_fn(offset: int, limit: int) -> list[_Result]:
query = """
WITH combined_data AS (
SELECT group_id, date_added, event_id
FROM sentry_rulefirehistory
WHERE rule_id = %s AND date_added >= %s AND date_added < %s
UNION ALL
SELECT group_id, date_added, event_id
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s AND date_added < %s
try:
alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id)
workflow = alert_rule_workflow.workflow

# Performs the raw SQL query with pagination
def data_fn(offset: int, limit: int) -> list[_Result]:
query = """
WITH combined_data AS (
SELECT group_id, date_added, event_id
FROM sentry_rulefirehistory
WHERE rule_id = %s AND date_added >= %s AND date_added < %s
UNION ALL
SELECT group_id, date_added, event_id
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s AND date_added < %s
)
SELECT
group_id as group,
COUNT(*) as count,
MAX(date_added) as last_triggered,
(ARRAY_AGG(event_id ORDER BY date_added DESC))[1] as event_id
FROM combined_data
GROUP BY group_id
ORDER BY count DESC, last_triggered DESC
LIMIT %s OFFSET %s
"""

with connection.cursor() as cursor:
cursor.execute(
query, [rule.id, start, end, workflow.id, start, end, limit, offset]
)
return [
_Result(
group=row[0],
count=row[1],
last_triggered=row[2],
event_id=row[3],
)
SELECT
group_id as group,
COUNT(*) as count,
MAX(date_added) as last_triggered,
(ARRAY_AGG(event_id ORDER BY date_added DESC))[1] as event_id
FROM combined_data
GROUP BY group_id
ORDER BY count DESC, last_triggered DESC
LIMIT %s OFFSET %s
"""
for row in cursor.fetchall()
]

with connection.cursor() as cursor:
cursor.execute(
query, [rule.id, start, end, workflow.id, start, end, limit, offset]
)
return [
_Result(
group=row[0],
count=row[1],
last_triggered=row[2],
event_id=row[3],
)
for row in cursor.fetchall()
]

result = GenericOffsetPaginator(data_fn=data_fn).get_result(per_page, cursor)
result.results = convert_results(result.results)

return result

except AlertRuleWorkflow.DoesNotExist:
# If no workflow is associated with this rule, just use the original behavior
logger.exception("No workflow associated with rule", extra={"rule_id": rule.id})
pass
result = GenericOffsetPaginator(data_fn=data_fn).get_result(per_page, cursor)
result.results = convert_results(result.results)

return result

except AlertRuleWorkflow.DoesNotExist:
# If no workflow is associated with this rule, just use the original behavior
logger.exception("No workflow associated with rule", extra={"rule_id": rule.id})
pass

rule_filtered_history = RuleFireHistory.objects.filter(
rule=rule,
Expand Down Expand Up @@ -154,50 +150,47 @@ def fetch_rule_hourly_stats(

existing_data: dict[datetime, TimeSeriesValue] = {}

if features.has(
"organizations:workflow-engine-single-process-workflows", rule.project.organization
):
try:
alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id)
workflow = alert_rule_workflow.workflow

# Use raw SQL to combine data from both tables
with connection.cursor() as db_cursor:
db_cursor.execute(
"""
SELECT
DATE_TRUNC('hour', date_added) as bucket,
COUNT(*) as count
FROM (
SELECT date_added
FROM sentry_rulefirehistory
WHERE rule_id = %s
AND date_added >= %s
AND date_added < %s

UNION ALL

SELECT date_added
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s
AND date_added < %s
) combined_data
GROUP BY DATE_TRUNC('hour', date_added)
ORDER BY bucket
""",
[rule.id, start, end, workflow.id, start, end],
)
try:
alert_rule_workflow = AlertRuleWorkflow.objects.get(rule_id=rule.id)
workflow = alert_rule_workflow.workflow

# Use raw SQL to combine data from both tables
with connection.cursor() as db_cursor:
db_cursor.execute(
"""
SELECT
DATE_TRUNC('hour', date_added) as bucket,
COUNT(*) as count
FROM (
SELECT date_added
FROM sentry_rulefirehistory
WHERE rule_id = %s
AND date_added >= %s
AND date_added < %s

UNION ALL

SELECT date_added
FROM workflow_engine_workflowfirehistory
WHERE workflow_id = %s
AND date_added >= %s
AND date_added < %s
) combined_data
GROUP BY DATE_TRUNC('hour', date_added)
ORDER BY bucket
""",
[rule.id, start, end, workflow.id, start, end],
)

results = db_cursor.fetchall()
results = db_cursor.fetchall()

# Convert raw SQL results to the expected format
existing_data = {row[0]: TimeSeriesValue(row[0], row[1]) for row in results}
# Convert raw SQL results to the expected format
existing_data = {row[0]: TimeSeriesValue(row[0], row[1]) for row in results}

except AlertRuleWorkflow.DoesNotExist:
# If no workflow is associated with this rule, just use the original behavior
logger.exception("No workflow associated with rule", extra={"rule_id": rule.id})
pass
except AlertRuleWorkflow.DoesNotExist:
# If no workflow is associated with this rule, just use the original behavior
logger.exception("No workflow associated with rule", extra={"rule_id": rule.id})
pass

if not existing_data:
qs = (
Expand Down
3 changes: 0 additions & 3 deletions tests/sentry/api/serializers/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from sentry.rules.filters.tagged_event import TaggedEventFilter
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.datetime import before_now, freeze_time
from sentry.testutils.helpers.features import with_feature
from sentry.users.services.user.serial import serialize_rpc_user
from sentry.workflow_engine.migration_helpers.issue_alert_migration import IssueAlertMigrator
from sentry.workflow_engine.models import WorkflowDataConditionGroup, WorkflowFireHistory
Expand All @@ -36,7 +35,6 @@ def test_last_triggered_rule_only(self) -> None:
result = serialize(rule, self.user, RuleSerializer(expand=["lastTriggered"]))
assert result["lastTriggered"] == timezone.now()

@with_feature("organizations:workflow-engine-single-process-workflows")
def test_last_triggered_with_workflow_only(self) -> None:
rule = self.create_project_rule()

Expand All @@ -50,7 +48,6 @@ def test_last_triggered_with_workflow_only(self) -> None:
result = serialize(rule, self.user, RuleSerializer(expand=["lastTriggered"]))
assert result["lastTriggered"] == timezone.now()

@with_feature("organizations:workflow-engine-single-process-workflows")
def test_last_triggered_with_workflow(self) -> None:
rule = self.create_project_rule()

Expand Down
3 changes: 0 additions & 3 deletions tests/sentry/rules/history/backends/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from sentry.rules.history.base import RuleGroupHistory
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.datetime import before_now, freeze_time
from sentry.testutils.helpers.features import with_feature
from sentry.testutils.skips import requires_snuba
from sentry.workflow_engine.models import AlertRuleWorkflow, WorkflowFireHistory

Expand Down Expand Up @@ -196,7 +195,6 @@ def test_event_id(self) -> None:
],
)

@with_feature("organizations:workflow-engine-single-process-workflows")
def test_combined_rule_and_workflow_history(self) -> None:
"""Test combining RuleFireHistory and WorkflowFireHistory when feature flag is enabled"""
rule = self.create_project_rule(project=self.event.project)
Expand Down Expand Up @@ -335,7 +333,6 @@ def test(self) -> None:
assert len(results) == 24
assert [r.count for r in results[-5:]] == [0, 0, 1, 1, 0]

@with_feature("organizations:workflow-engine-single-process-workflows")
def test_combined_rule_and_workflow_history(self) -> None:
"""Test combining RuleFireHistory and WorkflowFireHistory for hourly stats when feature flag is enabled"""
rule = self.create_project_rule(project=self.event.project)
Expand Down
Loading