Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions test/brpc_streaming_rpc_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct BatchStreamFeedbackRaceState {
std::atomic<bool> client_got_second_msg{false};
std::atomic<bool> server_write_done{false};
std::atomic<bool> rpc_done{false};
std::atomic<int> client_closed_count{0};

bthread_t server_send_tid{0};
std::atomic<bool> server_send_started{false};
Expand Down Expand Up @@ -123,7 +124,9 @@ class BatchStreamClientHandler : public brpc::StreamInputHandler {

void on_idle_timeout(brpc::StreamId /*id*/) override {}

void on_closed(brpc::StreamId /*id*/) override {}
void on_closed(brpc::StreamId /*id*/) override {
_state->client_closed_count.fetch_add(1, std::memory_order_release);
}

void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const std::string& /*error_text*/) override {}

Expand Down Expand Up @@ -224,12 +227,17 @@ static void SetAtomicTrue(std::atomic<bool>* f) {
f->store(true, std::memory_order_release);
}

static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
template <typename Pred>
static bool WaitForTrue(Pred pred, int timeout_ms) {
const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms * 1000L;
while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() < deadline_us) {
while (!pred() && butil::gettimeofday_us() < deadline_us) {
usleep(1000);
}
return f.load(std::memory_order_acquire);
return pred();
}

static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
return WaitForTrue([&f]() { return f.load(std::memory_order_acquire); }, timeout_ms);
}

TEST_F(StreamingRpcTest, sanity) {
Expand Down Expand Up @@ -307,6 +315,22 @@ TEST_F(StreamingRpcTest, batch_create_stream_feedback_race) {
}
server.Stop(0);
server.Join();

// Release the SocketUniquePtr held above so the fake socket can be
// recycled. Otherwise BeforeRecycle / on_closed for the extra stream
// is deferred until `client_extra_ptr` destructs at scope exit, which
// happens *after* `client_handler` and `state` are destroyed -> UAF
// inside Stream::Consume on Linux.
client_extra_ptr.reset();

// on_closed() runs asynchronously on each client stream's consumer
// bthread. Wait for both before letting handler/state go out of
// scope, otherwise Stream::Consume will dereference freed memory.
int expected_closed = request_streams.size();
WaitForTrue([&state, expected_closed]() {
return state.client_closed_count.load(std::memory_order_acquire)
>= expected_closed;
}, 2000);
};

test::EchoService_Stub stub(&channel);
Expand Down
Loading