Permalink
Browse files

Change nzqmt polling to keep polling until no more msgs

This seems to (finally) solve the logjam of messages.  Before, it would pull a
maximum of one message from every socket every interval ms.  As the default was
10ms, that meant a total of 100 messages per socket per second.  Under bursts,
this would fall over, and cause the heartbeats to be very delayed, causing
the clients to get disconnected.

I will be giving this to them upstream.
  • Loading branch information...
Beirdo committed May 11, 2012
1 parent edce592 commit 4a9a609c450f865c898b7b474dc4b609472e4aef
@@ -633,25 +633,32 @@ namespace nzmqt
// This method is public because it can be called directly if you need to.
inline void poll(long timeout_ = 0)
{
- QMutexLocker lock(&m_pollItemsMutex);
+ int cnt;
+ do {
+ QMutexLocker lock(&m_pollItemsMutex);
- if (m_pollItems.empty())
- return;
+ if (m_pollItems.empty())
+ return;
- zmq::poll(&m_pollItems[0], m_pollItems.size(), timeout_);
+ cnt = zmq::poll(&m_pollItems[0], m_pollItems.size(), timeout_);
+ if (cnt <= 0)
+ return;
- PollItems::iterator poIt = m_pollItems.begin();
- Sockets::iterator soIt = m_sockets.begin();
- while (poIt != m_pollItems.end())
- {
- if (poIt->revents & ZMQSocket::EVT_POLLIN)
+ PollItems::iterator poIt = m_pollItems.begin();
+ Sockets::iterator soIt = m_sockets.begin();
+ int i = 0;
+ while (i < cnt && poIt != m_pollItems.end())
{
- QList<QByteArray> message = (*soIt)->receiveMessage();
- (*soIt)->onMessageReceived(message);
+ if (poIt->revents & ZMQSocket::EVT_POLLIN)
+ {
+ QList<QByteArray> message = (*soIt)->receiveMessage();
+ (*soIt)->onMessageReceived(message);
+ i++;
+ }
+ ++soIt;
+ ++poIt;
}
- ++soIt;
- ++poIt;
- }
+ } while (cnt > 0);
}
protected:
@@ -278,10 +278,13 @@ void LoggerThread::run(void)
m_zmqContext->start();
}
+ qRegisterMetaType<QList<QByteArray> >("QList<QByteArray>");
+
m_zmqSocket = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_DEALER,
this);
connect(m_zmqSocket, SIGNAL(messageReceived(const QList<QByteArray>&)),
- SLOT(messageReceived(const QList<QByteArray>&)));
+ SLOT(messageReceived(const QList<QByteArray>&)),
+ Qt::QueuedConnection);
if (m_locallogs)
m_zmqSocket->connectTo("inproc://mylogs");
@@ -302,7 +305,8 @@ void LoggerThread::run(void)
loggingGetTimeStamp(&m_epoch, NULL);
QTimer *timer = new QTimer(this);
- connect(timer, SIGNAL(timeout()), this, SLOT(checkHeartBeat()));
+ connect(timer, SIGNAL(timeout()), this, SLOT(checkHeartBeat()),
+ Qt::QueuedConnection);
timer->start(1000);
QMutexLocker qLock(&logQueueMutex);
@@ -379,6 +383,7 @@ void LoggerThread::checkHeartBeat(void)
/// \brief Send a ping to the log server
void LoggerThread::pingLogServer(void)
{
+ // cout << "pong" << endl;
m_zmqSocket->sendMessage(QByteArray(""));
}
@@ -414,6 +419,7 @@ void LoggerThread::launchLogServer(void)
void LoggerThread::messageReceived(const QList<QByteArray> &msg)
{
m_initialWaiting = false;
+ // cout << "ping" << endl;
loggingGetTimeStamp(&m_epoch, NULL);
pingLogServer();
}
@@ -90,6 +90,8 @@ static QWaitCondition logMsgListNotEmpty;
static ClientList logClientToDel;
static QMutex logClientToDelMutex;
+static QAtomicInt msgsSinceHeartbeat;
+
#define TIMESTAMP_MAX 30
#define MAX_STRING_LENGTH (LOGLINE_MAX+120)
@@ -232,7 +234,8 @@ void FileLogger::setupZMQSocket(void)
nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- this, SLOT(receivedMessage(const QList<QByteArray>&)));
+ this, SLOT(receivedMessage(const QList<QByteArray>&)),
+ Qt::QueuedConnection);
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
@@ -294,7 +297,8 @@ void SyslogLogger::setupZMQSocket(void)
nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- this, SLOT(receivedMessage(const QList<QByteArray>&)));
+ this, SLOT(receivedMessage(const QList<QByteArray>&)),
+ Qt::QueuedConnection);
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
@@ -395,7 +399,8 @@ void DatabaseLogger::setupZMQSocket(void)
nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- this, SLOT(receivedMessage(const QList<QByteArray>&)));
+ this, SLOT(receivedMessage(const QList<QByteArray>&)),
+ Qt::QueuedConnection);
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
@@ -651,28 +656,31 @@ void LogServerThread::run(void)
m_zmqContext = nzmqt::createDefaultContext(NULL);
nzmqt::PollingZMQContext *ctx = static_cast<nzmqt::PollingZMQContext *>
(m_zmqContext);
- ctx->setInterval(1);
+ ctx->setInterval(100);
ctx->start();
m_zmqInSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_ROUTER);
connect(m_zmqInSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- this, SLOT(receivedMessage(const QList<QByteArray>&)));
+ this, SLOT(receivedMessage(const QList<QByteArray>&)),
+ Qt::QueuedConnection);
m_zmqInSock->bindTo("tcp://127.0.0.1:35327");
m_zmqInSock->bindTo("inproc://mylogs");
logForwardThread = new LogForwardThread();
logForwardThread->start();
connect(logForwardThread, SIGNAL(pingClient(QString)),
- this, SLOT(pingClient(QString)));
+ this, SLOT(pingClient(QString)), Qt::QueuedConnection);
// cerr << "wake all" << endl;
locker.unlock();
logThreadStarted.wakeAll();
// cerr << "unlock" << endl;
+ msgsSinceHeartbeat = 0;
m_heartbeatTimer = new QTimer(this);
- connect(m_heartbeatTimer, SIGNAL(timeout()), this, SLOT(checkHeartBeats()));
+ connect(m_heartbeatTimer, SIGNAL(timeout()), this, SLOT(checkHeartBeats()),
+ Qt::QueuedConnection);
m_heartbeatTimer->start(1000);
exec();
@@ -700,6 +708,7 @@ void LogServerThread::run(void)
void LogServerThread::pingClient(QString clientId)
{
LogMessage msg;
+ // cout << "ping " << clientId.toLocal8Bit().constData() << endl;
QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
msg << clientBa << QByteArray("");
m_zmqInSock->sendMessage(msg);
@@ -718,6 +727,9 @@ void LogServerThread::checkHeartBeats(void)
QMutexLocker lock2(&logClientToDelMutex);
loggingGetTimeStamp(&epoch, NULL);
+ // cout << "msgcount " << msgsSinceHeartbeat << endl;
+ msgsSinceHeartbeat = 0;
+
ClientMap::iterator it = logClientMap.begin();
for( ; it != logClientMap.end(); ++it )
{
@@ -731,6 +743,7 @@ void LogServerThread::checkHeartBeats(void)
}
else
{
+ // cout << "age " << age << " " << clientId.toLocal8Bit().constData() << endl;
pingClient(clientId);
}
}
@@ -936,7 +949,7 @@ void LogForwardThread::run(void)
m_shutdownTimer = new QTimer(this);
m_shutdownTimer->setSingleShot(true);
connect(m_shutdownTimer, SIGNAL(timeout()),
- this, SLOT(shutdownTimerExpired()));
+ this, SLOT(shutdownTimerExpired()), Qt::QueuedConnection);
LOG(VB_GENERAL, LOG_INFO, "Starting 5min shutdown timer");
m_shutdownTimer->start(5*60*1000);
@@ -1106,6 +1119,8 @@ void LogForwardThread::forwardMessage(LogMessage *msg)
}
#endif
+ msgsSinceHeartbeat.ref();
+
// First section is the client id
QByteArray clientBa = msg->first();
QString clientId = QString(clientBa.toHex());
@@ -1124,6 +1139,7 @@ void LogForwardThread::forwardMessage(LogMessage *msg)
}
else
{
+ // cout << "pong " << clientId.toLocal8Bit().constData() << endl;
loggingGetTimeStamp(&logItem->epoch, NULL);
}
return;
@@ -1134,6 +1150,7 @@ void LogForwardThread::forwardMessage(LogMessage *msg)
QMutexLocker lock(&logClientMapMutex);
LoggerListItem *logItem = logClientMap.value(clientId, NULL);
+ // cout << "msg " << clientId.toLocal8Bit().constData() << endl;
if (logItem)
{
loggingGetTimeStamp(&logItem->epoch, NULL);

0 comments on commit 4a9a609

Please sign in to comment.