From af80857ab18427707b57dc8b67ff6f6ef597cb10 Mon Sep 17 00:00:00 2001 From: bumingchun Date: Mon, 16 May 2022 16:28:19 +0800 Subject: [PATCH 1/4] fix rpc_press can't send request equably --- tools/rpc_press/rpc_press_impl.cpp | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/tools/rpc_press/rpc_press_impl.cpp b/tools/rpc_press/rpc_press_impl.cpp index f825f44ce6..e32f161a37 100644 --- a/tools/rpc_press/rpc_press_impl.cpp +++ b/tools/rpc_press/rpc_press_impl.cpp @@ -219,14 +219,8 @@ void RpcPress::sync_client() { } const int thread_index = g_thread_count.fetch_add(1, butil::memory_order_relaxed); int msg_index = thread_index; - std::deque timeq; - size_t MAX_QUEUE_SIZE = (size_t)req_rate; - if (MAX_QUEUE_SIZE < 100) { - MAX_QUEUE_SIZE = 100; - } else if (MAX_QUEUE_SIZE > 2000) { - MAX_QUEUE_SIZE = 2000; - } - timeq.push_back(butil::gettimeofday_us()); + int64_t last_expected_time = butil::gettimeofday_us(); + const int64_t interval = (int64_t) (1000000 / req_rate); while (!_stop) { brpc::Controller* cntl = new brpc::Controller; msg_index = (msg_index + _options.test_thread_num) % _msgs.size(); @@ -248,20 +242,11 @@ void RpcPress::sync_client() { brpc::Join(cid1); } else { int64_t end_time = butil::gettimeofday_us(); - int64_t expected_elp = 0; - int64_t actual_elp = 0; - timeq.push_back(end_time); - if (timeq.size() > MAX_QUEUE_SIZE) { - actual_elp = end_time - timeq.front(); - timeq.pop_front(); - expected_elp = (int64_t)(1000000 * timeq.size() / req_rate); - } else { - actual_elp = end_time - timeq.front(); - expected_elp = (int64_t)(1000000 * (timeq.size() - 1) / req_rate); - } - if (actual_elp < expected_elp) { - usleep(expected_elp - actual_elp); + int64_t expected_time = last_expected_time + interval; + if (end_time < expected_time) { + usleep(expected_time - end_time); } + last_expected_time = expected_time; } } } From 925bdbab455fe9fde24b9e7258fd57df7bce95ee Mon Sep 17 00:00:00 2001 From: bumingchun Date: Mon, 16 May 2022 22:09:01 +0800 Subject: [PATCH 2/4] set rate limit to 1000000 in one thread && calculate interval with nanosecond --- tools/rpc_press/rpc_press.cpp | 8 ++++++++ tools/rpc_press/rpc_press_impl.cpp | 9 +++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/tools/rpc_press/rpc_press.cpp b/tools/rpc_press/rpc_press.cpp index c176f96216..997a9fb03c 100644 --- a/tools/rpc_press/rpc_press.cpp +++ b/tools/rpc_press/rpc_press.cpp @@ -71,6 +71,14 @@ bool set_press_options(pbrpcframework::PressOptions* options){ } } + const int rate_limit_per_thread = 1000000; + double req_rate_per_thread = options->test_req_rate / options->test_thread_num; + if (req_rate_per_thread > rate_limit_per_thread) { + LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is too large in one thread. The rate limit is " + << rate_limit_per_thread << " in one thread"; + return false; + } + options->input = FLAGS_input; options->output = FLAGS_output; options->connection_type = FLAGS_connection_type; diff --git a/tools/rpc_press/rpc_press_impl.cpp b/tools/rpc_press/rpc_press_impl.cpp index e32f161a37..3b57042f61 100644 --- a/tools/rpc_press/rpc_press_impl.cpp +++ b/tools/rpc_press/rpc_press_impl.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include // butil::FilePath #include @@ -219,8 +220,8 @@ void RpcPress::sync_client() { } const int thread_index = g_thread_count.fetch_add(1, butil::memory_order_relaxed); int msg_index = thread_index; - int64_t last_expected_time = butil::gettimeofday_us(); - const int64_t interval = (int64_t) (1000000 / req_rate); + int64_t last_expected_time = butil::monotonic_time_ns(); + const int64_t interval = (int64_t) (1000000000L / req_rate); while (!_stop) { brpc::Controller* cntl = new brpc::Controller; msg_index = (msg_index + _options.test_thread_num) % _msgs.size(); @@ -241,10 +242,10 @@ void RpcPress::sync_client() { if (_options.test_req_rate <= 0) { brpc::Join(cid1); } else { - int64_t end_time = butil::gettimeofday_us(); + int64_t end_time = butil::monotonic_time_ns(); int64_t expected_time = last_expected_time + interval; if (end_time < expected_time) { - usleep(expected_time - end_time); + std::this_thread::sleep_for(std::chrono::nanoseconds(expected_time - end_time)); } last_expected_time = expected_time; } From eb20a337d41f7ca070d513cdd4c237f8ac0db318 Mon Sep 17 00:00:00 2001 From: bumingchun Date: Wed, 18 May 2022 18:57:07 +0800 Subject: [PATCH 3/4] change sleep function to usleep --- tools/rpc_press/rpc_press_impl.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/rpc_press/rpc_press_impl.cpp b/tools/rpc_press/rpc_press_impl.cpp index 3b57042f61..af95e2697f 100644 --- a/tools/rpc_press/rpc_press_impl.cpp +++ b/tools/rpc_press/rpc_press_impl.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include // butil::FilePath #include @@ -245,7 +244,7 @@ void RpcPress::sync_client() { int64_t end_time = butil::monotonic_time_ns(); int64_t expected_time = last_expected_time + interval; if (end_time < expected_time) { - std::this_thread::sleep_for(std::chrono::nanoseconds(expected_time - end_time)); + usleep((expected_time - end_time)/1000); } last_expected_time = expected_time; } From 35f00d1dedcc93ada4e9c159bbf535c80689f419 Mon Sep 17 00:00:00 2001 From: bumingchun Date: Sat, 4 Jun 2022 17:34:21 +0800 Subject: [PATCH 4/4] Update rpc_press_impl.cpp add the max tolerant delay between end_time and expected_time --- tools/rpc_press/rpc_press_impl.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tools/rpc_press/rpc_press_impl.cpp b/tools/rpc_press/rpc_press_impl.cpp index af95e2697f..8a873eff8c 100644 --- a/tools/rpc_press/rpc_press_impl.cpp +++ b/tools/rpc_press/rpc_press_impl.cpp @@ -221,6 +221,8 @@ void RpcPress::sync_client() { int msg_index = thread_index; int64_t last_expected_time = butil::monotonic_time_ns(); const int64_t interval = (int64_t) (1000000000L / req_rate); + // the max tolerant delay between end_time and expected_time. 10ms or 10 intervals + int64_t max_tolerant_delay = std::max(10000000L, 10 * interval); while (!_stop) { brpc::Controller* cntl = new brpc::Controller; msg_index = (msg_index + _options.test_thread_num) % _msgs.size(); @@ -246,6 +248,9 @@ void RpcPress::sync_client() { if (end_time < expected_time) { usleep((expected_time - end_time)/1000); } + if (end_time - expected_time > max_tolerant_delay) { + expected_time = end_time; + } last_expected_time = expected_time; } }