Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions airflow-core/docs/howto/usage-cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ By default, ``db clean`` will archive purged rows in tables of the form ``_airfl

When you encounter an error without using ``--skip-archive``, ``_airflow_deleted__<table>__<timestamp>`` would still exist in the DB. You can use ``db drop-archived`` command to manually drop these tables.

Detecting cleanup failures
^^^^^^^^^^^^^^^^^^^^^^^^^^

By default, ``db clean`` suppresses per-table errors (such as a database ``statement_timeout``
being exceeded on a very large table) and exits with code 0 even if one or more tables were not
cleaned. A WARNING is always emitted in the logs listing which tables were skipped due to errors.

To make the command exit with a non-zero code whenever any table cleanup fails — useful when
``airflow db clean`` is invoked from a DAG task and you want the task to turn red on failure —
pass ``--error-on-cleanup-failure``:

.. code-block:: bash

airflow db clean \
--clean-before-timestamp "$(date -u -d '21 days ago' '+%Y-%m-%dT%H:%M:%S+00:00')" \
--yes \
--error-on-cleanup-failure

.. tip::

On large deployments where the archival ``CREATE TABLE … AS SELECT`` step itself can time
out, combining ``--error-on-cleanup-failure`` with ``--skip-archive`` is recommended.
``--skip-archive`` deletes rows directly without the intermediate archive table, making the
operation both faster and less likely to hit ``statement_timeout``.

Export the purged records from the archive tables
-------------------------------------------------
The ``db export-archived`` command exports the contents of the archived tables, created by the ``db clean`` command,
Expand Down
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@ def string_lower_type(val):
"Lower values reduce long-running locks but increase the number of batches."
),
)
ARG_DB_ERROR_ON_CLEANUP_FAILURE = Arg(
("--error-on-cleanup-failure",),
help="Command will exit with a non-zero exit code if any table cleanup failed. By default errors are suppressed and the command exits 0.",
action="store_true",
)
ARG_DAG_IDS = Arg(
("--dag-ids",),
default=None,
Expand Down Expand Up @@ -1603,6 +1608,7 @@ class GroupCommand(NamedTuple):
ARG_DB_BATCH_SIZE,
ARG_DAG_IDS,
ARG_EXCLUDE_DAG_IDS,
ARG_DB_ERROR_ON_CLEANUP_FAILURE,
),
),
ActionCommand(
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def cleanup_tables(args):
batch_size=args.batch_size,
dag_ids=args.dag_ids,
exclude_dag_ids=args.exclude_dag_ids,
error_on_cleanup_failure=args.error_on_cleanup_failure,
)


Expand Down
39 changes: 35 additions & 4 deletions airflow-core/src/airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import csv
import logging
import os
from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any

from sqlalchemy import and_, column, func, inspect, select, table, text
Expand Down Expand Up @@ -475,11 +477,22 @@ def _print_config(*, configs: dict[str, _TableConfig]) -> None:


@contextmanager
def _suppress_with_logging(table: str, session: Session):
"""Suppresses errors but logs them."""
def _suppress_with_logging(table: str, session: Session) -> Generator[SimpleNamespace, None, None]:
"""
Suppress per-table cleanup errors, log them, and expose failure state to the caller.

Yields a :class:`~types.SimpleNamespace` with a single attribute ``failed`` (bool).
When an :class:`~sqlalchemy.exc.OperationalError` or
:class:`~sqlalchemy.exc.ProgrammingError` is raised inside the ``with`` block the
exception is swallowed, ``ctx.failed`` is set to ``True``, a WARNING is emitted for
the table, and the session is rolled back. The caller can inspect ``ctx.failed``
after the block to decide whether to surface the error upstream.
"""
ctx = SimpleNamespace(failed=False)
try:
yield
yield ctx
except (OperationalError, ProgrammingError):
ctx.failed = True
logger.warning("Encountered error when attempting to clean table '%s'. ", table)
logger.debug("Traceback for table '%s'", table, exc_info=True)
if session.is_active:
Expand Down Expand Up @@ -554,6 +567,7 @@ def run_cleanup(
skip_archive: bool = False,
session: Session = NEW_SESSION,
batch_size: int | None = None,
error_on_cleanup_failure: bool = False,
) -> None:
"""
Purges old records in airflow metadata database.
Expand All @@ -577,6 +591,9 @@ def run_cleanup(
:param skip_archive: Set to True if you don't want the purged rows preserved in an archive table.
:param session: Session representing connection to the metadata database.
:param batch_size: Maximum number of rows to delete or archive in a single transaction.
:param error_on_cleanup_failure: If True, raise a RuntimeError after processing all tables
if any per-table cleanup encountered an error. By default errors are suppressed and the
command exits 0 even if some tables were not cleaned.
"""
clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)

Expand All @@ -597,10 +614,11 @@ def run_cleanup(
exclude_dag_ids=exclude_dag_ids,
)
existing_tables = reflect_tables(tables=None, session=session).tables
failed_tables: list[str] = []

for table_name, table_config in effective_config_dict.items():
if table_name in existing_tables:
with _suppress_with_logging(table_name, session):
with _suppress_with_logging(table_name, session) as ctx:
_cleanup_table(
clean_before_timestamp=clean_before_timestamp,
dag_ids=dag_ids,
Expand All @@ -613,9 +631,22 @@ def run_cleanup(
batch_size=batch_size,
)
session.commit()
if ctx.failed:
failed_tables.append(table_name)
else:
logger.warning("Table %s not found. Skipping.", table_name)

if failed_tables:
if error_on_cleanup_failure:
raise RuntimeError(
f"airflow db clean encountered errors on the following tables and did not clean them: "
f"{failed_tables}. Check the logs above for details."
)
logger.warning(
"The following tables were not cleaned due to errors: %s. Check the logs above for details.",
failed_tables,
)


@provide_session
def export_archived_records(
Expand Down
40 changes: 40 additions & 0 deletions airflow-core/tests/unit/cli/commands/test_db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ def test_date_timezone_omitted(self, run_cleanup_mock, timezone):
confirm=False,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize("timezone", ["UTC", "Europe/Berlin", "America/Los_Angeles"])
Expand All @@ -756,6 +757,7 @@ def test_date_timezone_supplied(self, run_cleanup_mock, timezone):
confirm=False,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("confirm_arg", "expected"), [(["-y"], False), ([], True)])
Expand Down Expand Up @@ -785,6 +787,7 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
confirm=expected,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("extra_arg", "expected"), [(["--skip-archive"], True), ([], False)])
Expand Down Expand Up @@ -814,6 +817,7 @@ def test_skip_archive(self, run_cleanup_mock, extra_arg, expected):
confirm=True,
skip_archive=expected,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("dry_run_arg", "expected"), [(["--dry-run"], True), ([], False)])
Expand Down Expand Up @@ -843,6 +847,7 @@ def test_dry_run(self, run_cleanup_mock, dry_run_arg, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -874,6 +879,7 @@ def test_tables(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("extra_args", "expected"), [(["--verbose"], True), ([], False)])
Expand Down Expand Up @@ -903,6 +909,7 @@ def test_verbose(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("extra_args", "expected"), [(["--batch-size", "1234"], 1234), ([], None)])
Expand Down Expand Up @@ -932,6 +939,7 @@ def test_batch_size(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=expected,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -963,6 +971,7 @@ def test_dag_ids(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -994,6 +1003,37 @@ def test_exclude_dag_ids(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
("extra_args", "expected"), [(["--error-on-cleanup-failure"], True), ([], False)]
)
@patch("airflow.cli.commands.db_command.run_cleanup")
def test_error_on_cleanup_failure(self, run_cleanup_mock, extra_args, expected):
"""When --error-on-cleanup-failure is passed, error_on_cleanup_failure should be True."""
args = self.parser.parse_args(
[
"db",
"clean",
"--clean-before-timestamp",
"2021-01-01",
*extra_args,
]
)
db_command.cleanup_tables(args)

run_cleanup_mock.assert_called_once_with(
table_names=None,
dry_run=False,
dag_ids=None,
exclude_dag_ids=None,
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
verbose=False,
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=expected,
)

@patch("airflow.cli.commands.db_command.export_archived_records")
Expand Down
86 changes: 80 additions & 6 deletions airflow-core/tests/unit/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,19 +549,25 @@ def test_no_models_missing(self):
assert set(all_models) - exclusion_list.union(config_dict) == set()
assert exclusion_list.isdisjoint(config_dict)

def test_no_failure_warnings(self, caplog):
def test_no_failure_warnings(self):
"""
Ensure every table we have configured (and that is present in the db) can be cleaned successfully.
For example, this checks that the recency column is actually a column.
"""
run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
assert "Encountered error when attempting to clean table" not in caplog.text
with patch("airflow.utils.db_cleanup.logger") as mock_logger:
run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
for call in mock_logger.warning.call_args_list:
assert "Encountered error when attempting to clean table" not in str(call)

# Lets check we have the right error message just in case
caplog.clear()
with patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("oops", {}, None)):
with (
patch("airflow.utils.db_cleanup.logger") as mock_logger,
patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("oops", {}, None)),
):
run_cleanup(clean_before_timestamp=timezone.utcnow(), table_names=["task_instance"], dry_run=True)
assert "Encountered error when attempting to clean table" in caplog.text
mock_logger.warning.assert_any_call(
"Encountered error when attempting to clean table '%s'. ", "task_instance"
)

@pytest.mark.parametrize(
"drop_archive",
Expand Down Expand Up @@ -741,6 +747,74 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl
else:
confirm_mock.assert_not_called()

@patch(
"airflow.utils.db_cleanup._cleanup_table",
side_effect=OperationalError("", {}, Exception("mock db error")),
)
def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock):
"""When error_on_cleanup_failure=True and a table fails, RuntimeError should be raised."""
with pytest.raises(RuntimeError, match="airflow db clean encountered errors"):
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
error_on_cleanup_failure=True,
)

@patch(
"airflow.utils.db_cleanup._cleanup_table",
side_effect=OperationalError("", {}, Exception("mock db error")),
)
def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock):
"""When error_on_cleanup_failure=False (default) and a table fails, no exception is raised."""
with patch("airflow.utils.db_cleanup.logger") as mock_logger:
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
error_on_cleanup_failure=False,
)
mock_logger.warning.assert_any_call(
"The following tables were not cleaned due to errors: %s. Check the logs above for details.",
["log"],
)

@patch(
"airflow.utils.db_cleanup._cleanup_table",
side_effect=OperationalError("", {}, Exception("mock db error")),
)
def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock):
"""A warning naming the failed tables is emitted when error_on_cleanup_failure is not set."""
with patch("airflow.utils.db_cleanup.logger") as mock_logger:
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
)
mock_logger.warning.assert_any_call(
"The following tables were not cleaned due to errors: %s. Check the logs above for details.",
["log"],
)

@patch("airflow.utils.db_cleanup._cleanup_table")
def test_error_on_cleanup_failure_propagated_from_run_cleanup(self, cleanup_table_mock):
"""Ensure error_on_cleanup_failure is accepted by run_cleanup without errors when no failures occur."""
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
error_on_cleanup_failure=True,
)
cleanup_table_mock.assert_called_once()


def create_tis(base_date, num_tis, run_type=DagRunType.SCHEDULED):
from tests_common.test_utils.taskinstance import create_task_instance
Expand Down