Skip to content

Commit

Permalink
Implement heartbeats
Browse files Browse the repository at this point in the history
Unfortunately, it's crashing on the next poll.  BOOM.
  • Loading branch information
Beirdo committed Apr 26, 2012
1 parent cf0d2d6 commit cb4f433
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 35 deletions.
50 changes: 23 additions & 27 deletions mythtv/libs/libmythbase/logging.cpp
Expand Up @@ -112,6 +112,22 @@ void loglevelAdd(int value, QString name, char shortname);
void verboseInit(void);
void verboseHelp(void);

void loggingGetTimeStamp(qlonglong *epoch, uint *usec)
{
#if HAVE_GETTIMEOFDAY
struct timeval tv;
gettimeofday(&tv, NULL);
*epoch = tv.tv_sec;
*usec = tv.tv_usec;
#else
/* Stupid system has no gettimeofday, use less precise QDateTime */
QDateTime date = QDateTime::currentDateTime();
QTime time = date.time();
*epoch = date.toTime_t();
*usec = time.msec() * 1000;
#endif
}

LoggingItem::LoggingItem()
{
}
Expand All @@ -122,18 +138,7 @@ LoggingItem::LoggingItem(const char *_file, const char *_function,
m_line(_line), m_type(_type), m_level(_level), m_file(_file),
m_function(_function), m_threadName(NULL)
{
#if HAVE_GETTIMEOFDAY
struct timeval tv;
gettimeofday(&tv, NULL);
m_epoch = tv.tv_sec;
m_usec = tv.tv_usec;
#else
/* Stupid system has no gettimeofday, use less precise QDateTime */
QDateTime date = QDateTime::currentDateTime();
QTime time = date.time();
m_epoch = date.toTime_t();
m_usec = time.msec() * 1000;
#endif
loggingGetTimeStamp(&m_epoch, &m_usec);

m_message[0]='\0';
m_message[LOGLINE_MAX]='\0';
Expand Down Expand Up @@ -276,15 +281,7 @@ void LoggerThread::run(void)
if (!locallogs)
{
m_initialWaiting = true;
LoggingItem *item = LoggingItem::create(__FILE__, __FUNCTION__,
__LINE__, (LogLevel_t)LOG_DEBUG,
kInitializing);
if (item)
{
fillItem(item);
m_zmqSocket->sendMessage(item->toByteArray());
item->deleteItem();
}
m_zmqSocket->sendMessage(QByteArray(""));

msleep(100); // wait up to 100ms for mythlogserver to respond
if (m_initialWaiting && !locallogs)
Expand Down Expand Up @@ -352,15 +349,14 @@ void LoggerThread::run(void)
/// kInitializing message which contains the filename of the log to
/// create and whether to log to db and syslog. If this is not
/// received during startup, it is assumed that mythlogserver needs
/// to be started.
/// to be started. Also, the server will hit us with an empty message
/// when it hasn't heard from us within a second. After no responses
/// from us for 5s, the logs will be closed.
/// \param msg The message received (can be multi-part)
void LoggerThread::messageReceived(const QList<QByteArray> &msg)
{
if (m_initialWaiting)
{
m_initialWaiting = false;
return;
}
m_initialWaiting = false;
m_zmqSocket->sendMessage(QByteArray(""));
}


Expand Down
1 change: 1 addition & 0 deletions mythtv/libs/libmythbase/logging.h
Expand Up @@ -24,6 +24,7 @@ class LoggingItem;

void loggingRegisterThread(const QString &name);
void loggingDeregisterThread(void);
void loggingGetTimeStamp(qlonglong *epoch, uint *usec);

class QWaitCondition;

Expand Down
133 changes: 125 additions & 8 deletions mythtv/libs/libmythbase/loggingserver.cpp
Expand Up @@ -10,6 +10,7 @@
#include <QStringList>
#include <QMap>
#include <QRegExp>
#include <QTimer>
#include <iostream>

using namespace std;
Expand Down Expand Up @@ -61,7 +62,13 @@ static QWaitCondition logThreadStarted;
static bool logThreadFinished = false;

typedef QList<LoggerBase *> LoggerList;
typedef QMap<QString, LoggerList *> ClientMap;

typedef struct {
LoggerList *list;
qlonglong epoch;
uint usec;
} LoggerListItem;
typedef QMap<QString, LoggerListItem *> ClientMap;

typedef QList<QString> ClientList;
typedef QMap<LoggerBase *, ClientList *> RevClientMap;
Expand All @@ -74,6 +81,7 @@ static RevClientMap logRevClientMap;

static QMutex logMsgListMutex;
static LogMessageList logMsgList;
static QWaitCondition logMsgListNotEmpty;

#define TIMESTAMP_MAX 30
#define MAX_STRING_LENGTH (LOGLINE_MAX+120)
Expand Down Expand Up @@ -135,7 +143,9 @@ FileLogger::~FileLogger()
.arg(m_handle));
}

m_zmqSock->unsubscribeFrom(QByteArray(""));
m_zmqSock->setLinger(0);
m_zmqSock->disconnect(this);
m_zmqSock->close();
}

Expand Down Expand Up @@ -236,7 +246,9 @@ SyslogLogger::~SyslogLogger()
LOG(VB_GENERAL, LOG_INFO, "Removing syslogging");
closelog();

m_zmqSock->unsubscribeFrom(QByteArray(""));
m_zmqSock->setLinger(0);
m_zmqSock->disconnect(this);
m_zmqSock->close();
}

Expand Down Expand Up @@ -310,7 +322,9 @@ DatabaseLogger::~DatabaseLogger()

stopDatabaseAccess();

m_zmqSock->unsubscribeFrom(QByteArray(""));
m_zmqSock->setLinger(0);
m_zmqSock->disconnect(this);
m_zmqSock->close();
}

Expand Down Expand Up @@ -607,6 +621,8 @@ void LogServerThread::run(void)
logThreadFinished = false;
QMutexLocker locker(&logThreadStartedMutex);

qRegisterMetaType<QList<QByteArray> >("QList<QByteArray>");

m_zmqContext = nzmqt::createDefaultContext(NULL);
nzmqt::PollingZMQContext *ctx = static_cast<nzmqt::PollingZMQContext *>
(m_zmqContext);
Expand All @@ -623,28 +639,37 @@ void LogServerThread::run(void)

logThreadStarted.wakeAll();
locker.unlock();

QTimer *timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), this, SLOT(checkHeartBeats()));
timer->start(1000);

while (!m_aborted)
{
qApp->processEvents();
qApp->processEvents(QEventLoop::WaitForMoreEvents, 10);

{
QMutexLocker lock(&logMsgListMutex);
if (logMsgList.isEmpty() &&
!logMsgListNotEmpty.wait(lock.mutex(), 90))
{
continue;
}

while (!logMsgList.isEmpty())
{
LogMessage *msg = logMsgList.takeFirst();
forwardMessage(msg);
delete msg;
}
}

msleep(100);

// handle heartbeat...
}

logThreadFinished = true;

timer->stop();
delete timer;

m_zmqPubSock->setLinger(0);
m_zmqPubSock->close();
m_zmqInSock->setLinger(0);
Expand All @@ -668,6 +693,68 @@ void LogServerThread::run(void)
RunEpilog();
}

/// \brief Handles heartbeat checking once a second. If a client is not heard
/// from for at least 1 second, send it a heartbeat message which it
/// should send back. If we haven't heard from it in 5s, shut down its
/// logging.
void LogServerThread::checkHeartBeats(void)
{
qlonglong epoch;
uint usec;
ClientList toDel;

QMutexLocker lock(&logClientMapMutex);
loggingGetTimeStamp(&epoch, &usec);

ClientMap::iterator it = logClientMap.begin();
for( ; it != logClientMap.end(); ++it )
{
QString clientId = it.key();
LoggerListItem *logItem = it.value();
qlonglong age = epoch - logItem->epoch;

if (age > 5)
{
toDel.append(clientId);
}
else if (age > 1)
{
LogMessage msg;
QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
msg << clientBa << QByteArray("");
m_zmqInSock->sendMessage(msg);
}
}

QMutexLocker lock2(&logRevClientMapMutex);
while (!toDel.isEmpty())
{
QString clientId = toDel.takeFirst();
LoggerListItem *item = logClientMap.take(clientId);
LoggerList *list = item->list;
delete item;

while (!list->isEmpty())
{
LoggerBase *logger = list->takeFirst();
ClientList *clientList = logRevClientMap.value(logger, NULL);
if (!clientList || clientList->size() == 1)
{
if (clientList)
{
logRevClientMap.remove(logger);
delete clientList;
}
delete logger;
continue;
}

clientList->removeAll(clientId);
}
delete list;
}
}

/// \brief Handles messages received from logging clients
/// \param msg The message received (can be multi-part)
void LogServerThread::receivedMessage(const QList<QByteArray> &msg)
Expand All @@ -676,6 +763,7 @@ void LogServerThread::receivedMessage(const QList<QByteArray> &msg)
QMutexLocker lock(&logMsgListMutex);

logMsgList.append(message);
logMsgListNotEmpty.wakeAll();
}

void LogServerThread::forwardMessage(LogMessage *msg)
Expand All @@ -697,13 +785,39 @@ void LogServerThread::forwardMessage(LogMessage *msg)
QString clientId = QString(clientBa.toHex());

QByteArray json = msg->at(1);

if (json.size() == 0)
{
// This is either a ping response or a first gasp
QMutexLocker lock(&logClientMapMutex);
LoggerListItem *logItem = logClientMap.value(clientId, NULL);
if (!logItem)
{
// Send an initial ping so the client knows we are in the house
LogMessage msg;
QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
msg << clientBa << QByteArray("");
m_zmqInSock->sendMessage(msg);
}
else
{
loggingGetTimeStamp(&logItem->epoch, &logItem->usec);
}
return;
}

LoggingItem *item = LoggingItem::create(json);

try
{
QMutexLocker lock(&logClientMapMutex);
LoggerListItem *logItem = logClientMap.value(clientId, NULL);

if (!logClientMap.contains(clientId))
if (logItem)
{
loggingGetTimeStamp(&logItem->epoch, &logItem->usec);
}
else
{
LOG(VB_GENERAL, LOG_INFO, QString("New Client: %1").arg(clientId));
QMutexLocker lock2(&loggerMapMutex);
Expand Down Expand Up @@ -810,7 +924,10 @@ void LogServerThread::forwardMessage(LogMessage *msg)
loggers->insert(0, logger);
}

logClientMap.insert(clientId, loggers);
logItem = new LoggerListItem;
loggingGetTimeStamp(&logItem->epoch, &logItem->usec);
logItem->list = loggers;
logClientMap.insert(clientId, logItem);

msleep(10);
}
Expand Down
1 change: 1 addition & 0 deletions mythtv/libs/libmythbase/loggingserver.h
Expand Up @@ -143,6 +143,7 @@ class LogServerThread : public QObject, public MThread
void forwardMessage(LogMessage *msg);
protected slots:
void receivedMessage(const QList<QByteArray>&);
void checkHeartBeats(void);
};

class QWaitCondition;
Expand Down

0 comments on commit cb4f433

Please sign in to comment.