diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index eac374440dfccd..f87b40cdd7b80c 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -13,6 +13,7 @@ import click import sentry_sdk from django.conf import settings +from django.db import router as db_router from django.db.models import Model, QuerySet from django.utils import timezone @@ -189,11 +190,7 @@ def _cleanup( configure() - from django.db import router as db_router - - from sentry.db.deletion import BulkDeleteQuery from sentry.utils import metrics - from sentry.utils.query import RangeQuerySetWrapper start_time = None if timed: @@ -227,8 +224,6 @@ def is_filtered(model: type[Model]) -> bool: return False return model.__name__.lower() not in model_list - bulk_query_deletes = generate_bulk_query_deletes() - deletes = models_which_use_deletions_code_path() _run_specialized_cleanups(is_filtered, days, silent, models_attempted) @@ -238,8 +233,9 @@ def is_filtered(model: type[Model]) -> bool: project, organization, days, deletes ) + # This does not use the deletions code path, but rather uses the BulkDeleteQuery class + # to delete records in bulk (i.e. does not need to worry about child relations) run_bulk_query_deletes( - bulk_query_deletes, is_filtered, days, project, @@ -247,90 +243,23 @@ def is_filtered(model: type[Model]) -> bool: models_attempted, ) - debug_output("Running bulk deletes in DELETES") - for model_tp, dtfield, order_by in deletes: - debug_output(f"Removing {model_tp.__name__} for days={days} project={project or '*'}") - - if is_filtered(model_tp): - debug_output(">> Skipping %s" % model_tp.__name__) - else: - models_attempted.add(model_tp.__name__.lower()) - imp = ".".join((model_tp.__module__, model_tp.__name__)) - - q = BulkDeleteQuery( - model=model_tp, - dtfield=dtfield, - days=days, - project_id=project_id, - order_by=order_by, - ) - - for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) - - task_queue.join() - - project_deletion_query, to_delete_by_project = prepare_deletes_by_project( - project, project_id, is_filtered + run_bulk_deletes_in_deletes( + task_queue, + deletes, + is_filtered, + days, + project, + project_id, + models_attempted, ) - if project_deletion_query is not None and len(to_delete_by_project): - debug_output("Running bulk deletes in DELETES_BY_PROJECT") - for project_id_for_deletion in RangeQuerySetWrapper( - project_deletion_query.values_list("id", flat=True), - result_value_getter=lambda item: item, - ): - for model_tp, dtfield, order_by in to_delete_by_project: - models_attempted.add(model_tp.__name__.lower()) - debug_output( - f"Removing {model_tp.__name__} for days={days} project={project_id_for_deletion}" - ) - - imp = ".".join((model_tp.__module__, model_tp.__name__)) - - q = BulkDeleteQuery( - model=model_tp, - dtfield=dtfield, - days=days, - project_id=project_id_for_deletion, - order_by=order_by, - ) - - for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) - - task_queue.join() - - organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization( - organization_id, is_filtered + run_bulk_deletes_by_project( + task_queue, project, project_id, is_filtered, days, models_attempted ) - if organization_deletion_query is not None and len(to_delete_by_organization): - debug_output("Running bulk deletes in DELETES_BY_ORGANIZATION") - for organization_id_for_deletion in RangeQuerySetWrapper( - organization_deletion_query.values_list("id", flat=True), - result_value_getter=lambda item: item, - ): - for model_tp, dtfield, order_by in to_delete_by_organization: - models_attempted.add(model_tp.__name__.lower()) - debug_output( - f"Removing {model_tp.__name__} for days={days} organization={organization_id_for_deletion}" - ) - - imp = ".".join((model_tp.__module__, model_tp.__name__)) - - q = BulkDeleteQuery( - model=model_tp, - dtfield=dtfield, - days=days, - organization_id=organization_id_for_deletion, - order_by=order_by, - ) - - for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) - - task_queue.join() + run_bulk_deletes_by_organization( + task_queue, organization_id, is_filtered, days, models_attempted + ) remove_file_blobs(is_filtered, silent, models_attempted) @@ -425,6 +354,9 @@ def _start_pool(concurrency: int) -> tuple[list[Process], _WorkQueue]: def _stop_pool(pool: Sequence[Process], task_queue: _WorkQueue) -> None: + # First, ensure all queued tasks are completed + task_queue.join() + # Stop the pool for _ in pool: task_queue.put(_STOP_WORKER) @@ -599,7 +531,6 @@ def generate_bulk_query_deletes() -> list[tuple[type[Model], str, str | None]]: def run_bulk_query_deletes( - bulk_query_deletes: list[tuple[type[Model], str, str | None]], is_filtered: Callable[[type[Model]], bool], days: int, project: str | None, @@ -609,6 +540,7 @@ def run_bulk_query_deletes( from sentry.db.deletion import BulkDeleteQuery debug_output("Running bulk query deletes in bulk_query_deletes") + bulk_query_deletes = generate_bulk_query_deletes() for model_tp, dtfield, order_by in bulk_query_deletes: chunk_size = 10000 @@ -626,6 +558,129 @@ def run_bulk_query_deletes( ).execute(chunk_size=chunk_size) +def run_bulk_deletes_in_deletes( + task_queue: _WorkQueue, + deletes: list[tuple[type[Model], str, str]], + is_filtered: Callable[[type[Model]], bool], + days: int, + project: str | None, + project_id: int | None, + models_attempted: set[str], +) -> None: + from sentry.db.deletion import BulkDeleteQuery + + debug_output("Running bulk deletes in DELETES") + for model_tp, dtfield, order_by in deletes: + debug_output(f"Removing {model_tp.__name__} for days={days} project={project or '*'}") + + if is_filtered(model_tp): + debug_output(">> Skipping %s" % model_tp.__name__) + else: + models_attempted.add(model_tp.__name__.lower()) + imp = ".".join((model_tp.__module__, model_tp.__name__)) + + q = BulkDeleteQuery( + model=model_tp, + dtfield=dtfield, + days=days, + project_id=project_id, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + + # Ensure all tasks are completed before exiting + task_queue.join() + + +def run_bulk_deletes_by_project( + task_queue: _WorkQueue, + project: str | None, + project_id: int | None, + is_filtered: Callable[[type[Model]], bool], + days: int, + models_attempted: set[str], +) -> None: + from sentry.db.deletion import BulkDeleteQuery + from sentry.utils.query import RangeQuerySetWrapper + + project_deletion_query, to_delete_by_project = prepare_deletes_by_project( + project, project_id, is_filtered + ) + + if project_deletion_query is not None and len(to_delete_by_project): + debug_output("Running bulk deletes in DELETES_BY_PROJECT") + for project_id_for_deletion in RangeQuerySetWrapper( + project_deletion_query.values_list("id", flat=True), + result_value_getter=lambda item: item, + ): + for model_tp, dtfield, order_by in to_delete_by_project: + models_attempted.add(model_tp.__name__.lower()) + debug_output( + f"Removing {model_tp.__name__} for days={days} project={project_id_for_deletion}" + ) + + imp = ".".join((model_tp.__module__, model_tp.__name__)) + + q = BulkDeleteQuery( + model=model_tp, + dtfield=dtfield, + days=days, + project_id=project_id_for_deletion, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + + # Ensure all tasks are completed before exiting + task_queue.join() + + +def run_bulk_deletes_by_organization( + task_queue: _WorkQueue, + organization_id: int | None, + is_filtered: Callable[[type[Model]], bool], + days: int, + models_attempted: set[str], +) -> None: + from sentry.db.deletion import BulkDeleteQuery + from sentry.utils.query import RangeQuerySetWrapper + + organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization( + organization_id, is_filtered + ) + + if organization_deletion_query is not None and len(to_delete_by_organization): + debug_output("Running bulk deletes in DELETES_BY_ORGANIZATION") + for organization_id_for_deletion in RangeQuerySetWrapper( + organization_deletion_query.values_list("id", flat=True), + result_value_getter=lambda item: item, + ): + for model_tp, dtfield, order_by in to_delete_by_organization: + models_attempted.add(model_tp.__name__.lower()) + debug_output( + f"Removing {model_tp.__name__} for days={days} organization={organization_id_for_deletion}" + ) + + imp = ".".join((model_tp.__module__, model_tp.__name__)) + + q = BulkDeleteQuery( + model=model_tp, + dtfield=dtfield, + days=days, + organization_id=organization_id_for_deletion, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + + # Ensure all tasks are completed before exiting + task_queue.join() + + def prepare_deletes_by_project( project: str | None, project_id: int | None, is_filtered: Callable[[type[Model]], bool] ) -> tuple[QuerySet[Any] | None, list[tuple[Any, str, str]]]: