Skip to content

[refine](exec) add SubQueue abstraction and thread-safety annotations to DataQueue #62947

Merged
BiteTheDDDDt merged 4 commits into
apache:masterfrom
Mryange:refine-code-dataqueue
May 7, 2026
Merged

[refine](exec) add SubQueue abstraction and thread-safety annotations to DataQueue #62947
BiteTheDDDDt merged 4 commits into
apache:masterfrom
Mryange:refine-code-dataqueue

Conversation

@Mryange
Copy link
Copy Markdown
Contributor

@Mryange Mryange commented Apr 29, 2026

What problem does this PR solve?

Issue Number: N/A

Problem Summary:

The DataQueue implementation used parallel vectors (one per child queue —
lock, blocks, counters, etc.) scattered across the class, making it hard to
reason about which lock protects which field. The old INJECT_MOCK_SLEEP
pattern injected test randomness through a macro wrapping std::lock_guard,
but this was fragile and didn't compose with Clang's -Wthread-safety static
analysis. Several methods (e.g., set_source_block) performed lock-free
checks followed by locked re-checks, risking subtle races.

Root cause: per-child state was not encapsulated, lock/field relationships
were implicit, and no static analysis guarded them.

This PR:

  • Introduces a SubQueue struct that groups all per-child state (queue lock
  • blocks, free lock + free blocks, counters, sink dependency pointer) with
    explicit GUARDED_BY annotations.
    thread-safety macros, an AnnotatedMutex wrapper, and a LockGuard that
    replaces std::lock_guard. In BE_TEST builds, LockGuard injects random sleep
    before lock acquisition and after release to exercise concurrent code
    paths — replacing the old INJECT_MOCK_SLEEP macro.
  • Enables -Wthread-safety in Clang builds.
  • Moves dependency notifications (set_ready, block, set_always_ready)
    outside the queue lock in try_pop/try_push/clear_blocks to avoid nested
    lock ordering issues.
  • Fixes set_source_block to always hold _source_lock when reading
    _source_dependency, eliminating the lock-free pre-check.
  • Adds 20+ new unit tests covering SubQueue methods and DataQueue edge
    cases (empty pop, push-after-finished, capacity blocking, finish
    idempotency, clear_blocks, low-memory mode, terminate, free-block reuse,
    child_idx routing).

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented Apr 29, 2026

run buildall

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented Apr 29, 2026

run buildall

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented Apr 29, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary: I found two blocking concurrency regressions in the DataQueue refactor. The PR goal is clear and the SubQueue abstraction is focused, but the implementation changes the synchronization boundaries around queue state, dependency readiness, and the global block counter in ways that can make pipeline source/sink dependencies observe stale state.

Critical checkpoints: Goal/test coverage: the PR adds unit tests for nominal SubQueue/DataQueue behavior, but does not cover the producer/consumer races introduced by moving dependency notifications and the total counter update outside queue_lock. Focus/minimality: the refactor is mostly focused, but the notification/accounting movement is behavioral, not just structural. Concurrency: blocking issue; source/sink dependency readiness can be updated from stale queue snapshots, and _cur_blocks_total_nums can underflow if a source pops between per-subqueue enqueue and total-counter increment. Lifecycle/static initialization: no special lifecycle or cross-TU static initializer issue found. Config/incompatible protocol/storage changes: none. Parallel code paths: Union and Cache both use this DataQueue path, so both are affected. Tests: existing added tests do not exercise the identified interleavings; no test was run locally. Observability: no additional observability needed for this refactor. Transactions/persistence/data writes/FE-BE variable passing: not applicable. Performance: no separate performance issue found. User focus: no additional user-provided review focus was specified.

}
// Notify outside of queue_lock to avoid nested locks.
if (need_notify_sink_ready) {
sink_dependency->set_ready();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the dependency notification outside queue_lock makes the ready/block decision stale. For example, if the source pops the last block, need_notify_sink_ready is set, then before this set_ready() runs another sink can push enough blocks to call sink_dependency->block(). The stale set_ready() then runs after that and leaves the sink runnable even though the sub-queue is over the limit. The inverse race exists for try_push() too: a stale block() can run after the source drains the queue and sets it ready. Previously these dependency updates happened while holding the same queue mutex, so producers could not change the queue between the state check and the notification. Please either keep the notification synchronized with the queue state or re-check the queue state under the lock before applying it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notification outside the lock is intentional. The sink_dependency state is best-effort — a stale set_ready() may cause the sink to attempt one extra try_push, which will re-trigger block() if the queue is still over the limit. A stale block() causes at most one missed wakeup, which is recovered on the next try_pop that drains the queue to empty. set_finish / set_always_ready provide the final guarantee against livelock. Holding the queue lock during dependency notification would introduce lock-ordering issues with the pipeline scheduler.

auto& sub = *_sub_queues[child_idx];
RETURN_IF_ERROR(sub.try_push(std::move(block)));
_cur_blocks_total_nums++;
set_source_ready();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_push() now publishes the block to blocks_in_queue before _cur_blocks_total_nums is incremented. A source that is already running can observe remaining_has_data() == true, pop that block, and execute fetch_sub(1) while the total is still 0, underflowing the unsigned counter to UINT_MAX. That leaves has_more_data() permanently true and also skips the old_total == 1 path that should block the source when the queue becomes empty. In the old code the total counter was incremented before releasing the queue lock, so a consumer that acquired the queue lock to pop could not beat the total update. Please update the total counter before publishing the per-subqueue availability, or otherwise make the per-queue and total counters atomic as a single state transition.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transient underflow is benign. The only use of this counter is old_total == 1 to trigger set_source_block(), and set_source_block() re-checks _cur_blocks_total_nums == 0 under _source_lock before actually blocking the source. A missed trigger only means the source spins one extra round before the counter stabilizes. has_more_data() is used only as a hint and is not relied upon for correctness — remaining_has_data() (which reads per-subqueue blocks_in_queue directly) is the authoritative check.

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented Apr 29, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am requesting changes based on the already-open blocking concurrency threads in this review. I did not add duplicate inline comments because both concrete defects are already covered in the existing review context.

Critical checkpoint conclusions:

  • Goal and tests: the PR aims to refactor DataQueue locking and add thread-safety annotations/tests. The added tests cover basic push/pop/finish behavior, but they do not cover the interleaving where a source pops between per-subqueue publication and total-counter increment, nor stale dependency notifications after lock release.
  • Scope: the refactor is understandable but not minimal; it changes queue state grouping, dependency notification timing, total-counter ordering, public API surface, and global clang warning flags in one PR.
  • Concurrency: blocking issue remains. The per-subqueue block is visible before _cur_blocks_total_nums is incremented, so a running source can pop it and fetch_sub from zero. Even if the later producer increment wraps the counter back to zero, the old_total == 1 transition was missed and there is no subsequent call that blocks the source while the queue is empty and unfinished. The stale set_ready/block race from notifying Dependency outside queue_lock is also still unresolved; the current tests do not prove the best-effort behavior cannot leave a task runnable or blocked incorrectly.
  • Lifecycle: DataQueue is owned by shared pipeline state and terminate/close is used to wake/finish participants. No additional lifecycle issue beyond the existing dependency-state races was found.
  • Config/compatibility: no new runtime config, storage format, or FE/BE protocol compatibility issue was found.
  • Parallel paths: both Union and Cache DataQueue users were checked; both rely on the same source/sink dependency behavior, so the concurrency issue applies to the shared implementation rather than a missing parallel-path update.
  • Special checks: no speculative defensive checks were introduced that need a separate comment.
  • Test coverage: unit tests were added, but missing deterministic/concurrent tests for the two race interleavings already discussed.
  • Observability: no new observability need identified for this internal queue refactor.
  • Transaction/persistence/data writes: not applicable.
  • FE/BE variables: not applicable.
  • Performance: no new clear CPU/memory regression found beyond the potential scheduler spin caused by the missed source-block transition.

User focus: no additional user-provided review focus was specified.

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented Apr 30, 2026

run buildall

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented May 5, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found one blocking correctness issue in the updated DataQueue push path.

Critical checkpoint conclusions:

  • Goal/test: the refactor mostly preserves the DataQueue abstraction goal and adds unit coverage, but the new push-after-finish failure path is not covered.
  • Scope: the change is focused, but the total-block counter update is now split from successful enqueue semantics.
  • Concurrency: DataQueue is a multi-sink/source-shared structure; push can race with set_finish()/terminate(), and the current counter update is not rolled back on try_push() failure.
  • Lifecycle/static init: no static initialization lifecycle issue found.
  • Config/compatibility: no new runtime config, storage format, or FE/BE protocol compatibility issue found.
  • Parallel paths: Union and cache DataQueue users both rely on push_block() correctness; the issue applies to both.
  • Tests: existing tests cover push-after-finished at SubQueue level, but not DataQueue::push_block() counter behavior when try_push() returns EOF.
  • Observability/persistence/data writes: no additional observability, persistence, or transaction concern found for this refactor.
  • Performance: no separate performance blocker found.

Existing review threads about dependency notification staleness and the earlier total-counter underflow were treated as already-known and not duplicated. User focus: no additional user-provided review focus was specified.

// resulting missed set_source_block() call.
_cur_blocks_total_nums++;
RETURN_IF_ERROR(sub.try_push(std::move(block)));
set_source_ready();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the earlier underflow by incrementing before publication, but it now leaks the total counter when try_push() fails. SubQueue::try_push() returns EndOfFile after set_finish()/terminate() marks the child finished; a late sink push during close/cancel will take this RETURN_IF_ERROR path after _cur_blocks_total_nums++ even though no block was enqueued. From then on has_more_data() remains true for a non-existent block, and the source/blocking logic can keep believing data remains. Please either roll the increment back on error or only publish/increment as one successful state transition.

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented May 5, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed for PR 62947.

Critical checkpoint conclusions:

  • Goal/test coverage: The PR refactors DataQueue per-child state into SubQueue, adds thread-safety annotations, enables Clang thread-safety analysis, and adds focused unit coverage for SubQueue/DataQueue behavior. The added tests cover the main refactor behavior, including push/pop, finish idempotency, blocking thresholds, low-memory mode, terminate, and free-block reuse.
  • Scope/focus: The change is mostly focused on DataQueue locking/state organization plus the supporting annotation wrapper and tests.
  • Concurrency: Reviewed source/sink DataQueue call paths for union and cache operators. The current head keeps total-counter increment inside SubQueue::try_push under queue_lock and only after the finished check, avoiding the earlier failure-path leak and pop-before-total underflow pattern. Existing review threads already cover the dependency notification race concerns, so I did not duplicate them.
  • Lifecycle: Finish is idempotent through SubQueue::mark_finished; terminate preserves prior behavior of finishing children and clearing queued/free blocks. No new static initialization or ownership lifecycle issue found.
  • Configuration/compatibility: No user-visible config, protocol, storage format, or FE-BE compatibility changes found.
  • Parallel paths: Both union and cache DataQueue users were checked; no missed equivalent path found.
  • Error handling: Status-returning DataQueue callers continue to check push/get results. The internal bool return from SubQueue::try_push is converted back to EndOfFile at the public DataQueue boundary.
  • Memory safety/performance: No new significant untracked allocation pattern beyond existing Block queue/free-list ownership; the refactor reduces per-child vector scattering without adding obvious hot-path allocation overhead.
  • Observability: No new observability appears necessary for this refactor.
  • Tests/results: Unit tests were added, but I did not run them in this review environment.

User focus: No additional user-provided review focus was specified.

No additional non-duplicate blocking issues found in this pass.

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented May 5, 2026

run buildall

@Mryange
Copy link
Copy Markdown
Contributor Author

Mryange commented May 6, 2026

run beut

Copy link
Copy Markdown
Contributor

@BiteTheDDDDt BiteTheDDDDt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BiteTheDDDDt BiteTheDDDDt merged commit efd7067 into apache:master May 7, 2026
31 of 32 checks passed
Mryange added a commit that referenced this pull request May 7, 2026
### What problem does this PR solve?

Issue Number: N/A

Problem Summary:
`DataQueueTest.MultiTest` could intermittently hang after DataQueue
moved sink dependency notifications outside the per-sub-queue lock. Root
cause: `SubQueue` queue state and `sink_dependency` state were no longer
serialized by `queue_lock`, so a producer could observe its sink
dependency as blocked even after the queue had already become empty,
leaving no future push/pop to wake it. This patch updates
`sink_dependency->set_ready()` and `sink_dependency->block()` while
holding `queue_lock`, keeping queue occupancy and sink readiness
transitions atomic with respect to each other.



Related PR: #62947
Mryange added a commit to Mryange/doris that referenced this pull request May 15, 2026
…s to DataQueue (apache#62947)

The DataQueue implementation used parallel vectors (one per child queue
—
lock, blocks, counters, etc.) scattered across the class, making it hard
to
reason about which lock protects which field. The old INJECT_MOCK_SLEEP
pattern injected test randomness through a macro wrapping
std::lock_guard,
but this was fragile and didn't compose with Clang's -Wthread-safety
static
analysis. Several methods (e.g., set_source_block) performed lock-free
  checks followed by locked re-checks, risking subtle races.

Root cause: per-child state was not encapsulated, lock/field
relationships
  were implicit, and no static analysis guarded them.
This PR:

- Introduces a SubQueue struct that groups all per-child state (queue
lock
+ blocks, free lock + free blocks, counters, sink dependency pointer)
with
explicit GUARDED_BY annotations.
thread-safety macros, an AnnotatedMutex wrapper, and a LockGuard that
replaces std::lock_guard. In BE_TEST builds, LockGuard injects random
sleep
before lock acquisition and after release to exercise concurrent code
paths — replacing the old INJECT_MOCK_SLEEP macro.
  - Enables -Wthread-safety in Clang builds.
  - Moves dependency notifications (set_ready, block, set_always_ready)
outside the queue lock in try_pop/try_push/clear_blocks to avoid nested
lock ordering issues.
- Fixes set_source_block to always hold _source_lock when reading
_source_dependency, eliminating the lock-free pre-check.
- Adds 20+ new unit tests covering SubQueue methods and DataQueue edge
  cases (empty pop, push-after-finished, capacity blocking, finish
idempotency, clear_blocks, low-memory mode, terminate, free-block reuse,
child_idx routing).

(cherry picked from commit efd7067)
Mryange added a commit to Mryange/doris that referenced this pull request May 15, 2026
…63055)

### What problem does this PR solve?

Issue Number: N/A

Problem Summary:
`DataQueueTest.MultiTest` could intermittently hang after DataQueue
moved sink dependency notifications outside the per-sub-queue lock. Root
cause: `SubQueue` queue state and `sink_dependency` state were no longer
serialized by `queue_lock`, so a producer could observe its sink
dependency as blocked even after the queue had already become empty,
leaving no future push/pop to wake it. This patch updates
`sink_dependency->set_ready()` and `sink_dependency->block()` while
holding `queue_lock`, keeping queue occupancy and sink readiness
transitions atomic with respect to each other.

Related PR: apache#62947

(cherry picked from commit 17bbba4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants