Skip to content

Commit

Permalink
Change nzqmt polling to keep polling until no more msgs
Browse files Browse the repository at this point in the history
This seems to (finally) solve the logjam of messages.  Before, it would pull a
maximum of one message from every socket every interval ms.  As the default was
10ms, that meant a total of 100 messages per socket per second.  Under bursts,
this would fall over, and cause the heartbeats to be very delayed, causing
the clients to get disconnected.

I will be giving this to them upstream.
  • Loading branch information
Beirdo committed May 11, 2012
1 parent edce592 commit 4a9a609
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 24 deletions.
35 changes: 21 additions & 14 deletions mythtv/external/nzmqt/include/nzmqt/nzmqt.hpp
Expand Up @@ -633,25 +633,32 @@ namespace nzmqt
// This method is public because it can be called directly if you need to.
inline void poll(long timeout_ = 0)
{
QMutexLocker lock(&m_pollItemsMutex);
int cnt;
do {
QMutexLocker lock(&m_pollItemsMutex);

if (m_pollItems.empty())
return;
if (m_pollItems.empty())
return;

zmq::poll(&m_pollItems[0], m_pollItems.size(), timeout_);
cnt = zmq::poll(&m_pollItems[0], m_pollItems.size(), timeout_);
if (cnt <= 0)
return;

PollItems::iterator poIt = m_pollItems.begin();
Sockets::iterator soIt = m_sockets.begin();
while (poIt != m_pollItems.end())
{
if (poIt->revents & ZMQSocket::EVT_POLLIN)
PollItems::iterator poIt = m_pollItems.begin();
Sockets::iterator soIt = m_sockets.begin();
int i = 0;
while (i < cnt && poIt != m_pollItems.end())
{
QList<QByteArray> message = (*soIt)->receiveMessage();
(*soIt)->onMessageReceived(message);
if (poIt->revents & ZMQSocket::EVT_POLLIN)
{
QList<QByteArray> message = (*soIt)->receiveMessage();
(*soIt)->onMessageReceived(message);
i++;
}
++soIt;
++poIt;
}
++soIt;
++poIt;
}
} while (cnt > 0);
}

protected:
Expand Down
10 changes: 8 additions & 2 deletions mythtv/libs/libmythbase/logging.cpp
Expand Up @@ -278,10 +278,13 @@ void LoggerThread::run(void)
m_zmqContext->start();
}

qRegisterMetaType<QList<QByteArray> >("QList<QByteArray>");

m_zmqSocket = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_DEALER,
this);
connect(m_zmqSocket, SIGNAL(messageReceived(const QList<QByteArray>&)),
SLOT(messageReceived(const QList<QByteArray>&)));
SLOT(messageReceived(const QList<QByteArray>&)),
Qt::QueuedConnection);

if (m_locallogs)
m_zmqSocket->connectTo("inproc://mylogs");
Expand All @@ -302,7 +305,8 @@ void LoggerThread::run(void)
loggingGetTimeStamp(&m_epoch, NULL);

QTimer *timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), this, SLOT(checkHeartBeat()));
connect(timer, SIGNAL(timeout()), this, SLOT(checkHeartBeat()),
Qt::QueuedConnection);
timer->start(1000);

QMutexLocker qLock(&logQueueMutex);
Expand Down Expand Up @@ -379,6 +383,7 @@ void LoggerThread::checkHeartBeat(void)
/// \brief Send a ping to the log server
void LoggerThread::pingLogServer(void)
{
// cout << "pong" << endl;
m_zmqSocket->sendMessage(QByteArray(""));
}

Expand Down Expand Up @@ -414,6 +419,7 @@ void LoggerThread::launchLogServer(void)
void LoggerThread::messageReceived(const QList<QByteArray> &msg)
{
m_initialWaiting = false;
// cout << "ping" << endl;
loggingGetTimeStamp(&m_epoch, NULL);
pingLogServer();
}
Expand Down
33 changes: 25 additions & 8 deletions mythtv/libs/libmythbase/loggingserver.cpp
Expand Up @@ -90,6 +90,8 @@ static QWaitCondition logMsgListNotEmpty;
static ClientList logClientToDel;
static QMutex logClientToDelMutex;

static QAtomicInt msgsSinceHeartbeat;

#define TIMESTAMP_MAX 30
#define MAX_STRING_LENGTH (LOGLINE_MAX+120)

Expand Down Expand Up @@ -232,7 +234,8 @@ void FileLogger::setupZMQSocket(void)
nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
this, SLOT(receivedMessage(const QList<QByteArray>&)));
this, SLOT(receivedMessage(const QList<QByteArray>&)),
Qt::QueuedConnection);
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
Expand Down Expand Up @@ -294,7 +297,8 @@ void SyslogLogger::setupZMQSocket(void)
nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
this, SLOT(receivedMessage(const QList<QByteArray>&)));
this, SLOT(receivedMessage(const QList<QByteArray>&)),
Qt::QueuedConnection);
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
Expand Down Expand Up @@ -395,7 +399,8 @@ void DatabaseLogger::setupZMQSocket(void)
nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
this, SLOT(receivedMessage(const QList<QByteArray>&)));
this, SLOT(receivedMessage(const QList<QByteArray>&)),
Qt::QueuedConnection);
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
Expand Down Expand Up @@ -651,28 +656,31 @@ void LogServerThread::run(void)
m_zmqContext = nzmqt::createDefaultContext(NULL);
nzmqt::PollingZMQContext *ctx = static_cast<nzmqt::PollingZMQContext *>
(m_zmqContext);
ctx->setInterval(1);
ctx->setInterval(100);
ctx->start();

m_zmqInSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_ROUTER);
connect(m_zmqInSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
this, SLOT(receivedMessage(const QList<QByteArray>&)));
this, SLOT(receivedMessage(const QList<QByteArray>&)),
Qt::QueuedConnection);
m_zmqInSock->bindTo("tcp://127.0.0.1:35327");
m_zmqInSock->bindTo("inproc://mylogs");

logForwardThread = new LogForwardThread();
logForwardThread->start();

connect(logForwardThread, SIGNAL(pingClient(QString)),
this, SLOT(pingClient(QString)));
this, SLOT(pingClient(QString)), Qt::QueuedConnection);

// cerr << "wake all" << endl;
locker.unlock();
logThreadStarted.wakeAll();
// cerr << "unlock" << endl;

msgsSinceHeartbeat = 0;
m_heartbeatTimer = new QTimer(this);
connect(m_heartbeatTimer, SIGNAL(timeout()), this, SLOT(checkHeartBeats()));
connect(m_heartbeatTimer, SIGNAL(timeout()), this, SLOT(checkHeartBeats()),
Qt::QueuedConnection);
m_heartbeatTimer->start(1000);

exec();
Expand Down Expand Up @@ -700,6 +708,7 @@ void LogServerThread::run(void)
void LogServerThread::pingClient(QString clientId)
{
LogMessage msg;
// cout << "ping " << clientId.toLocal8Bit().constData() << endl;
QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
msg << clientBa << QByteArray("");
m_zmqInSock->sendMessage(msg);
Expand All @@ -718,6 +727,9 @@ void LogServerThread::checkHeartBeats(void)
QMutexLocker lock2(&logClientToDelMutex);
loggingGetTimeStamp(&epoch, NULL);

// cout << "msgcount " << msgsSinceHeartbeat << endl;
msgsSinceHeartbeat = 0;

ClientMap::iterator it = logClientMap.begin();
for( ; it != logClientMap.end(); ++it )
{
Expand All @@ -731,6 +743,7 @@ void LogServerThread::checkHeartBeats(void)
}
else
{
// cout << "age " << age << " " << clientId.toLocal8Bit().constData() << endl;
pingClient(clientId);
}
}
Expand Down Expand Up @@ -936,7 +949,7 @@ void LogForwardThread::run(void)
m_shutdownTimer = new QTimer(this);
m_shutdownTimer->setSingleShot(true);
connect(m_shutdownTimer, SIGNAL(timeout()),
this, SLOT(shutdownTimerExpired()));
this, SLOT(shutdownTimerExpired()), Qt::QueuedConnection);
LOG(VB_GENERAL, LOG_INFO, "Starting 5min shutdown timer");
m_shutdownTimer->start(5*60*1000);

Expand Down Expand Up @@ -1106,6 +1119,8 @@ void LogForwardThread::forwardMessage(LogMessage *msg)
}
#endif

msgsSinceHeartbeat.ref();

// First section is the client id
QByteArray clientBa = msg->first();
QString clientId = QString(clientBa.toHex());
Expand All @@ -1124,6 +1139,7 @@ void LogForwardThread::forwardMessage(LogMessage *msg)
}
else
{
// cout << "pong " << clientId.toLocal8Bit().constData() << endl;
loggingGetTimeStamp(&logItem->epoch, NULL);
}
return;
Expand All @@ -1134,6 +1150,7 @@ void LogForwardThread::forwardMessage(LogMessage *msg)
QMutexLocker lock(&logClientMapMutex);
LoggerListItem *logItem = logClientMap.value(clientId, NULL);

// cout << "msg " << clientId.toLocal8Bit().constData() << endl;
if (logItem)
{
loggingGetTimeStamp(&logItem->epoch, NULL);
Expand Down

0 comments on commit 4a9a609

Please sign in to comment.