Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import structlog
from sqlalchemy import delete, func, insert, select, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import joinedload, load_only

from airflow._shared.timezones.timezone import utcnow
Expand Down Expand Up @@ -257,6 +257,21 @@ def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *, se
)


# Dialect-specific message prefixes for unique-constraint IntegrityErrors. Kept in sync with
# ``airflow.api_fastapi.common.exceptions._UniqueConstraintErrorHandler``.
_UNIQUE_CONSTRAINT_ERROR_PREFIXES = (
"UNIQUE constraint failed", # SQLite
"Duplicate entry", # MySQL
"violates unique constraint", # Postgres
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String-matching exc.orig across three drivers is fragile and tightly couples this code to current driver wordings. Prefer fixing at the write site with INSERT ... ON CONFLICT DO NOTHING (or equivalent merge for the brand-new-dag path), so this except arm doesn't need to classify dialects at all. If we keep this approach, please guard against exc.orig is None.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

)


def _is_unique_constraint_error(exc: IntegrityError) -> bool:
"""Return True iff ``exc`` is a unique/primary-key constraint violation across SQLite, MySQL, and Postgres."""
orig_str = str(exc.orig)
return any(prefix in orig_str for prefix in _UNIQUE_CONSTRAINT_ERROR_PREFIXES)


def _serialize_dag_capturing_errors(
dag: LazyDeserializedDAG,
bundle_name,
Expand Down Expand Up @@ -295,6 +310,29 @@ def _serialize_dag_capturing_errors(
return []
except OperationalError:
raise
except IntegrityError as exc:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we keep the dialect-prefix approach for now, log.info(...) here will fire on every collision and is going to be noisy on a healthy multi-DFP cluster — and silently spammy if a real unique-constraint regression ever lands. Suggest log.warning at minimum, and including the constraint name in the message if SQLAlchemy exposes it on exc.orig.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

# Multiple Dag processors writing the same brand-new Dag can race on the INSERT.
# The loser's transaction is already invalid, so we must roll the session back to
# avoid PendingRollbackError on subsequent per-Dag work in this parsing cycle.
# The winning peer already produced the correct row, so this is not an import error
# and we don't retry. Non-unique IntegrityErrors (e.g. NOT-NULL violations from a
# genuinely malformed Dag) fall through to the generic Exception arm.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share a log of where you saw this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trace that lands in import_errors on each dialect, captured by reverting the new except IntegrityError arm and rerunning the regression test against main:

sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: serialized_dag.dag_id
sqlalchemy.exc.IntegrityError: (MySQLdb.IntegrityError) (1062, "Duplicate entry 'my_dag' for key 'serialized_dag.PRIMARY'")
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "serialized_dag_pkey"

The IntegrityError raised by the losing processor's SerializedDagModel.write_dag is caught by the existing generic except Exception arm in _serialize_dag_capturing_errors, fed through traceback.format_exc(...), and recorded as the import-error value for the parsing cycle. The loser's now-invalid transaction also causes PendingRollbackError on the next per-Dag write in the same update_dag_parsing_results_in_db call. Added a before/after pytest snippet to the PR body that surfaces each dialect's exact message.

if _is_unique_constraint_error(exc):
log.info(
"Concurrent Dag processor already wrote Dag, skipping duplicate insert",
dag_id=dag.dag_id,
fileloc=dag.fileloc,
)
session.rollback()
return []
log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", dag.dag_id, dag.fileloc)
dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth")
return [
(
(bundle_name, dag.relative_fileloc),
traceback.format_exc(limit=-dagbag_import_error_traceback_depth),
)
]
except Exception:
log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", dag.dag_id, dag.fileloc)
dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth")
Expand Down
105 changes: 104 additions & 1 deletion airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import pytest
from sqlalchemy import delete, func, select
from sqlalchemy.exc import OperationalError, SAWarning
from sqlalchemy.exc import IntegrityError, OperationalError, SAWarning

import airflow.dag_processing.collection
from airflow._shared.timezones import timezone as tz
Expand Down Expand Up @@ -651,6 +651,109 @@ def test_serialized_dag_errors_are_import_errors(
assert len(dag_import_error_listener.existing) == 0
assert dag_import_error_listener.new["abc.py"] == import_error.stacktrace

@pytest.mark.parametrize(
"orig_message",
[
pytest.param(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts that the handler routes a mocked IntegrityError correctly; it does not assert that two concurrent DFPs actually race or that the surrounding update_dag_parsing_results_in_db cycle recovers. Please add a db_test that races two sessions on the same brand-new Dag (e.g. two threads calling SerializedDagModel.write_dag against the same dag_id), so we have a regression test for the actual scenario, not just for the symptom string.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

"(sqlite3.IntegrityError) UNIQUE constraint failed: serialized_dag.dag_id",
id="sqlite",
),
pytest.param(
"(MySQLdb.IntegrityError) (1062, \"Duplicate entry 'my_dag' for key 'serialized_dag.PRIMARY'\")",
id="mysql",
),
pytest.param(
"(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "
'"serialized_dag_pkey"',
id="postgres",
),
],
)
@patch.object(SerializedDagModel, "write_dag")
@patch("airflow.serialization.definitions.dag.SerializedDAG.bulk_write_to_db")
def test_duplicate_key_from_concurrent_processor_is_not_import_error(
self,
mock_bulk_write_to_db,
mock_write_dag,
orig_message,
testing_dag_bundle,
session,
):
"""
A unique-constraint IntegrityError from a peer Dag processor that won the
first-write race should not be recorded as an import error, the session should
be rolled back, and ``write_dag`` should not be retried.
"""
mock_write_dag.side_effect = IntegrityError(
statement="INSERT INTO serialized_dag ...", params={}, orig=Exception(orig_message)
)

mock_session = mock.MagicMock()
mock_dag = mock.MagicMock(dag_id="dup_dag", fileloc="dup_dag.py", relative_fileloc="dup_dag.py")
import_errors: dict[tuple[str, str], str] = {}

update_dag_parsing_results_in_db(
"testing",
None,
dags=[mock_dag],
import_errors=import_errors,
parse_duration=None,
warnings=set(),
session=mock_session,
)

# write_dag is called exactly once: a unique-constraint hit is not retried.
assert mock_write_dag.call_count == 1
# No import error recorded.
assert import_errors == {}
# Session is rolled back so subsequent per-Dag work doesn't hit PendingRollbackError.
mock_session.rollback.assert_called()

@patch.object(SerializedDagModel, "write_dag")
@patch("airflow.serialization.definitions.dag.SerializedDAG.bulk_write_to_db")
def test_non_unique_integrity_error_is_still_import_error(
self,
mock_bulk_write_to_db,
mock_write_dag,
testing_dag_bundle,
session,
):
"""
IntegrityErrors that aren't unique-constraint violations (e.g. NOT-NULL) must
still surface as import errors so the user sees the problem.
"""
mock_write_dag.side_effect = IntegrityError(
statement="INSERT INTO serialized_dag ...",
params={},
orig=Exception(
'(psycopg2.errors.NotNullViolation) null value in column "dag_id" '
"violates not-null constraint"
),
)

mock_session = mock.MagicMock()
mock_dag = mock.MagicMock(
dag_id="not_null_dag", fileloc="not_null.py", relative_fileloc="not_null.py"
)
import_errors: dict[tuple[str, str], str] = {}

update_dag_parsing_results_in_db(
"testing",
None,
dags=[mock_dag],
import_errors=import_errors,
parse_duration=None,
warnings=set(),
session=mock_session,
)

err = import_errors.get(("testing", "not_null.py"))
assert err is not None
assert "IntegrityError" in err
assert "not-null constraint" in err
# The error must not have triggered the duplicate-key short-circuit rollback path.
mock_session.rollback.assert_not_called()

@patch.object(ParseImportError, "full_file_path")
@mark_fab_auth_manager_test
@conf_vars({("core", "min_serialized_dag_update_interval"): "5"})
Expand Down
Loading