Skip to content

Commit

Permalink
fix rpc_press can't send request equably (#1763)
Browse files Browse the repository at this point in the history
* fix rpc_press can't send request equably

* set rate limit to 1000000 in one thread && calculate interval with nanosecond

* change sleep function to usleep

* Update rpc_press_impl.cpp

add the max tolerant delay between end_time and expected_time

Co-authored-by: bumingchun <bumingchun@192.168.1.8>
  • Loading branch information
bumingchun and bumingchun committed Jun 20, 2022
1 parent df97a4e commit c945795
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
8 changes: 8 additions & 0 deletions tools/rpc_press/rpc_press.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ bool set_press_options(pbrpcframework::PressOptions* options){
}
}

const int rate_limit_per_thread = 1000000;
double req_rate_per_thread = options->test_req_rate / options->test_thread_num;
if (req_rate_per_thread > rate_limit_per_thread) {
LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is too large in one thread. The rate limit is "
<< rate_limit_per_thread << " in one thread";
return false;
}

options->input = FLAGS_input;
options->output = FLAGS_output;
options->connection_type = FLAGS_connection_type;
Expand Down
34 changes: 12 additions & 22 deletions tools/rpc_press/rpc_press_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,10 @@ void RpcPress::sync_client() {
}
const int thread_index = g_thread_count.fetch_add(1, butil::memory_order_relaxed);
int msg_index = thread_index;
std::deque<int64_t> timeq;
size_t MAX_QUEUE_SIZE = (size_t)req_rate;
if (MAX_QUEUE_SIZE < 100) {
MAX_QUEUE_SIZE = 100;
} else if (MAX_QUEUE_SIZE > 2000) {
MAX_QUEUE_SIZE = 2000;
}
timeq.push_back(butil::gettimeofday_us());
int64_t last_expected_time = butil::monotonic_time_ns();
const int64_t interval = (int64_t) (1000000000L / req_rate);
// the max tolerant delay between end_time and expected_time. 10ms or 10 intervals
int64_t max_tolerant_delay = std::max(10000000L, 10 * interval);
while (!_stop) {
brpc::Controller* cntl = new brpc::Controller;
msg_index = (msg_index + _options.test_thread_num) % _msgs.size();
Expand All @@ -247,21 +243,15 @@ void RpcPress::sync_client() {
if (_options.test_req_rate <= 0) {
brpc::Join(cid1);
} else {
int64_t end_time = butil::gettimeofday_us();
int64_t expected_elp = 0;
int64_t actual_elp = 0;
timeq.push_back(end_time);
if (timeq.size() > MAX_QUEUE_SIZE) {
actual_elp = end_time - timeq.front();
timeq.pop_front();
expected_elp = (int64_t)(1000000 * timeq.size() / req_rate);
} else {
actual_elp = end_time - timeq.front();
expected_elp = (int64_t)(1000000 * (timeq.size() - 1) / req_rate);
}
if (actual_elp < expected_elp) {
usleep(expected_elp - actual_elp);
int64_t end_time = butil::monotonic_time_ns();
int64_t expected_time = last_expected_time + interval;
if (end_time < expected_time) {
usleep((expected_time - end_time)/1000);
}
if (end_time - expected_time > max_tolerant_delay) {
expected_time = end_time;
}
last_expected_time = expected_time;
}
}
}
Expand Down

0 comments on commit c945795

Please sign in to comment.