From 10cba9c27b110b4f34f77e1eb01c6c0e14050f84 Mon Sep 17 00:00:00 2001 From: chenBright Date: Wed, 20 May 2026 22:50:32 +0800 Subject: [PATCH] Fix UAF in batch_create_stream_feedback_race unittest --- test/brpc_streaming_rpc_unittest.cpp | 32 ++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index ecb88c6150..0f8a3e56d5 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -91,6 +91,7 @@ struct BatchStreamFeedbackRaceState { std::atomic client_got_second_msg{false}; std::atomic server_write_done{false}; std::atomic rpc_done{false}; + std::atomic client_closed_count{0}; bthread_t server_send_tid{0}; std::atomic server_send_started{false}; @@ -123,7 +124,9 @@ class BatchStreamClientHandler : public brpc::StreamInputHandler { void on_idle_timeout(brpc::StreamId /*id*/) override {} - void on_closed(brpc::StreamId /*id*/) override {} + void on_closed(brpc::StreamId /*id*/) override { + _state->client_closed_count.fetch_add(1, std::memory_order_release); + } void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const std::string& /*error_text*/) override {} @@ -224,12 +227,17 @@ static void SetAtomicTrue(std::atomic* f) { f->store(true, std::memory_order_release); } -static bool WaitForTrue(const std::atomic& f, int timeout_ms) { +template +static bool WaitForTrue(Pred pred, int timeout_ms) { const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms * 1000L; - while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() < deadline_us) { + while (!pred() && butil::gettimeofday_us() < deadline_us) { usleep(1000); } - return f.load(std::memory_order_acquire); + return pred(); +} + +static bool WaitForTrue(const std::atomic& f, int timeout_ms) { + return WaitForTrue([&f]() { return f.load(std::memory_order_acquire); }, timeout_ms); } TEST_F(StreamingRpcTest, sanity) { @@ -307,6 +315,22 @@ TEST_F(StreamingRpcTest, batch_create_stream_feedback_race) { } server.Stop(0); server.Join(); + + // Release the SocketUniquePtr held above so the fake socket can be + // recycled. Otherwise BeforeRecycle / on_closed for the extra stream + // is deferred until `client_extra_ptr` destructs at scope exit, which + // happens *after* `client_handler` and `state` are destroyed -> UAF + // inside Stream::Consume on Linux. + client_extra_ptr.reset(); + + // on_closed() runs asynchronously on each client stream's consumer + // bthread. Wait for both before letting handler/state go out of + // scope, otherwise Stream::Consume will dereference freed memory. + int expected_closed = request_streams.size(); + WaitForTrue([&state, expected_closed]() { + return state.client_closed_count.load(std::memory_order_acquire) + >= expected_closed; + }, 2000); }; test::EchoService_Stub stub(&channel);