Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/sentry/reprocessing2.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,22 @@
_REDIS_SYNC_TTL = 3600 * 24


# Note: Event attachments and group reports are migrated in save_event.
# Group-related models are only a few per-group and are migrated at
# once.
GROUP_MODELS_TO_MIGRATE = DIRECT_GROUP_RELATED_MODELS + (models.Activity,)

# If we were to move groupinbox to the new, empty group, inbox would show the
# empty, unactionable group while it is reprocessing. Let post-process take
# care of assigning GroupInbox like normally.
GROUP_MODELS_TO_MIGRATE = tuple(x for x in GROUP_MODELS_TO_MIGRATE if x != models.GroupInbox)

# Event attachments and group reports are per-event. This means that:
#
# 1. they are migrated as part of the processing pipeline (post-process/save-event)
# 2. there are a lot of them per group. For remaining events, we need to chunk
# up those queries for them to not get too slow
EVENT_MODELS_TO_MIGRATE = (models.EventAttachment, models.UserReport)


class CannotReprocess(Exception):
pass
Expand Down
14 changes: 9 additions & 5 deletions src/sentry/tasks/reprocessing2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sentry_sdk
from django.db import transaction

from sentry import eventstore, eventstream, models, nodestore
from sentry import eventstore, eventstream, nodestore
from sentry.eventstore.models import Event
from sentry.tasks.base import instrumented_task, retry
from sentry.utils.query import celery_run_batch_query
Expand Down Expand Up @@ -158,14 +158,13 @@ def handle_remaining_events(

from sentry import buffer
from sentry.models.group import Group
from sentry.reprocessing2 import EVENT_MODELS_TO_MIGRATE

assert remaining_events in ("delete", "keep")

if remaining_events == "delete":
models.EventAttachment.objects.filter(
project_id=project_id, event_id__in=event_ids
).delete()
models.UserReport.objects.filter(project_id=project_id, event_id__in=event_ids).delete()
for cls in EVENT_MODELS_TO_MIGRATE:
cls.objects.filter(project_id=project_id, event_id__in=event_ids).delete()

# Remove from nodestore
node_ids = [Event.generate_node_id(project_id, event_id) for event_id in event_ids]
Expand All @@ -176,6 +175,11 @@ def handle_remaining_events(
project_id, event_ids, from_timestamp=from_timestamp, to_timestamp=to_timestamp
)
elif remaining_events == "keep":
for cls in EVENT_MODELS_TO_MIGRATE:
cls.objects.filter(project_id=project_id, event_id__in=event_ids).update(
group_id=new_group_id
)

eventstream.replace_group_unsafe(
project_id,
event_ids,
Expand Down
47 changes: 32 additions & 15 deletions tests/sentry/tasks/test_reprocessing2.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@
from sentry.utils.cache import cache_key_for_event


def _create_event_attachment(evt, type):
file = File.objects.create(name="foo", type=type)
file.putfile(BytesIO(b"hello world"))
EventAttachment.objects.create(
event_id=evt.event_id,
group_id=evt.group_id,
project_id=evt.project_id,
file_id=file.id,
type=file.type,
name="foo",
)


def _create_user_report(evt):
UserReport.objects.create(
project_id=evt.project_id,
event_id=evt.event_id,
name="User",
)


@pytest.fixture(autouse=True)
def reprocessing_feature(monkeypatch):
monkeypatch.setattr("sentry.tasks.reprocessing2.GROUP_REPROCESSING_CHUNK_SIZE", 1)
Expand Down Expand Up @@ -237,6 +258,9 @@ def event_preprocessor(data):
event_id: eventstore.get_event_by_id(default_project.id, event_id) for event_id in event_ids
}

for evt in old_events.values():
_create_user_report(evt)

(group_id,) = {e.group_id for e in old_events.values()}

with burst_task_runner() as burst:
Expand All @@ -257,6 +281,12 @@ def event_preprocessor(data):
elif remaining_events == "keep":
assert event.group_id != group_id
assert dict(event.data) == dict(old_events[event_id].data)
assert (
UserReport.objects.get(
project_id=default_project.id, event_id=event_id
).group_id
!= group_id
)
else:
raise ValueError(remaining_events)
else:
Expand Down Expand Up @@ -314,22 +344,9 @@ def event_preprocessor(data):

for evt in (event, event_to_delete):
for type in ("event.attachment", "event.minidump"):
file = File.objects.create(name="foo", type=type)
file.putfile(BytesIO(b"hello world"))
EventAttachment.objects.create(
event_id=evt.event_id,
group_id=evt.group_id,
project_id=default_project.id,
file_id=file.id,
type=file.type,
name="foo",
)
_create_event_attachment(evt, type)

UserReport.objects.create(
project_id=default_project.id,
event_id=evt.event_id,
name="User",
)
_create_user_report(evt)

with burst_task_runner() as burst:
reprocess_group(default_project.id, event.group_id, max_events=1)
Expand Down