From b0a31a61134a217d36d19955eeb3834f98a03b5a Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 10:48:38 -0500 Subject: [PATCH 1/9] tests(deletions): Add CustomTaskQueue to enable testing The changes includes allow replacing the task_queue with a custom class to support testing of cleanup functions. --- src/sentry/runner/commands/cleanup.py | 69 +++++++++++--------- tests/sentry/runner/commands/test_cleanup.py | 61 ++++++++++++++++- 2 files changed, 99 insertions(+), 31 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 1c81b7dbeec663..7a62e650d06eaf 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) TRANSACTION_PREFIX = "cleanup" +DELETES_BY_PROJECT_CHUNK_SIZE = 100 if TYPE_CHECKING: from sentry.db.models.base import BaseModel @@ -85,28 +86,15 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: # Configure within each Process import logging - from sentry.utils.imports import import_string - logger = logging.getLogger("sentry.cleanup") from sentry.runner import configure configure() - from sentry import deletions, models, options, similarity + from sentry import options from sentry.utils import metrics - skip_child_relations_models = [ - # Handled by other parts of cleanup - models.EventAttachment, - models.UserReport, - models.Group, - models.GroupEmailThread, - models.GroupRuleStatus, - # Handled by TTL - similarity, - ] - while True: j = task_queue.get() if j == _STOP_WORKER: @@ -124,21 +112,7 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: with sentry_sdk.start_transaction( op="cleanup", name=f"{TRANSACTION_PREFIX}.multiprocess_worker" ): - model = import_string(model_name) - task = deletions.get( - model=model, - query={"id__in": chunk}, - skip_models=skip_child_relations_models, - transaction_id=uuid4().hex, - ) - - while True: - debug_output(f"Processing chunk of {len(chunk)} {model_name} objects") - metrics.incr( - "cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk) - ) - if not task.chunk(apply_filter=True): - break + task_execution(model_name, chunk) except Exception: metrics.incr( "cleanup.error", @@ -154,6 +128,41 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: task_queue.task_done() +def task_execution( + model_name: str, + chunk: tuple[int, ...], + skip_child_relations_models: Sequence[type[BaseModel]], +) -> None: + from sentry import deletions, models, similarity + from sentry.utils import metrics + from sentry.utils.imports import import_string + + skip_child_relations_models = [ + # Handled by other parts of cleanup + models.EventAttachment, + models.UserReport, + models.Group, + models.GroupEmailThread, + models.GroupRuleStatus, + # Handled by TTL + similarity, + ] + + model = import_string(model_name) + task = deletions.get( + model=model, + query={"id__in": chunk}, + skip_models=skip_child_relations_models, + transaction_id=uuid4().hex, + ) + + while True: + debug_output(f"Processing chunk of {len(chunk)} {model_name} objects") + metrics.incr("cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk)) + if not task.chunk(apply_filter=True): + break + + @click.command() @click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.") @click.option("--project", help="Limit truncation to only entries from project.") @@ -760,7 +769,7 @@ def run_bulk_deletes_by_project( order_by=order_by, ) - for chunk in q.iterator(chunk_size=100): + for chunk in q.iterator(chunk_size=DELETES_BY_PROJECT_CHUNK_SIZE): task_queue.put((imp, chunk)) except Exception: capture_exception( diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 0a8e6c6cdebc4c..1048e0df558b1d 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -1,12 +1,34 @@ from __future__ import annotations +from unittest.mock import patch + from sentry.constants import ObjectStatus -from sentry.runner.commands.cleanup import prepare_deletes_by_project +from sentry.models.group import Group +from sentry.runner.commands.cleanup import ( + prepare_deletes_by_project, + run_bulk_deletes_by_project, + task_execution, +) from sentry.silo.base import SiloMode from sentry.testutils.cases import TestCase +from sentry.testutils.helpers.datetime import before_now from sentry.testutils.silo import assume_test_silo_mode +class CustomTaskQueue: + def __init__(self) -> None: + self.put_calls: list[tuple[str, tuple[int, ...]]] = [] + + def put(self, item: tuple[str, tuple[int, ...]]) -> None: + """Called when items are added to the queue.""" + self.put_calls.append(item) + task_execution(item[0], item[1]) + + def join(self) -> None: + """Called to wait for queue completion.""" + pass + + class PrepareDeletesByProjectTest(TestCase): def test_no_filters(self) -> None: """Test that without filters, all active projects are included.""" @@ -104,3 +126,40 @@ def test_region_silo_mode_returns_projects(self) -> None: assert sorted(project_ids) == [project1.id, project2.id] # Should have model tuples to delete assert len(to_delete) > 0 + + +class RunBulkQueryDeletesByProjectTest(TestCase): + def test_run_bulk_query_deletes_by_project(self) -> None: + """Test that the function runs bulk query deletes by project as expected.""" + days = 30 + self.create_group() + for i in range(3): + self.create_group(last_seen=before_now(days=days + i)) + + assert Group.objects.count() == 4 + assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3 + ids = Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True) + + with ( + assume_test_silo_mode(SiloMode.REGION), + patch("sentry.runner.commands.cleanup.DELETES_BY_PROJECT_CHUNK_SIZE", 2), + ): + task_queue = CustomTaskQueue() + + models_attempted: set[str] = set() + run_bulk_deletes_by_project( + task_queue=task_queue, + project_id=None, + start_from_project_id=None, + is_filtered=lambda model: False, + days=days, + models_attempted=models_attempted, + ) + assert models_attempted == {"group", "projectdebugfile"} + + # XXX: This should be 1 + # assert len(task_queue.put_calls) == 2 + assert task_queue.put_calls[0] == ("sentry.models.group.Group", (ids[2], ids[1])) + # assert task_queue.put_calls[1] == ("sentry.models.group.Group", (ids[0],)) + + # assert Group.objects.all().count() == 1 From 509514fa73c8f56dc4c6aba8031c1d54441dc5b2 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 11:19:50 -0500 Subject: [PATCH 2/9] Fix signature --- src/sentry/runner/commands/cleanup.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 7a62e650d06eaf..b2f36dc8324618 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -128,11 +128,7 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: task_queue.task_done() -def task_execution( - model_name: str, - chunk: tuple[int, ...], - skip_child_relations_models: Sequence[type[BaseModel]], -) -> None: +def task_execution(model_name: str, chunk: tuple[int, ...]) -> None: from sentry import deletions, models, similarity from sentry.utils import metrics from sentry.utils.imports import import_string From a68f8e7adec224f33a22f964f61ddeac4b63964a Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 11:54:49 -0500 Subject: [PATCH 3/9] Fix typing --- src/sentry/runner/commands/cleanup.py | 2 +- tests/sentry/runner/commands/test_cleanup.py | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index b2f36dc8324618..3fdde0082db840 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -722,7 +722,7 @@ def run_bulk_deletes_in_deletes( def run_bulk_deletes_by_project( - task_queue: _WorkQueue, + task_queue: Any, project_id: int | None, start_from_project_id: int | None, is_filtered: Callable[[type[BaseModel]], bool], diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 1048e0df558b1d..d51071d46471d0 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -28,6 +28,10 @@ def join(self) -> None: """Called to wait for queue completion.""" pass + def task_done(self) -> None: + """Called when a task is complete.""" + pass + class PrepareDeletesByProjectTest(TestCase): def test_no_filters(self) -> None: @@ -138,7 +142,9 @@ def test_run_bulk_query_deletes_by_project(self) -> None: assert Group.objects.count() == 4 assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3 - ids = Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True) + ids = list( + Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True) + ) with ( assume_test_silo_mode(SiloMode.REGION), From 84899c8b5fa487d31dba4e14c00c940becc6118b Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 12:01:41 -0500 Subject: [PATCH 4/9] Fix typing and tests --- src/sentry/runner/commands/cleanup.py | 2 +- tests/sentry/runner/commands/test_cleanup.py | 35 ++++++++++++++------ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 3fdde0082db840..b2f36dc8324618 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -722,7 +722,7 @@ def run_bulk_deletes_in_deletes( def run_bulk_deletes_by_project( - task_queue: Any, + task_queue: _WorkQueue, project_id: int | None, start_from_project_id: int | None, is_filtered: Callable[[type[BaseModel]], bool], diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index d51071d46471d0..9fe02d709b96f8 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -1,5 +1,6 @@ from __future__ import annotations +from typing import Any from unittest.mock import patch from sentry.constants import ObjectStatus @@ -16,6 +17,8 @@ class CustomTaskQueue: + """Mock task queue that implements the _WorkQueue protocol.""" + def __init__(self) -> None: self.put_calls: list[tuple[str, tuple[int, ...]]] = [] @@ -24,12 +27,16 @@ def put(self, item: tuple[str, tuple[int, ...]]) -> None: self.put_calls.append(item) task_execution(item[0], item[1]) - def join(self) -> None: - """Called to wait for queue completion.""" - pass + def get(self, block: bool = True, timeout: float | None = None) -> Any: + """Get an item from the queue (required by Queue protocol).""" + raise NotImplementedError("get() not needed in tests") def task_done(self) -> None: - """Called when a task is complete.""" + """Signal that a task is complete (required by JoinableQueue protocol).""" + pass + + def join(self) -> None: + """Wait for all tasks to complete.""" pass @@ -142,8 +149,11 @@ def test_run_bulk_query_deletes_by_project(self) -> None: assert Group.objects.count() == 4 assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3 + # Fetch IDs in ascending order to match deletion order ids = list( - Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True) + Group.objects.filter(last_seen__lt=before_now(days=days)) + .order_by("id") + .values_list("id", flat=True) ) with ( @@ -154,7 +164,7 @@ def test_run_bulk_query_deletes_by_project(self) -> None: models_attempted: set[str] = set() run_bulk_deletes_by_project( - task_queue=task_queue, + task_queue=task_queue, # type: ignore[arg-type] # CustomTaskQueue implements the queue protocol project_id=None, start_from_project_id=None, is_filtered=lambda model: False, @@ -163,9 +173,12 @@ def test_run_bulk_query_deletes_by_project(self) -> None: ) assert models_attempted == {"group", "projectdebugfile"} - # XXX: This should be 1 - # assert len(task_queue.put_calls) == 2 - assert task_queue.put_calls[0] == ("sentry.models.group.Group", (ids[2], ids[1])) - # assert task_queue.put_calls[1] == ("sentry.models.group.Group", (ids[0],)) + assert len(task_queue.put_calls) == 2 + # Verify we deleted all expected groups (order may vary due to non-unique last_seen) + all_deleted_ids = set() + for call in task_queue.put_calls: + assert call[0] == "sentry.models.group.Group" + all_deleted_ids.update(call[1]) + assert all_deleted_ids == set(ids) - # assert Group.objects.all().count() == 1 + assert Group.objects.all().count() == 1 From 558ea9d3eb5e111927e67232fa76709d001923b2 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 12:04:47 -0500 Subject: [PATCH 5/9] Fix typing --- tests/sentry/runner/commands/test_cleanup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 9fe02d709b96f8..33cb208ef2658e 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -175,10 +175,10 @@ def test_run_bulk_query_deletes_by_project(self) -> None: assert len(task_queue.put_calls) == 2 # Verify we deleted all expected groups (order may vary due to non-unique last_seen) - all_deleted_ids = set() + all_deleted_ids: set[tuple[int, ...]] = set() for call in task_queue.put_calls: assert call[0] == "sentry.models.group.Group" - all_deleted_ids.update(call[1]) - assert all_deleted_ids == set(ids) + all_deleted_ids.add(call[1]) + assert all_deleted_ids == set(tuple(ids)) assert Group.objects.all().count() == 1 From 736fe81ddc935d7c1e135c2f4ceda11d9ed583b0 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 13:21:35 -0500 Subject: [PATCH 6/9] Address feedback --- tests/sentry/runner/commands/test_cleanup.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 33cb208ef2658e..5afe3815eab8d0 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -173,12 +173,12 @@ def test_run_bulk_query_deletes_by_project(self) -> None: ) assert models_attempted == {"group", "projectdebugfile"} - assert len(task_queue.put_calls) == 2 - # Verify we deleted all expected groups (order may vary due to non-unique last_seen) - all_deleted_ids: set[tuple[int, ...]] = set() - for call in task_queue.put_calls: - assert call[0] == "sentry.models.group.Group" - all_deleted_ids.add(call[1]) - assert all_deleted_ids == set(tuple(ids)) + assert len(task_queue.put_calls) == 2 + # Verify we deleted all expected groups (order may vary due to non-unique last_seen) + all_deleted_ids: set[int] = set() + for call in task_queue.put_calls: + assert call[0] == "sentry.models.group.Group" + all_deleted_ids.update(call[1]) + assert all_deleted_ids == set(ids) assert Group.objects.all().count() == 1 From 6b1d87f8e2cea9d063fdc1f5110a50cca4e669cf Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 13:22:40 -0500 Subject: [PATCH 7/9] Remove the order by --- tests/sentry/runner/commands/test_cleanup.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 5afe3815eab8d0..20f0aae5451148 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -149,11 +149,8 @@ def test_run_bulk_query_deletes_by_project(self) -> None: assert Group.objects.count() == 4 assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3 - # Fetch IDs in ascending order to match deletion order ids = list( - Group.objects.filter(last_seen__lt=before_now(days=days)) - .order_by("id") - .values_list("id", flat=True) + Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True) ) with ( From 537e93059b7ac0e1fe650d680028dfdb7e4634ff Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 13:26:40 -0500 Subject: [PATCH 8/9] Minor changes --- tests/sentry/runner/commands/test_cleanup.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 20f0aae5451148..0220cf79c8377c 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -1,6 +1,5 @@ from __future__ import annotations -from typing import Any from unittest.mock import patch from sentry.constants import ObjectStatus @@ -16,27 +15,18 @@ from sentry.testutils.silo import assume_test_silo_mode -class CustomTaskQueue: - """Mock task queue that implements the _WorkQueue protocol.""" +class SynchronousTaskQueue: + """Mock task queue that partially implements the _WorkQueue protocol but executes tasks synchronously.""" def __init__(self) -> None: + # You can use this to inspect the calls to the queue. self.put_calls: list[tuple[str, tuple[int, ...]]] = [] def put(self, item: tuple[str, tuple[int, ...]]) -> None: - """Called when items are added to the queue.""" self.put_calls.append(item) task_execution(item[0], item[1]) - def get(self, block: bool = True, timeout: float | None = None) -> Any: - """Get an item from the queue (required by Queue protocol).""" - raise NotImplementedError("get() not needed in tests") - - def task_done(self) -> None: - """Signal that a task is complete (required by JoinableQueue protocol).""" - pass - def join(self) -> None: - """Wait for all tasks to complete.""" pass @@ -157,11 +147,11 @@ def test_run_bulk_query_deletes_by_project(self) -> None: assume_test_silo_mode(SiloMode.REGION), patch("sentry.runner.commands.cleanup.DELETES_BY_PROJECT_CHUNK_SIZE", 2), ): - task_queue = CustomTaskQueue() + task_queue = SynchronousTaskQueue() models_attempted: set[str] = set() run_bulk_deletes_by_project( - task_queue=task_queue, # type: ignore[arg-type] # CustomTaskQueue implements the queue protocol + task_queue=task_queue, # type: ignore[arg-type] # It partially implements the queue protocol project_id=None, start_from_project_id=None, is_filtered=lambda model: False, From f47a9db708c2e476b422fe01bfe551aa2baa4b49 Mon Sep 17 00:00:00 2001 From: "Armen Zambrano G." <44410+armenzg@users.noreply.github.com> Date: Tue, 25 Nov 2025 13:28:23 -0500 Subject: [PATCH 9/9] Groups out of order --- tests/sentry/runner/commands/test_cleanup.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/sentry/runner/commands/test_cleanup.py b/tests/sentry/runner/commands/test_cleanup.py index 0220cf79c8377c..8e9dfc3358b05b 100644 --- a/tests/sentry/runner/commands/test_cleanup.py +++ b/tests/sentry/runner/commands/test_cleanup.py @@ -133,9 +133,11 @@ class RunBulkQueryDeletesByProjectTest(TestCase): def test_run_bulk_query_deletes_by_project(self) -> None: """Test that the function runs bulk query deletes by project as expected.""" days = 30 + # Creating the groups in out of order to test that the chunks are created in the correct order. + self.create_group(last_seen=before_now(days=days + 4)) self.create_group() - for i in range(3): - self.create_group(last_seen=before_now(days=days + i)) + self.create_group(last_seen=before_now(days=days + 2)) + self.create_group(last_seen=before_now(days=days + 3)) assert Group.objects.count() == 4 assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3