From afa190844eadba1364d245c5c11e0e546e43f937 Mon Sep 17 00:00:00 2001 From: Scott Cooper Date: Wed, 13 Oct 2021 19:35:04 -0700 Subject: [PATCH 1/3] add locks to groupowners --- src/sentry/tasks/groupowner.py | 54 +++++++++++++++++++++----------- src/sentry/tasks/post_process.py | 7 ++++- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/src/sentry/tasks/groupowner.py b/src/sentry/tasks/groupowner.py index 86c0ffda31da1f..2657129ea33804 100644 --- a/src/sentry/tasks/groupowner.py +++ b/src/sentry/tasks/groupowner.py @@ -3,11 +3,13 @@ from django.utils import timezone +from sentry.app import locks from sentry.models import Commit, Project, Release from sentry.models.groupowner import GroupOwner, GroupOwnerType from sentry.tasks.base import instrumented_task from sentry.utils import metrics from sentry.utils.committers import get_event_file_committers +from sentry.utils.locking import UnableToAcquireLock from sentry.utils.sdk import set_current_event_project PREFERRED_GROUP_OWNERS = 2 @@ -66,25 +68,39 @@ def process_suspect_commits(event_id, event_platform, event_frames, group_id, pr for owner_id in sorted(owner_scores, reverse=True, key=owner_scores.get)[ :PREFERRED_GROUP_OWNERS ]: - go, created = GroupOwner.objects.update_or_create( - group_id=group_id, - type=GroupOwnerType.SUSPECT_COMMIT.value, - user_id=owner_id, - project=project, - organization_id=project.organization_id, - defaults={ - "date_added": timezone.now() - }, # Updates date of an existing owner, since we just matched them with this new event - ) - if created: - owner_count += 1 - if owner_count > PREFERRED_GROUP_OWNERS: - try: - owner = owners[0] - except IndexError: - pass - else: - owner.delete() + # Lock to prevent duplicate GroupOwners + lock = locks.get(f"groupowner:{group_id}-{owner_id}", duration=10) + try: + with lock.acquire(): + go, created = GroupOwner.objects.update_or_create( + group_id=group_id, + type=GroupOwnerType.SUSPECT_COMMIT.value, + user_id=owner_id, + project=project, + organization_id=project.organization_id, + defaults={ + "date_added": timezone.now() + }, # Updates date of an existing owner, since we just matched them with this new event + ) + if created: + owner_count += 1 + if owner_count > PREFERRED_GROUP_OWNERS: + try: + owner = owners[0] + except IndexError: + pass + else: + owner.delete() + except GroupOwner.MultipleObjectsReturned: + GroupOwner.objects.filter( + group_id=group_id, + type=GroupOwnerType.SUSPECT_COMMIT.value, + user_id=owner_id, + project=project, + organization_id=project.organization_id, + )[0].delete() + except UnableToAcquireLock: + pass except Commit.DoesNotExist: logger.info( diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index 58026b40676432..d67429e230413e 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -169,7 +169,12 @@ def handle_group_owners(project, group, owners): ) ) if new_group_owners: - GroupOwner.objects.bulk_create(new_group_owners) + lock = locks.get(f"groupowner-bulk:{group.id}", duration=10) + try: + with lock.acquire(): + GroupOwner.objects.bulk_create(new_group_owners) + except UnableToAcquireLock: + pass def update_existing_attachments(event): From 76e1c91e6268582f94d678e31e2b26f4954da1c1 Mon Sep 17 00:00:00 2001 From: Scott Cooper Date: Thu, 14 Oct 2021 16:36:11 -0700 Subject: [PATCH 2/3] wrap entire functions --- src/sentry/tasks/groupowner.py | 70 ++++++++++++++------------ src/sentry/tasks/post_process.py | 86 ++++++++++++++++---------------- 2 files changed, 83 insertions(+), 73 deletions(-) diff --git a/src/sentry/tasks/groupowner.py b/src/sentry/tasks/groupowner.py index 2657129ea33804..f89800b5fd369c 100644 --- a/src/sentry/tasks/groupowner.py +++ b/src/sentry/tasks/groupowner.py @@ -19,13 +19,9 @@ logger = logging.getLogger("tasks.groupowner") -@instrumented_task( - name="sentry.tasks.process_suspect_commits", - queue="group_owners.process_suspect_commits", - default_retry_delay=5, - max_retries=5, -) -def process_suspect_commits(event_id, event_platform, event_frames, group_id, project_id, **kwargs): +def _process_suspect_commits( + event_id, event_platform, event_frames, group_id, project_id, **kwargs +): metrics.incr("sentry.tasks.process_suspect_commits.start") set_current_event_project(project_id) @@ -68,29 +64,26 @@ def process_suspect_commits(event_id, event_platform, event_frames, group_id, pr for owner_id in sorted(owner_scores, reverse=True, key=owner_scores.get)[ :PREFERRED_GROUP_OWNERS ]: - # Lock to prevent duplicate GroupOwners - lock = locks.get(f"groupowner:{group_id}-{owner_id}", duration=10) try: - with lock.acquire(): - go, created = GroupOwner.objects.update_or_create( - group_id=group_id, - type=GroupOwnerType.SUSPECT_COMMIT.value, - user_id=owner_id, - project=project, - organization_id=project.organization_id, - defaults={ - "date_added": timezone.now() - }, # Updates date of an existing owner, since we just matched them with this new event - ) - if created: - owner_count += 1 - if owner_count > PREFERRED_GROUP_OWNERS: - try: - owner = owners[0] - except IndexError: - pass - else: - owner.delete() + go, created = GroupOwner.objects.update_or_create( + group_id=group_id, + type=GroupOwnerType.SUSPECT_COMMIT.value, + user_id=owner_id, + project=project, + organization_id=project.organization_id, + defaults={ + "date_added": timezone.now() + }, # Updates date of an existing owner, since we just matched them with this new event + ) + if created: + owner_count += 1 + if owner_count > PREFERRED_GROUP_OWNERS: + try: + owner = owners[0] + except IndexError: + pass + else: + owner.delete() except GroupOwner.MultipleObjectsReturned: GroupOwner.objects.filter( group_id=group_id, @@ -99,8 +92,6 @@ def process_suspect_commits(event_id, event_platform, event_frames, group_id, pr project=project, organization_id=project.organization_id, )[0].delete() - except UnableToAcquireLock: - pass except Commit.DoesNotExist: logger.info( @@ -112,3 +103,20 @@ def process_suspect_commits(event_id, event_platform, event_frames, group_id, pr "process_suspect_commits.skipped", extra={"event": event_id, "reason": "no_release"}, ) + + +@instrumented_task( + name="sentry.tasks.process_suspect_commits", + queue="group_owners.process_suspect_commits", + default_retry_delay=5, + max_retries=5, +) +def process_suspect_commits(event_id, event_platform, event_frames, group_id, project_id, **kwargs): + lock = locks.get(f"process-suspect-commits:{group_id}", duration=10) + try: + with lock.acquire(): + _process_suspect_commits( + event_id, event_platform, event_frames, group_id, project_id, **kwargs + ) + except UnableToAcquireLock: + pass diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index d67429e230413e..756ecb12837160 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -131,50 +131,52 @@ def handle_group_owners(project, group, owners): from sentry.models.team import Team from sentry.models.user import User - with metrics.timer("post_process.handle_group_owners"): - current_group_owners = GroupOwner.objects.filter( - group=group, type=GroupOwnerType.OWNERSHIP_RULE.value - ) - new_owners = {(type(owner), owner.id) for owner in owners} - # Owners already in the database that we'll keep - keeping_owners = set() - for owner in current_group_owners: - lookup_key = ( - (Team, owner.team_id) if owner.team_id is not None else (User, owner.user_id) - ) - if lookup_key not in new_owners: - owner.delete() - else: - keeping_owners.add(lookup_key) - - new_group_owners = [] - - for key in new_owners: - if key not in keeping_owners: - owner_type, owner_id = key - user_id = None - team_id = None - if owner_type is User: - user_id = owner_id - if owner_type is Team: - team_id = owner_id - new_group_owners.append( - GroupOwner( - group=group, - type=GroupOwnerType.OWNERSHIP_RULE.value, - user_id=user_id, - team_id=team_id, - project=project, - organization=project.organization, - ) + lock = locks.get(f"groupowner-bulk:{group.id}", duration=10) + try: + with lock.acquire(): + with metrics.timer("post_process.handle_group_owners"): + current_group_owners = GroupOwner.objects.filter( + group=group, type=GroupOwnerType.OWNERSHIP_RULE.value ) - if new_group_owners: - lock = locks.get(f"groupowner-bulk:{group.id}", duration=10) - try: - with lock.acquire(): + new_owners = {(type(owner), owner.id) for owner in owners} + # Owners already in the database that we'll keep + keeping_owners = set() + for owner in current_group_owners: + lookup_key = ( + (Team, owner.team_id) + if owner.team_id is not None + else (User, owner.user_id) + ) + if lookup_key not in new_owners: + owner.delete() + else: + keeping_owners.add(lookup_key) + + new_group_owners = [] + + for key in new_owners: + if key not in keeping_owners: + owner_type, owner_id = key + user_id = None + team_id = None + if owner_type is User: + user_id = owner_id + if owner_type is Team: + team_id = owner_id + new_group_owners.append( + GroupOwner( + group=group, + type=GroupOwnerType.OWNERSHIP_RULE.value, + user_id=user_id, + team_id=team_id, + project=project, + organization=project.organization, + ) + ) + if new_group_owners: GroupOwner.objects.bulk_create(new_group_owners) - except UnableToAcquireLock: - pass + except UnableToAcquireLock: + pass def update_existing_attachments(event): From 9567f8f7884cf973c3d472712a3edfd3136878a2 Mon Sep 17 00:00:00 2001 From: Scott Cooper Date: Tue, 19 Oct 2021 13:26:10 -0700 Subject: [PATCH 3/3] combine locks --- src/sentry/tasks/post_process.py | 77 +++++++++++++++----------------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index 756ecb12837160..53b72cffa0216e 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -133,48 +133,45 @@ def handle_group_owners(project, group, owners): lock = locks.get(f"groupowner-bulk:{group.id}", duration=10) try: - with lock.acquire(): - with metrics.timer("post_process.handle_group_owners"): - current_group_owners = GroupOwner.objects.filter( - group=group, type=GroupOwnerType.OWNERSHIP_RULE.value + with lock.acquire(), metrics.timer("post_process.handle_group_owners"): + current_group_owners = GroupOwner.objects.filter( + group=group, type=GroupOwnerType.OWNERSHIP_RULE.value + ) + new_owners = {(type(owner), owner.id) for owner in owners} + # Owners already in the database that we'll keep + keeping_owners = set() + for owner in current_group_owners: + lookup_key = ( + (Team, owner.team_id) if owner.team_id is not None else (User, owner.user_id) ) - new_owners = {(type(owner), owner.id) for owner in owners} - # Owners already in the database that we'll keep - keeping_owners = set() - for owner in current_group_owners: - lookup_key = ( - (Team, owner.team_id) - if owner.team_id is not None - else (User, owner.user_id) - ) - if lookup_key not in new_owners: - owner.delete() - else: - keeping_owners.add(lookup_key) - - new_group_owners = [] - - for key in new_owners: - if key not in keeping_owners: - owner_type, owner_id = key - user_id = None - team_id = None - if owner_type is User: - user_id = owner_id - if owner_type is Team: - team_id = owner_id - new_group_owners.append( - GroupOwner( - group=group, - type=GroupOwnerType.OWNERSHIP_RULE.value, - user_id=user_id, - team_id=team_id, - project=project, - organization=project.organization, - ) + if lookup_key not in new_owners: + owner.delete() + else: + keeping_owners.add(lookup_key) + + new_group_owners = [] + + for key in new_owners: + if key not in keeping_owners: + owner_type, owner_id = key + user_id = None + team_id = None + if owner_type is User: + user_id = owner_id + if owner_type is Team: + team_id = owner_id + new_group_owners.append( + GroupOwner( + group=group, + type=GroupOwnerType.OWNERSHIP_RULE.value, + user_id=user_id, + team_id=team_id, + project=project, + organization=project.organization, ) - if new_group_owners: - GroupOwner.objects.bulk_create(new_group_owners) + ) + if new_group_owners: + GroupOwner.objects.bulk_create(new_group_owners) except UnableToAcquireLock: pass