Skip to content

Commit

Permalink
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot
Browse files Browse the repository at this point in the history
This is inefficient and involves unfair scheduling. The latter implies
possible bad surprises regarding waiting durations on busy nodes. Instead,
use AsioConditionVariable#Wait() if there are no free slots. It's notified
by others' CpuBoundWork#~CpuBoundWork() once finished.
  • Loading branch information
Al2Klimov committed Feb 21, 2024
1 parent b899611 commit 9062934
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 30 deletions.
67 changes: 57 additions & 10 deletions lib/base/io-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,71 @@

using namespace icinga;

CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&)
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
: m_Done(false)
{
auto& ioEngine (IoEngine::Get());
auto& sem (ioEngine.m_CpuBoundSemaphore);
std::unique_lock<std::mutex> lock (sem.Mutex);

for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
if (sem.FreeSlots) {
--sem.FreeSlots;
return;
}

auto cv (Shared<AsioConditionVariable>::Make(ioEngine.GetIoContext()));
bool gotSlot = false;
auto pos (sem.Waiting.insert(sem.Waiting.end(), IoEngine::CpuBoundQueueItem{&strand, cv, &gotSlot}));

lock.unlock();

if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
try {
cv->Wait(yc);
} catch (...) {
std::unique_lock<std::mutex> lock (sem.Mutex);

if (gotSlot) {
lock.unlock();
Done();
} else {
sem.Waiting.erase(pos);
}

break;
throw;
}
}

class SetAsioCV
{
public:
inline SetAsioCV(Shared<AsioConditionVariable>::Ptr cv) : m_CV(std::move(cv))
{
}

inline void operator()()
{
m_CV->Set();
}

private:
Shared<AsioConditionVariable>::Ptr m_CV;
};

void CpuBoundWork::Done()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
auto& sem (IoEngine::Get().m_CpuBoundSemaphore);
std::unique_lock<std::mutex> lock (sem.Mutex);

if (sem.Waiting.empty()) {
++sem.FreeSlots;
} else {
auto next (sem.Waiting.front());

*next.GotSlot = true;
sem.Waiting.pop_front();
boost::asio::post(*next.Strand, SetAsioCV(std::move(next.CV)));
}

m_Done = true;
}
Expand All @@ -58,7 +101,11 @@ boost::asio::io_context& IoEngine::GetIoContext()
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
{
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);

{
std::unique_lock<std::mutex> lock (m_CpuBoundSemaphore.Mutex);
m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u;
}

for (auto& thread : m_Threads) {
thread = std::thread(&IoEngine::RunEventLoop, this);
Expand Down
57 changes: 37 additions & 20 deletions lib/base/io-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
#include "base/exception.hpp"
#include "base/lazy-init.hpp"
#include "base/logger.hpp"
#include "base/shared.hpp"
#include "base/shared-object.hpp"
#include <atomic>
#include <cstdint>
#include <exception>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
Expand All @@ -31,7 +35,7 @@ namespace icinga
class CpuBoundWork
{
public:
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&);
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand);
CpuBoundWork(const CpuBoundWork&) = delete;
CpuBoundWork(CpuBoundWork&&) = delete;
CpuBoundWork& operator=(const CpuBoundWork&) = delete;
Expand All @@ -48,6 +52,25 @@ class CpuBoundWork
bool m_Done;
};


/**
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class AsioConditionVariable
{
public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false);

void Set();
void Clear();
void Wait(boost::asio::yield_context yc);

private:
boost::asio::deadline_timer m_Timer;
};

/**
* Async I/O engine
*
Expand Down Expand Up @@ -110,6 +133,13 @@ class IoEngine
}

private:
struct CpuBoundQueueItem
{
boost::asio::io_context::strand* Strand;
Shared<AsioConditionVariable>::Ptr CV;
bool* GotSlot;
};

IoEngine();

void RunEventLoop();
Expand All @@ -120,29 +150,16 @@ class IoEngine
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer;
std::atomic_int_fast32_t m_CpuBoundSemaphore;
};

class TerminateIoThread : public std::exception
{
struct {
std::mutex Mutex;
uint_fast32_t FreeSlots;
std::list<CpuBoundQueueItem> Waiting;
} m_CpuBoundSemaphore;
};

/**
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class AsioConditionVariable
class TerminateIoThread : public std::exception
{
public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false);

void Set();
void Clear();
void Wait(boost::asio::yield_context yc);

private:
boost::asio::deadline_timer m_Timer;
};

/**
Expand Down

0 comments on commit 9062934

Please sign in to comment.