Skip to content
Merged
2 changes: 1 addition & 1 deletion migrations_lockfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ auth: 0008_alter_user_username_max_length
contenttypes: 0002_remove_content_type_name
jira_ac: 0001_initial
nodestore: 0001_initial
sentry: 0116_backfill_debug_file_checksum
sentry: 0117_dummy-activityupdate
sessions: 0001_initial
sites: 0002_alter_domain_unique
social_auth: 0001_initial
7 changes: 6 additions & 1 deletion src/sentry/api/endpoints/group_reprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,10 @@ def post(self, request, group):
else:
max_events = None

reprocess_group.delay(project_id=group.project_id, group_id=group.id, max_events=max_events)
reprocess_group.delay(
project_id=group.project_id,
group_id=group.id,
max_events=max_events,
acting_user_id=getattr(request.user, "id", None),
)
return self.respond(status=200)
65 changes: 37 additions & 28 deletions src/sentry/deletions/defaults/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,34 @@
import os
from sentry import eventstore, nodestore
from sentry.eventstore.models import Event
from sentry.models import EventAttachment, UserReport
from sentry.reprocessing2 import delete_unprocessed_events
from sentry import models

from ..base import BaseDeletionTask, BaseRelation, ModelDeletionTask, ModelRelation


GROUP_RELATED_MODELS = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since this is now used outside of deletions, can we move this to a better place, for instance somewhere in sentry.models? This could even be on Group itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite the name this is still related to group deletion and the order of items is specific to deletion. I was also considering duplicating the list like group merge/unmerge does. Let me know what you prefer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'll leave this up to you. I found it interesting that this is being imported from reprocessing to copy over information rather than delete it, that's all.

# prioritize GroupHash
models.GroupHash,
models.GroupAssignee,
models.GroupCommitResolution,
models.GroupLink,
models.GroupBookmark,
models.GroupMeta,
models.GroupEnvironment,
models.GroupRelease,
models.GroupRedirect,
models.GroupResolution,
models.GroupRuleStatus,
models.GroupSeen,
models.GroupShare,
models.GroupSnooze,
models.GroupEmailThread,
models.GroupSubscription,
models.UserReport,
models.EventAttachment,
)


class EventDataDeletionTask(BaseDeletionTask):
"""
Deletes nodestore data, EventAttachment and UserReports for group
Expand Down Expand Up @@ -53,46 +75,33 @@ def chunk(self):
node_ids = [Event.generate_node_id(self.project_id, event.event_id) for event in events]
nodestore.delete_multi(node_ids)

from sentry.reprocessing2 import delete_unprocessed_events

delete_unprocessed_events(events)

# Remove EventAttachment and UserReport
# Remove EventAttachment and UserReport *again* as those may not have a
# group ID, therefore there may be dangling ones after "regular" model
# deletion.
event_ids = [event.event_id for event in events]
EventAttachment.objects.filter(event_id__in=event_ids, project_id=self.project_id).delete()
UserReport.objects.filter(event_id__in=event_ids, project_id=self.project_id).delete()
models.EventAttachment.objects.filter(
event_id__in=event_ids, project_id=self.project_id
).delete()
models.UserReport.objects.filter(
event_id__in=event_ids, project_id=self.project_id
).delete()

return True


class GroupDeletionTask(ModelDeletionTask):
def get_child_relations(self, instance):
from sentry import models

relations = []

model_list = (
# prioritize GroupHash
models.GroupHash,
models.GroupAssignee,
models.GroupCommitResolution,
models.GroupLink,
models.GroupBookmark,
models.GroupMeta,
models.GroupEnvironment,
models.GroupRelease,
models.GroupRedirect,
models.GroupResolution,
models.GroupRuleStatus,
models.GroupSeen,
models.GroupShare,
models.GroupSnooze,
models.GroupEmailThread,
models.GroupSubscription,
models.UserReport,
models.EventAttachment,
relations.extend(
[ModelRelation(m, {"group_id": instance.id}) for m in GROUP_RELATED_MODELS]
)

relations.extend([ModelRelation(m, {"group_id": instance.id}) for m in model_list])

# Skip EventDataDeletionTask if this is being called from cleanup.py
if not os.environ.get("_SENTRY_CLEANUP"):
relations.extend(
Expand Down
38 changes: 38 additions & 0 deletions src/sentry/migrations/0117_dummy-activityupdate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.28 on 2020-10-27 18:48
from __future__ import unicode_literals

from django.db import migrations
import sentry.db.models.fields.bounded


class Migration(migrations.Migration):
# This flag is used to mark that a migration shouldn't be automatically run in
# production. We set this to True for operations that we think are risky and want
# someone from ops to run manually and monitor.
# General advice is that if in doubt, mark your migration as `is_dangerous`.
# Some things you should always mark as dangerous:
# - Large data migrations. Typically we want these to be run manually by ops so that
# they can be monitored. Since data migrations will now hold a transaction open
# this is even more important.
# - Adding columns to highly active tables, even ones that are NULL.
is_dangerous = False

# This flag is used to decide whether to run this migration in a transaction or not.
# By default we prefer to run in a transaction, but for migrations where you want
# to `CREATE INDEX CONCURRENTLY` this needs to be set to False. Typically you'll
# want to create an index concurrently when adding one to an existing table.
atomic = False


dependencies = [
('sentry', '0116_backfill_debug_file_checksum'),
]

operations = [
migrations.AlterField(
model_name='activity',
name='type',
field=sentry.db.models.fields.bounded.BoundedPositiveIntegerField(choices=[(1, 'set_resolved'), (15, 'set_resolved_by_age'), (13, 'set_resolved_in_release'), (16, 'set_resolved_in_commit'), (21, 'set_resolved_in_pull_request'), (2, 'set_unresolved'), (3, 'set_ignored'), (4, 'set_public'), (5, 'set_private'), (6, 'set_regression'), (7, 'create_issue'), (8, 'note'), (9, 'first_seen'), (10, 'release'), (11, 'assigned'), (12, 'unassigned'), (14, 'merge'), (17, 'deploy'), (18, 'new_processing_issues'), (19, 'unmerge_source'), (20, 'unmerge_destination'), (22, 'reprocess')]),
),
]
3 changes: 3 additions & 0 deletions src/sentry/models/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Activity(Model):
UNMERGE_SOURCE = 19
UNMERGE_DESTINATION = 20
SET_RESOLVED_IN_PULL_REQUEST = 21
REPROCESS = 22

TYPE = (
# (TYPE, verb-slug)
Expand All @@ -64,6 +65,8 @@ class Activity(Model):
(NEW_PROCESSING_ISSUES, u"new_processing_issues"),
(UNMERGE_SOURCE, u"unmerge_source"),
(UNMERGE_DESTINATION, u"unmerge_destination"),
# The user has reprocessed the group, so events may have moved to new groups
(REPROCESS, u"reprocess"),
)

project = FlexibleForeignKey("sentry.Project")
Expand Down
54 changes: 45 additions & 9 deletions src/sentry/reprocessing2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,27 @@
import hashlib
import logging
import sentry_sdk
import six

from django.conf import settings

from sentry import nodestore, features, eventstore
from sentry.attachments import CachedAttachment, attachment_cache
from sentry.models import EventAttachment
from sentry import models
from sentry.utils import snuba
from sentry.utils.cache import cache_key_for_event
from sentry.utils.redis import redis_clusters
from sentry.eventstore.processing import event_processing_store
from sentry.deletions.defaults.group import GROUP_RELATED_MODELS

logger = logging.getLogger("sentry.reprocessing")

_REDIS_SYNC_TTL = 3600


GROUP_MODELS_TO_MIGRATE = GROUP_RELATED_MODELS + (models.Activity,)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #21611 this addition should not be necessary



def _generate_unprocessed_event_node_id(project_id, event_id):
return hashlib.md5(
u"{}:{}:unprocessed".format(project_id, event_id).encode("utf-8")
Expand Down Expand Up @@ -101,7 +106,7 @@ def reprocess_event(project_id, event_id, start_time):
cache_key = event_processing_store.store(data)

# Step 2: Copy attachments into attachment cache
queryset = EventAttachment.objects.filter(
queryset = models.EventAttachment.objects.filter(
project_id=project_id, event_id=orig_event_id
).select_related("file")

Expand Down Expand Up @@ -202,16 +207,47 @@ def mark_event_reprocessed(data):
_get_sync_redis_client().decr(key)


def start_group_reprocessing(project_id, group_id, max_events=None):
from sentry.models.group import Group, GroupStatus
from sentry.models.grouphash import GroupHash
def start_group_reprocessing(project_id, group_id, max_events=None, acting_user_id=None):
from django.db import transaction

with transaction.atomic():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this might be a pretty large transaction, it touches a lot of tables at once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's follow up on this please. I'm pretty sure we'll want to back this out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that I would really not like to fail halfway through, because that's quite a mess to clean up. Perhaps we want some shitty kind of "clientside rollback", I think I saw that someplace else already.

Group.objects.filter(id=group_id).update(status=GroupStatus.REPROCESSING)
# Remove all grouphashes such that new events get sorted into a
# different group.
GroupHash.objects.filter(group_id=group_id).delete()
group = models.Group.objects.get(id=group_id)
original_status = group.status
original_short_id = group.short_id
group.status = models.GroupStatus.REPROCESSING
# satisfy unique constraint of (project_id, short_id)
# we manually tested that multiple groups with (project_id=1,
# short_id=null) can exist in postgres
group.short_id = None
group.save()

# Create a duplicate row that has the same attributes by nulling out
# the primary key and saving
group.pk = group.id = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth having a comment here that you're tricking django into doing an INSERT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

new_group = group # rename variable just to avoid confusion
del group
new_group.status = original_status
new_group.short_id = original_short_id
# this will be incremented by the events that are reprocessed
new_group.times_seen = 0
new_group.save()

for model in GROUP_MODELS_TO_MIGRATE:
model.objects.filter(group_id=group_id).update(group_id=new_group.id)

models.GroupRedirect.objects.create(
organization_id=new_group.project.organization_id,
group_id=new_group.id,
previous_group_id=group_id,
)

models.Activity.objects.create(
type=models.Activity.REPROCESS,
project=new_group.project,
ident=six.text_type(group_id),
group=new_group,
user_id=acting_user_id,
)

# Get event counts of issue (for all environments etc). This was copypasted
# and simplified from groupserializer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class GroupActivityItem extends React.Component {
data.issues.length,
author
);
case 'reprocess':
return t(
'%(author)s reprocessed this issue, some events may have moved into new issues',
{author}
);
default:
return ''; // should never hit (?)
}
Expand Down
9 changes: 6 additions & 3 deletions src/sentry/tasks/reprocessing2.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
time_limit=120,
soft_time_limit=110,
)
def reprocess_group(project_id, group_id, offset=0, start_time=None, max_events=None):
def reprocess_group(
project_id, group_id, offset=0, start_time=None, max_events=None, acting_user_id=None
):
from sentry.reprocessing2 import start_group_reprocessing

start_group_reprocessing(project_id, group_id, max_events=max_events)

if start_time is None:
start_time = time.time()
start_group_reprocessing(
project_id, group_id, max_events=max_events, acting_user_id=acting_user_id
)

if max_events is not None and max_events <= 0:
events = []
Expand Down
33 changes: 28 additions & 5 deletions tests/sentry/tasks/test_reprocessing2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from time import time
import pytest
import uuid
import six

from sentry import eventstore
from sentry.models.group import Group
from sentry.models import Group, GroupAssignee, Activity
from sentry.event_manager import EventManager
from sentry.eventstore.processing import event_processing_store
from sentry.plugins.base.v2 import Plugin2
Expand Down Expand Up @@ -59,7 +60,6 @@ def is_enabled(self, project=None):

@pytest.mark.django_db
@pytest.mark.snuba
@pytest.mark.skip(reason="Some of these tests deadlock on CI")
@pytest.mark.parametrize("change_groups", (True, False), ids=("new_group", "same_group"))
def test_basic(
task_runner,
Expand Down Expand Up @@ -138,10 +138,20 @@ def get_event_by_processing_counter(n):

@pytest.mark.django_db
@pytest.mark.snuba
@pytest.mark.skip(reason="Some of these tests deadlock on CI")
def test_concurrent_events_go_into_new_group(
default_project, reset_snuba, register_event_preprocessor, process_and_save, burst_task_runner
default_project,
reset_snuba,
register_event_preprocessor,
process_and_save,
burst_task_runner,
default_user,
):
"""
Assert that both unmodified and concurrently inserted events go into "the
new group", i.e. the successor of the reprocessed (old) group that
inherited the group hashes.
"""

@register_event_preprocessor
def event_preprocessor(data):
extra = data.setdefault("extra", {})
Expand All @@ -152,6 +162,13 @@ def event_preprocessor(data):
event_id = process_and_save({"message": "hello world"})

event = eventstore.get_event_by_id(default_project.id, event_id)
original_short_id = event.group.short_id
assert original_short_id
original_group_id = event.group.id

original_assignee = GroupAssignee.objects.create(
group_id=original_group_id, project=default_project, user=default_user
)

with burst_task_runner() as burst_reprocess:
reprocess_group(default_project.id, event.group_id)
Expand All @@ -177,10 +194,16 @@ def event_preprocessor(data):
assert event2.group_id == event3.group_id
assert event.get_hashes() == event2.get_hashes() == event3.get_hashes()

group = event3.group

assert group.short_id == original_short_id
assert GroupAssignee.objects.get(group=group) == original_assignee
activity = Activity.objects.get(group=group, type=Activity.REPROCESS)
assert activity.ident == six.text_type(original_group_id)


@pytest.mark.django_db
@pytest.mark.snuba
@pytest.mark.skip(reason="Some of these tests deadlock on CI")
def test_max_events(
default_project,
reset_snuba,
Expand Down