Skip to content

fix(api): handle concurrent RTIF writes to prevent unique constraint violation#63581

Open
YoannAbriel wants to merge 1 commit intoapache:mainfrom
YoannAbriel:fix/issue-61705
Open

fix(api): handle concurrent RTIF writes to prevent unique constraint violation#63581
YoannAbriel wants to merge 1 commit intoapache:mainfrom
YoannAbriel:fix/issue-61705

Conversation

@YoannAbriel
Copy link
Contributor

Problem

When multiple workers try to write rendered task instance fields (RTIF) for the same task instance simultaneously, the API server returns a 409 Conflict error due to a unique constraint violation on rendered_task_instance_fields_pkey. This causes the task runner to fail with AirflowRuntimeError, marking the task as failed even though it completed successfully.

This is particularly common with CeleryExecutor when parallel tasks render fields at the same time, or when task retries overlap.

Closes: #61705

Root Cause

The update_rtif method uses session.merge() which performs a SELECT-then-INSERT/UPDATE pattern. When two concurrent requests both SELECT and find no existing record, they both attempt an INSERT, and the second one fails with an IntegrityError.

The global _UniqueConstraintErrorHandler catches this IntegrityError and converts it to a 409 Conflict HTTP response, which the task-sdk treats as a fatal error.

Fix

Handle IntegrityError in the ti_put_rtif endpoint with a retry strategy:

  1. Catch IntegrityError from the first update_rtif call
  2. Rollback the failed transaction
  3. Re-fetch the task instance (since the previous ORM object is detached after rollback)
  4. Retry update_rtif — this time session.merge() will find the existing record and perform an UPDATE instead of INSERT

This is safe because RTIF writes are idempotent — the last writer wins, which is the correct semantic for rendered template fields.

Testing

  • Added test_ti_put_rtif_concurrent_write: verifies that two sequential writes to the same RTIF succeed (the second updates rather than conflicts)
  • Added test_ti_put_rtif_integrity_error_handled: simulates the race condition by mocking update_rtif to raise IntegrityError on the first call, verifying the retry succeeds

Verified with unit tests. No external service dependencies.


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4, claude-opus-4-6)

Generated-by: Claude Code (Opus 4, claude-opus-4-6) following the guidelines


@Vamsi-klu
Copy link
Contributor

Hey @YoannAbriel, nice writeup on this one — the race condition between concurrent RTIF writes hitting the unique constraint is a real problem, and catching it at the endpoint level with a retry is a pragmatic approach.

A few things I noticed while reading through:

The None guard after re-fetch could be tighter. After session.rollback() and re-fetching task_instance, if the TI was deleted between the rollback and the re-fetch, task_instance would be None. Right now the code logs the success message and returns 201 regardless. Might be worth either raising a 404 there (consistent with the check above the try block) or at least logging a warning so it doesn't silently look like a successful write.

The except IntegrityError is a bit broad. It catches all integrity errors, not just the unique constraint on rendered_task_instance_fields_pkey. For example, a foreign key violation (if the TI was deleted mid-flight) would also be caught and retried, which would just fail again. In practice this probably doesn't cause harm, but checking the constraint name in the error before retrying would make it more precise.

On the teststest_ti_put_rtif_integrity_error_handled directly validates the retry logic with the mock, which is good. One gap: since the mock replaces the entire update_rtif method, it doesn't verify that the session is actually in a usable state after the rollback+re-fetch. A test that checks the RTIF row actually exists in the DB after the retry would strengthen confidence.

Longer term thought — have you considered using INSERT...ON CONFLICT DO UPDATE (upsert) at the SQL level in RenderedTaskInstanceFields.write()? SQLAlchemy supports it for Postgres, MySQL, and SQLite. That would eliminate the race at the root instead of handling it at the endpoint. Definitely a bigger change though, so the retry approach makes sense as a targeted fix for now.

Good catch overall — RTIF writes being killed by a 409 that the task-sdk doesn't retry is a subtle failure mode.

@eladkal eladkal added this to the Airflow 3.1.9 milestone Mar 15, 2026
@eladkal eladkal added type:bug-fix Changelog: Bug Fixes backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch labels Mar 15, 2026
@kaxil
Copy link
Member

kaxil commented Mar 16, 2026

Were you able to reproduce this issue, could you share screenshots of before and after please

When multiple workers try to write rendered task instance fields for the
same task instance simultaneously, a race condition in session.merge()
can cause an IntegrityError (unique constraint violation). This happens
because both workers SELECT (find no record), then both try to INSERT.

Fix the ti_put_rtif endpoint to catch IntegrityError, rollback the
failed transaction, re-fetch the task instance, and retry the write.
The retry succeeds because merge() now finds the existing record and
performs an UPDATE instead of INSERT.

Closes: apache#61705
@YoannAbriel
Copy link
Contributor Author

Not yet — will set up a local reproducer with concurrent API calls hitting the RTIF endpoint and share logs. Will update here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch pending-response type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airflow Runtime error : Unique Constraint Violations observed

4 participants