Skip to content

Commit

Permalink
Added heartbeat in clients.
Browse files Browse the repository at this point in the history
If there is no response from the log server in 5s, start a new one.
  • Loading branch information
Beirdo committed Apr 26, 2012
1 parent 38da9df commit 4e54851
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 39 deletions.
96 changes: 72 additions & 24 deletions mythtv/libs/libmythbase/logging.cpp
Expand Up @@ -11,6 +11,7 @@
#include <QMap>
#include <QRegExp>
#include <QVariantMap>
#include <QTimer>
#include <iostream>

using namespace std;
Expand Down Expand Up @@ -118,13 +119,17 @@ void loggingGetTimeStamp(qlonglong *epoch, uint *usec)
struct timeval tv;
gettimeofday(&tv, NULL);
*epoch = tv.tv_sec;
*usec = tv.tv_usec;
if (usec)
*usec = tv.tv_usec;
#else
/* Stupid system has no gettimeofday, use less precise QDateTime */
QDateTime date = QDateTime::currentDateTime();
QTime time = date.time();
*epoch = date.toTime_t();
*usec = time.msec() * 1000;
if (usec)
{
QTime time = date.time();
*usec = time.msec() * 1000;
}
#endif
}

Expand Down Expand Up @@ -233,6 +238,7 @@ LoggerThread::LoggerThread(QString filename, bool progress, bool quiet,
"Logging thread registration/deregistration enabled!");
debugRegistration = true;
}
m_locallogs = (m_appname == MYTH_APPNAME_MYTHLOGSERVER);

moveToThread(qthread());
}
Expand All @@ -259,8 +265,7 @@ void LoggerThread::run(void)

LOG(VB_GENERAL, LOG_INFO, "Added logging to the console");

bool locallogs = (m_appname == MYTH_APPNAME_MYTHLOGSERVER);
if (locallogs)
if (m_locallogs)
m_zmqContext = logServerThread->getZMQContext();
else
{
Expand All @@ -273,34 +278,23 @@ void LoggerThread::run(void)
connect(m_zmqSocket, SIGNAL(messageReceived(const QList<QByteArray>&)),
SLOT(messageReceived(const QList<QByteArray>&)));

if (locallogs)
if (m_locallogs)
m_zmqSocket->connectTo("inproc://mylogs");
else
m_zmqSocket->connectTo("tcp://127.0.0.1:35327");

if (!locallogs)
if (!m_locallogs)
{
m_initialWaiting = true;
m_zmqSocket->sendMessage(QByteArray(""));
pingLogServer();

// wait up to 100ms for mythlogserver to respond
qApp->processEvents(QEventLoop::WaitForMoreEvents, 100);
if (m_initialWaiting && !locallogs)
if (m_initialWaiting)
{
// Got no response from mythlogserver, let's assume it's dead and
// start // it up
m_initialWaiting = false;
LOG(VB_GENERAL, LOG_INFO, "Starting mythlogserver");

MythSystemMask mask = MythSystemMask(kMSDontBlockInputDevs |
kMSDontDisableDrawing |
kMSRunBackground |
kMSRunShell);
QStringList args;
args << "--daemon";

MythSystem ms("mythlogserver", args, mask);
ms.Run();
launchLogServer();
}

LOG(VB_GENERAL, LOG_INFO,
Expand All @@ -309,12 +303,20 @@ void LoggerThread::run(void)
else
LOG(VB_GENERAL, LOG_INFO, "Added logging to mythlogserver locally");

loggingGetTimeStamp(&m_epoch, NULL);

QTimer *timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), this, SLOT(checkHeartBeat()));
timer->start(1000);

QMutexLocker qLock(&logQueueMutex);

while (!m_aborted || !logQueue.isEmpty())
{
qApp->processEvents();
qLock.unlock();
qApp->processEvents(QEventLoop::AllEvents, 10);

qLock.relock();
if (logQueue.isEmpty())
{
m_waitEmpty->wakeAll();
Expand All @@ -333,10 +335,13 @@ void LoggerThread::run(void)
qLock.relock();
}

timer->stop();
delete timer;

m_zmqSocket->setLinger(0);
m_zmqSocket->close();

if (!locallogs)
if (!m_locallogs)
delete m_zmqContext;

logThreadFinished = true;
Expand All @@ -347,6 +352,48 @@ void LoggerThread::run(void)
}


/// \brief Handles heartbeat checking once a second. If the server is not
/// heard from for at least 5s, restart it
void LoggerThread::checkHeartBeat(void)
{
qlonglong epoch;

loggingGetTimeStamp(&epoch, NULL);
qlonglong age = (epoch - m_epoch) % 30;

cout << "age " << age << endl;
if (age == 5)
{
launchLogServer();
}
}

/// \brief Send a ping to the log server
void LoggerThread::pingLogServer(void)
{
m_zmqSocket->sendMessage(QByteArray(""));
}

/// \brief Launches the logging server daemon
void LoggerThread::launchLogServer(void)
{
m_initialWaiting = false;
if (!m_locallogs)
{
LOG(VB_GENERAL, LOG_INFO, "Starting mythlogserver");

MythSystemMask mask = MythSystemMask(kMSDontBlockInputDevs |
kMSDontDisableDrawing |
kMSRunBackground |
kMSRunShell);
QStringList args;
args << "--daemon";

MythSystem ms("mythlogserver", args, mask);
ms.Run();
}
}

/// \brief Handles messages received back from mythlogserver via ZeroMQ.
/// This is particularly used to receive the acknowledgement of the
/// kInitializing message which contains the filename of the log to
Expand All @@ -359,7 +406,8 @@ void LoggerThread::run(void)
void LoggerThread::messageReceived(const QList<QByteArray> &msg)
{
m_initialWaiting = false;
m_zmqSocket->sendMessage(QByteArray(""));
loggingGetTimeStamp(&m_epoch, NULL);
pingLogServer();
}


Expand Down
6 changes: 6 additions & 0 deletions mythtv/libs/libmythbase/logging.h
Expand Up @@ -191,15 +191,21 @@ class LoggerThread : public QObject, public MThread
QString m_tablename; ///< Cached table name for db logging
int m_facility; ///< Cached syslog facility (or -1 to disable)
pid_t m_pid; ///< Cached pid value
bool m_locallogs; ///< Are we logging locally (i.e. this is the
/// mythlogserver itself)
qlonglong m_epoch; ///< Time last heard from the server (seconds)

nzmqt::ZMQContext *m_zmqContext; ///< ZeroMQ context to use
nzmqt::ZMQSocket *m_zmqSocket; ///< ZeroMQ socket to talk to
/// mythlogserver
protected:
bool logConsole(LoggingItem *item);
void launchLogServer(void);
void pingLogServer(void);

protected slots:
void messageReceived(const QList<QByteArray>&);
void checkHeartBeat(void);
};

#endif
Expand Down
33 changes: 18 additions & 15 deletions mythtv/libs/libmythbase/loggingserver.cpp
Expand Up @@ -66,7 +66,6 @@ typedef QList<LoggerBase *> LoggerList;
typedef struct {
LoggerList *list;
qlonglong epoch;
uint usec;
} LoggerListItem;
typedef QMap<QString, LoggerListItem *> ClientMap;

Expand Down Expand Up @@ -696,18 +695,28 @@ void LogServerThread::run(void)
RunEpilog();
}

/// \brief Sends a ping message to the given client
/// \param clientId The clientID for the logging client we wish to ping
void LogServerThread::pingClient(QString clientId)
{
LogMessage msg;
QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
msg << clientBa << QByteArray("");
m_zmqInSock->sendMessage(msg);
}


/// \brief Handles heartbeat checking once a second. If a client is not heard
/// from for at least 1 second, send it a heartbeat message which it
/// should send back. If we haven't heard from it in 5s, shut down its
/// logging.
void LogServerThread::checkHeartBeats(void)
{
qlonglong epoch;
uint usec;
ClientList toDel;

QMutexLocker lock(&logClientMapMutex);
loggingGetTimeStamp(&epoch, &usec);
loggingGetTimeStamp(&epoch, NULL);

ClientMap::iterator it = logClientMap.begin();
for( ; it != logClientMap.end(); ++it )
Expand All @@ -720,12 +729,9 @@ void LogServerThread::checkHeartBeats(void)
{
toDel.append(clientId);
}
else if (age > 1)
else
{
LogMessage msg;
QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
msg << clientBa << QByteArray("");
m_zmqInSock->sendMessage(msg);
pingClient(clientId);
}
}

Expand Down Expand Up @@ -798,14 +804,11 @@ void LogServerThread::forwardMessage(LogMessage *msg)
if (!logItem)
{
// Send an initial ping so the client knows we are in the house
LogMessage msg;
QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
msg << clientBa << QByteArray("");
m_zmqInSock->sendMessage(msg);
pingClient(clientId);
}
else
{
loggingGetTimeStamp(&logItem->epoch, &logItem->usec);
loggingGetTimeStamp(&logItem->epoch, NULL);
}
return;
}
Expand All @@ -819,7 +822,7 @@ void LogServerThread::forwardMessage(LogMessage *msg)

if (logItem)
{
loggingGetTimeStamp(&logItem->epoch, &logItem->usec);
loggingGetTimeStamp(&logItem->epoch, NULL);
}
else
{
Expand Down Expand Up @@ -929,7 +932,7 @@ void LogServerThread::forwardMessage(LogMessage *msg)
}

logItem = new LoggerListItem;
loggingGetTimeStamp(&logItem->epoch, &logItem->usec);
loggingGetTimeStamp(&logItem->epoch, NULL);
logItem->list = loggers;
logClientMap.insert(clientId, logItem);

Expand Down
1 change: 1 addition & 0 deletions mythtv/libs/libmythbase/loggingserver.h
Expand Up @@ -141,6 +141,7 @@ class LogServerThread : public QObject, public MThread
nzmqt::ZMQSocket *m_zmqPubSock; ///< ZeroMQ publishing socket

void forwardMessage(LogMessage *msg);
void pingClient(QString clientId);
protected slots:
void receivedMessage(const QList<QByteArray>&);
void checkHeartBeats(void);
Expand Down

0 comments on commit 4e54851

Please sign in to comment.