diff --git a/mythtv/libs/libmythbase/logging.cpp b/mythtv/libs/libmythbase/logging.cpp index 5925cb6930c..8a68c29b011 100644 --- a/mythtv/libs/libmythbase/logging.cpp +++ b/mythtv/libs/libmythbase/logging.cpp @@ -112,6 +112,22 @@ void loglevelAdd(int value, QString name, char shortname); void verboseInit(void); void verboseHelp(void); +void loggingGetTimeStamp(qlonglong *epoch, uint *usec) +{ +#if HAVE_GETTIMEOFDAY + struct timeval tv; + gettimeofday(&tv, NULL); + *epoch = tv.tv_sec; + *usec = tv.tv_usec; +#else + /* Stupid system has no gettimeofday, use less precise QDateTime */ + QDateTime date = QDateTime::currentDateTime(); + QTime time = date.time(); + *epoch = date.toTime_t(); + *usec = time.msec() * 1000; +#endif +} + LoggingItem::LoggingItem() { } @@ -122,18 +138,7 @@ LoggingItem::LoggingItem(const char *_file, const char *_function, m_line(_line), m_type(_type), m_level(_level), m_file(_file), m_function(_function), m_threadName(NULL) { -#if HAVE_GETTIMEOFDAY - struct timeval tv; - gettimeofday(&tv, NULL); - m_epoch = tv.tv_sec; - m_usec = tv.tv_usec; -#else - /* Stupid system has no gettimeofday, use less precise QDateTime */ - QDateTime date = QDateTime::currentDateTime(); - QTime time = date.time(); - m_epoch = date.toTime_t(); - m_usec = time.msec() * 1000; -#endif + loggingGetTimeStamp(&m_epoch, &m_usec); m_message[0]='\0'; m_message[LOGLINE_MAX]='\0'; @@ -276,15 +281,7 @@ void LoggerThread::run(void) if (!locallogs) { m_initialWaiting = true; - LoggingItem *item = LoggingItem::create(__FILE__, __FUNCTION__, - __LINE__, (LogLevel_t)LOG_DEBUG, - kInitializing); - if (item) - { - fillItem(item); - m_zmqSocket->sendMessage(item->toByteArray()); - item->deleteItem(); - } + m_zmqSocket->sendMessage(QByteArray("")); msleep(100); // wait up to 100ms for mythlogserver to respond if (m_initialWaiting && !locallogs) @@ -352,15 +349,14 @@ void LoggerThread::run(void) /// kInitializing message which contains the filename of the log to /// create and whether to log to db and syslog. If this is not /// received during startup, it is assumed that mythlogserver needs -/// to be started. +/// to be started. Also, the server will hit us with an empty message +/// when it hasn't heard from us within a second. After no responses +/// from us for 5s, the logs will be closed. /// \param msg The message received (can be multi-part) void LoggerThread::messageReceived(const QList &msg) { - if (m_initialWaiting) - { - m_initialWaiting = false; - return; - } + m_initialWaiting = false; + m_zmqSocket->sendMessage(QByteArray("")); } diff --git a/mythtv/libs/libmythbase/logging.h b/mythtv/libs/libmythbase/logging.h index 6f27361687e..0303f06b289 100644 --- a/mythtv/libs/libmythbase/logging.h +++ b/mythtv/libs/libmythbase/logging.h @@ -24,6 +24,7 @@ class LoggingItem; void loggingRegisterThread(const QString &name); void loggingDeregisterThread(void); +void loggingGetTimeStamp(qlonglong *epoch, uint *usec); class QWaitCondition; diff --git a/mythtv/libs/libmythbase/loggingserver.cpp b/mythtv/libs/libmythbase/loggingserver.cpp index 25a9118df39..666415c5165 100644 --- a/mythtv/libs/libmythbase/loggingserver.cpp +++ b/mythtv/libs/libmythbase/loggingserver.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include using namespace std; @@ -61,7 +62,13 @@ static QWaitCondition logThreadStarted; static bool logThreadFinished = false; typedef QList LoggerList; -typedef QMap ClientMap; + +typedef struct { + LoggerList *list; + qlonglong epoch; + uint usec; +} LoggerListItem; +typedef QMap ClientMap; typedef QList ClientList; typedef QMap RevClientMap; @@ -74,6 +81,7 @@ static RevClientMap logRevClientMap; static QMutex logMsgListMutex; static LogMessageList logMsgList; +static QWaitCondition logMsgListNotEmpty; #define TIMESTAMP_MAX 30 #define MAX_STRING_LENGTH (LOGLINE_MAX+120) @@ -135,7 +143,9 @@ FileLogger::~FileLogger() .arg(m_handle)); } + m_zmqSock->unsubscribeFrom(QByteArray("")); m_zmqSock->setLinger(0); + m_zmqSock->disconnect(this); m_zmqSock->close(); } @@ -236,7 +246,9 @@ SyslogLogger::~SyslogLogger() LOG(VB_GENERAL, LOG_INFO, "Removing syslogging"); closelog(); + m_zmqSock->unsubscribeFrom(QByteArray("")); m_zmqSock->setLinger(0); + m_zmqSock->disconnect(this); m_zmqSock->close(); } @@ -310,7 +322,9 @@ DatabaseLogger::~DatabaseLogger() stopDatabaseAccess(); + m_zmqSock->unsubscribeFrom(QByteArray("")); m_zmqSock->setLinger(0); + m_zmqSock->disconnect(this); m_zmqSock->close(); } @@ -607,6 +621,8 @@ void LogServerThread::run(void) logThreadFinished = false; QMutexLocker locker(&logThreadStartedMutex); + qRegisterMetaType >("QList"); + m_zmqContext = nzmqt::createDefaultContext(NULL); nzmqt::PollingZMQContext *ctx = static_cast (m_zmqContext); @@ -623,13 +639,23 @@ void LogServerThread::run(void) logThreadStarted.wakeAll(); locker.unlock(); + + QTimer *timer = new QTimer(this); + connect(timer, SIGNAL(timeout()), this, SLOT(checkHeartBeats())); + timer->start(1000); while (!m_aborted) { - qApp->processEvents(); + qApp->processEvents(QEventLoop::WaitForMoreEvents, 10); { QMutexLocker lock(&logMsgListMutex); + if (logMsgList.isEmpty() && + !logMsgListNotEmpty.wait(lock.mutex(), 90)) + { + continue; + } + while (!logMsgList.isEmpty()) { LogMessage *msg = logMsgList.takeFirst(); @@ -637,14 +663,13 @@ void LogServerThread::run(void) delete msg; } } - - msleep(100); - - // handle heartbeat... } logThreadFinished = true; + timer->stop(); + delete timer; + m_zmqPubSock->setLinger(0); m_zmqPubSock->close(); m_zmqInSock->setLinger(0); @@ -668,6 +693,68 @@ void LogServerThread::run(void) RunEpilog(); } +/// \brief Handles heartbeat checking once a second. If a client is not heard +/// from for at least 1 second, send it a heartbeat message which it +/// should send back. If we haven't heard from it in 5s, shut down its +/// logging. +void LogServerThread::checkHeartBeats(void) +{ + qlonglong epoch; + uint usec; + ClientList toDel; + + QMutexLocker lock(&logClientMapMutex); + loggingGetTimeStamp(&epoch, &usec); + + ClientMap::iterator it = logClientMap.begin(); + for( ; it != logClientMap.end(); ++it ) + { + QString clientId = it.key(); + LoggerListItem *logItem = it.value(); + qlonglong age = epoch - logItem->epoch; + + if (age > 5) + { + toDel.append(clientId); + } + else if (age > 1) + { + LogMessage msg; + QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit()); + msg << clientBa << QByteArray(""); + m_zmqInSock->sendMessage(msg); + } + } + + QMutexLocker lock2(&logRevClientMapMutex); + while (!toDel.isEmpty()) + { + QString clientId = toDel.takeFirst(); + LoggerListItem *item = logClientMap.take(clientId); + LoggerList *list = item->list; + delete item; + + while (!list->isEmpty()) + { + LoggerBase *logger = list->takeFirst(); + ClientList *clientList = logRevClientMap.value(logger, NULL); + if (!clientList || clientList->size() == 1) + { + if (clientList) + { + logRevClientMap.remove(logger); + delete clientList; + } + delete logger; + continue; + } + + clientList->removeAll(clientId); + } + delete list; + } +} + /// \brief Handles messages received from logging clients /// \param msg The message received (can be multi-part) void LogServerThread::receivedMessage(const QList &msg) @@ -676,6 +763,7 @@ void LogServerThread::receivedMessage(const QList &msg) QMutexLocker lock(&logMsgListMutex); logMsgList.append(message); + logMsgListNotEmpty.wakeAll(); } void LogServerThread::forwardMessage(LogMessage *msg) @@ -697,13 +785,39 @@ void LogServerThread::forwardMessage(LogMessage *msg) QString clientId = QString(clientBa.toHex()); QByteArray json = msg->at(1); + + if (json.size() == 0) + { + // This is either a ping response or a first gasp + QMutexLocker lock(&logClientMapMutex); + LoggerListItem *logItem = logClientMap.value(clientId, NULL); + if (!logItem) + { + // Send an initial ping so the client knows we are in the house + LogMessage msg; + QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit()); + msg << clientBa << QByteArray(""); + m_zmqInSock->sendMessage(msg); + } + else + { + loggingGetTimeStamp(&logItem->epoch, &logItem->usec); + } + return; + } + LoggingItem *item = LoggingItem::create(json); try { QMutexLocker lock(&logClientMapMutex); + LoggerListItem *logItem = logClientMap.value(clientId, NULL); - if (!logClientMap.contains(clientId)) + if (logItem) + { + loggingGetTimeStamp(&logItem->epoch, &logItem->usec); + } + else { LOG(VB_GENERAL, LOG_INFO, QString("New Client: %1").arg(clientId)); QMutexLocker lock2(&loggerMapMutex); @@ -810,7 +924,10 @@ void LogServerThread::forwardMessage(LogMessage *msg) loggers->insert(0, logger); } - logClientMap.insert(clientId, loggers); + logItem = new LoggerListItem; + loggingGetTimeStamp(&logItem->epoch, &logItem->usec); + logItem->list = loggers; + logClientMap.insert(clientId, logItem); msleep(10); } diff --git a/mythtv/libs/libmythbase/loggingserver.h b/mythtv/libs/libmythbase/loggingserver.h index ea0e4e7f689..56d5a9a638f 100644 --- a/mythtv/libs/libmythbase/loggingserver.h +++ b/mythtv/libs/libmythbase/loggingserver.h @@ -143,6 +143,7 @@ class LogServerThread : public QObject, public MThread void forwardMessage(LogMessage *msg); protected slots: void receivedMessage(const QList&); + void checkHeartBeats(void); }; class QWaitCondition;