Skip to content

Don't surface duplicate-key Dag-write race between Dag processors as import error#66788

Open
1fanwang wants to merge 1 commit into
apache:mainfrom
1fanwang:fix/dagproc-duplicate-key-race
Open

Don't surface duplicate-key Dag-write race between Dag processors as import error#66788
1fanwang wants to merge 1 commit into
apache:mainfrom
1fanwang:fix/dagproc-duplicate-key-race

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 12, 2026

Fix for the parallel-Dag-processor race called out in #66786. We run multiple DFP instances on our LinkedIn DI cluster; occasionally two of them parse the same file at the same moment and race on the first INSERT — the loser gets a duplicate-key IntegrityError and surfaces it to users as an import error (red banner in the UI). It's a benign first-write race, not a parse problem; this PR treats it that way.

When more than one Dag processor writes the same brand-new Dag in the same scheduler heartbeat, only one of them wins the INSERT into serialized_dag / dag / dag_version. The losers raise a unique-constraint IntegrityError (UNIQUE constraint failed on SQLite, Duplicate entry / 1062 on MySQL, violates unique constraint on Postgres), which _serialize_dag_capturing_errors currently routes through the generic except Exception arm.

That has two user-visible effects in the same parsing cycle:

  1. The Dag file ends up in import_errors even though the winner produced the correct row, so the UI flags a healthy file as broken until the next cycle.
  2. The session is left in an aborted state, so subsequent per-Dag writes in the same update_dag_parsing_results_in_db call raise PendingRollbackError.

The winner already produced the right row, so the loser's failure is a no-op rather than an import error. Add an except IntegrityError arm before the generic catch that:

  • Detects unique-constraint violations across SQLite, MySQL, and Postgres using the same message-prefix approach _UniqueConstraintErrorHandler already uses in the public API.
  • Rolls the session back so the rest of the cycle can proceed.
  • Logs at INFO and returns an empty import-error list (no retry — the row is there).

Non-unique IntegrityErrors (e.g. NOT-NULL violations from a genuinely malformed Dag) still flow into the generic arm and surface as import errors, so users keep seeing real serialization problems.

Reproducer and before/after evidence

TestUpdateDagParsingResults::test_duplicate_key_from_concurrent_processor_is_not_import_error exercises the three dialect message shapes (SQLite, MySQL, Postgres) as parametrized variants; test_non_unique_integrity_error_is_still_import_error pins the NOT-NULL fall-through.

Before fix (with the new except IntegrityError arm removed), each dialect variant fails with the actual SQLAlchemy IntegrityError recorded as an import-error entry — i.e. exactly the trace a scheduler would log in production:

FAILED ::test_duplicate_key_from_concurrent_processor_is_not_import_error[sqlite]
    import_errors[('testing', 'dup_dag.py')] =
        sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: serialized_dag.dag_id
        [SQL: INSERT INTO serialized_dag ...]

FAILED ::test_duplicate_key_from_concurrent_processor_is_not_import_error[mysql]
    import_errors[('testing', 'dup_dag.py')] =
        sqlalchemy.exc.IntegrityError: (MySQLdb.IntegrityError) (1062, "Duplicate entry 'my_dag' for key 'serialized_dag.PRIMARY'")
        [SQL: INSERT INTO serialized_dag ...]

FAILED ::test_duplicate_key_from_concurrent_processor_is_not_import_error[postgres]
    import_errors[('testing', 'dup_dag.py')] =
        sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "serialized_dag_pkey"
        [SQL: INSERT INTO serialized_dag ...]

PASSED ::test_non_unique_integrity_error_is_still_import_error

After fix, every variant passes: no import_errors entry is recorded for the unique-constraint race, session.rollback() runs so subsequent per-Dag work in the same parsing cycle does not hit PendingRollbackError, and the message-prefix detection covers each dialect. The NOT-NULL fall-through case is unchanged: a genuinely malformed Dag still surfaces as an import error.

PASSED ::test_duplicate_key_from_concurrent_processor_is_not_import_error[sqlite]
PASSED ::test_duplicate_key_from_concurrent_processor_is_not_import_error[mysql]
PASSED ::test_duplicate_key_from_concurrent_processor_is_not_import_error[postgres]
PASSED ::test_non_unique_integrity_error_is_still_import_error

Closes: #66786

When multiple Dag processors race to INSERT the same brand-new Dag row,
the loser's transaction raises ``IntegrityError`` (1062 / UNIQUE
constraint failed / unique violation). The current handler in
``_serialize_dag_capturing_errors`` records that as an import error and
leaves the SQLAlchemy session in a state that causes ``PendingRollbackError``
on the next per-Dag write in the same parsing cycle. The user sees a
healthy Dag file flagged as broken in the UI even though the peer
processor's INSERT succeeded.

Detect unique-constraint violations across SQLite, MySQL, and Postgres
using the same message-prefix approach as
``_UniqueConstraintErrorHandler`` in the public API. On a hit, roll the
session back, log at INFO, and return an empty import-error list — the
row exists, so the write is a no-op and there's no value in retrying.
Other IntegrityErrors (e.g. NOT-NULL violations from genuinely malformed
Dags) still flow into the generic exception arm and surface as import
errors.

Closes: apache#66786
Signed-off-by: 1fanwang <1fannnw@gmail.com>
# avoid PendingRollbackError on subsequent per-Dag work in this parsing cycle.
# The winning peer already produced the correct row, so this is not an import error
# and we don't retry. Non-unique IntegrityErrors (e.g. NOT-NULL violations from a
# genuinely malformed Dag) fall through to the generic Exception arm.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share a log of where you saw this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trace that lands in import_errors on each dialect, captured by reverting the new except IntegrityError arm and rerunning the regression test against main:

sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: serialized_dag.dag_id
sqlalchemy.exc.IntegrityError: (MySQLdb.IntegrityError) (1062, "Duplicate entry 'my_dag' for key 'serialized_dag.PRIMARY'")
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "serialized_dag_pkey"

The IntegrityError raised by the losing processor's SerializedDagModel.write_dag is caught by the existing generic except Exception arm in _serialize_dag_capturing_errors, fed through traceback.format_exc(...), and recorded as the import-error value for the parsing cycle. The loser's now-invalid transaction also causes PendingRollbackError on the next per-Dag write in the same update_dag_parsing_results_in_db call. Added a before/after pytest snippet to the PR body that surfaces each dialect's exact message.

Copy link
Copy Markdown
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed write-up. Two things this PR needs before it can land.

1. Multi-DFP over a shared bundle is not a supported topology.
production-deployment.rst is explicit: "the Dag processor is not horizontally scaled — even if you have more of them there is usually one Dag processor running at a time per specific folder." Today the CLI default (--bundle-name=None → parse every bundle, see manager.py:361) makes it easy to accidentally run two DFPs over the same bundle, but that is a gap to close, not a topology to silently support. If we want to make multi-DFP a first-class deployment, that needs a maintainer-level decision and an actual coordination layer (bundle lease at DFP startup, sharded file queue, advisory lock per (bundle_name, relative_fileloc), etc.) — not an exception handler in the write path.

2. If we do fix the symptom for now, this is the wrong layer.
For a brand-new Dag, the race is at the INSERT in DagVersion.write_dag / SerializedDagModel.write_dag. The cleaner fix is at the write site (INSERT ... ON CONFLICT DO NOTHING for the new-DAG path, or refetch-and-skip after catching at the model layer), so the loser never enters _serialize_dag_capturing_errors with a poisoned session in the first place. Catching IntegrityError here and string-matching three dialect-specific message prefixes (UNIQUE constraint failed / Duplicate entry / violates unique constraint) is fragile:

  • The prefixes change across driver versions (e.g. psycopg vs psycopg2 wording, MySQL message text on different connectors). _UniqueConstraintErrorHandler gets away with it at the API boundary; an internal write path is a different risk profile.
  • It silently swallows any future legitimate unique-constraint violation introduced in this path (new migration, new dedup constraint, real data corruption). That class of bug is much harder to debug than a transient import-error banner.
  • SQLAlchemyError.orig may be None in some paths; str(None) won't match and we silently fall through, which is fine, but the helper should be defensive about that.

3. The tests don't actually exercise the race.
Both new tests mock SerializedDagModel.write_dag and raise a hand-constructed IntegrityError whose orig is a plain Exception(msg). That confirms the new except arm routes correctly but does not demonstrate the race occurs, that session.rollback() is sufficient to recover the surrounding update_dag_parsing_results_in_db cycle, or that subsequent per-Dag writes in the same call actually proceed. A db_test that spawns two threads/sessions racing on SerializedDagModel.write_dag for a brand-new dag would be much stronger — and would also show whether INSERT ... ON CONFLICT DO NOTHING at the model layer is the better fix.

Asks before re-review:

  • Open a discussion / linked issue with maintainers about whether multi-DFP-per-bundle is something we want to support in 3.x, or whether we should instead refuse-to-start when another DFP holds the bundle. Either way, document the chosen position.
  • Move the fix to the write site (DagVersion.write_dag / SerializedDagModel.write_dag), so the loser is a no-op rather than an exception we have to classify.
  • Replace the mocked tests with an integration test that actually races two sessions on the same brand-new Dag.

Happy to help shape the coordination approach if option (a) goes the "refuse to start" way.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

_UNIQUE_CONSTRAINT_ERROR_PREFIXES = (
"UNIQUE constraint failed", # SQLite
"Duplicate entry", # MySQL
"violates unique constraint", # Postgres
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String-matching exc.orig across three drivers is fragile and tightly couples this code to current driver wordings. Prefer fixing at the write site with INSERT ... ON CONFLICT DO NOTHING (or equivalent merge for the brand-new-dag path), so this except arm doesn't need to classify dialects at all. If we keep this approach, please guard against exc.orig is None.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

return []
except OperationalError:
raise
except IntegrityError as exc:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we keep the dialect-prefix approach for now, log.info(...) here will fire on every collision and is going to be noisy on a healthy multi-DFP cluster — and silently spammy if a real unique-constraint regression ever lands. Suggest log.warning at minimum, and including the constraint name in the message if SQLAlchemy exposes it on exc.orig.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

@pytest.mark.parametrize(
"orig_message",
[
pytest.param(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts that the handler routes a mocked IntegrityError correctly; it does not assert that two concurrent DFPs actually race or that the surrounding update_dag_parsing_results_in_db cycle recovers. Please add a db_test that races two sessions on the same brand-new Dag (e.g. two threads calling SerializedDagModel.write_dag against the same dag_id), so we have a regression test for the actual scenario, not just for the symptom string.


Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting

@1fanwang
Copy link
Copy Markdown
Contributor Author

Thanks for the context @ephraimbuddy, taking a closer look, will update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dag processor: duplicate-key IntegrityError on first-write race surfaces as import error

2 participants