Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processing): Separate second query of percentage comparison #73099

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions src/sentry/rules/conditions/event_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,16 @@ def disable_consistent_snuba_mode(
return option_override_cm

def get_comparison_start_end(
self, interval: timedelta, duration: timedelta
self, end: datetime, duration: timedelta, interval: timedelta | None = None
) -> tuple[datetime, datetime]:
"""
Calculate the start and end times for the query. `interval` is only used for EventFrequencyPercentCondition
as the '5 minutes' in The issue affects more than 100 percent of sessions in 5 minutes, otherwise it's the current time.
`duration` is the time frame in which the condition is measuring counts, e.g. the '10 minutes' in
"The issue is seen more than 100 times in 10 minutes"
"""
end = timezone.now() - interval
if interval:
end = end - interval
start = end - duration
return (start, end)

Expand All @@ -276,14 +277,17 @@ def get_rate(
environment_id: int,
comparison_type: str,
) -> int:
start, end = self.get_comparison_start_end(timedelta(), duration)
current_time = timezone.now()
start, end = self.get_comparison_start_end(current_time, duration)
schew2381 marked this conversation as resolved.
Show resolved Hide resolved
with self.disable_consistent_snuba_mode(duration):
result = self.query(event, start, end, environment_id=environment_id)
if comparison_type == ComparisonType.PERCENT:
# TODO: Figure out if there's a way we can do this less frequently. All queries are
# automatically cached for 10s. We could consider trying to cache this and the main
# query for 20s to reduce the load.
start, end = self.get_comparison_start_end(comparison_interval, duration)
start, end = self.get_comparison_start_end(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RE: comments above

1 - lets make this kwargs
2 - lets pre-calucate the end here

current_time, duration, comparison_interval
)
comparison_result = self.query(event, start, end, environment_id=environment_id)
result = percent_increase(result, comparison_result)

Expand All @@ -292,31 +296,24 @@ def get_rate(
def get_rate_bulk(
self,
duration: timedelta,
comparison_interval: timedelta,
group_ids: set[int],
environment_id: int,
comparison_type: str,
current_time: datetime,
comparison_interval: timedelta | None = None,
) -> dict[int, int]:
start, end = self.get_comparison_start_end(timedelta(), duration)
if comparison_type == ComparisonType.COUNT:
start, end = self.get_comparison_start_end(current_time, duration)
elif comparison_type == ComparisonType.PERCENT:
start, end = self.get_comparison_start_end(current_time, duration, comparison_interval)

with self.disable_consistent_snuba_mode(duration):
result = self.batch_query(
group_ids=group_ids,
start=start,
end=end,
environment_id=environment_id,
)
if comparison_type == ComparisonType.PERCENT:
start, comparison_end = self.get_comparison_start_end(comparison_interval, duration)
comparison_result = self.batch_query(
group_ids=group_ids,
start=start,
end=comparison_end,
environment_id=environment_id,
)
result = {
group_id: percent_increase(result[group_id], comparison_result[group_id])
for group_id in group_ids
}
return result

def get_snuba_query_result(
Expand Down
165 changes: 110 additions & 55 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
BaseEventFrequencyCondition,
ComparisonType,
EventFrequencyConditionData,
percent_increase,
)
from sentry.rules.processing.processor import (
PROJECT_ID_BUFFER_LIST_KEY,
Expand All @@ -38,17 +39,24 @@
EVENT_LIMIT = 100


class UniqueCondition(NamedTuple):
class UniqueConditionQuery(NamedTuple):
"""
Represents all the data that uniquely identifies a condition class and its
single respective Snuba query that must be made. Multiple instances of the
same condition class can share the single query.
"""

cls_id: str
interval: str
environment_id: int
comparisonInterval: str | None = None

def __repr__(self):
return f"id: {self.cls_id},\ninterval: {self.interval},\nenv id: {self.environment_id}"


class DataAndGroups(NamedTuple):
data: EventFrequencyConditionData | None
data: EventFrequencyConditionData
group_ids: set[int]

def __repr__(self):
Expand Down Expand Up @@ -91,42 +99,60 @@ def get_rules_to_slow_conditions(
return rules_to_slow_conditions


def get_condition_groups(
def _generate_unique_queries(
condition_data: EventFrequencyConditionData, environment_id: int
) -> list[UniqueConditionQuery]:
"""
Returns a list of all unique condition queries that must be made for the
given condition instance.
Count comparison conditions will only have one unique query, while percent
comparison conditions will have two unique queries.
"""
unique_queries = [
UniqueConditionQuery(condition_data["id"], condition_data["interval"], environment_id)
]
if condition_data.get("comparisonType") == ComparisonType.PERCENT:
# We will later compare the first query results against the second query to calculate
# a percentage for percentage comparison conditions.
comparison_interval = condition_data.get("comparisonInterval", "5")
unique_queries.append(unique_queries[0]._replace(comparisonInterval=comparison_interval))
return unique_queries


def get_condition_query_groups(
alert_rules: list[Rule], rules_to_groups: DefaultDict[int, set[int]]
) -> dict[UniqueCondition, DataAndGroups]:
) -> dict[UniqueConditionQuery, DataAndGroups]:
"""
Map unique conditions to the group IDs that need to checked for that
condition. We also store a pointer to that condition's JSON so we can
instantiate the class later
Map unique condition queries to the group IDs that need to checked for that
query. We also store a pointer to that condition's JSON so we can
instantiate the class later.
"""
condition_groups: dict[UniqueCondition, DataAndGroups] = {}
condition_groups: dict[UniqueConditionQuery, DataAndGroups] = {}
for rule in alert_rules:
# We only want a rule's slow conditions because alert_rules are only added
# to the buffer if we've already checked their fast conditions.
slow_conditions = get_slow_conditions(rule)
for condition_data in slow_conditions:
if condition_data:
unique_condition = UniqueCondition(
condition_data["id"], condition_data["interval"], rule.environment_id
)
for unique_cond in _generate_unique_queries(condition_data, rule.environment_id):
# Add to set of group_ids if there are already group_ids
# that apply to the unique condition
if data_and_groups := condition_groups.get(unique_condition):
# that apply to the unique condition query.
if data_and_groups := condition_groups.get(unique_cond):
data_and_groups.group_ids.update(rules_to_groups[rule.id])
# Otherwise, create the tuple containing the condition data and the
# set of group_ids that apply to the unique condition
# set of group_ids that apply to the unique condition query.
else:
condition_groups[unique_condition] = DataAndGroups(
condition_groups[unique_cond] = DataAndGroups(
condition_data, set(rules_to_groups[rule.id])
)
return condition_groups


def get_condition_group_results(
condition_groups: dict[UniqueCondition, DataAndGroups],
condition_groups: dict[UniqueConditionQuery, DataAndGroups],
project: Project,
) -> dict[UniqueCondition, dict[int, int]] | None:
condition_group_results: dict[UniqueCondition, dict[int, int]] = {}
) -> dict[UniqueConditionQuery, dict[int, int]] | None:
condition_group_results: dict[UniqueConditionQuery, dict[int, int]] = {}
current_time = datetime.now(tz=timezone.utc)
for unique_condition, (condition_data, group_ids) in condition_groups.items():
condition_cls = rules.get(unique_condition.cls_id)

Expand All @@ -142,26 +168,60 @@ def get_condition_group_results(
return None

_, duration = condition_inst.intervals[unique_condition.interval]
comparison_interval = condition_inst.intervals[unique_condition.interval][1]
comparison_type = (
condition_data.get("comparisonType", ComparisonType.COUNT)
if condition_data
else ComparisonType.COUNT
)

comparison_type = condition_data.get("comparisonType", ComparisonType.COUNT)
if comparison_type == ComparisonType.PERCENT:
comparison_interval = condition_inst.intervals[
condition_data.get("comparisonInterval", "5")
][1]
else:
comparison_interval = None

result = safe_execute(
condition_inst.get_rate_bulk,
duration,
comparison_interval,
group_ids,
unique_condition.environment_id,
comparison_type,
duration=duration,
group_ids=group_ids,
environment_id=unique_condition.environment_id,
comparison_type=comparison_type,
current_time=current_time,
comparison_interval=comparison_interval,
)
condition_group_results[unique_condition] = result or {}
return condition_group_results


def _passes_comparison(
condition_group_results: dict[UniqueConditionQuery, dict[int, int]],
condition_data: EventFrequencyConditionData,
group_id: int,
environment_id: int,
) -> bool:
"""
Checks if a specific condition instance has passed.
"""
unique_queries = _generate_unique_queries(condition_data, environment_id)
try:
query_values = [
condition_group_results[unique_query][group_id] for unique_query in unique_queries
]
except KeyError as e:
logger.exception(
"delayed_processing.missing_query_results", extra={"exception": e, "group_id": group_id}
)
return False

calculated_value = query_values[0]
# If there's a second query we must have a percent comparison condition.
if len(query_values) == 2:
calculated_value = percent_increase(calculated_value, query_values[1])

target_value = float(condition_data["value"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we converted this to a string then float, not sure if that's important though

Copy link
Member Author

@schew2381 schew2381 Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ceorourke do you remember why we originally did that here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was just to match this https://github.com/getsentry/sentry/blob/master/src/sentry/rules/conditions/event_frequency.py - what's the data type initially? A float?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a string but I think we can just convert that directly, like here


return calculated_value > target_value


def get_rules_to_fire(
condition_group_results: dict[UniqueCondition, dict[int, int]],
condition_group_results: dict[UniqueConditionQuery, dict[int, int]],
rules_to_slow_conditions: DefaultDict[Rule, list[EventFrequencyConditionData]],
rules_to_groups: DefaultDict[int, set[int]],
) -> DefaultDict[Rule, set[int]]:
Expand All @@ -171,23 +231,17 @@ def get_rules_to_fire(
for group_id in rules_to_groups[alert_rule.id]:
conditions_matched = 0
for slow_condition in slow_conditions:
unique_condition = UniqueCondition(
str(slow_condition.get("id")),
str(slow_condition.get("interval")),
alert_rule.environment_id,
)
results = condition_group_results.get(unique_condition, {})
if results:
target_value = float(str(slow_condition.get("value")))
if results[group_id] > target_value:
if action_match == "any":
rules_to_fire[alert_rule].add(group_id)
break
conditions_matched += 1
else:
if action_match == "all":
# We failed to match all conditions for this group, skip
break
if _passes_comparison(
condition_group_results, slow_condition, group_id, alert_rule.environment_id
):
if action_match == "any":
rules_to_fire[alert_rule].add(group_id)
break
conditions_matched += 1
else:
if action_match == "all":
# We failed to match all conditions for this group, skip
break
if action_match == "all" and conditions_matched == len(slow_conditions):
rules_to_fire[alert_rule].add(group_id)
return rules_to_fire
Expand Down Expand Up @@ -355,18 +409,19 @@ def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
)
alert_rules = [rule for rule in alert_rules_qs if rule.id not in snoozed_rules]

# STEP 4: Create a map of unique conditions to a tuple containing the JSON
# information needed to instantiate that condition class and the group_ids that
# must be checked for that condition. We don't query per rule condition because
# condition of the same class, interval, and environment can share a single scan.
condition_groups = get_condition_groups(alert_rules, rules_to_groups)
# STEP 4: Create a map of unique condition queries to a tuple containing the
# JSON information needed to instantiate that condition class and the
# group_ids that must be checked for that condition.
# We don't query per rule condition because conditions of the same class,
# interval, environment, and comparisonInterval can share a single scan.
condition_groups = get_condition_query_groups(alert_rules, rules_to_groups)
logger.info(
"delayed_processing.condition_groups",
extra={"condition_groups": condition_groups, "project_id": project_id},
)

# Step 5: Instantiate each unique condition, and evaluate the relevant
# group_ids that apply for that condition
# Step 5: Instantiate the condition that we can apply to each unique condition
# query, and evaluate the relevant group_ids that apply for that query.
with metrics.timer("delayed_processing.get_condition_group_results.duration"):
condition_group_results = get_condition_group_results(condition_groups, project)

Expand Down
Loading
Loading