From 5a4b1e1acf4ac7bae411dcd7305e221888ca9cf9 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 10 Aug 2021 13:34:48 +0200 Subject: [PATCH] fix(reprocessing): Migrate event attachments from "remaining" events. --- src/sentry/reprocessing2.py | 10 ++++- src/sentry/tasks/reprocessing2.py | 14 ++++--- tests/sentry/tasks/test_reprocessing2.py | 47 ++++++++++++++++-------- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/src/sentry/reprocessing2.py b/src/sentry/reprocessing2.py index dd832ed0d01c44..c0ab7f82a513cb 100644 --- a/src/sentry/reprocessing2.py +++ b/src/sentry/reprocessing2.py @@ -98,7 +98,8 @@ _REDIS_SYNC_TTL = 3600 * 24 -# Note: Event attachments and group reports are migrated in save_event. +# Group-related models are only a few per-group and are migrated at +# once. GROUP_MODELS_TO_MIGRATE = DIRECT_GROUP_RELATED_MODELS + (models.Activity,) # If we were to move groupinbox to the new, empty group, inbox would show the @@ -106,6 +107,13 @@ # care of assigning GroupInbox like normally. GROUP_MODELS_TO_MIGRATE = tuple(x for x in GROUP_MODELS_TO_MIGRATE if x != models.GroupInbox) +# Event attachments and group reports are per-event. This means that: +# +# 1. they are migrated as part of the processing pipeline (post-process/save-event) +# 2. there are a lot of them per group. For remaining events, we need to chunk +# up those queries for them to not get too slow +EVENT_MODELS_TO_MIGRATE = (models.EventAttachment, models.UserReport) + class CannotReprocess(Exception): pass diff --git a/src/sentry/tasks/reprocessing2.py b/src/sentry/tasks/reprocessing2.py index d5d4e3baabca00..a7b208a6d998c6 100644 --- a/src/sentry/tasks/reprocessing2.py +++ b/src/sentry/tasks/reprocessing2.py @@ -4,7 +4,7 @@ import sentry_sdk from django.db import transaction -from sentry import eventstore, eventstream, models, nodestore +from sentry import eventstore, eventstream, nodestore from sentry.eventstore.models import Event from sentry.tasks.base import instrumented_task, retry from sentry.utils.query import celery_run_batch_query @@ -158,14 +158,13 @@ def handle_remaining_events( from sentry import buffer from sentry.models.group import Group + from sentry.reprocessing2 import EVENT_MODELS_TO_MIGRATE assert remaining_events in ("delete", "keep") if remaining_events == "delete": - models.EventAttachment.objects.filter( - project_id=project_id, event_id__in=event_ids - ).delete() - models.UserReport.objects.filter(project_id=project_id, event_id__in=event_ids).delete() + for cls in EVENT_MODELS_TO_MIGRATE: + cls.objects.filter(project_id=project_id, event_id__in=event_ids).delete() # Remove from nodestore node_ids = [Event.generate_node_id(project_id, event_id) for event_id in event_ids] @@ -176,6 +175,11 @@ def handle_remaining_events( project_id, event_ids, from_timestamp=from_timestamp, to_timestamp=to_timestamp ) elif remaining_events == "keep": + for cls in EVENT_MODELS_TO_MIGRATE: + cls.objects.filter(project_id=project_id, event_id__in=event_ids).update( + group_id=new_group_id + ) + eventstream.replace_group_unsafe( project_id, event_ids, diff --git a/tests/sentry/tasks/test_reprocessing2.py b/tests/sentry/tasks/test_reprocessing2.py index a4bc695afe22c0..7cf090d284e841 100644 --- a/tests/sentry/tasks/test_reprocessing2.py +++ b/tests/sentry/tasks/test_reprocessing2.py @@ -26,6 +26,27 @@ from sentry.utils.cache import cache_key_for_event +def _create_event_attachment(evt, type): + file = File.objects.create(name="foo", type=type) + file.putfile(BytesIO(b"hello world")) + EventAttachment.objects.create( + event_id=evt.event_id, + group_id=evt.group_id, + project_id=evt.project_id, + file_id=file.id, + type=file.type, + name="foo", + ) + + +def _create_user_report(evt): + UserReport.objects.create( + project_id=evt.project_id, + event_id=evt.event_id, + name="User", + ) + + @pytest.fixture(autouse=True) def reprocessing_feature(monkeypatch): monkeypatch.setattr("sentry.tasks.reprocessing2.GROUP_REPROCESSING_CHUNK_SIZE", 1) @@ -237,6 +258,9 @@ def event_preprocessor(data): event_id: eventstore.get_event_by_id(default_project.id, event_id) for event_id in event_ids } + for evt in old_events.values(): + _create_user_report(evt) + (group_id,) = {e.group_id for e in old_events.values()} with burst_task_runner() as burst: @@ -257,6 +281,12 @@ def event_preprocessor(data): elif remaining_events == "keep": assert event.group_id != group_id assert dict(event.data) == dict(old_events[event_id].data) + assert ( + UserReport.objects.get( + project_id=default_project.id, event_id=event_id + ).group_id + != group_id + ) else: raise ValueError(remaining_events) else: @@ -314,22 +344,9 @@ def event_preprocessor(data): for evt in (event, event_to_delete): for type in ("event.attachment", "event.minidump"): - file = File.objects.create(name="foo", type=type) - file.putfile(BytesIO(b"hello world")) - EventAttachment.objects.create( - event_id=evt.event_id, - group_id=evt.group_id, - project_id=default_project.id, - file_id=file.id, - type=file.type, - name="foo", - ) + _create_event_attachment(evt, type) - UserReport.objects.create( - project_id=default_project.id, - event_id=evt.event_id, - name="User", - ) + _create_user_report(evt) with burst_task_runner() as burst: reprocess_group(default_project.id, event.group_id, max_events=1)