Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/sentry/buffer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ def get(
"""
return {col: 0 for col in columns}

def get_hash(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> dict[str, str]:
return {}

def get_set(self, key: str) -> list[tuple[int, datetime]]:
return []

def incr(
self,
model: type[models.Model],
Expand Down
27 changes: 22 additions & 5 deletions src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,23 @@ def _execute_redis_operation(
getattr(pipe, operation.value)(key, *args, **kwargs)
if args:
pipe.expire(key, self.key_expire)
return pipe.execute()
return pipe.execute()[0]

def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
self._execute_redis_operation(key, RedisOperation.SORTED_SET_ADD, {value: time()})

def get_set(self, key: str) -> list[set[int]]:
return self._execute_redis_operation(
def get_set(self, key: str) -> list[tuple[int, datetime]]:
redis_set = self._execute_redis_operation(
key, RedisOperation.SORTED_SET_GET_RANGE, start=0, end=-1, withscores=True
)
decoded_set = []
for items in redis_set:
item = items[0]
if isinstance(item, bytes):
item = item.decode("utf-8")
data_and_timestamp = (int(item), items[1])
decoded_set.append(data_and_timestamp)
return decoded_set

def delete_key(self, key: str, min: int, max: int) -> None:
self._execute_redis_operation(key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max)
Expand All @@ -307,9 +315,18 @@ def push_to_hash(

def get_hash(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> list[dict[str, str]]:
) -> dict[str, str]:
key = self._make_key(model, field)
return self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL)
redis_hash = self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should also have _execute_redis_operation only return a single value. We always either make one call in the pipeline, or a second call to set expire, but I don't know that we ever care about the result of that second call?

Might be a nicer interface to just return the value of the operation you requested

decoded_hash = {}
for k, v in redis_hash.items():
if isinstance(k, bytes):
k = k.decode("utf-8")
if isinstance(v, bytes):
v = v.decode("utf-8")
decoded_hash[k] = v

return decoded_hash

def handle_pending_partitions(self, partition: int | None) -> None:
if partition is None and self.pending_partitions > 1:
Expand Down
13 changes: 8 additions & 5 deletions src/sentry/rules/conditions/event_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,12 @@ def batch_query_hook(
"""
raise NotImplementedError

def get_option_override(self, duration: timedelta) -> contextlib.AbstractContextManager[object]:
# For conditions with interval >= 1 hour we don't need to worry about read your writes
# consistency. Disable it so that we can scale to more nodes.
def disable_consistent_snuba_mode(
self, duration: timedelta
) -> contextlib.AbstractContextManager[object]:
"""For conditions with interval >= 1 hour we don't need to worry about read your writes
consistency. Disable it so that we can scale to more nodes.
"""
option_override_cm: contextlib.AbstractContextManager[object] = contextlib.nullcontext()
if duration >= timedelta(hours=1):
option_override_cm = options_override({"consistent": False})
Expand Down Expand Up @@ -248,7 +251,7 @@ def get_rate(
comparison_type: str,
) -> int:
start, end = self.get_comparison_start_end(timedelta(), duration)
with self.get_option_override(duration):
with self.disable_consistent_snuba_mode(duration):
result = self.query(event, start, end, environment_id=environment_id)
if comparison_type == ComparisonType.PERCENT:
# TODO: Figure out if there's a way we can do this less frequently. All queries are
Expand All @@ -269,7 +272,7 @@ def get_rate_bulk(
comparison_type: str,
) -> dict[int, int]:
start, end = self.get_comparison_start_end(timedelta(), duration)
with self.get_option_override(duration):
with self.disable_consistent_snuba_mode(duration):
result = self.batch_query(
group_ids=group_ids,
start=start,
Expand Down
18 changes: 8 additions & 10 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ def get_slow_conditions(rule: Rule) -> list[MutableMapping[str, str]]:
return slow_conditions


def get_rules_to_groups(rulegroup_to_events: list[dict[str, str]]) -> DefaultDict[int, set[int]]:
def get_rules_to_groups(rulegroup_to_events: dict[str, str]) -> DefaultDict[int, set[int]]:
rules_to_groups: DefaultDict[int, set[int]] = defaultdict(set)
for rulegroup_to_event in rulegroup_to_events:
for rule_group in rulegroup_to_event.keys():
rule_id, group_id = rule_group.split(":")
rules_to_groups[int(rule_id)].add(int(group_id))
for rule_group in rulegroup_to_events.keys():
rule_id, group_id = rule_group.split(":")
rules_to_groups[int(rule_id)].add(int(group_id))

return rules_to_groups

Expand Down Expand Up @@ -168,10 +167,9 @@ def get_rules_to_fire(
def process_delayed_alert_conditions(buffer: RedisBuffer) -> None:
with metrics.timer("delayed_processing.process_all_conditions.duration"):
project_ids = buffer.get_set(PROJECT_ID_BUFFER_LIST_KEY)

for project_id in project_ids:
with metrics.timer("delayed_processing.process_project.duration"):
apply_delayed.delay(project_id=project_id, buffer=buffer)
apply_delayed.delay(project_id=project_id)


@instrumented_task(
Expand All @@ -182,15 +180,15 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None:
time_limit=60, # 1 minute
silo_mode=SiloMode.REGION,
)
def apply_delayed(project_id: int, buffer: RedisBuffer) -> DefaultDict[Rule, set[int]] | None:
def apply_delayed(project_id: int) -> DefaultDict[Rule, set[int]] | None:
# XXX(CEO) this is a temporary return value!
"""
Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass
"""
# STEP 1: Fetch the rulegroup_to_events mapping for the project from redis
project = Project.objects.get(id=project_id)
project = Project.objects.get_from_cache(id=project_id)
buffer = RedisBuffer()
rulegroup_to_events = buffer.get_hash(model=Project, field={"project_id": project.id})

# STEP 2: Map each rule to the groups that must be checked for that rule.
rules_to_groups = get_rules_to_groups(rulegroup_to_events)

Expand Down
23 changes: 6 additions & 17 deletions tests/sentry/buffer/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,10 @@ def test_process_pending_partitions_none(self, process_pending, process_incr):

def group_rule_data_by_project_id(self, buffer, project_ids):
project_ids_to_rule_data = defaultdict(list)
for proj_id in project_ids[0]:
for proj_id in project_ids:
rule_group_pairs = buffer.get_hash(Project, {"project_id": proj_id[0]})
for pair in rule_group_pairs:
for k, v in pair.items():
if isinstance(k, bytes):
k = k.decode("utf-8")
if isinstance(v, bytes):
v = v.decode("utf-8")
project_ids_to_rule_data[int(proj_id[0])].append({k: v})
for k, v in rule_group_pairs.items():
project_ids_to_rule_data[int(proj_id[0])].append({k: v})
return project_ids_to_rule_data

def test_enqueue(self):
Expand Down Expand Up @@ -308,7 +303,6 @@ def test_enqueue(self):

project_ids = self.buf.get_set(PROJECT_ID_BUFFER_LIST_KEY)
assert project_ids

project_ids_to_rule_data = self.group_rule_data_by_project_id(self.buf, project_ids)
assert project_ids_to_rule_data[project_id][0].get(f"{rule_id}:{group_id}") == str(event_id)
assert project_ids_to_rule_data[project_id][1].get(f"{rule_id}:{group2_id}") == str(
Expand Down Expand Up @@ -385,7 +379,7 @@ def test_delete_batch(self):

# retrieve them
project_ids = self.buf.get_set(PROJECT_ID_BUFFER_LIST_KEY)
assert len(project_ids[0]) == 2
assert len(project_ids) == 2
rule_group_pairs = self.buf.get_hash(Project, {"project_id": project_id})
assert len(rule_group_pairs)

Expand All @@ -394,12 +388,7 @@ def test_delete_batch(self):

# retrieve again to make sure only project_id was removed
project_ids = self.buf.get_set(PROJECT_ID_BUFFER_LIST_KEY)
if isinstance(project_ids[0][0][0], bytes):
assert project_ids == [
[(bytes(str(project2_id), "utf-8"), one_minute_from_now.timestamp())]
]
else:
assert project_ids == [[(str(project2_id), one_minute_from_now.timestamp())]]
assert project_ids == [(project2_id, one_minute_from_now.timestamp())]

# delete the project_id hash
self.buf.delete_hash(
Expand All @@ -409,7 +398,7 @@ def test_delete_batch(self):
)

rule_group_pairs = self.buf.get_hash(Project, {"project_id": project_id})
assert rule_group_pairs == [{}]
assert rule_group_pairs == {}

@mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
@mock.patch("sentry.buffer.base.Buffer.process")
Expand Down
Loading