Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 143 additions & 88 deletions src/sentry/runner/commands/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import click
import sentry_sdk
from django.conf import settings
from django.db import router as db_router
from django.db.models import Model, QuerySet
from django.utils import timezone

Expand Down Expand Up @@ -189,11 +190,7 @@ def _cleanup(

configure()

from django.db import router as db_router

from sentry.db.deletion import BulkDeleteQuery
from sentry.utils import metrics
from sentry.utils.query import RangeQuerySetWrapper

start_time = None
if timed:
Expand Down Expand Up @@ -227,8 +224,6 @@ def is_filtered(model: type[Model]) -> bool:
return False
return model.__name__.lower() not in model_list

bulk_query_deletes = generate_bulk_query_deletes()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is only needed within run_bulk_query_deletes.

Image


deletes = models_which_use_deletions_code_path()

_run_specialized_cleanups(is_filtered, days, silent, models_attempted)
Expand All @@ -238,99 +233,33 @@ def is_filtered(model: type[Model]) -> bool:
project, organization, days, deletes
)

# This does not use the deletions code path, but rather uses the BulkDeleteQuery class
# to delete records in bulk (i.e. does not need to worry about child relations)
run_bulk_query_deletes(
bulk_query_deletes,
is_filtered,
days,
project,
project_id,
models_attempted,
)

debug_output("Running bulk deletes in DELETES")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for model_tp, dtfield, order_by in deletes:
debug_output(f"Removing {model_tp.__name__} for days={days} project={project or '*'}")

if is_filtered(model_tp):
debug_output(">> Skipping %s" % model_tp.__name__)
else:
models_attempted.add(model_tp.__name__.lower())
imp = ".".join((model_tp.__module__, model_tp.__name__))

q = BulkDeleteQuery(
model=model_tp,
dtfield=dtfield,
days=days,
project_id=project_id,
order_by=order_by,
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))

task_queue.join()

project_deletion_query, to_delete_by_project = prepare_deletes_by_project(
project, project_id, is_filtered
run_bulk_deletes_in_deletes(
task_queue,
deletes,
is_filtered,
days,
project,
project_id,
models_attempted,
)

if project_deletion_query is not None and len(to_delete_by_project):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug_output("Running bulk deletes in DELETES_BY_PROJECT")
for project_id_for_deletion in RangeQuerySetWrapper(
project_deletion_query.values_list("id", flat=True),
result_value_getter=lambda item: item,
):
for model_tp, dtfield, order_by in to_delete_by_project:
models_attempted.add(model_tp.__name__.lower())
debug_output(
f"Removing {model_tp.__name__} for days={days} project={project_id_for_deletion}"
)

imp = ".".join((model_tp.__module__, model_tp.__name__))

q = BulkDeleteQuery(
model=model_tp,
dtfield=dtfield,
days=days,
project_id=project_id_for_deletion,
order_by=order_by,
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))

task_queue.join()

organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization(
organization_id, is_filtered
run_bulk_deletes_by_project(
task_queue, project, project_id, is_filtered, days, models_attempted
)

if organization_deletion_query is not None and len(to_delete_by_organization):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug_output("Running bulk deletes in DELETES_BY_ORGANIZATION")
for organization_id_for_deletion in RangeQuerySetWrapper(
organization_deletion_query.values_list("id", flat=True),
result_value_getter=lambda item: item,
):
for model_tp, dtfield, order_by in to_delete_by_organization:
models_attempted.add(model_tp.__name__.lower())
debug_output(
f"Removing {model_tp.__name__} for days={days} organization={organization_id_for_deletion}"
)

imp = ".".join((model_tp.__module__, model_tp.__name__))

q = BulkDeleteQuery(
model=model_tp,
dtfield=dtfield,
days=days,
organization_id=organization_id_for_deletion,
order_by=order_by,
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))

task_queue.join()
run_bulk_deletes_by_organization(
task_queue, organization_id, is_filtered, days, models_attempted
)

remove_file_blobs(is_filtered, silent, models_attempted)

Expand Down Expand Up @@ -425,6 +354,9 @@ def _start_pool(concurrency: int) -> tuple[list[Process], _WorkQueue]:


def _stop_pool(pool: Sequence[Process], task_queue: _WorkQueue) -> None:
# First, ensure all queued tasks are completed
task_queue.join()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever an exception is raised inside of the try/block, it may leave us without calling task_queue.join() in one of the steps, thus, it may make sense to completing the tasks before shutting down the pool.


# Stop the pool
for _ in pool:
task_queue.put(_STOP_WORKER)
Expand Down Expand Up @@ -599,7 +531,6 @@ def generate_bulk_query_deletes() -> list[tuple[type[Model], str, str | None]]:


def run_bulk_query_deletes(
bulk_query_deletes: list[tuple[type[Model], str, str | None]],
is_filtered: Callable[[type[Model]], bool],
days: int,
project: str | None,
Expand All @@ -609,6 +540,7 @@ def run_bulk_query_deletes(
from sentry.db.deletion import BulkDeleteQuery

debug_output("Running bulk query deletes in bulk_query_deletes")
bulk_query_deletes = generate_bulk_query_deletes()
for model_tp, dtfield, order_by in bulk_query_deletes:
chunk_size = 10000

Expand All @@ -626,6 +558,129 @@ def run_bulk_query_deletes(
).execute(chunk_size=chunk_size)


def run_bulk_deletes_in_deletes(
task_queue: _WorkQueue,
deletes: list[tuple[type[Model], str, str]],
is_filtered: Callable[[type[Model]], bool],
days: int,
project: str | None,
project_id: int | None,
models_attempted: set[str],
) -> None:
from sentry.db.deletion import BulkDeleteQuery

debug_output("Running bulk deletes in DELETES")
for model_tp, dtfield, order_by in deletes:
debug_output(f"Removing {model_tp.__name__} for days={days} project={project or '*'}")

if is_filtered(model_tp):
debug_output(">> Skipping %s" % model_tp.__name__)
else:
models_attempted.add(model_tp.__name__.lower())
imp = ".".join((model_tp.__module__, model_tp.__name__))

q = BulkDeleteQuery(
model=model_tp,
dtfield=dtfield,
days=days,
project_id=project_id,
order_by=order_by,
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))

# Ensure all tasks are completed before exiting
task_queue.join()


def run_bulk_deletes_by_project(
task_queue: _WorkQueue,
project: str | None,
project_id: int | None,
is_filtered: Callable[[type[Model]], bool],
days: int,
models_attempted: set[str],
) -> None:
from sentry.db.deletion import BulkDeleteQuery
from sentry.utils.query import RangeQuerySetWrapper

project_deletion_query, to_delete_by_project = prepare_deletes_by_project(
project, project_id, is_filtered
)

if project_deletion_query is not None and len(to_delete_by_project):
debug_output("Running bulk deletes in DELETES_BY_PROJECT")
for project_id_for_deletion in RangeQuerySetWrapper(
project_deletion_query.values_list("id", flat=True),
result_value_getter=lambda item: item,
):
for model_tp, dtfield, order_by in to_delete_by_project:
models_attempted.add(model_tp.__name__.lower())
debug_output(
f"Removing {model_tp.__name__} for days={days} project={project_id_for_deletion}"
)

imp = ".".join((model_tp.__module__, model_tp.__name__))

q = BulkDeleteQuery(
model=model_tp,
dtfield=dtfield,
days=days,
project_id=project_id_for_deletion,
order_by=order_by,
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))

# Ensure all tasks are completed before exiting
task_queue.join()


def run_bulk_deletes_by_organization(
task_queue: _WorkQueue,
organization_id: int | None,
is_filtered: Callable[[type[Model]], bool],
days: int,
models_attempted: set[str],
) -> None:
from sentry.db.deletion import BulkDeleteQuery
from sentry.utils.query import RangeQuerySetWrapper

organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization(
organization_id, is_filtered
)

if organization_deletion_query is not None and len(to_delete_by_organization):
debug_output("Running bulk deletes in DELETES_BY_ORGANIZATION")
for organization_id_for_deletion in RangeQuerySetWrapper(
organization_deletion_query.values_list("id", flat=True),
result_value_getter=lambda item: item,
):
for model_tp, dtfield, order_by in to_delete_by_organization:
models_attempted.add(model_tp.__name__.lower())
debug_output(
f"Removing {model_tp.__name__} for days={days} organization={organization_id_for_deletion}"
)

imp = ".".join((model_tp.__module__, model_tp.__name__))

q = BulkDeleteQuery(
model=model_tp,
dtfield=dtfield,
days=days,
organization_id=organization_id_for_deletion,
order_by=order_by,
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))

# Ensure all tasks are completed before exiting
task_queue.join()


def prepare_deletes_by_project(
project: str | None, project_id: int | None, is_filtered: Callable[[type[Model]], bool]
) -> tuple[QuerySet[Any] | None, list[tuple[Any, str, str]]]:
Expand Down
Loading