-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
tests(deletions): Add CustomTaskQueue to enable testing #103974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b0a31a6
509514f
a68f8e7
84899c8
558ea9d
736fe81
6b1d87f
537e930
f47a9db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
| TRANSACTION_PREFIX = "cleanup" | ||
| DELETES_BY_PROJECT_CHUNK_SIZE = 100 | ||
|
|
||
| if TYPE_CHECKING: | ||
| from sentry.db.models.base import BaseModel | ||
|
|
@@ -85,28 +86,15 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: | |
| # Configure within each Process | ||
| import logging | ||
|
|
||
| from sentry.utils.imports import import_string | ||
|
|
||
| logger = logging.getLogger("sentry.cleanup") | ||
|
|
||
| from sentry.runner import configure | ||
|
|
||
| configure() | ||
|
|
||
| from sentry import deletions, models, options, similarity | ||
| from sentry import options | ||
| from sentry.utils import metrics | ||
|
|
||
| skip_child_relations_models = [ | ||
| # Handled by other parts of cleanup | ||
| models.EventAttachment, | ||
| models.UserReport, | ||
| models.Group, | ||
| models.GroupEmailThread, | ||
| models.GroupRuleStatus, | ||
| # Handled by TTL | ||
| similarity, | ||
| ] | ||
|
|
||
| while True: | ||
| j = task_queue.get() | ||
| if j == _STOP_WORKER: | ||
|
|
@@ -124,21 +112,7 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: | |
| with sentry_sdk.start_transaction( | ||
| op="cleanup", name=f"{TRANSACTION_PREFIX}.multiprocess_worker" | ||
| ): | ||
| model = import_string(model_name) | ||
| task = deletions.get( | ||
| model=model, | ||
| query={"id__in": chunk}, | ||
| skip_models=skip_child_relations_models, | ||
| transaction_id=uuid4().hex, | ||
| ) | ||
|
|
||
| while True: | ||
| debug_output(f"Processing chunk of {len(chunk)} {model_name} objects") | ||
| metrics.incr( | ||
| "cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk) | ||
| ) | ||
| if not task.chunk(apply_filter=True): | ||
| break | ||
| task_execution(model_name, chunk) | ||
| except Exception: | ||
| metrics.incr( | ||
| "cleanup.error", | ||
|
|
@@ -154,6 +128,37 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: | |
| task_queue.task_done() | ||
|
|
||
|
|
||
| def task_execution(model_name: str, chunk: tuple[int, ...]) -> None: | ||
| from sentry import deletions, models, similarity | ||
| from sentry.utils import metrics | ||
| from sentry.utils.imports import import_string | ||
|
|
||
| skip_child_relations_models = [ | ||
| # Handled by other parts of cleanup | ||
| models.EventAttachment, | ||
| models.UserReport, | ||
| models.Group, | ||
| models.GroupEmailThread, | ||
| models.GroupRuleStatus, | ||
| # Handled by TTL | ||
| similarity, | ||
| ] | ||
|
|
||
| model = import_string(model_name) | ||
| task = deletions.get( | ||
| model=model, | ||
| query={"id__in": chunk}, | ||
| skip_models=skip_child_relations_models, | ||
| transaction_id=uuid4().hex, | ||
| ) | ||
|
|
||
| while True: | ||
| debug_output(f"Processing chunk of {len(chunk)} {model_name} objects") | ||
| metrics.incr("cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk)) | ||
| if not task.chunk(apply_filter=True): | ||
| break | ||
|
|
||
|
|
||
| @click.command() | ||
| @click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.") | ||
| @click.option("--project", help="Limit truncation to only entries from project.") | ||
|
|
@@ -760,7 +765,7 @@ def run_bulk_deletes_by_project( | |
| order_by=order_by, | ||
| ) | ||
|
|
||
| for chunk in q.iterator(chunk_size=100): | ||
| for chunk in q.iterator(chunk_size=DELETES_BY_PROJECT_CHUNK_SIZE): | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will allow patching within the test.
armenzg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| task_queue.put((imp, chunk)) | ||
| except Exception: | ||
| capture_exception( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,35 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| from sentry.constants import ObjectStatus | ||
| from sentry.runner.commands.cleanup import prepare_deletes_by_project | ||
| from sentry.models.group import Group | ||
| from sentry.runner.commands.cleanup import ( | ||
| prepare_deletes_by_project, | ||
| run_bulk_deletes_by_project, | ||
| task_execution, | ||
| ) | ||
| from sentry.silo.base import SiloMode | ||
| from sentry.testutils.cases import TestCase | ||
| from sentry.testutils.helpers.datetime import before_now | ||
| from sentry.testutils.silo import assume_test_silo_mode | ||
|
|
||
|
|
||
| class SynchronousTaskQueue: | ||
| """Mock task queue that partially implements the _WorkQueue protocol but executes tasks synchronously.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| # You can use this to inspect the calls to the queue. | ||
| self.put_calls: list[tuple[str, tuple[int, ...]]] = [] | ||
|
|
||
| def put(self, item: tuple[str, tuple[int, ...]]) -> None: | ||
| self.put_calls.append(item) | ||
| task_execution(item[0], item[1]) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def join(self) -> None: | ||
| pass | ||
|
|
||
|
|
||
| class PrepareDeletesByProjectTest(TestCase): | ||
| def test_no_filters(self) -> None: | ||
| """Test that without filters, all active projects are included.""" | ||
|
|
@@ -104,3 +127,47 @@ def test_region_silo_mode_returns_projects(self) -> None: | |
| assert sorted(project_ids) == [project1.id, project2.id] | ||
| # Should have model tuples to delete | ||
| assert len(to_delete) > 0 | ||
|
|
||
|
|
||
| class RunBulkQueryDeletesByProjectTest(TestCase): | ||
| def test_run_bulk_query_deletes_by_project(self) -> None: | ||
| """Test that the function runs bulk query deletes by project as expected.""" | ||
| days = 30 | ||
| # Creating the groups in out of order to test that the chunks are created in the correct order. | ||
| self.create_group(last_seen=before_now(days=days + 4)) | ||
| self.create_group() | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only group that won't be past retention. |
||
| self.create_group(last_seen=before_now(days=days + 2)) | ||
| self.create_group(last_seen=before_now(days=days + 3)) | ||
|
|
||
| assert Group.objects.count() == 4 | ||
| assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3 | ||
| ids = list( | ||
| Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True) | ||
| ) | ||
|
|
||
| with ( | ||
| assume_test_silo_mode(SiloMode.REGION), | ||
| patch("sentry.runner.commands.cleanup.DELETES_BY_PROJECT_CHUNK_SIZE", 2), | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two groups at a time will be processed. |
||
| ): | ||
| task_queue = SynchronousTaskQueue() | ||
|
|
||
| models_attempted: set[str] = set() | ||
| run_bulk_deletes_by_project( | ||
| task_queue=task_queue, # type: ignore[arg-type] # It partially implements the queue protocol | ||
| project_id=None, | ||
| start_from_project_id=None, | ||
| is_filtered=lambda model: False, | ||
| days=days, | ||
| models_attempted=models_attempted, | ||
| ) | ||
| assert models_attempted == {"group", "projectdebugfile"} | ||
|
|
||
| assert len(task_queue.put_calls) == 2 | ||
| # Verify we deleted all expected groups (order may vary due to non-unique last_seen) | ||
| all_deleted_ids: set[int] = set() | ||
| for call in task_queue.put_calls: | ||
| assert call[0] == "sentry.models.group.Group" | ||
| all_deleted_ids.update(call[1]) | ||
| assert all_deleted_ids == set(ids) | ||
|
|
||
| assert Group.objects.all().count() == 1 | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the logic from the
multiprocess_workerfunction is moved into a new function calledtask_execution.This allows testing the deletion without being impacted by the multiprocessing set up (DB changes by the tests are not seen by the processes).