diff --git a/src/sentry/processing/realtime_metrics/base.py b/src/sentry/processing/realtime_metrics/base.py index 5e2b15f48d0f91..1d0c5404257eab 100644 --- a/src/sentry/processing/realtime_metrics/base.py +++ b/src/sentry/processing/realtime_metrics/base.py @@ -1,10 +1,19 @@ +from typing import Set + from sentry.utils.services import Service class RealtimeMetricsStore(Service): # type: ignore """A service for storing metrics about incoming requests within a given time window.""" - __all__ = ("increment_project_event_counter", "increment_project_duration_counter", "validate") + __all__ = ( + "increment_project_event_counter", + "increment_project_duration_counter", + "validate", + "get_lpq_projects", + "add_project_to_lpq", + "remove_projects_from_lpq", + ) def increment_project_event_counter(self, project_id: int, timestamp: int) -> None: """Increment the event counter for the given project_id. @@ -26,3 +35,31 @@ def increment_project_duration_counter( the time of the event in seconds since the UNIX epoch and "duration" the processing time in seconds. """ pass + + def get_lpq_projects(self) -> Set[int]: + """ + Fetches the list of projects that are currently using the low priority queue. + + Returns a list of project IDs. + """ + pass + + def add_project_to_lpq(self, project_id: int) -> None: + """ + Moves a project to the low priority queue. + + This forces all symbolication events triggered by the specified project to be redirected to + the low priority queue, unless the project is manually excluded from the low priority queue + via the `store.symbolicate-event-lpq-never` kill switch. + """ + pass + + def remove_projects_from_lpq(self, project_ids: Set[int]) -> None: + """ + Removes projects from the low priority queue. + + This restores all specified projects back to the regular queue, unless they have been + manually forced into the low priority queue via the `store.symbolicate-event-lpq-always` + kill switch. + """ + pass diff --git a/src/sentry/processing/realtime_metrics/redis.py b/src/sentry/processing/realtime_metrics/redis.py index 03edfd52f8f0ef..24a21c3e61e607 100644 --- a/src/sentry/processing/realtime_metrics/redis.py +++ b/src/sentry/processing/realtime_metrics/redis.py @@ -1,10 +1,14 @@ import datetime +from typing import Set from sentry.exceptions import InvalidConfiguration from sentry.utils import redis from . import base +# redis key for entry storing current list of LPQ members +LPQ_MEMBERS_KEY = "store.symbolicate-event-lpq-selected" + class RedisRealtimeMetricsStore(base.RealtimeMetricsStore): """An implementation of RealtimeMetricsStore based on a Redis backend.""" @@ -80,3 +84,41 @@ def increment_project_duration_counter( pipeline.hincrby(key, duration, 1) pipeline.pexpire(key, self._histogram_ttl) pipeline.execute() + + def get_lpq_projects(self) -> Set[int]: + """ + Fetches the list of projects that are currently using the low priority queue. + + Returns a list of project IDs. + """ + return {int(project_id) for project_id in self.cluster.smembers(LPQ_MEMBERS_KEY)} + + def add_project_to_lpq(self, project_id: int) -> None: + """ + Assigns a project to the low priority queue. + + This registers an intent to redirect all symbolication events triggered by the specified + project to be redirected to the low priority queue. + + This may throw an exception if there is some sort of issue registering the project with the + queue. + """ + + # This returns 0 if project_id was already in the set, 1 if it was added, and throws an + # exception if there's a problem so it's fine if we just ignore the return value of this as + # the project is always added if this successfully completes. + self.cluster.sadd(LPQ_MEMBERS_KEY, project_id) + + def remove_projects_from_lpq(self, project_ids: Set[int]) -> None: + """ + Removes projects from the low priority queue. + + This registers an intent to restore all specified projects back to the regular queue. + + This may throw an exception if there is some sort of issue deregistering the projects from + the queue. + """ + if len(project_ids) == 0: + return + + self.cluster.srem(LPQ_MEMBERS_KEY, *project_ids) diff --git a/tests/sentry/processing/realtime_metrics/test_redis.py b/tests/sentry/processing/realtime_metrics/test_redis.py index fe510ddafe29ef..20107b885abb9f 100644 --- a/tests/sentry/processing/realtime_metrics/test_redis.py +++ b/tests/sentry/processing/realtime_metrics/test_redis.py @@ -112,3 +112,142 @@ def test_increment_project_duration_counter_different_buckets( assert redis_cluster.hget("symbolicate_event_low_priority:histogram:10:17:1140", "20") == "1" assert redis_cluster.hget("symbolicate_event_low_priority:histogram:10:17:1150", "40") == "1" + + +def test_get_lpq_projects_unset(store: RedisRealtimeMetricsStore) -> None: + in_lpq = store.get_lpq_projects() + assert in_lpq == set() + + +def test_get_lpq_projects_empty( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.srem("store.symbolicate-event-lpq-selected", 1) + + in_lpq = store.get_lpq_projects() + assert in_lpq == set() + + +def test_get_lpq_projects_filled( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + in_lpq = store.get_lpq_projects() + assert in_lpq == {1} + + +def test_add_project_to_lpq_unset( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + store.add_project_to_lpq(1) + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + + +def test_add_project_to_lpq_empty( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.srem("store.symbolicate-event-lpq-selected", 1) + + store.add_project_to_lpq(1) + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + + +def test_add_project_to_lpq_dupe( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + + store.add_project_to_lpq(1) + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + + +def test_add_project_to_lpq_filled( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + store.add_project_to_lpq(1) + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1", "11"} + + +def test_remove_projects_from_lpq_unset( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + store.remove_projects_from_lpq({1}) + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_empty( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.srem("store.symbolicate-event-lpq-selected", 1) + + store.remove_projects_from_lpq({1}) + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_only_member( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + + store.remove_projects_from_lpq({1}) + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_nonmember( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + store.remove_projects_from_lpq({1}) + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == {"11"} + + +def test_remove_projects_from_lpq_subset( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + store.remove_projects_from_lpq({1}) + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == {"11"} + + +def test_remove_projects_from_lpq_all_members( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + store.remove_projects_from_lpq({1, 11}) + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_no_members( + store: RedisRealtimeMetricsStore, redis_cluster: redis._RedisCluster +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + + store.remove_projects_from_lpq({}) + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == {"1"}