diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 871981a82ad..2f945eed81c 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -16,28 +16,71 @@ using namespace icinga; -CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&) +CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand) : m_Done(false) { auto& ioEngine (IoEngine::Get()); + auto& sem (ioEngine.m_CpuBoundSemaphore); + std::unique_lock lock (sem.Mutex); - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); + if (sem.FreeSlots) { + --sem.FreeSlots; + return; + } + + auto cv (Shared::Make(ioEngine.GetIoContext())); + bool gotSlot = false; + auto pos (sem.Waiting.insert(sem.Waiting.end(), IoEngine::CpuBoundQueueItem{&strand, cv, &gotSlot})); + + lock.unlock(); - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; + try { + cv->Wait(yc); + } catch (...) { + std::unique_lock lock (sem.Mutex); + + if (gotSlot) { + lock.unlock(); + Done(); + } else { + sem.Waiting.erase(pos); } - break; + throw; } } +class SetAsioCV +{ +public: + inline SetAsioCV(Shared::Ptr cv) : m_CV(std::move(cv)) + { + } + + inline void operator()() + { + m_CV->Set(); + } + +private: + Shared::Ptr m_CV; +}; + void CpuBoundWork::Done() { if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); + auto& sem (IoEngine::Get().m_CpuBoundSemaphore); + std::unique_lock lock (sem.Mutex); + + if (sem.Waiting.empty()) { + ++sem.FreeSlots; + } else { + auto next (sem.Waiting.front()); + + *next.GotSlot = true; + sem.Waiting.pop_front(); + boost::asio::post(*next.Strand, SetAsioCV(std::move(next.CV))); + } m_Done = true; } @@ -58,7 +101,11 @@ boost::asio::io_context& IoEngine::GetIoContext() IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext) { m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); - m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u); + + { + std::unique_lock lock (m_CpuBoundSemaphore.Mutex); + m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u; + } for (auto& thread : m_Threads) { thread = std::thread(&IoEngine::RunEventLoop, this); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 622a92dd00c..f370bde6696 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -6,10 +6,14 @@ #include "base/exception.hpp" #include "base/lazy-init.hpp" #include "base/logger.hpp" +#include "base/shared.hpp" #include "base/shared-object.hpp" #include +#include #include +#include #include +#include #include #include #include @@ -31,7 +35,7 @@ namespace icinga class CpuBoundWork { public: - CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&); + CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand); CpuBoundWork(const CpuBoundWork&) = delete; CpuBoundWork(CpuBoundWork&&) = delete; CpuBoundWork& operator=(const CpuBoundWork&) = delete; @@ -48,6 +52,25 @@ class CpuBoundWork bool m_Done; }; + +/** + * Condition variable which doesn't block I/O threads + * + * @ingroup base + */ +class AsioConditionVariable +{ +public: + AsioConditionVariable(boost::asio::io_context& io, bool init = false); + + void Set(); + void Clear(); + void Wait(boost::asio::yield_context yc); + +private: + boost::asio::deadline_timer m_Timer; +}; + /** * Async I/O engine * @@ -110,6 +133,13 @@ class IoEngine } private: + struct CpuBoundQueueItem + { + boost::asio::io_context::strand* Strand; + Shared::Ptr CV; + bool* GotSlot; + }; + IoEngine(); void RunEventLoop(); @@ -120,29 +150,16 @@ class IoEngine boost::asio::executor_work_guard m_KeepAlive; std::vector m_Threads; boost::asio::deadline_timer m_AlreadyExpiredTimer; - std::atomic_int_fast32_t m_CpuBoundSemaphore; -}; -class TerminateIoThread : public std::exception -{ + struct { + std::mutex Mutex; + uint_fast32_t FreeSlots; + std::list Waiting; + } m_CpuBoundSemaphore; }; -/** - * Condition variable which doesn't block I/O threads - * - * @ingroup base - */ -class AsioConditionVariable +class TerminateIoThread : public std::exception { -public: - AsioConditionVariable(boost::asio::io_context& io, bool init = false); - - void Set(); - void Clear(); - void Wait(boost::asio::yield_context yc); - -private: - boost::asio::deadline_timer m_Timer; }; /**