Skip to content

Commit

Permalink
Implement WorkQueue metric stats and periodic logging
Browse files Browse the repository at this point in the history
refs #5133
  • Loading branch information
Michael Friedrich committed May 23, 2017
1 parent 939b2dd commit 456cfdc
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
57 changes: 51 additions & 6 deletions lib/base/workqueue.cpp
Expand Up @@ -33,8 +33,11 @@ boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;

WorkQueue::WorkQueue(size_t maxItems, int threadCount)
: m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false),
m_Processing(0), m_NextTaskID(0)
m_Processing(0), m_NextTaskID(0), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
{
/* Initialize logger. */
m_StatusTimerTimeout = Utility::GetTime();

m_StatusTimer = new Timer();
m_StatusTimer->SetInterval(10);
m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
Expand Down Expand Up @@ -192,14 +195,41 @@ void WorkQueue::StatusTimerHandler(void)
{
boost::mutex::scoped_lock lock(m_Mutex);

Log log(LogNotice, "WorkQueue");
ASSERT(!m_Name.IsEmpty());

int pending = m_Tasks.size();

log << "#" << m_ID;
double now = Utility::GetTime();
double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
double timeToZero = pending / gradient;

if (!m_Name.IsEmpty())
log << " (" << m_Name << ")";
String timeInfo;

if (pending > GetTaskCount(5)) {
timeInfo = " empty in ";
if (timeToZero < 0)
timeInfo += "infinite time, your task handler isn't able to keep up";
else
timeInfo += Utility::FormatDuration(timeToZero);
}

log << " tasks: " << m_Tasks.size();
m_PendingTasks = pending;
m_PendingTasksTimestamp = now;

/* Log if there are pending items, or 5 minute timeout is reached. */
if (pending > 0 || m_StatusTimerTimeout < now) {
Log(LogInformation, "WorkQueue")
<< "#" << m_ID << " (" << m_Name << ") "
<< "items: " << pending << ", "
<< "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s "
<< "(" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
<< timeInfo;
}

/* Reschedule next log entry in 5 minutes. */
if (m_StatusTimerTimeout < now) {
m_StatusTimerTimeout = now + 60 * 5;
}
}

void WorkQueue::WorkerThreadProc(void)
Expand Down Expand Up @@ -247,6 +277,8 @@ void WorkQueue::WorkerThreadProc(void)
_before_ we re-acquire the mutex */
task = Task();

IncreaseTaskCount();

lock.lock();

m_Processing--;
Expand All @@ -256,3 +288,16 @@ void WorkQueue::WorkerThreadProc(void)
}
}

void WorkQueue::IncreaseTaskCount(void)
{
double now = Utility::GetTime();

boost::mutex::scoped_lock lock(m_StatsMutex);
m_TaskStats.InsertValue(now, 1);
}

int WorkQueue::GetTaskCount(RingBuffer::SizeType span) const
{
boost::mutex::scoped_lock lock(m_StatsMutex);
return m_TaskStats.GetValues(span);
}
11 changes: 11 additions & 0 deletions lib/base/workqueue.hpp
Expand Up @@ -22,6 +22,7 @@

#include "base/i2-base.hpp"
#include "base/timer.hpp"
#include "base/ringbuffer.hpp"
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
Expand Down Expand Up @@ -93,13 +94,17 @@ class I2_BASE_API WorkQueue
bool IsWorkerThread(void) const;

size_t GetLength(void) const;
int GetTaskCount(RingBuffer::SizeType span) const;

void SetExceptionCallback(const ExceptionCallback& callback);

bool HasExceptions(void) const;
std::vector<boost::exception_ptr> GetExceptions(void) const;
void ReportExceptions(const String& facility) const;

protected:
void IncreaseTaskCount(void);

private:
int m_ID;
String m_Name;
Expand All @@ -120,6 +125,12 @@ class I2_BASE_API WorkQueue
ExceptionCallback m_ExceptionCallback;
std::vector<boost::exception_ptr> m_Exceptions;
Timer::Ptr m_StatusTimer;
double m_StatusTimerTimeout;

mutable boost::mutex m_StatsMutex;
RingBuffer m_TaskStats;
int m_PendingTasks;
double m_PendingTasksTimestamp;

void WorkerThreadProc(void);
void StatusTimerHandler(void);
Expand Down

0 comments on commit 456cfdc

Please sign in to comment.