Skip to content

Commit 4da8ef3

Browse files
committed
perf(eventstream): Add caching layer, raise sort
Two changes: 1. Add a caching layer so that we can avoid hitting the DB for hot groups. 2. We don't need the sort in 95% of cases. Let's raise that into Python and only do it when necessary.
1 parent 8e863ac commit 4da8ef3

File tree

2 files changed

+74
-37
lines changed

2 files changed

+74
-37
lines changed
Lines changed: 73 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
from collections.abc import Iterable
1+
from collections import defaultdict
2+
from collections.abc import Iterable, Mapping
3+
from datetime import UTC, datetime, timedelta
24

35
import sentry_sdk
6+
from django.core.cache import cache
47
from django.db.models import Q, QuerySet
58

69
from sentry.models.groupredirect import GroupRedirect
@@ -14,44 +17,88 @@
1417
SIZE_THRESHOLD_FOR_CLICKHOUSE = 2500
1518

1619

20+
def _build_group_redirect_by_group_id_cache_key(group_id: str | int) -> str:
21+
return f"groupredirectsforgroupid:{group_id}"
22+
23+
1724
def _get_all_related_redirects_query(
1825
group_ids: set[str | int],
1926
) -> QuerySet[GroupRedirect, tuple[int, int]]:
20-
return (
21-
GroupRedirect.objects.filter(
22-
Q(group_id__in=group_ids) | Q(previous_group_id__in=group_ids)
23-
).values_list("group_id", "previous_group_id")
24-
# This order returns the newest redirects first. i.e. we're implicitly dropping
25-
# the oldest redirects if we have >THRESHOLD. We choose to drop the oldest
26-
# because they're least likely to have data in retention.
27-
# Technically id != date_added, but it's a close appx (& much faster).
28-
.order_by("-id")
27+
return GroupRedirect.objects.filter(
28+
Q(group_id__in=group_ids) | Q(previous_group_id__in=group_ids)
29+
).values_list("date_added", "group_id", "previous_group_id", named=True)
30+
31+
32+
def _try_get_from_cache(
33+
group_ids: Iterable[str | int],
34+
) -> tuple[set[tuple[str | int, datetime]], set[str | int]]:
35+
"""
36+
CACHE STRUCTURE:
37+
group_id ==> set[(group_id, date_added)]
38+
39+
Returns (all merged IDs from cache hits, all redirect IDs, all uncached input IDs)
40+
"""
41+
# CACHE STRUCTURE:
42+
# group_id ==> set[ tuple[group_id, redirect_id] ]
43+
id_to_keys = {
44+
group_id: _build_group_redirect_by_group_id_cache_key(group_id) for group_id in group_ids
45+
}
46+
cache_results: Mapping[str | int, set[tuple[str | int, datetime]]] = cache.get_many(
47+
id_to_keys.values()
2948
)
3049

50+
cached_data = set().union(*cache_results.values())
51+
uncached_group_ids = {
52+
group_id for group_id in group_ids if group_id not in cache_results.keys()
53+
}
54+
55+
return (cached_data, uncached_group_ids)
56+
3157

3258
def get_all_merged_group_ids(
3359
group_ids: Iterable[str | int], threshold=SIZE_THRESHOLD_FOR_CLICKHOUSE
3460
) -> set[str | int]:
3561
with sentry_sdk.start_span(op="get_all_merged_group_ids") as span:
36-
group_id_set = set(group_ids)
37-
all_related_rows = _get_all_related_redirects_query(group_id_set)
62+
# Initialize all IDs with a future time to ensure they aren't filtered out.
63+
running_data = {
64+
(group_id, datetime.now(UTC) + timedelta(minutes=1)) for group_id in group_ids
65+
}
66+
67+
# Step 1: Try to get data from cache
68+
cached_data, uncached_group_ids = _try_get_from_cache(group_ids)
69+
running_data.update(cached_data)
70+
71+
# Step 2: Get unordered uncached data from Postgres
72+
all_related_rows = _get_all_related_redirects_query(uncached_group_ids)
73+
id_to_related = defaultdict(set)
3874

39-
threshold_breaker_set = None
75+
for row in all_related_rows:
76+
running_data.add((row.group_id, row.date_added))
77+
running_data.add((row.previous_group_id, row.date_added))
4078

41-
for r in all_related_rows:
42-
group_id_set.update(r)
79+
id_to_related[row.group_id].add((row.previous_group_id, row.date_added))
80+
id_to_related[row.previous_group_id].add((row.group_id, row.date_added))
4381

44-
# We only want to set the threshold_breaker the first time that we cross
45-
# the threshold.
46-
if threshold_breaker_set is None and len(group_id_set) >= threshold:
47-
# Because we're incrementing the size of group_id_set by either one or two
48-
# each iteration, it's fine if we're a bit over. That's negligible compared
49-
# to the scale-of-thousands Clickhouse threshold.
50-
threshold_breaker_set = group_id_set.copy()
82+
# Step 3: Set cache-missed data into cache
83+
cache.set_many(
84+
data={
85+
_build_group_redirect_by_group_id_cache_key(group_id): id_to_related[group_id]
86+
for group_id in uncached_group_ids
87+
},
88+
timeout=300, # 5 minutes
89+
)
5190

52-
out = group_id_set if threshold_breaker_set is None else threshold_breaker_set
91+
# Step 4: If and only if result size is greater than threshold, sort by
92+
# date_added and only return newest threshold # of results.
93+
if len(running_data) <= threshold:
94+
output_set = {datum[0] for datum in running_data}
95+
else:
96+
# Sort by datetime, decreasing, and then take first threshold results
97+
output_set = set(
98+
sorted(running_data, key=lambda datum: datum[1], reverse=True)[:threshold]
99+
)
53100

54-
span.set_data("true_group_id_len", len(group_id_set))
55-
span.set_data("returned_group_id_len", len(out))
101+
span.set_data("true_group_id_len", len(running_data))
102+
span.set_data("returned_group_id_len", len(output_set))
56103

57-
return out
104+
return output_set

tests/sentry/services/eventstore/test_query_preprocessing.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
from datetime import UTC, datetime, timedelta
22

33
from sentry.models.groupredirect import GroupRedirect
4-
from sentry.services.eventstore.query_preprocessing import (
5-
_get_all_related_redirects_query,
6-
get_all_merged_group_ids,
7-
)
4+
from sentry.services.eventstore.query_preprocessing import get_all_merged_group_ids
85
from sentry.testutils.cases import TestCase
96
from sentry.testutils.skips import requires_snuba
107

@@ -33,13 +30,6 @@ def setUp(self) -> None:
3330
date_added=datetime.now(UTC) - timedelta(hours=1),
3431
)
3532

36-
def test_get_all_related_groups_query(self) -> None:
37-
"""
38-
What we want is for this to return the newest redirects first.
39-
What we're technically doing is taking the redirects with the highest IDs.
40-
"""
41-
assert _get_all_related_redirects_query({self.g1.id})[0] == (self.g1.id, self.g2.id)
42-
4333
def test_get_all_merged_group_ids(self) -> None:
4434
assert get_all_merged_group_ids([self.g1.id]) == {self.g1.id, self.g2.id, self.g3.id}
4535
assert get_all_merged_group_ids([self.g2.id]) == {self.g1.id, self.g2.id}

0 commit comments

Comments
 (0)