Fix asset event scheduling race condition and consistency#62501
Open
dingo4dev wants to merge 3 commits intoapache:mainfrom
Open
Fix asset event scheduling race condition and consistency#62501dingo4dev wants to merge 3 commits intoapache:mainfrom
dingo4dev wants to merge 3 commits intoapache:mainfrom
Conversation
11ed83d to
624e1ea
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
This PR attempts to fix a race condition in asset-based scheduling where AssetEvent records created by concurrent tasks can be missed by the Scheduler due to a visibility gap between database flush() and commit() operations. The fix synchronizes timestamps between AssetEvent and AssetDagRunQueue records by manually assigning timestamps and committing the asset event immediately.
Changes:
- Modified asset event registration to commit immediately after creating AssetEvent, ensuring early visibility to the scheduler
- Updated AssetDagRunQueue creation to use the asset event's timestamp for synchronization across both PostgreSQL and slow-path implementations
- Changed scheduler logic to conditionally create DAG runs only when asset events exist, and to delete ADRQ entries more selectively based on timestamps
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| airflow-core/src/airflow/jobs/scheduler_job_runner.py | Modified DAG run creation to be conditional on asset_events existence and updated ADRQ deletion to filter by timestamp |
| airflow-core/src/airflow/assets/manager.py | Changed from session.flush() to session.commit() for early AssetEvent persistence and updated ADRQ timestamp synchronization logic |
0f2aa7e to
33ed8c3
Compare
Fixes an issue where the Scheduler misses AssetEvents due to a visibility gap between flush() and commit(). When multiple tasks create events for the same asset, the AssetEvent timestamp is generated during flush, but the record remains invisible to the Scheduler until the final commit. If the AssetDagRunQueue is processed in the interim, the 'late-committing' event is orphaned. The job scheduler will read the AssetDagRunQueue and fetch the created_at column as the triggered_date which will use for fetching the asset event and assign dag `run_after`. This change ensures timestamps are synchronized and the transaction boundary is tightened to prevent data-aware scheduling misses.
This is to prevent double triggering when the late-commit asset_dag_run_queue is exist in entry but the asset event is already process in prev dag
2308c0b to
0019359
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR addresses a race condition in the Asset-based scheduling logic where AssetEvent records created by concurrent tasks can be "missed" or "orphaned" by the Scheduler.
The issue stems from the Visibility Gap between a database flush() (which generates the created_at timestamp) and the final commit() (which makes the record visible to other sessions).
This change ensures that the synchronization between the AssetEvent and the AssetDagRunQueue is atomic and that the timestamps are aligned to prevent the drift caused by performance issue.
related: #54659, #56750, #56749
--- config: theme: redux-dark-color look: neo --- sequenceDiagram participant J1 as Task Instance 1 participant J2 as Task Instance 2 participant JS as Job Scheduler participant DB as Database Note over J1,DB: T0 Note left of J1: T1 J1-->>DB: Asset Event 1 (AE1) & commit activate J1 activate DB Note left of J1: T2 J2-->>DB: Asset Event 2 (AE2) & commit activate J2 activate DB Note left of J1: T3 J2-->>-DB: Asset Dag Run Queue (ADRQ)<br/>created_at = AE2.timestamp<br/>(flush & commit) deactivate DB Note left of J1: T4 activate JS activate DB JS-->>DB: Fetch created_at from ADRQ JS-->>DB: Fetch events (AE1, AE2) JS->>JS: Create DAG with run_after = ADRQ created_at (AE2.timestamp) deactivate DB deactivate JS Note left of J1: T5 J1-->>DB: Asset Dag Run Queue (ADRQ)<br/>created_at = AE1.timestamp<br/>(flush & commit) deactivate J1 deactivate DB Note left of J1: T6 activate JS activate DB JS-->>DB: Fetch created_at from ADRQ JS->>JS: No event found<br/>(AE1.timestamp < prev DAG run_after) deactivate DB deactivate JSWas generative AI tooling used to co-author this PR?