diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 6f8bc752bbc4f..301886bad2f7a 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -32,7 +32,7 @@ import structlog from sqlalchemy import delete, func, insert, select, tuple_, update -from sqlalchemy.exc import OperationalError +from sqlalchemy.exc import IntegrityError, OperationalError from sqlalchemy.orm import joinedload, load_only from airflow._shared.timezones.timezone import utcnow @@ -257,6 +257,21 @@ def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *, se ) +# Dialect-specific message prefixes for unique-constraint IntegrityErrors. Kept in sync with +# ``airflow.api_fastapi.common.exceptions._UniqueConstraintErrorHandler``. +_UNIQUE_CONSTRAINT_ERROR_PREFIXES = ( + "UNIQUE constraint failed", # SQLite + "Duplicate entry", # MySQL + "violates unique constraint", # Postgres +) + + +def _is_unique_constraint_error(exc: IntegrityError) -> bool: + """Return True iff ``exc`` is a unique/primary-key constraint violation across SQLite, MySQL, and Postgres.""" + orig_str = str(exc.orig) + return any(prefix in orig_str for prefix in _UNIQUE_CONSTRAINT_ERROR_PREFIXES) + + def _serialize_dag_capturing_errors( dag: LazyDeserializedDAG, bundle_name, @@ -295,6 +310,29 @@ def _serialize_dag_capturing_errors( return [] except OperationalError: raise + except IntegrityError as exc: + # Multiple Dag processors writing the same brand-new Dag can race on the INSERT. + # The loser's transaction is already invalid, so we must roll the session back to + # avoid PendingRollbackError on subsequent per-Dag work in this parsing cycle. + # The winning peer already produced the correct row, so this is not an import error + # and we don't retry. Non-unique IntegrityErrors (e.g. NOT-NULL violations from a + # genuinely malformed Dag) fall through to the generic Exception arm. + if _is_unique_constraint_error(exc): + log.info( + "Concurrent Dag processor already wrote Dag, skipping duplicate insert", + dag_id=dag.dag_id, + fileloc=dag.fileloc, + ) + session.rollback() + return [] + log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", dag.dag_id, dag.fileloc) + dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth") + return [ + ( + (bundle_name, dag.relative_fileloc), + traceback.format_exc(limit=-dagbag_import_error_traceback_depth), + ) + ] except Exception: log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", dag.dag_id, dag.fileloc) dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth") diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index 42be7792381fb..ac1349d8f99da 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -28,7 +28,7 @@ import pytest from sqlalchemy import delete, func, select -from sqlalchemy.exc import OperationalError, SAWarning +from sqlalchemy.exc import IntegrityError, OperationalError, SAWarning import airflow.dag_processing.collection from airflow._shared.timezones import timezone as tz @@ -651,6 +651,109 @@ def test_serialized_dag_errors_are_import_errors( assert len(dag_import_error_listener.existing) == 0 assert dag_import_error_listener.new["abc.py"] == import_error.stacktrace + @pytest.mark.parametrize( + "orig_message", + [ + pytest.param( + "(sqlite3.IntegrityError) UNIQUE constraint failed: serialized_dag.dag_id", + id="sqlite", + ), + pytest.param( + "(MySQLdb.IntegrityError) (1062, \"Duplicate entry 'my_dag' for key 'serialized_dag.PRIMARY'\")", + id="mysql", + ), + pytest.param( + "(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint " + '"serialized_dag_pkey"', + id="postgres", + ), + ], + ) + @patch.object(SerializedDagModel, "write_dag") + @patch("airflow.serialization.definitions.dag.SerializedDAG.bulk_write_to_db") + def test_duplicate_key_from_concurrent_processor_is_not_import_error( + self, + mock_bulk_write_to_db, + mock_write_dag, + orig_message, + testing_dag_bundle, + session, + ): + """ + A unique-constraint IntegrityError from a peer Dag processor that won the + first-write race should not be recorded as an import error, the session should + be rolled back, and ``write_dag`` should not be retried. + """ + mock_write_dag.side_effect = IntegrityError( + statement="INSERT INTO serialized_dag ...", params={}, orig=Exception(orig_message) + ) + + mock_session = mock.MagicMock() + mock_dag = mock.MagicMock(dag_id="dup_dag", fileloc="dup_dag.py", relative_fileloc="dup_dag.py") + import_errors: dict[tuple[str, str], str] = {} + + update_dag_parsing_results_in_db( + "testing", + None, + dags=[mock_dag], + import_errors=import_errors, + parse_duration=None, + warnings=set(), + session=mock_session, + ) + + # write_dag is called exactly once: a unique-constraint hit is not retried. + assert mock_write_dag.call_count == 1 + # No import error recorded. + assert import_errors == {} + # Session is rolled back so subsequent per-Dag work doesn't hit PendingRollbackError. + mock_session.rollback.assert_called() + + @patch.object(SerializedDagModel, "write_dag") + @patch("airflow.serialization.definitions.dag.SerializedDAG.bulk_write_to_db") + def test_non_unique_integrity_error_is_still_import_error( + self, + mock_bulk_write_to_db, + mock_write_dag, + testing_dag_bundle, + session, + ): + """ + IntegrityErrors that aren't unique-constraint violations (e.g. NOT-NULL) must + still surface as import errors so the user sees the problem. + """ + mock_write_dag.side_effect = IntegrityError( + statement="INSERT INTO serialized_dag ...", + params={}, + orig=Exception( + '(psycopg2.errors.NotNullViolation) null value in column "dag_id" ' + "violates not-null constraint" + ), + ) + + mock_session = mock.MagicMock() + mock_dag = mock.MagicMock( + dag_id="not_null_dag", fileloc="not_null.py", relative_fileloc="not_null.py" + ) + import_errors: dict[tuple[str, str], str] = {} + + update_dag_parsing_results_in_db( + "testing", + None, + dags=[mock_dag], + import_errors=import_errors, + parse_duration=None, + warnings=set(), + session=mock_session, + ) + + err = import_errors.get(("testing", "not_null.py")) + assert err is not None + assert "IntegrityError" in err + assert "not-null constraint" in err + # The error must not have triggered the duplicate-key short-circuit rollback path. + mock_session.rollback.assert_not_called() + @patch.object(ParseImportError, "full_file_path") @mark_fab_auth_manager_test @conf_vars({("core", "min_serialized_dag_update_interval"): "5"})