[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782
[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782zclllyybb wants to merge 1 commit intoapache:masterfrom
Conversation
|
/review |
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Fixes a brpc AutoReleaseClosure race when callbacks are reused to start a new RPC (mutating response_/cntl_), by ensuring status/failure checks happen before invoking callback->call(), and adds unit tests to prevent regressions.
Changes:
- Reorders
AutoReleaseClosure::Run()so it checkscntl_/response_->status()before invokingcallback->call(). - Refactors runtime-filter sync-size RPC handling to keep the callback alive until RPC completion.
- Adds BE unit tests that simulate callback reuse mutating shared RPC state.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| be/test/exec/exchange/exchange_sink_test.cpp | Adds tests that simulate callback reuse and verify correct ordering. |
| be/src/util/brpc_closure.h | Reorders Run() and simplifies error handling paths. |
| be/src/exec/runtime_filter/runtime_filter_producer.h | Adds storage to keep sync-size callback alive across async RPC. |
| be/src/exec/runtime_filter/runtime_filter_producer.cpp | Introduces SyncSizeCallback and changes closure construction/lifetime. |
| be/src/exec/runtime_filter/runtime_filter_mgr.h | Updates _send_rf_to_target signature (removes QueryContext arg). |
| be/src/exec/runtime_filter/runtime_filter_mgr.cpp | Updates runtime-filter RPC closure construction (drops ctx passing). |
| be/src/exec/runtime_filter/runtime_filter.cpp | Updates runtime-filter RPC closure construction (drops ctx passing). |
| be/src/exec/operator/exchange_sink_buffer.cpp | Adds comments documenting callback reuse ordering constraints. |
| be/src/exec/exchange/vdata_stream_sender.h | Documents callback reuse rationale. |
Comments suppressed due to low confidence (1)
be/src/util/brpc_closure.h:1
- This change removes the previous
QueryContext-based failure propagation (anderror_msgaugmentation) fromAutoReleaseClosure, replacing it with logging only. That’s a behavior change: some call sites previously relied onAutoReleaseClosureto cancel the query on RPC failure / non-OKstatus(), gated byignore_runtime_filter_error; after this patch, errors may no longer interrupt the query and could lead to hangs or delayed failure handling. Recommendation (mandatory): restore a mechanism to propagate failures (either by reintroducing the optionalQueryContext+ captured status/failure info before callingtmp->call(), or by requiring callers to pass a callback that performs the cancel/sub/error handling) while keeping thecall()invocation as the last step to avoid the reuse race.
// Licensed to the Apache Software Foundation (ASF) under one
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper); | ||
| // Store callback in the producer to keep it alive until the RPC completes. | ||
| // AutoReleaseClosure holds callbacks via weak_ptr, so without this the callback | ||
| // would be destroyed when this function returns and error-path sub() would never fire. | ||
| _sync_size_callback = callback; | ||
| // RuntimeFilter maybe deconstructed before the rpc finished, so that could not use | ||
| // a raw pointer in closure. Has to use the context's shared ptr. | ||
| auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper, | ||
| state->query_options().ignore_runtime_filter_error | ||
| ? std::weak_ptr<QueryContext> {} | ||
| : state->get_query_ctx_weak()); | ||
| auto closure = AutoReleaseClosure<PSendFilterSizeRequest, SyncSizeCallback>::create_unique( | ||
| request, callback); |
There was a problem hiding this comment.
_sync_size_callback is a single slot, but send_size() can be invoked more than once during the producer lifecycle in some implementations; if a second call happens before the first RPC completes, this assignment overwrites the stored callback and can still allow the earlier callback to be destroyed early (reintroducing the 'weak_ptr callback dies before completion' failure). Also, the stored callback is never cleared on completion, which can unnecessarily retain _dependency until producer teardown. Recommendation (mandatory): make the storage support multiple in-flight RPCs (e.g., vector/list keyed by RPC generation or request id), and clear the stored entry when the RPC completes (both success and error paths), e.g., by having the callback clear a weak back-reference to the producer or by clearing in the completion handler that observes the response.
| auto closure = AutoReleaseClosure<PSyncFilterSizeRequest, | ||
| DummyBrpcCallback<PSyncFilterSizeResponse>>:: | ||
| create_unique(std::make_shared<PSyncFilterSizeRequest>(), | ||
| DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx); | ||
| DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared()); |
There was a problem hiding this comment.
This call site previously passed a QueryContext weak_ptr (conditionally based on ignore_runtime_filter_error) into AutoReleaseClosure so failures could be escalated/canceled when the option is false. With the ctx parameter removed and the callback being DummyBrpcCallback, RPC failure/non-OK status() handling appears to be reduced to logging only, which can change query correctness/termination behavior. Recommendation (mandatory): either (1) restore ctx-based cancellation semantics in AutoReleaseClosure (capturing status/failure before invoking call()), or (2) replace DummyBrpcCallback with a runtime-filter-specific callback that performs the required cancel/disable/sub behavior under the same option gating.
There was a problem hiding this comment.
Code Review Summary
PR Goal & Accomplishment
This PR fixes a real data race in AutoReleaseClosure::Run() where call() was invoked before checking cntl_->Failed() and response_->status(). Because call() can trigger callback reuse (e.g., in ExchangeSendCallback::call() -> _suc_fn -> _send_rpc() which reuses the same callback via get_send_callback()), the closure would then read mutated cntl_/response_ state from the new RPC instead of the original. The fix correctly reorders: log/check first, then call() last. The crash stacktrace and debug-log evidence clearly confirm the race.
The secondary change — removing QueryContext cancellation from AutoReleaseClosure and moving error handling into individual callbacks — is also sound. All callback types that need error handling (ExchangeSendCallback, SyncSizeCallback, WriteBlockCallback) already fully handle errors in their call() methods. The DummyBrpcCallback paths either have caller-side join()+check patterns or are fire-and-forget operations where query cancellation was too aggressive.
Compilation Bug Found
The old code had two _process_status overloads: an unconstrained no-op fallback template <typename Response> void _process_status(Response*) {} and a constrained template <HasStatus Response> void _process_status(Response*). The new code only has the constrained _log_error_status<HasStatus Response> but removed the unconstrained fallback. This will cause a compilation failure for PTabletWriterCancelResult (an empty protobuf message with no status() field), used in vtablet_writer.cpp:1229.
Critical Checkpoint Conclusions
-
Goal accomplished? Yes, the core data race fix is correct. Tests prove the ordering. However, there is a compilation bug (see inline comment).
-
Modification minimal and focused? Yes. The changes are well-scoped: core fix in
brpc_closure.h, consequent cleanup ofQueryContextremoval, andSyncSizeClosure->SyncSizeCallbackrefactor. -
Concurrency: The core fix addresses the concurrency issue correctly by ensuring
call()(which may trigger callback reuse and new RPCs) is the last operation inRun(). TheSyncSizeCallback::call()readscntl_andresponse_which are safe becausecall()is now called before any reuse can happen. -
Lifecycle management:
_sync_size_callback(asshared_ptr<void>) correctly keeps theSyncSizeCallbackalive until theRuntimeFilterProduceris destroyed. Without this,AutoReleaseClosure'sweak_ptrto the callback would expire whensend_size()returns, and the error-pathsub()would never fire. This is correct. -
Parallel code paths: The two success handler lambdas in
exchange_sink_buffer.cpp(unicast and broadcast) both have the same comment added about_send_rpcbeing last. Good. -
Test coverage: Three well-designed unit tests verify the ordering invariant by capturing state at
call()time and mutating afterward. Tests correctly fail with the old ordering and pass with the new. -
Error handling: The behavioral change from
QueryContext::cancel()toLOG(WARNING)is safe — all critical paths already handle errors in their callbacks or via caller-sidejoin()+check. -
No incompatible changes: No function symbol or storage format changes.
-
Configuration: No new configs added.
-
Observability: Error logging is preserved (downgraded from
canceltoLOG(WARNING)inAutoReleaseClosure, but callbacks handle errors appropriately).
|
/review |
|
run buildall |
There was a problem hiding this comment.
Code Review Summary
PR Goal
Fix a real data race in AutoReleaseClosure where call() was invoked before checking cntl_->Failed() and response_->status(). When call() reuses the callback (as in ExchangeSendCallback via get_send_callback()), it can trigger a new RPC that mutates response_ and cntl_, causing the subsequent reads to see the new RPC's state instead of the original — leading to SIGSEGV crashes.
Core Fix Assessment
The fix is correct and well-motivated:
- Reordering in
Run(): Log/check cntl/response state before callingcallback_->call(). This prevents the data race. - Removing
QueryContextfromAutoReleaseClosure: Decentralizes error handling to individual callbacks, which is architecturally cleaner. SyncSizeCallbackrefactoring: Moves from subclassingAutoReleaseClosureto subclassingDummyBrpcCallback, properly handling errors incall().- Unit tests: Three well-designed tests verify the ordering invariant.
Critical Checkpoint Conclusions
- Goal accomplished? Yes, the data race is fixed by reordering
call()to be last inRun(). Tests prove the ordering. - Modification minimal and focused? Mostly yes. The
QueryContextremoval is a secondary cleanup bundled with the fix. - Concurrency? The core race condition is correctly addressed. The callback reuse pattern in
ExchangeSendCallbackis safe because_send_rpc(which triggers reuse) is documented and enforced as the last operation in success handlers. - Lifecycle management?
_sync_size_callbackasshared_ptr<void>correctly keeps the callback alive. Theweak_ptrinAutoReleaseClosureallows graceful degradation if the callback owner is gone. - Parallel code paths? Both unicast and broadcast paths in
exchange_sink_buffer.cppare updated with matching comments. - Error handling? See inline comment — the
ignore_runtime_filter_error=false+ RPC failure path that previously cancelled the query now silently degrades. The existing regression testtest_inject_send_filter_size_fail.groovyexpects the old cancellation behavior and will likely fail. - Test coverage? Good unit tests for the ordering fix. Missing update to the regression test.
- Incompatible changes? No storage/protocol changes. The
AutoReleaseClosureconstructor signature change is source-breaking but all callers are updated. - Performance? No concerns — the reordering adds no overhead.
| std::shared_ptr<RuntimeFilterWrapper> wrapper) | ||
| : _dependency(std::move(dependency)), _wrapper(wrapper) {} | ||
|
|
||
| void call() override { |
There was a problem hiding this comment.
[Behavioral regression] The old SyncSizeClosure called Base::_process_if_rpc_failed() in its Defer, which would invoke ctx->cancel(Status::NetworkError("RPC meet failed: ...")) when ignore_runtime_filter_error == false. This cancelled the query on RF RPC failure.
The new SyncSizeCallback::call() gracefully degrades (disables the filter + subs dependency) but never cancels the query context, regardless of ignore_runtime_filter_error.
This breaks the existing regression test regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy (lines 80-86), which expects:
sql "set ignore_runtime_filter_error = false"
test {
sql "...";
exception "RPC meet failed"
}The test expects the query to fail with "RPC meet failed" when the debug point injects an RPC failure and ignore_runtime_filter_error=false. With this PR, the query will succeed (with degraded performance) instead.
Recommendation: Either:
- Update the regression test to reflect the new behavior (query succeeds even on RF RPC failure), OR
- Add query-context cancellation logic to
SyncSizeCallback::call()whenignore_runtime_filter_error == false(requires passing the flag or a weak QueryContext to the callback).
Option 1 seems more consistent with the PR's design philosophy of graceful degradation.
| create_unique(merge_filter_request, merge_filter_callback, | ||
| state->query_options().ignore_runtime_filter_error | ||
| ? std::weak_ptr<QueryContext> {} | ||
| : state->get_query_ctx_weak()); |
There was a problem hiding this comment.
[Behavioral change] _push_to_remote sends a merge_filter RPC with a DummyBrpcCallback (empty call()). Previously, when ignore_runtime_filter_error == false, the old code would cancel the query context on RPC failure. Now, failures are only logged as warnings.
This is functionally safe (runtime filters are an optimization, not correctness), but the behavioral change should be documented in the PR's release notes. A merge_filter failure will result in the runtime filter never completing at the merge node — consumers will wait until the RF timeout expires, then proceed without the filter.
| } | ||
| // this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_) | ||
| if (auto tmp = callback_.lock()) { | ||
| tmp->call(); |
There was a problem hiding this comment.
[Observation - looks good] The reordering is correct: log/check state first, then call() last. The comment clearly documents why call() must be last. This is the core fix and it correctly addresses the data race.
TPC-H: Total hot run time: 26928 ms |
TPC-DS: Total hot run time: 167957 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
The callback's call() method may reuse the callback object (e.g., in vdata_stream_sender.h get_send_callback()), triggering a new RPC that mutates response_ and cntl_. If AutoReleaseClosure::Run() invokes call() before checking cntl_->Failed() or response_->status(), it reads the NEW RPC's state instead of the ORIGINAL RPC's result, causing:
we have confirmed the data race is real existing with temporary LOGs which has been removed:
and we add some be-ut which could only pass WITH this patch.
before we fix:
after: