Permalink
Browse files

HPCC-8189 JobQueue updating dali too frequently

Both eclccserver and roxie were setting short timeouts on the call to
jobq.dequeue, in order to ensure they could close down cleanly and quickly.

However, this resulted in frequent writes to dali to update the queue
status (several per second), meaning dali would never be idle and the
inc file would grow unnecessarily large.

Use the async cancelAcceptConversation method means we can close down even
faster, and avoids the dali noise.

There is still a timeout every 900s (in a defualt configuration) on the
dfuserver monitor queue. That cannot be eliminated without more significant
refactoring.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information...
1 parent c670786 commit 2fff62bfb58eee9341099340a4c65477b3c0e3b5 @richardkchapman richardkchapman committed Nov 6, 2012
Showing with 27 additions and 7 deletions.
  1. +1 −1 dali/dfu/dfurun.cpp
  2. +1 −1 ecl/agentexec/agentexec.cpp
  3. +5 −2 ecl/eclccserver/eclccserver.cpp
  4. +20 −3 roxie/ccd/ccdserver.cpp
View
@@ -462,7 +462,7 @@ class CDFUengine: public CInterface, implements IDFUengine
void startListener(const char *queuename,CSDSServerStatus *serverstatus)
{
PROGLOG("DFU server waiting on queue %s",queuename);
- cDFUlistener *lt = new cDFUlistener(this,queuename,false,serverstatus, 1000*60);
+ cDFUlistener *lt = new cDFUlistener(this,queuename,false,serverstatus);
listeners.append(*lt);
lt->start();
}
@@ -147,7 +147,7 @@ int CEclAgentExecutionServer::run()
while (started)
{
PROGLOG("AgentExec: Waiting on queue(s) '%s'", queueNames.str());
- Owned<IJobQueueItem> item = queue->dequeue(WAIT_FOREVER);
+ Owned<IJobQueueItem> item = queue->dequeue();
if (item.get())
{
StringAttr wuid;
@@ -433,6 +433,7 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
unsigned maxThreadsActive;
bool running;
CSDSServerStatus serverstatus;
+ Owned<IJobQueue> queue;
public:
IMPLEMENT_IINTERFACE;
@@ -455,15 +456,15 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
void run()
{
DBGLOG("eclccServer (%d threads) waiting for requests on queue(s) %s", poolSize, queueName.get());
- Owned<IJobQueue> queue = createJobQueue(queueName.get());
+ queue.setown(createJobQueue(queueName.get()));
queue->connect();
running = true;
LocalIAbortHandler abortHandler(*this);
while (running)
{
try
{
- Owned<IJobQueueItem> item = queue->dequeue(1000);
+ Owned<IJobQueueItem> item = queue->dequeue();
if (item.get())
{
try
@@ -508,6 +509,8 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
virtual bool onAbort()
{
running = false;
+ if (queue)
+ queue->cancelAcceptConversation();
return false;
}
};
View
@@ -31111,6 +31111,7 @@ class RoxieListener : public Thread, implements IRoxieListener, implements IThre
class RoxieWorkUnitListener : public RoxieListener
{
+ Owned<IJobQueue> queue;
public:
RoxieWorkUnitListener(unsigned _poolSize, bool _suspended)
: RoxieListener(_poolSize, _suspended)
@@ -31132,9 +31133,23 @@ class RoxieWorkUnitListener : public RoxieListener
UNIMPLEMENTED;
}
+ virtual bool stop(unsigned timeout)
+ {
+ if (queue)
+ {
+ DBGLOG("RoxieWorkUnitListener::stop");
+ queue->cancelAcceptConversation();
+ }
+ return RoxieListener::stop(timeout);
+ }
+
virtual void stopListening()
{
- // Nothing to do
+ if (queue)
+ {
+ DBGLOG("RoxieWorkUnitListener::stopListening");
+ queue->cancelAcceptConversation();
+ }
}
virtual int run()
@@ -31154,26 +31169,28 @@ class RoxieWorkUnitListener : public RoxieListener
DBGLOG("roxie: Waiting on queue(s) '%s'", queueNames.str());
try
{
- Owned<IJobQueue> queue = createJobQueue(queueNames.str());
+ queue.setown(createJobQueue(queueNames.str()));
queue->connect();
daliHelper->noteQueuesRunning(queueNames.str());
while (running)
{
- Owned<IJobQueueItem> item = queue->dequeue(5000);
+ Owned<IJobQueueItem> item = queue->dequeue();
if (item.get())
{
if (traceLevel)
PROGLOG("roxie: Dequeued workunit request '%s'", item->queryWUID());
pool->start((void *) item->queryWUID());
}
}
+ queue.clear();
}
catch (IDaliClient_Exception *E)
{
if (traceLevel)
EXCLOG(E, "roxie: Dali connection lost");
E->Release();
daliHelper->disconnect();
+ queue.clear();
}
}
}

0 comments on commit 2fff62b

Please sign in to comment.