From 316e3a780fa259b8f1baa4fc4ce916b69c02a99a Mon Sep 17 00:00:00 2001 From: Hevake Lee Date: Sun, 23 Jun 2024 09:59:52 +0800 Subject: [PATCH] =?UTF-8?q?opt(util):=E4=BC=98=E5=8C=96AsyncPipe=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=EF=BC=8C=E6=B6=88=E9=99=A4=E9=94=99=E5=BA=8F=E9=A3=8E?= =?UTF-8?q?=E9=99=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/util/async_pipe.cpp | 62 ++++++++++++++++---------------- modules/util/async_pipe_test.cpp | 9 ++--- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/modules/util/async_pipe.cpp b/modules/util/async_pipe.cpp index aec9efb..fe50167 100644 --- a/modules/util/async_pipe.cpp +++ b/modules/util/async_pipe.cpp @@ -287,16 +287,21 @@ void AsyncPipe::Impl::appendLockless(const void *data_ptr, size_t data_size) void AsyncPipe::Impl::threadFunc() { - do { - bool is_wake_for_timeup = true; //! 是否因超时而唤醒 + for (;;) { + bool is_wake_for_quit = false; //! 是否因需要停止而被唤醒 + bool is_wake_for_timeup = true; //! 是否因超时而被唤醒 + { //! 等待唤醒信号 std::unique_lock lk(full_buffers_mutex_); if (full_buffers_.empty()) { //! 等待三种情况: 1.超时,2.停止,3.full_buffers_不为空 full_buffers_cv_.wait_for(lk, std::chrono::milliseconds(cfg_.interval), - [&is_wake_for_timeup, this] { - if (stop_signal_ || !full_buffers_.empty()) { + [this, &is_wake_for_timeup, &is_wake_for_quit] { + if (stop_signal_) + is_wake_for_quit = true; + + if (is_wake_for_quit || !full_buffers_.empty()) { is_wake_for_timeup = false; return true; } @@ -307,7 +312,22 @@ void AsyncPipe::Impl::threadFunc() is_wake_for_timeup = false; } } - //! 先处理 full_buffers_ 中的数据 + + //! 如果是超时或是收到停止信号,则先将 curr_buff_ 移到 full_buffers_ + if (is_wake_for_timeup || is_wake_for_quit) { + if (curr_buffer_mutex_.try_lock()) { + if (curr_buffer_ != nullptr) { + //! Q: 这里为什么不锁 full_buffers_mutex_ ? + //! A: 因为锁住了 curr_buffer_mutex_ 就不会有前端调用 appendLockless(),仅有后端的线程操作。 + //! 所以不锁 full_buffers_mutex_ 也是安全的 + full_buffers_.push_back(curr_buffer_); + curr_buffer_ = nullptr; + } + curr_buffer_mutex_.unlock(); + } + } + + //! 然后逐一处理 full_buffers_ 中的数据 for (;;) { Buffer *buff = nullptr; { @@ -316,8 +336,9 @@ void AsyncPipe::Impl::threadFunc() if (!full_buffers_.empty()) { buff = full_buffers_.front(); full_buffers_.pop_front(); - } else + } else { break; + } } if (buff != nullptr) { @@ -341,32 +362,9 @@ void AsyncPipe::Impl::threadFunc() } } - //! 如果是超时或是收到停止信号,则检查并处理 curr_buffer_ 中的数据 - if (is_wake_for_timeup || stop_signal_) { - Buffer *buff = nullptr; - if (curr_buffer_mutex_.try_lock()) { - //! 注意:这里一定要用 try_lock(),否则会死锁 - if (curr_buffer_ != nullptr && !curr_buffer_->empty()) { - buff = curr_buffer_; - curr_buffer_ = nullptr; - } - curr_buffer_mutex_.unlock(); - } - - if (buff != nullptr) { //! 如果没取出来 - //! 进行处理 - if (cb_) - cb_(buff->data(), buff->size()); - buff->reset(); - //! 然后将处理后的buff放入到free_buffers_中 - std::lock_guard lg(free_buffers_mutex_); - free_buffers_.push_back(buff); - free_buffers_cv_.notify_all(); - } - } - } while (!stop_signal_); //! 如果是停止信号,则直接跳出循环,结束线程 - //! Q: stop_signal_ 信号为什么不在被唤醒时就break呢? - //! A: 因为我们期望就算是退出了,Buff中的数据都应该先被处理掉 + if (is_wake_for_quit) + break; + } } } diff --git a/modules/util/async_pipe_test.cpp b/modules/util/async_pipe_test.cpp index 7c89c18..94dad97 100644 --- a/modules/util/async_pipe_test.cpp +++ b/modules/util/async_pipe_test.cpp @@ -40,18 +40,18 @@ void TestByConfig(AsyncPipe::Config cfg) const uint8_t *p = static_cast(ptr); for (size_t i = 0; i < size; ++i) out_data.push_back(p[i]); - this_thread::sleep_for(chrono::milliseconds(10)); + this_thread::sleep_for(chrono::microseconds(10)); } ); - for (size_t i = 0; i < 256; ++i) { uint8_t v = i; ap.append(&v, 1); } + ap.cleanup(); - EXPECT_EQ(out_data.size(), 256); + ASSERT_EQ(out_data.size(), 256); for (size_t i = 0; i < 256; ++i) { EXPECT_EQ(out_data[i], i); } @@ -221,7 +221,7 @@ TEST(AsyncPipe, MultiThreadAppend) const auto len = s1.size(); const int thread_num = 100; - const int each_thread_send_num = 1000; + const int each_thread_send_num = 100; std::vector recv_data; recv_data.reserve(thread_num * each_thread_send_num * len); @@ -231,6 +231,7 @@ TEST(AsyncPipe, MultiThreadAppend) const char *str = static_cast(ptr); for (size_t i = 0; i < size; ++i) recv_data.push_back(str[i]); + std::this_thread::sleep_for(std::chrono::microseconds(rand() % 10)); } );