Skip to content

Commit

Permalink
opt(util):优化AsyncPipe代码,消除错序风险
Browse files Browse the repository at this point in the history
  • Loading branch information
hevake committed Jun 23, 2024
1 parent 00d7529 commit 316e3a7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 36 deletions.
62 changes: 30 additions & 32 deletions modules/util/async_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,21 @@ void AsyncPipe::Impl::appendLockless(const void *data_ptr, size_t data_size)

void AsyncPipe::Impl::threadFunc()
{
do {
bool is_wake_for_timeup = true; //! 是否因超时而唤醒
for (;;) {
bool is_wake_for_quit = false; //! 是否因需要停止而被唤醒
bool is_wake_for_timeup = true; //! 是否因超时而被唤醒

{
//! 等待唤醒信号
std::unique_lock<std::mutex> lk(full_buffers_mutex_);
if (full_buffers_.empty()) {
//! 等待三种情况: 1.超时,2.停止,3.full_buffers_不为空
full_buffers_cv_.wait_for(lk, std::chrono::milliseconds(cfg_.interval),
[&is_wake_for_timeup, this] {
if (stop_signal_ || !full_buffers_.empty()) {
[this, &is_wake_for_timeup, &is_wake_for_quit] {
if (stop_signal_)
is_wake_for_quit = true;

if (is_wake_for_quit || !full_buffers_.empty()) {
is_wake_for_timeup = false;
return true;
}
Expand All @@ -307,7 +312,22 @@ void AsyncPipe::Impl::threadFunc()
is_wake_for_timeup = false;
}
}
//! 先处理 full_buffers_ 中的数据

//! 如果是超时或是收到停止信号,则先将 curr_buff_ 移到 full_buffers_
if (is_wake_for_timeup || is_wake_for_quit) {
if (curr_buffer_mutex_.try_lock()) {
if (curr_buffer_ != nullptr) {
//! Q: 这里为什么不锁 full_buffers_mutex_ ?
//! A: 因为锁住了 curr_buffer_mutex_ 就不会有前端调用 appendLockless(),仅有后端的线程操作。
//! 所以不锁 full_buffers_mutex_ 也是安全的
full_buffers_.push_back(curr_buffer_);
curr_buffer_ = nullptr;
}
curr_buffer_mutex_.unlock();
}
}

//! 然后逐一处理 full_buffers_ 中的数据
for (;;) {
Buffer *buff = nullptr;
{
Expand All @@ -316,8 +336,9 @@ void AsyncPipe::Impl::threadFunc()
if (!full_buffers_.empty()) {
buff = full_buffers_.front();
full_buffers_.pop_front();
} else
} else {
break;
}
}

if (buff != nullptr) {
Expand All @@ -341,32 +362,9 @@ void AsyncPipe::Impl::threadFunc()
}
}

//! 如果是超时或是收到停止信号,则检查并处理 curr_buffer_ 中的数据
if (is_wake_for_timeup || stop_signal_) {
Buffer *buff = nullptr;
if (curr_buffer_mutex_.try_lock()) {
//! 注意:这里一定要用 try_lock(),否则会死锁
if (curr_buffer_ != nullptr && !curr_buffer_->empty()) {
buff = curr_buffer_;
curr_buffer_ = nullptr;
}
curr_buffer_mutex_.unlock();
}

if (buff != nullptr) { //! 如果没取出来
//! 进行处理
if (cb_)
cb_(buff->data(), buff->size());
buff->reset();
//! 然后将处理后的buff放入到free_buffers_中
std::lock_guard<std::mutex> lg(free_buffers_mutex_);
free_buffers_.push_back(buff);
free_buffers_cv_.notify_all();
}
}
} while (!stop_signal_); //! 如果是停止信号,则直接跳出循环,结束线程
//! Q: stop_signal_ 信号为什么不在被唤醒时就break呢?
//! A: 因为我们期望就算是退出了,Buff中的数据都应该先被处理掉
if (is_wake_for_quit)
break;
}
}

}
Expand Down
9 changes: 5 additions & 4 deletions modules/util/async_pipe_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ void TestByConfig(AsyncPipe::Config cfg)
const uint8_t *p = static_cast<const uint8_t*>(ptr);
for (size_t i = 0; i < size; ++i)
out_data.push_back(p[i]);
this_thread::sleep_for(chrono::milliseconds(10));
this_thread::sleep_for(chrono::microseconds(10));
}
);


for (size_t i = 0; i < 256; ++i) {
uint8_t v = i;
ap.append(&v, 1);
}

ap.cleanup();

EXPECT_EQ(out_data.size(), 256);
ASSERT_EQ(out_data.size(), 256);
for (size_t i = 0; i < 256; ++i) {
EXPECT_EQ(out_data[i], i);
}
Expand Down Expand Up @@ -221,7 +221,7 @@ TEST(AsyncPipe, MultiThreadAppend)
const auto len = s1.size();

const int thread_num = 100;
const int each_thread_send_num = 1000;
const int each_thread_send_num = 100;

std::vector<char> recv_data;
recv_data.reserve(thread_num * each_thread_send_num * len);
Expand All @@ -231,6 +231,7 @@ TEST(AsyncPipe, MultiThreadAppend)
const char *str = static_cast<const char *>(ptr);
for (size_t i = 0; i < size; ++i)
recv_data.push_back(str[i]);
std::this_thread::sleep_for(std::chrono::microseconds(rand() % 10));
}
);

Expand Down

0 comments on commit 316e3a7

Please sign in to comment.