From b5edb450899da7397f9b13d6b06cdec40dcb37b2 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 7 Oct 2025 09:10:12 -0400 Subject: [PATCH 1/7] ref(cleanup): More functions This helps with the readability of the code. --- src/sentry/runner/commands/cleanup.py | 247 +++++++++++++++----------- 1 file changed, 141 insertions(+), 106 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index eac374440dfccd..c358f942a8c8ee 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -16,8 +16,10 @@ from django.db.models import Model, QuerySet from django.utils import timezone +from sentry.db.deletion import BulkDeleteQuery from sentry.runner.decorators import log_options from sentry.silo.base import SiloLimit, SiloMode +from sentry.utils.query import RangeQuerySetWrapper logger = logging.getLogger(__name__) @@ -191,9 +193,7 @@ def _cleanup( from django.db import router as db_router - from sentry.db.deletion import BulkDeleteQuery from sentry.utils import metrics - from sentry.utils.query import RangeQuerySetWrapper start_time = None if timed: @@ -227,19 +227,15 @@ def is_filtered(model: type[Model]) -> bool: return False return model.__name__.lower() not in model_list - bulk_query_deletes = generate_bulk_query_deletes() - deletes = models_which_use_deletions_code_path() + project_id, organization_id = get_project_and_organization_id(project, organization) _run_specialized_cleanups(is_filtered, days, silent, models_attempted) + _handle_non_control_deletes(project, deletes, days) - # Handle project/organization specific logic - project_id, organization_id = _handle_project_organization_cleanup( - project, organization, days, deletes - ) - + # This does not use the deletions code path, but rather uses the BulkDeleteQuery class + # to delete records in bulk (i.e. does not need to worry about child relations) run_bulk_query_deletes( - bulk_query_deletes, is_filtered, days, project, @@ -247,90 +243,23 @@ def is_filtered(model: type[Model]) -> bool: models_attempted, ) - debug_output("Running bulk deletes in DELETES") - for model_tp, dtfield, order_by in deletes: - debug_output(f"Removing {model_tp.__name__} for days={days} project={project or '*'}") - - if is_filtered(model_tp): - debug_output(">> Skipping %s" % model_tp.__name__) - else: - models_attempted.add(model_tp.__name__.lower()) - imp = ".".join((model_tp.__module__, model_tp.__name__)) - - q = BulkDeleteQuery( - model=model_tp, - dtfield=dtfield, - days=days, - project_id=project_id, - order_by=order_by, - ) - - for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) - - task_queue.join() - - project_deletion_query, to_delete_by_project = prepare_deletes_by_project( - project, project_id, is_filtered + run_bulk_deletes_in_deletes( + task_queue, + deletes, + is_filtered, + days, + project, + project_id, + models_attempted, ) - if project_deletion_query is not None and len(to_delete_by_project): - debug_output("Running bulk deletes in DELETES_BY_PROJECT") - for project_id_for_deletion in RangeQuerySetWrapper( - project_deletion_query.values_list("id", flat=True), - result_value_getter=lambda item: item, - ): - for model_tp, dtfield, order_by in to_delete_by_project: - models_attempted.add(model_tp.__name__.lower()) - debug_output( - f"Removing {model_tp.__name__} for days={days} project={project_id_for_deletion}" - ) - - imp = ".".join((model_tp.__module__, model_tp.__name__)) - - q = BulkDeleteQuery( - model=model_tp, - dtfield=dtfield, - days=days, - project_id=project_id_for_deletion, - order_by=order_by, - ) - - for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) - - task_queue.join() - - organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization( - organization_id, is_filtered + run_bulk_deletes_by_project( + task_queue, project, project_id, is_filtered, days, models_attempted ) - if organization_deletion_query is not None and len(to_delete_by_organization): - debug_output("Running bulk deletes in DELETES_BY_ORGANIZATION") - for organization_id_for_deletion in RangeQuerySetWrapper( - organization_deletion_query.values_list("id", flat=True), - result_value_getter=lambda item: item, - ): - for model_tp, dtfield, order_by in to_delete_by_organization: - models_attempted.add(model_tp.__name__.lower()) - debug_output( - f"Removing {model_tp.__name__} for days={days} organization={organization_id_for_deletion}" - ) - - imp = ".".join((model_tp.__module__, model_tp.__name__)) - - q = BulkDeleteQuery( - model=model_tp, - dtfield=dtfield, - days=days, - organization_id=organization_id_for_deletion, - order_by=order_by, - ) - - for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) - - task_queue.join() + run_bulk_deletes_by_organization( + task_queue, organization_id, is_filtered, days, models_attempted + ) remove_file_blobs(is_filtered, silent, models_attempted) @@ -374,26 +303,27 @@ def _run_specialized_cleanups( exported_data(is_filtered, silent, models_attempted) -def _handle_project_organization_cleanup( - project: str | None, - organization: str | None, - days: int, - deletes: list[tuple[type[Model], str, str]], -) -> tuple[int | None, int | None]: - """Handle project/organization specific cleanup logic.""" - project_id = None - organization_id = None +def _handle_non_control_deletes( + project: str | None, deletes: list[tuple[type[Model], str, str]], days: int +) -> None: + """Handle deletes for project/organization that are not control-limited.""" if SiloMode.get_current_mode() != SiloMode.CONTROL: if project: remove_cross_project_models(deletes) - project_id = get_project_id_or_fail(project) - elif organization: - organization_id = get_organization_id_or_fail(organization) else: remove_old_nodestore_values(days) - return project_id, organization_id + +def get_project_and_organization_id( + project: str | None, organization: str | None +) -> tuple[int | None, int | None]: + if project: + return get_project_id_or_fail(project), None + elif organization: + return None, get_organization_id_or_fail(organization) + else: + return None, None def _report_models_never_attempted( @@ -599,16 +529,14 @@ def generate_bulk_query_deletes() -> list[tuple[type[Model], str, str | None]]: def run_bulk_query_deletes( - bulk_query_deletes: list[tuple[type[Model], str, str | None]], is_filtered: Callable[[type[Model]], bool], days: int, project: str | None, project_id: int | None, models_attempted: set[str], ) -> None: - from sentry.db.deletion import BulkDeleteQuery - debug_output("Running bulk query deletes in bulk_query_deletes") + bulk_query_deletes = generate_bulk_query_deletes() for model_tp, dtfield, order_by in bulk_query_deletes: chunk_size = 10000 @@ -626,6 +554,113 @@ def run_bulk_query_deletes( ).execute(chunk_size=chunk_size) +def run_bulk_deletes_in_deletes( + task_queue: _WorkQueue, + deletes: list[tuple[type[Model], str, str]], + is_filtered: Callable[[type[Model]], bool], + days: int, + project: str | None, + project_id: int | None, + models_attempted: set[str], +) -> None: + debug_output("Running bulk deletes in DELETES") + for model_tp, dtfield, order_by in deletes: + debug_output(f"Removing {model_tp.__name__} for days={days} project={project or '*'}") + + if is_filtered(model_tp): + debug_output(">> Skipping %s" % model_tp.__name__) + else: + models_attempted.add(model_tp.__name__.lower()) + imp = ".".join((model_tp.__module__, model_tp.__name__)) + + q = BulkDeleteQuery( + model=model_tp, + dtfield=dtfield, + days=days, + project_id=project_id, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + + task_queue.join() + + +def run_bulk_deletes_by_project( + task_queue: _WorkQueue, project, project_id, is_filtered, days, models_attempted +): + project_deletion_query, to_delete_by_project = prepare_deletes_by_project( + project, project_id, is_filtered + ) + + if project_deletion_query is not None and len(to_delete_by_project): + debug_output("Running bulk deletes in DELETES_BY_PROJECT") + for project_id_for_deletion in RangeQuerySetWrapper( + project_deletion_query.values_list("id", flat=True), + result_value_getter=lambda item: item, + ): + for model_tp, dtfield, order_by in to_delete_by_project: + models_attempted.add(model_tp.__name__.lower()) + debug_output( + f"Removing {model_tp.__name__} for days={days} project={project_id_for_deletion}" + ) + + imp = ".".join((model_tp.__module__, model_tp.__name__)) + + q = BulkDeleteQuery( + model=model_tp, + dtfield=dtfield, + days=days, + project_id=project_id_for_deletion, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + + task_queue.join() + + +def run_bulk_deletes_by_organization( + task_queue: _WorkQueue, + organization_id: int | None, + is_filtered: Callable[[type[Model]], bool], + days: int, + models_attempted: set[str], +): + organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization( + organization_id, is_filtered + ) + + if organization_deletion_query is not None and len(to_delete_by_organization): + debug_output("Running bulk deletes in DELETES_BY_ORGANIZATION") + for organization_id_for_deletion in RangeQuerySetWrapper( + organization_deletion_query.values_list("id", flat=True), + result_value_getter=lambda item: item, + ): + for model_tp, dtfield, order_by in to_delete_by_organization: + models_attempted.add(model_tp.__name__.lower()) + debug_output( + f"Removing {model_tp.__name__} for days={days} organization={organization_id_for_deletion}" + ) + + imp = ".".join((model_tp.__module__, model_tp.__name__)) + + q = BulkDeleteQuery( + model=model_tp, + dtfield=dtfield, + days=days, + organization_id=organization_id_for_deletion, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + + task_queue.join() + + def prepare_deletes_by_project( project: str | None, project_id: int | None, is_filtered: Callable[[type[Model]], bool] ) -> tuple[QuerySet[Any] | None, list[tuple[Any, str, str]]]: From b8b5f0b36bbef1f61e8a25adb468821319d17f1a Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 7 Oct 2025 09:18:02 -0400 Subject: [PATCH 2/7] Reduce diff --- src/sentry/runner/commands/cleanup.py | 45 ++++++++++++++++----------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index c358f942a8c8ee..7c4deb9717086e 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -228,10 +228,13 @@ def is_filtered(model: type[Model]) -> bool: return model.__name__.lower() not in model_list deletes = models_which_use_deletions_code_path() - project_id, organization_id = get_project_and_organization_id(project, organization) _run_specialized_cleanups(is_filtered, days, silent, models_attempted) - _handle_non_control_deletes(project, deletes, days) + + # Handle project/organization specific logic + project_id, organization_id = _handle_project_organization_cleanup( + project, organization, days, deletes + ) # This does not use the deletions code path, but rather uses the BulkDeleteQuery class # to delete records in bulk (i.e. does not need to worry about child relations) @@ -303,27 +306,26 @@ def _run_specialized_cleanups( exported_data(is_filtered, silent, models_attempted) -def _handle_non_control_deletes( - project: str | None, deletes: list[tuple[type[Model], str, str]], days: int -) -> None: - """Handle deletes for project/organization that are not control-limited.""" +def _handle_project_organization_cleanup( + project: str | None, + organization: str | None, + days: int, + deletes: list[tuple[type[Model], str, str]], +) -> tuple[int | None, int | None]: + """Handle project/organization specific cleanup logic.""" + project_id = None + organization_id = None if SiloMode.get_current_mode() != SiloMode.CONTROL: if project: remove_cross_project_models(deletes) + project_id = get_project_id_or_fail(project) + elif organization: + organization_id = get_organization_id_or_fail(organization) else: remove_old_nodestore_values(days) - -def get_project_and_organization_id( - project: str | None, organization: str | None -) -> tuple[int | None, int | None]: - if project: - return get_project_id_or_fail(project), None - elif organization: - return None, get_organization_id_or_fail(organization) - else: - return None, None + return project_id, organization_id def _report_models_never_attempted( @@ -588,8 +590,13 @@ def run_bulk_deletes_in_deletes( def run_bulk_deletes_by_project( - task_queue: _WorkQueue, project, project_id, is_filtered, days, models_attempted -): + task_queue: _WorkQueue, + project: str | None, + project_id: int | None, + is_filtered: Callable[[type[Model]], bool], + days: int, + models_attempted: set[str], +) -> None: project_deletion_query, to_delete_by_project = prepare_deletes_by_project( project, project_id, is_filtered ) @@ -628,7 +635,7 @@ def run_bulk_deletes_by_organization( is_filtered: Callable[[type[Model]], bool], days: int, models_attempted: set[str], -): +) -> None: organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization( organization_id, is_filtered ) From cc9bef372b4fa861af379087c50456ebfe2a21bd Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 7 Oct 2025 09:26:25 -0400 Subject: [PATCH 3/7] Correct place to configure --- src/sentry/runner/commands/cleanup.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 7c4deb9717086e..c10030011847a1 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -13,12 +13,19 @@ import click import sentry_sdk from django.conf import settings +from django.db import router as db_router from django.db.models import Model, QuerySet from django.utils import timezone +from sentry.runner import configure + +configure() + + from sentry.db.deletion import BulkDeleteQuery from sentry.runner.decorators import log_options from sentry.silo.base import SiloLimit, SiloMode +from sentry.utils import metrics from sentry.utils.query import RangeQuerySetWrapper logger = logging.getLogger(__name__) @@ -187,13 +194,6 @@ def _cleanup( pool, task_queue = _start_pool(concurrency) try: - from sentry.runner import configure - - configure() - - from django.db import router as db_router - - from sentry.utils import metrics start_time = None if timed: From 1a793f32843bc61bd9fc88d306d35d437edfa342 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 7 Oct 2025 09:31:51 -0400 Subject: [PATCH 4/7] Fix imports --- src/sentry/runner/commands/cleanup.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index c10030011847a1..16c1fc0bddadad 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -17,16 +17,8 @@ from django.db.models import Model, QuerySet from django.utils import timezone -from sentry.runner import configure - -configure() - - -from sentry.db.deletion import BulkDeleteQuery from sentry.runner.decorators import log_options from sentry.silo.base import SiloLimit, SiloMode -from sentry.utils import metrics -from sentry.utils.query import RangeQuerySetWrapper logger = logging.getLogger(__name__) @@ -194,6 +186,11 @@ def _cleanup( pool, task_queue = _start_pool(concurrency) try: + from sentry.runner import configure + + configure() + + from sentry.utils import metrics start_time = None if timed: @@ -537,6 +534,8 @@ def run_bulk_query_deletes( project_id: int | None, models_attempted: set[str], ) -> None: + from sentry.db.deletion import BulkDeleteQuery + debug_output("Running bulk query deletes in bulk_query_deletes") bulk_query_deletes = generate_bulk_query_deletes() for model_tp, dtfield, order_by in bulk_query_deletes: @@ -565,6 +564,8 @@ def run_bulk_deletes_in_deletes( project_id: int | None, models_attempted: set[str], ) -> None: + from sentry.db.deletion import BulkDeleteQuery + debug_output("Running bulk deletes in DELETES") for model_tp, dtfield, order_by in deletes: debug_output(f"Removing {model_tp.__name__} for days={days} project={project or '*'}") @@ -597,6 +598,9 @@ def run_bulk_deletes_by_project( days: int, models_attempted: set[str], ) -> None: + from sentry.db.deletion import BulkDeleteQuery + from sentry.utils.query import RangeQuerySetWrapper + project_deletion_query, to_delete_by_project = prepare_deletes_by_project( project, project_id, is_filtered ) @@ -636,6 +640,9 @@ def run_bulk_deletes_by_organization( days: int, models_attempted: set[str], ) -> None: + from sentry.db.deletion import BulkDeleteQuery + from sentry.utils.query import RangeQuerySetWrapper + organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization( organization_id, is_filtered ) From 4e06b1f490e22bfd687f59e9fdca9d156bee85c0 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 7 Oct 2025 09:38:22 -0400 Subject: [PATCH 5/7] Fix indentation --- src/sentry/runner/commands/cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 16c1fc0bddadad..a6ea8c971fcd74 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -672,7 +672,7 @@ def run_bulk_deletes_by_organization( for chunk in q.iterator(chunk_size=100): task_queue.put((imp, chunk)) - task_queue.join() + task_queue.join() def prepare_deletes_by_project( From d7d87845fe4715aba9eb74e016a0ec148db3a43e Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 7 Oct 2025 10:30:47 -0400 Subject: [PATCH 6/7] Place all task_queue.join() at the same level --- src/sentry/runner/commands/cleanup.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index a6ea8c971fcd74..7d05d701da9568 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -587,7 +587,8 @@ def run_bulk_deletes_in_deletes( for chunk in q.iterator(chunk_size=100): task_queue.put((imp, chunk)) - task_queue.join() + # Ensure all tasks are completed before exiting + task_queue.join() def run_bulk_deletes_by_project( @@ -630,7 +631,8 @@ def run_bulk_deletes_by_project( for chunk in q.iterator(chunk_size=100): task_queue.put((imp, chunk)) - task_queue.join() + # Ensure all tasks are completed before exiting + task_queue.join() def run_bulk_deletes_by_organization( @@ -672,7 +674,8 @@ def run_bulk_deletes_by_organization( for chunk in q.iterator(chunk_size=100): task_queue.put((imp, chunk)) - task_queue.join() + # Ensure all tasks are completed before exiting + task_queue.join() def prepare_deletes_by_project( From 3a5b49c30d6e6c63aa1ff00885bfd5b4eac8b208 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 7 Oct 2025 10:34:49 -0400 Subject: [PATCH 7/7] Complete the task --- src/sentry/runner/commands/cleanup.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 7d05d701da9568..f87b40cdd7b80c 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -354,6 +354,9 @@ def _start_pool(concurrency: int) -> tuple[list[Process], _WorkQueue]: def _stop_pool(pool: Sequence[Process], task_queue: _WorkQueue) -> None: + # First, ensure all queued tasks are completed + task_queue.join() + # Stop the pool for _ in pool: task_queue.put(_STOP_WORKER)