[fix](pipeline) detect unread local exchange data on eos#63588
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Found one blocking issue: the new BE unit tests directly access protected/private exchanger members, so the test target will not compile.
Critical checkpoint conclusions:
- Goal/test proof: The production change aims to detect unread local-exchange data after natural EOS, and tests were added, but the new tests currently fail at compile time.
- Scope/focus: The production change is small and focused; the blocker is limited to test construction.
- Concurrency: The runtime check is executed after EOS from the source path. No new threads or dependency types are introduced; existing exchanger queue locking is reused.
- Lifecycle/static initialization: No new static/global lifecycle hazards found.
- Config/compatibility: No new config, storage format, or FE/BE protocol compatibility concern found.
- Parallel paths: The virtual method covers all exchanger implementations through the template override, with an adaptive override for tmp-block state.
- Conditional checks: The no-limit EOS gate is intentional to avoid flagging limit-short-circuit cases.
- Test coverage/results: Tests cover queue residual and adaptive tmp residual intent, but they need to be rewritten to use public behavior or otherwise made legally accessible before they can validate the feature.
- Observability: Error messages include exchange type, channel, queue info, and unread block details; sufficient for this diagnostic path.
- Transaction/persistence/data writes: Not applicable.
- Performance/memory: Check only runs on natural EOS and should not affect hot steady-state paths.
User focus: No additional user-provided review focus was supplied.
| auto wrapper = ExchangerBase::BlockWrapper::create_shared(std::move(in_block), nullptr, -1); | ||
| auto row_idx = std::make_shared<PODArray<uint32_t>>(1); | ||
| (*row_idx)[0] = 0; | ||
| ASSERT_TRUE(exchanger->_data_queue[0].enqueue( |
There was a problem hiding this comment.
This test target will not compile because it accesses Exchanger<PartitionedBlock>::_data_queue, which is a protected member of the base class. The next test has the same compile-time problem with AdaptivePassthroughExchanger::_tmp_block and _tmp_eos, which are private. Please construct these residual states through the public sink/get_block behavior (or otherwise add a deliberate test-only access mechanism) instead of directly reaching into class internals.
|
/review 我们的beut是有编译选项可以反问private的。 |
|
OpenCode automated review failed and did not complete. Error: Review step was skipped (possibly timeout or cancelled) Please inspect the workflow logs and rerun the review after the underlying issue is resolved. |
|
/review 我们的beut是有编译选项可以访问private的。 |
There was a problem hiding this comment.
Review result: no additional blocking issues found in the PR patch.
Critical checkpoint conclusions:
- Goal/test: The change addresses hidden local-exchange eos with unread buffered data by validating queue/tmp-block state after eos on no-limit source operators; added BE unit tests cover queue residuals, adaptive tmp-block residuals, and adaptive copied-slice behavior.
- Scope: The production change is small and focused on local exchange source/exchanger logic.
- Concurrency: The new check runs only after eos from get_block; at that point running sink operators should be finished for the channel, so I did not find a concrete enqueue/check race. Existing exchanger queue locking patterns are preserved.
- Lifecycle: No new static/global lifecycle concerns. The adaptive tmp-block lifecycle is explicitly checked.
- Config/compatibility: No new configs, protocols, storage formats, or FE-BE compatibility concerns.
- Parallel paths: The virtual check covers the Exchanger template used by shuffle, passthrough, pass-to-one, and broadcast; AdaptivePassthrough also checks its extra tmp-block path.
- Tests/results: Tests are BE unit tests and target the new diagnostics. I did not run them in this review environment.
- Observability: Error messages include channel, exchange type, queue info, and block diagnostics, which should be sufficient for this invariant failure.
- Transaction/persistence/data writes: Not applicable.
- Performance: The check is gated behind eos and no-limit, so it is not on the normal per-row/per-block hot path.
Focus response: I noted the existing thread about direct private/protected member access in BE unit tests. Given the user focus that BE UT has compile options for this access, I did not treat that existing concern as a new issue or duplicate it.
|
run buildall |
TPC-H: Total hot run time: 32034 ms |
TPC-DS: Total hot run time: 172689 ms |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run cloud_p0 |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
What problem does this PR solve?
Problem Summary:
A local exchange source without a limit can currently trust eos even when unread data is still buffered in the exchanger. In that case the pipeline may finish with fewer output rows than input rows and the failure is hidden behind a normal end-of-stream. Root cause: the source path did not validate exchanger state after eos, and AdaptivePassthroughExchanger also has a tmp block path outside the normal queue state. This change adds a post-eos residual-data check for the no-limit source path, reports detailed queue and buffered-block diagnostics when unread data remains, and adds unit tests that cover both queue residuals and adaptive passthrough tmp-block residuals.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)