-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
fix(repos): Have repo sync batch up the work #113131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| from sentry.features.exceptions import FeatureNotRegistered | ||
| from sentry.integrations.models.organization_integration import OrganizationIntegration | ||
| from sentry.integrations.services.integration import integration_service | ||
| from sentry.integrations.services.integration.model import RpcIntegration | ||
| from sentry.integrations.services.repository.service import repository_service | ||
| from sentry.integrations.source_code_management.metrics import ( | ||
| SCMIntegrationInteractionEvent, | ||
|
|
@@ -28,13 +29,15 @@ | |
| from sentry.integrations.source_code_management.repo_audit import log_repo_change | ||
| from sentry.integrations.source_code_management.repository import RepositoryIntegration | ||
| from sentry.organizations.services.organization import organization_service | ||
| from sentry.organizations.services.organization.model import RpcOrganization | ||
| from sentry.plugins.providers.integration_repository import get_integration_repository_provider | ||
| from sentry.shared_integrations.exceptions import ApiError | ||
| from sentry.silo.base import SiloMode | ||
| from sentry.tasks.base import instrumented_task, retry | ||
| from sentry.taskworker.namespaces import integrations_control_tasks | ||
| from sentry.utils import metrics | ||
| from sentry.utils.cursored_scheduler import CursoredScheduler | ||
| from sentry.utils.iterators import chunked | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -51,6 +54,8 @@ | |
| "vsts", | ||
| ] | ||
|
|
||
| SYNC_BATCH_SIZE = 100 | ||
|
|
||
|
|
||
| def _has_feature(flag: str, org: object) -> bool: | ||
| """Check a feature flag, returning False if the flag isn't registered.""" | ||
|
|
@@ -73,43 +78,12 @@ def sync_repos_for_org(organization_integration_id: int) -> None: | |
| Sync repositories for a single OrganizationIntegration. | ||
|
|
||
| Fetches all repos from the SCM provider, diffs against Sentry's | ||
| Repository table, and creates/disables/re-enables repos as needed. | ||
| Repository table, then dispatches batched apply tasks. | ||
| """ | ||
| try: | ||
| oi = OrganizationIntegration.objects.get( | ||
| id=organization_integration_id, | ||
| status=ObjectStatus.ACTIVE, | ||
| ) | ||
| except OrganizationIntegration.DoesNotExist: | ||
| logger.info( | ||
| "sync_repos_for_org.missing_org_integration", | ||
| extra={"organization_integration_id": organization_integration_id}, | ||
| ) | ||
| ctx = _get_sync_context(organization_integration_id) | ||
| if ctx is None: | ||
| return | ||
|
|
||
| integration = integration_service.get_integration( | ||
| integration_id=oi.integration_id, status=ObjectStatus.ACTIVE | ||
| ) | ||
| if integration is None: | ||
| logger.info( | ||
| "sync_repos_for_org.missing_integration", | ||
| extra={"integration_id": oi.integration_id}, | ||
| ) | ||
| return | ||
|
|
||
| organization_id = oi.organization_id | ||
| org_context = organization_service.get_organization_by_id( | ||
| id=organization_id, include_projects=False, include_teams=False | ||
| ) | ||
| if org_context is None: | ||
| logger.info( | ||
| "sync_repos_for_org.missing_organization", | ||
| extra={"organization_id": organization_id}, | ||
| ) | ||
| return | ||
|
|
||
| rpc_org = org_context.organization | ||
| provider_key = integration.provider | ||
| integration, rpc_org, provider_key = ctx | ||
|
|
||
| if not _has_feature(f"organizations:{provider_key}-repo-auto-sync", rpc_org): | ||
| return | ||
|
|
@@ -120,10 +94,10 @@ def sync_repos_for_org(organization_integration_id: int) -> None: | |
| with SCMIntegrationInteractionEvent( | ||
| interaction_type=SCMIntegrationInteractionType.SYNC_REPOS, | ||
| integration_id=integration.id, | ||
| organization_id=organization_id, | ||
| organization_id=rpc_org.id, | ||
| provider_key=provider_key, | ||
| ).capture(): | ||
| installation = integration.get_installation(organization_id=organization_id) | ||
| installation = integration.get_installation(organization_id=rpc_org.id) | ||
| assert isinstance(installation, RepositoryIntegration) | ||
|
|
||
| try: | ||
|
|
@@ -134,15 +108,15 @@ def sync_repos_for_org(organization_integration_id: int) -> None: | |
| "sync_repos_for_org.rate_limited", | ||
| extra={ | ||
| "integration_id": integration.id, | ||
| "organization_id": organization_id, | ||
| "organization_id": rpc_org.id, | ||
| }, | ||
| ) | ||
| raise | ||
|
|
||
| provider_external_ids = {repo["external_id"] for repo in provider_repos} | ||
|
|
||
| all_repos = repository_service.get_repositories( | ||
| organization_id=organization_id, | ||
| organization_id=rpc_org.id, | ||
| integration_id=integration.id, | ||
| providers=[provider], | ||
| ) | ||
|
|
@@ -193,7 +167,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None: | |
| extra={ | ||
| "provider": provider_key, | ||
| "integration_id": integration.id, | ||
| "organization_id": organization_id, | ||
| "organization_id": rpc_org.id, | ||
| "dry_run": dry_run, | ||
| "provider_total": len(provider_external_ids), | ||
| "sentry_active": len(sentry_active_ids), | ||
|
|
@@ -210,76 +184,203 @@ def sync_repos_for_org(organization_integration_id: int) -> None: | |
| if dry_run: | ||
| return | ||
|
|
||
| repo_by_external_id = {r.external_id: r for r in active_repos + disabled_repos} | ||
| # Build repo configs for new repos | ||
| new_repo_configs = [ | ||
| { | ||
| **repo, | ||
| "identifier": str(repo["identifier"]), | ||
| "integration_id": integration.id, | ||
| "installation": integration.id, | ||
| } | ||
| for repo in provider_repos | ||
| if repo["external_id"] in new_ids | ||
| ] | ||
| removed_id_list = list(removed_ids) | ||
| restored_id_list = list(restored_ids) | ||
|
|
||
| # TODO: Switch to apply_async once the tasks are deployed to all workers | ||
| for config_batch in chunked(new_repo_configs, SYNC_BATCH_SIZE): | ||
| create_repos_batch( | ||
| organization_integration_id=organization_integration_id, | ||
| repo_configs=config_batch, | ||
| ) | ||
|
|
||
| if new_ids: | ||
| integration_repo_provider = get_integration_repository_provider(integration) | ||
| repo_configs = [ | ||
| { | ||
| **repo, | ||
| "identifier": str(repo["identifier"]), | ||
| "integration_id": integration.id, | ||
| "installation": integration.id, | ||
| } | ||
| for repo in provider_repos | ||
| if repo["external_id"] in new_ids | ||
| ] | ||
| if repo_configs: | ||
| created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( | ||
| configs=repo_configs, organization=rpc_org | ||
| if _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): | ||
| for removed_batch in chunked(removed_id_list, SYNC_BATCH_SIZE): | ||
| disable_repos_batch( | ||
| organization_integration_id=organization_integration_id, | ||
| external_ids=removed_batch, | ||
| ) | ||
|
|
||
| for repo in created_repos: | ||
| log_repo_change( | ||
| event_name="REPO_ADDED", | ||
| organization_id=organization_id, | ||
| repo=repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
| for repo in reactivated_repos: | ||
| log_repo_change( | ||
| event_name="REPO_ENABLED", | ||
| organization_id=organization_id, | ||
| repo=repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
| if removed_ids and _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): | ||
| repository_service.disable_repositories_by_external_ids( | ||
| organization_id=organization_id, | ||
| integration_id=integration.id, | ||
| provider=provider, | ||
| external_ids=list(removed_ids), | ||
| for restored_batch in chunked(restored_id_list, SYNC_BATCH_SIZE): | ||
| restore_repos_batch( | ||
| organization_integration_id=organization_integration_id, | ||
| external_ids=restored_batch, | ||
| ) | ||
|
|
||
|
|
||
| def _get_sync_context( | ||
| organization_integration_id: int, | ||
| ) -> tuple[RpcIntegration, RpcOrganization, str] | None: | ||
| """Shared lookup for batch tasks. Returns (integration, rpc_org, provider_key) or None.""" | ||
| try: | ||
| oi = OrganizationIntegration.objects.get( | ||
| id=organization_integration_id, | ||
| status=ObjectStatus.ACTIVE, | ||
| ) | ||
| except OrganizationIntegration.DoesNotExist: | ||
| logger.info( | ||
| "sync_repos.missing_org_integration", | ||
| extra={"organization_integration_id": organization_integration_id}, | ||
| ) | ||
| return None | ||
|
|
||
| integration = integration_service.get_integration( | ||
| integration_id=oi.integration_id, status=ObjectStatus.ACTIVE | ||
| ) | ||
| if integration is None: | ||
| logger.info( | ||
| "sync_repos.missing_integration", | ||
| extra={"integration_id": oi.integration_id}, | ||
| ) | ||
| return None | ||
|
|
||
| org_context = organization_service.get_organization_by_id( | ||
| id=oi.organization_id, include_projects=False, include_teams=False | ||
| ) | ||
| if org_context is None: | ||
| logger.info( | ||
| "sync_repos.missing_organization", | ||
| extra={"organization_id": oi.organization_id}, | ||
| ) | ||
| return None | ||
|
|
||
| return integration, org_context.organization, integration.provider | ||
|
|
||
|
|
||
| @instrumented_task( | ||
| name="sentry.integrations.source_code_management.sync_repos.create_repos_batch", | ||
| namespace=integrations_control_tasks, | ||
| retry=Retry(times=3, delay=120), | ||
| processing_deadline_duration=120, | ||
| silo_mode=SiloMode.CONTROL, | ||
| ) | ||
| @retry() | ||
| def create_repos_batch( | ||
| organization_integration_id: int, | ||
| repo_configs: list[dict[str, object]], | ||
| ) -> None: | ||
| ctx = _get_sync_context(organization_integration_id) | ||
| if ctx is None: | ||
| return | ||
| integration, rpc_org, provider_key = ctx | ||
|
|
||
| integration_repo_provider = get_integration_repository_provider(integration) | ||
| created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( | ||
| configs=repo_configs, organization=rpc_org | ||
| ) | ||
|
|
||
| for repo in created_repos: | ||
| log_repo_change( | ||
| event_name="REPO_ADDED", | ||
| organization_id=rpc_org.id, | ||
| repo=repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
| for repo in reactivated_repos: | ||
| log_repo_change( | ||
| event_name="REPO_ENABLED", | ||
| organization_id=rpc_org.id, | ||
| repo=repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
|
|
||
| @instrumented_task( | ||
| name="sentry.integrations.source_code_management.sync_repos.disable_repos_batch", | ||
| namespace=integrations_control_tasks, | ||
| retry=Retry(times=3, delay=120), | ||
| processing_deadline_duration=120, | ||
| silo_mode=SiloMode.CONTROL, | ||
| ) | ||
| @retry() | ||
| def disable_repos_batch( | ||
| organization_integration_id: int, | ||
| external_ids: list[str], | ||
| ) -> None: | ||
| ctx = _get_sync_context(organization_integration_id) | ||
| if ctx is None: | ||
| return | ||
| integration, rpc_org, provider_key = ctx | ||
| provider = f"integrations:{provider_key}" | ||
|
|
||
| if not _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): | ||
| return | ||
|
|
||
| repository_service.disable_repositories_by_external_ids( | ||
| organization_id=rpc_org.id, | ||
| integration_id=integration.id, | ||
| provider=provider, | ||
| external_ids=external_ids, | ||
| ) | ||
|
|
||
| all_repos = repository_service.get_repositories( | ||
| organization_id=rpc_org.id, | ||
| integration_id=integration.id, | ||
| providers=[provider], | ||
| ) | ||
| repo_by_external_id = {r.external_id: r for r in all_repos} | ||
|
|
||
| for eid in external_ids: | ||
| removed_repo = repo_by_external_id.get(eid) | ||
| if removed_repo: | ||
| log_repo_change( | ||
| event_name="REPO_DISABLED", | ||
| organization_id=rpc_org.id, | ||
| repo=removed_repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
| for eid in removed_ids: | ||
| removed_repo = repo_by_external_id.get(eid) | ||
| if removed_repo: | ||
| log_repo_change( | ||
| event_name="REPO_DISABLED", | ||
| organization_id=organization_id, | ||
| repo=removed_repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
| if restored_ids: | ||
| for repo in disabled_repos: | ||
| if repo.external_id in restored_ids: | ||
| repo.status = ObjectStatus.ACTIVE | ||
| repository_service.update_repository( | ||
| organization_id=organization_id, update=repo | ||
| ) | ||
| log_repo_change( | ||
| event_name="REPO_ENABLED", | ||
| organization_id=organization_id, | ||
| repo=repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
| @instrumented_task( | ||
| name="sentry.integrations.source_code_management.sync_repos.restore_repos_batch", | ||
| namespace=integrations_control_tasks, | ||
| retry=Retry(times=3, delay=120), | ||
| processing_deadline_duration=120, | ||
| silo_mode=SiloMode.CONTROL, | ||
| ) | ||
| @retry() | ||
| def restore_repos_batch( | ||
| organization_integration_id: int, | ||
| external_ids: list[str], | ||
| ) -> None: | ||
| ctx = _get_sync_context(organization_integration_id) | ||
| if ctx is None: | ||
| return | ||
| integration, rpc_org, provider_key = ctx | ||
| provider = f"integrations:{provider_key}" | ||
|
|
||
| all_repos = repository_service.get_repositories( | ||
| organization_id=rpc_org.id, | ||
| integration_id=integration.id, | ||
| providers=[provider], | ||
| ) | ||
| restore_set = set(external_ids) | ||
| for repo in all_repos: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: The refactored repo sync tasks can create duplicate Suggested FixIn Prompt for AI Agent |
||
| if repo.external_id in restore_set: | ||
| repo.status = ObjectStatus.ACTIVE | ||
| repository_service.update_repository(organization_id=rpc_org.id, update=repo) | ||
| log_repo_change( | ||
| event_name="REPO_ENABLED", | ||
| organization_id=rpc_org.id, | ||
| repo=repo, | ||
| source="automatic SCM syncing", | ||
| provider=provider_key, | ||
| ) | ||
|
|
||
|
|
||
| @instrumented_task( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restore_repos_batch missing disabled status filter on query
Medium Severity
restore_repos_batchfetches all repos regardless of status, while the old code only iterateddisabled_repos(filtered toObjectStatus.DISABLED). Theget_repositoriescall here doesn't pass the availablestatusparameter. When these tasks are moved to async execution (the stated follow-up), a repo that transitioned fromDISABLEDtoPENDING_DELETIONbetween the parent task and this batch task would be incorrectly set back toACTIVE, overriding the deletion. Even synchronously, this needlessly updates already-active repos and emits falseREPO_ENABLEDaudit log entries if duplicateexternal_idvalues exist.Reviewed by Cursor Bugbot for commit a6ce51e. Configure here.