Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processing) Move redis operations in reprocessing to servicewrapper #66168

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ disable_error_code = [
module = [
"sentry.buffer.base",
"sentry.buffer.redis",
"sentry.eventstore.reprocessing.redis",
"sentry.utils.redis",
"sentry.utils.redis_metrics",
"sentry.tasks.on_demand_metrics",
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3604,6 +3604,10 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]

SENTRY_USE_UWSGI = True

# Configure service wrapper for reprocessing2 state
SENTRY_REPROCESSING_STORE = "sentry.eventstore.reprocessing.redis.RedisReprocessingStore"
SENTRY_REPROCESSING_STORE_OPTIONS = {"cluster": "default"}

# When copying attachments for to-be-reprocessed events into processing store,
# how large is an individual file chunk? Each chunk is stored as Redis key.
SENTRY_REPROCESSING_ATTACHMENT_CHUNK_SIZE = 2**20
Expand Down
13 changes: 13 additions & 0 deletions src/sentry/eventstore/reprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from django.conf import settings

from sentry.eventstore.reprocessing.base import ReprocessingStore
from sentry.utils.services import LazyServiceWrapper

reprocessing_store = LazyServiceWrapper(
ReprocessingStore,
settings.SENTRY_REPROCESSING_STORE,
settings.SENTRY_REPROCESSING_STORE_OPTIONS,
)


__all__ = ["reprocessing_store"]
72 changes: 72 additions & 0 deletions src/sentry/eventstore/reprocessing/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from collections.abc import Sequence
from datetime import datetime
from typing import Any

from sentry.utils.services import Service


class ReprocessingStore(Service):
__all__ = (
"event_count_for_hashes",
"pop_batched_events",
"get_old_primary_hashes",
"expire_hash",
"add_hash",
"get_remaining_event_count",
"rename_key",
"mark_event_reprocessed",
"start_reprocessing",
"get_pending",
"get_progress",
)

def __init__(self, **options: Any) -> None:
pass

def event_count_for_hashes(
self, project_id: int, group_id: int, old_primary_hashes: Sequence[str]
) -> int:
raise NotImplementedError()

def pop_batched_events(
self, project_id: int, group_id: int, primary_hash: str
) -> tuple[list[str], datetime | None, datetime | None]:
raise NotImplementedError()

def get_old_primary_hashes(self, project_id: int, group_id: int) -> set[Any]:
raise NotImplementedError()

def expire_hash(
self,
project_id: int,
group_id: int,
event_id: str,
date_val: datetime,
old_primary_hash: str,
) -> None:
raise NotImplementedError()

def add_hash(self, project_id: int, group_id: int, hash: str) -> None:
raise NotImplementedError()

def get_remaining_event_count(
self, project_id: int, old_group_id: int, datetime_to_event: list[tuple[datetime, str]]
) -> int:
raise NotImplementedError()

def rename_key(self, project_id: int, old_group_id: int) -> str | None:
raise NotImplementedError()

def mark_event_reprocessed(self, group_id: int, num_events: int) -> bool:
raise NotImplementedError()

def start_reprocessing(
self, group_id: int, date_created: Any, sync_count: int, event_count: int
) -> None:
raise NotImplementedError()

def get_pending(self, group_id: int) -> Any:
raise NotImplementedError()

def get_progress(self, group_id: int) -> dict[str, Any] | None:
raise NotImplementedError()
172 changes: 172 additions & 0 deletions src/sentry/eventstore/reprocessing/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import uuid
markstory marked this conversation as resolved.
Show resolved Hide resolved
from collections.abc import Sequence
from datetime import datetime
from typing import Any

import redis
from django.conf import settings

from sentry.utils import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you're only using dumps, explicit importing this would be better:

Suggested change
from sentry.utils import json
from sentry.utils.json import dumps

from sentry.utils.dates import to_datetime, to_timestamp
from sentry.utils.redis import redis_clusters

from .base import ReprocessingStore


def _get_sync_counter_key(group_id: int) -> str:
return f"re2:count:{group_id}"


def _get_info_reprocessed_key(group_id: int) -> str:
return f"re2:info:{group_id}"


def _get_old_primary_hash_subset_key(project_id: int, group_id: int, primary_hash: str) -> str:
return f"re2:tombstones:{{{project_id}:{group_id}:{primary_hash}}}"


def _get_remaining_key(project_id: int, group_id: int) -> str:
return f"re2:remaining:{{{project_id}:{group_id}}}"


class RedisReprocessingStore(ReprocessingStore):
def __init__(self, **options: dict[str, Any]) -> None:
cluster = options.pop("cluster", "default")
assert isinstance(cluster, str), "cluster option must be a string"
self.redis = redis_clusters.get(cluster)

def event_count_for_hashes(
self, project_id: int, group_id: int, old_primary_hashes: Sequence[str]
) -> int:
# Events for a group are split and bucketed by their primary hashes. If flushing is to be
# performed on a per-group basis, the event count needs to be summed up across all buckets
# belonging to a single group.
event_count = 0
for primary_hash in old_primary_hashes:
key = _get_old_primary_hash_subset_key(project_id, group_id, primary_hash)
event_count += self.redis.llen(key)
return event_count

def pop_batched_events(
self, project_id: int, group_id: int, primary_hash: str
) -> tuple[list[str], datetime | None, datetime | None]:
"""
For redis key pointing to a list of buffered events structured like
`event id;datetime of event`, returns a list of event IDs, the
earliest datetime, and the latest datetime.
"""
event_ids_batch = []
min_datetime: datetime | None = None
max_datetime: datetime | None = None
key = _get_old_primary_hash_subset_key(project_id, group_id, primary_hash)

for row in self.redis.lrange(key, 0, -1):
datetime_raw, event_id = row.split(";")
parsed_datetime = to_datetime(float(datetime_raw))

assert parsed_datetime is not None

if min_datetime is None or parsed_datetime < min_datetime:
min_datetime = parsed_datetime
if max_datetime is None or parsed_datetime > max_datetime:
max_datetime = parsed_datetime

event_ids_batch.append(event_id)

self.redis.delete(key)

return event_ids_batch, min_datetime, max_datetime

def get_old_primary_hashes(self, project_id: int, group_id: int) -> set[Any]:
# This is a meta key that contains old primary hashes. These hashes are then
# combined with other values to construct a key that points to a list of
# tombstonable events.
primary_hash_set_key = f"re2:tombstone-primary-hashes:{project_id}:{group_id}"

return self.redis.smembers(primary_hash_set_key)

def expire_hash(
self,
project_id: int,
group_id: int,
event_id: str,
date_val: datetime,
old_primary_hash: str,
) -> None:
event_key = _get_old_primary_hash_subset_key(project_id, group_id, old_primary_hash)
self.redis.lpush(event_key, f"{to_timestamp(date_val)};{event_id}")
self.redis.expire(event_key, settings.SENTRY_REPROCESSING_TOMBSTONES_TTL)

def add_hash(self, project_id: int, group_id: int, hash: str) -> None:
primary_hash_set_key = f"re2:tombstone-primary-hashes:{project_id}:{group_id}"

self.redis.sadd(primary_hash_set_key, hash)
self.redis.expire(primary_hash_set_key, settings.SENTRY_REPROCESSING_TOMBSTONES_TTL)

def get_remaining_event_count(
self, project_id: int, old_group_id: int, datetime_to_event: list[tuple[datetime, str]]
) -> int:
# We explicitly cluster by only project_id and group_id here such that our
# RENAME command later succeeds.
key = _get_remaining_key(project_id, old_group_id)

if datetime_to_event:
llen = self.redis.lpush(
key,
*(
f"{to_timestamp(datetime)};{event_id}"
for datetime, event_id in datetime_to_event
),
)
self.redis.expire(key, settings.SENTRY_REPROCESSING_SYNC_TTL)
else:
llen = self.redis.llen(key)
return llen

def rename_key(self, project_id: int, old_group_id: int) -> str | None:
key = _get_remaining_key(project_id, old_group_id)
new_key = f"{key}:{uuid.uuid4().hex}"
try:
# Rename `key` to a new temp key that is passed to celery task. We
# use `renamenx` instead of `rename` only to detect UUID collisions.
assert self.redis.renamenx(key, new_key), "UUID collision for new_key?"

return new_key
except redis.exceptions.ResponseError:
# `key` does not exist in Redis. `ResponseError` is a bit too broad
# but it seems we'd have to do string matching on error message
# otherwise.
return None

def mark_event_reprocessed(self, group_id: int, num_events: int) -> bool:
# refresh the TTL of the metadata:
self.redis.expire(
_get_info_reprocessed_key(group_id), settings.SENTRY_REPROCESSING_SYNC_TTL
)
key = _get_sync_counter_key(group_id)
self.redis.expire(key, settings.SENTRY_REPROCESSING_SYNC_TTL)
return self.redis.decrby(key, num_events) == 0

def start_reprocessing(
self, group_id: int, date_created: Any, sync_count: int, event_count: int
) -> None:
self.redis.setex(
_get_sync_counter_key(group_id), settings.SENTRY_REPROCESSING_SYNC_TTL, sync_count
)
self.redis.setex(
_get_info_reprocessed_key(group_id),
settings.SENTRY_REPROCESSING_SYNC_TTL,
json.dumps(
{"dateCreated": date_created, "syncCount": sync_count, "totalEvents": event_count}
),
)

def get_pending(self, group_id: int) -> tuple[int | None, int]:
pending_key = _get_sync_counter_key(group_id)
pending = self.redis.get(pending_key)
ttl = self.redis.ttl(pending_key)
return pending, ttl

def get_progress(self, group_id: int) -> dict[str, Any] | None:
info = self.redis.get(_get_info_reprocessed_key(group_id))
return info
3 changes: 3 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,9 @@
# Drop delete_old_primary_hash messages for a particular project.
register("reprocessing2.drop-delete-old-primary-hash", default=[], flags=FLAG_AUTOMATOR_MODIFIABLE)

# Switch to use service wrapper for reprocessing redis operations
register("reprocessing.use_store", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE)

# BEGIN ABUSE QUOTAS

# Example:
Expand Down
Loading
Loading