diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 1c81b7dbeec663..b2f36dc8324618 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) TRANSACTION_PREFIX = "cleanup" +DELETES_BY_PROJECT_CHUNK_SIZE = 100 if TYPE_CHECKING: from sentry.db.models.base import BaseModel @@ -85,28 +86,15 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: # Configure within each Process import logging - from sentry.utils.imports import import_string - logger = logging.getLogger("sentry.cleanup") from sentry.runner import configure configure() - from sentry import deletions, models, options, similarity + from sentry import options from sentry.utils import metrics - skip_child_relations_models = [ - # Handled by other parts of cleanup - models.EventAttachment, - models.UserReport, - models.Group, - models.GroupEmailThread, - models.GroupRuleStatus, - # Handled by TTL - similarity, - ] - while True: j = task_queue.get() if j == _STOP_WORKER: @@ -124,21 +112,7 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: with sentry_sdk.start_transaction( op="cleanup", name=f"{TRANSACTION_PREFIX}.multiprocess_worker" ): - model = import_string(model_name) - task = deletions.get( - model=model, - query={"id__in": chunk}, - skip_models=skip_child_relations_models, - transaction_id=uuid4().hex, - ) - - while True: - debug_output(f"Processing chunk of {len(chunk)} {model_name} objects") - metrics.incr( - "cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk) - ) - if not task.chunk(apply_filter=True): - break + task_execution(model_name, chunk) except Exception: metrics.incr( "cleanup.error", @@ -154,6 +128,37 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: task_queue.task_done() +def task_execution(model_name: str, chunk: tuple[int, ...]) -> None: + from sentry import deletions, models, similarity + from sentry.utils import metrics + from sentry.utils.imports import import_string + + skip_child_relations_models = [ + # Handled by other parts of cleanup + models.EventAttachment, + models.UserReport, + models.Group, + models.GroupEmailThread, + models.GroupRuleStatus, + # Handled by TTL + similarity, + ] + + model = import_string(model_name) + task = deletions.get( + model=model, + query={"id__in": chunk}, + skip_models=skip_child_relations_models, + transaction_id=uuid4().hex, + ) + + while True: + debug_output(f"Processing chunk of {len(chunk)} {model_name} objects") + metrics.incr("cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk)) + if not task.chunk(apply_filter=True): + break + + @click.command() @click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.") @click.option("--project", help="Limit truncation to only entries from project.") @@ -760,7 +765,7 @@ def run_bulk_deletes_by_project( order_by=order_by, ) - for chunk in q.iterator(chunk_size=100): + for chunk in q.iterator(chunk_size=DELETES_BY_PROJECT_CHUNK_SIZE): task_queue.put((imp, chunk)) except Exception: capture_exception( diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 0a8e6c6cdebc4c..8e9dfc3358b05b 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -1,12 +1,35 @@ from __future__ import annotations +from unittest.mock import patch + from sentry.constants import ObjectStatus -from sentry.runner.commands.cleanup import prepare_deletes_by_project +from sentry.models.group import Group +from sentry.runner.commands.cleanup import ( + prepare_deletes_by_project, + run_bulk_deletes_by_project, + task_execution, +) from sentry.silo.base import SiloMode from sentry.testutils.cases import TestCase +from sentry.testutils.helpers.datetime import before_now from sentry.testutils.silo import assume_test_silo_mode +class SynchronousTaskQueue: + """Mock task queue that partially implements the _WorkQueue protocol but executes tasks synchronously.""" + + def __init__(self) -> None: + # You can use this to inspect the calls to the queue. + self.put_calls: list[tuple[str, tuple[int, ...]]] = [] + + def put(self, item: tuple[str, tuple[int, ...]]) -> None: + self.put_calls.append(item) + task_execution(item[0], item[1]) + + def join(self) -> None: + pass + + class PrepareDeletesByProjectTest(TestCase): def test_no_filters(self) -> None: """Test that without filters, all active projects are included.""" @@ -104,3 +127,47 @@ def test_region_silo_mode_returns_projects(self) -> None: assert sorted(project_ids) == [project1.id, project2.id] # Should have model tuples to delete assert len(to_delete) > 0 + + +class RunBulkQueryDeletesByProjectTest(TestCase): + def test_run_bulk_query_deletes_by_project(self) -> None: + """Test that the function runs bulk query deletes by project as expected.""" + days = 30 + # Creating the groups in out of order to test that the chunks are created in the correct order. + self.create_group(last_seen=before_now(days=days + 4)) + self.create_group() + self.create_group(last_seen=before_now(days=days + 2)) + self.create_group(last_seen=before_now(days=days + 3)) + + assert Group.objects.count() == 4 + assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3 + ids = list( + Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True) + ) + + with ( + assume_test_silo_mode(SiloMode.REGION), + patch("sentry.runner.commands.cleanup.DELETES_BY_PROJECT_CHUNK_SIZE", 2), + ): + task_queue = SynchronousTaskQueue() + + models_attempted: set[str] = set() + run_bulk_deletes_by_project( + task_queue=task_queue, # type: ignore[arg-type] # It partially implements the queue protocol + project_id=None, + start_from_project_id=None, + is_filtered=lambda model: False, + days=days, + models_attempted=models_attempted, + ) + assert models_attempted == {"group", "projectdebugfile"} + + assert len(task_queue.put_calls) == 2 + # Verify we deleted all expected groups (order may vary due to non-unique last_seen) + all_deleted_ids: set[int] = set() + for call in task_queue.put_calls: + assert call[0] == "sentry.models.group.Group" + all_deleted_ids.update(call[1]) + assert all_deleted_ids == set(ids) + + assert Group.objects.all().count() == 1