Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Split the logserver into two threads

It seems under stress, the heartbeats get lost, causing all of the clients to
reconnect.  To combat this, I have moved the reception into its own thread,
and the logging into another.  In doing this, I have created two separate
ZeroMQ contexts as well, and changed the polling interval for the TCP reception
to 1ms rather than the default 10ms.
  • Loading branch information...
commit edce592897ea4dbb300404be5ab4314010436b01 1 parent 2cc4bdc
@Beirdo Beirdo authored
View
26 mythtv/libs/libmythbase/logging.cpp
@@ -293,17 +293,8 @@ void LoggerThread::run(void)
m_initialWaiting = true;
pingLogServer();
- // wait up to 100ms for mythlogserver to respond
- qApp->processEvents(QEventLoop::WaitForMoreEvents, 100);
- if (m_initialWaiting)
- {
- // Got no response from mythlogserver, let's assume it's dead and
- // start // it up
- launchLogServer();
- }
-
- LOG(VB_GENERAL, LOG_INFO,
- "Added logging to mythlogserver at TCP:35327");
+ // wait up to 150ms for mythlogserver to respond
+ QTimer::singleShot(150, this, SLOT(initialTimeout()));
}
else
LOG(VB_GENERAL, LOG_INFO, "Added logging to mythlogserver locally");
@@ -356,6 +347,19 @@ void LoggerThread::run(void)
RunEpilog();
}
+/// \brief Handles the initial startup timeout when waiting for the log server
+/// to show signs of life
+void LoggerThread::initialTimeout(void)
+{
+ if (m_initialWaiting)
+ {
+ // Got no response from mythlogserver, let's assume it's dead and
+ // start it up
+ launchLogServer();
+ }
+
+ LOG(VB_GENERAL, LOG_INFO, "Added logging to mythlogserver at TCP:35327");
+}
/// \brief Handles heartbeat checking once a second. If the server is not
/// heard from for at least 5s, restart it
View
4 mythtv/libs/libmythbase/logging.h
@@ -182,7 +182,7 @@ class LoggerThread : public QObject, public MThread
/// Protected by logQueueMutex
bool m_aborted; ///< Flag to abort the thread.
/// Protected by logQueueMutex
- bool m_initialWaiting; ///< Waiting for the initial response from
+ volatile bool m_initialWaiting; ///< Waiting for the initial response from
/// mythlogserver
QString m_filename; ///< Filename of debug logfile
bool m_progress; ///< show only LOG_ERR and more important (console only)
@@ -198,6 +198,7 @@ class LoggerThread : public QObject, public MThread
nzmqt::ZMQContext *m_zmqContext; ///< ZeroMQ context to use
nzmqt::ZMQSocket *m_zmqSocket; ///< ZeroMQ socket to talk to
/// mythlogserver
+
protected:
bool logConsole(LoggingItem *item);
void launchLogServer(void);
@@ -206,6 +207,7 @@ class LoggerThread : public QObject, public MThread
protected slots:
void messageReceived(const QList<QByteArray>&);
void checkHeartBeat(void);
+ void initialTimeout(void);
};
#endif
View
576 mythtv/libs/libmythbase/loggingserver.cpp
@@ -58,6 +58,8 @@ static QMutex loggerMapMutex;
static QMap<QString, LoggerBase *> loggerMap;
LogServerThread *logServerThread = NULL;
+LogForwardThread *logForwardThread = NULL;
+
static QMutex logThreadStartedMutex;
static QWaitCondition logThreadStarted;
static bool logThreadFinished = false;
@@ -85,6 +87,9 @@ static QMutex logMsgListMutex;
static LogMessageList logMsgList;
static QWaitCondition logMsgListNotEmpty;
+static ClientList logClientToDel;
+static QMutex logClientToDelMutex;
+
#define TIMESTAMP_MAX 30
#define MAX_STRING_LENGTH (LOGLINE_MAX+120)
@@ -150,7 +155,7 @@ FileLogger::~FileLogger()
m_zmqSock->setLinger(0);
m_zmqSock->disconnect(this);
m_zmqSock->close();
- delete m_zmqSock;
+ m_zmqSock->deleteLater();
}
/// \brief Reopen the logfile after a SIGHUP. Log files only (no console).
@@ -224,10 +229,10 @@ bool FileLogger::logmsg(LoggingItem *item)
void FileLogger::setupZMQSocket(void)
{
- nzmqt::ZMQContext *ctx = logServerThread->getZMQContext();
- m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB);
+ nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
+ m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- SLOT(receivedMessage(const QList<QByteArray>&)));
+ this, SLOT(receivedMessage(const QList<QByteArray>&)));
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
@@ -254,7 +259,7 @@ SyslogLogger::~SyslogLogger()
m_zmqSock->setLinger(0);
m_zmqSock->disconnect(this);
m_zmqSock->close();
- delete m_zmqSock;
+ m_zmqSock->deleteLater();
}
@@ -286,10 +291,10 @@ bool SyslogLogger::logmsg(LoggingItem *item)
void SyslogLogger::setupZMQSocket(void)
{
- nzmqt::ZMQContext *ctx = logServerThread->getZMQContext();
- m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB);
+ nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
+ m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- SLOT(receivedMessage(const QList<QByteArray>&)));
+ this, SLOT(receivedMessage(const QList<QByteArray>&)));
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
@@ -331,7 +336,7 @@ DatabaseLogger::~DatabaseLogger()
m_zmqSock->setLinger(0);
m_zmqSock->disconnect(this);
m_zmqSock->close();
- delete m_zmqSock;
+ m_zmqSock->deleteLater();
}
/// \brief Stop logging to the database and wait for the thread to stop.
@@ -387,10 +392,10 @@ bool DatabaseLogger::logmsg(LoggingItem *item)
void DatabaseLogger::setupZMQSocket(void)
{
- nzmqt::ZMQContext *ctx = logServerThread->getZMQContext();
- m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB);
+ nzmqt::ZMQContext *ctx = logForwardThread->getZMQContext();
+ m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- SLOT(receivedMessage(const QList<QByteArray>&)));
+ this, SLOT(receivedMessage(const QList<QByteArray>&)));
m_zmqSock->subscribeTo(QByteArray(""));
m_zmqSock->connectTo("inproc://loggers");
}
@@ -618,9 +623,8 @@ void logSighup(int signum)
/// \brief LogServerThread constructor.
LogServerThread::LogServerThread() :
- MThread("LogServer"), m_aborted(false), m_zmqContext(NULL),
- m_zmqInSock(NULL), m_zmqPubSock(NULL), m_heartbeatTimer(NULL),
- m_shutdownTimer(NULL), m_sighupNotifier(NULL)
+ MThread("LogServer"), m_zmqContext(NULL), m_zmqInSock(NULL),
+ m_heartbeatTimer(NULL)
{
moveToThread(qthread());
}
@@ -642,6 +646,263 @@ void LogServerThread::run(void)
logThreadFinished = false;
QMutexLocker locker(&logThreadStartedMutex);
+ qRegisterMetaType<QList<QByteArray> >("QList<QByteArray>");
+
+ m_zmqContext = nzmqt::createDefaultContext(NULL);
+ nzmqt::PollingZMQContext *ctx = static_cast<nzmqt::PollingZMQContext *>
+ (m_zmqContext);
+ ctx->setInterval(1);
+ ctx->start();
+
+ m_zmqInSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_ROUTER);
+ connect(m_zmqInSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
+ this, SLOT(receivedMessage(const QList<QByteArray>&)));
+ m_zmqInSock->bindTo("tcp://127.0.0.1:35327");
+ m_zmqInSock->bindTo("inproc://mylogs");
+
+ logForwardThread = new LogForwardThread();
+ logForwardThread->start();
+
+ connect(logForwardThread, SIGNAL(pingClient(QString)),
+ this, SLOT(pingClient(QString)));
+
+ // cerr << "wake all" << endl;
+ locker.unlock();
+ logThreadStarted.wakeAll();
+ // cerr << "unlock" << endl;
+
+ m_heartbeatTimer = new QTimer(this);
+ connect(m_heartbeatTimer, SIGNAL(timeout()), this, SLOT(checkHeartBeats()));
+ m_heartbeatTimer->start(1000);
+
+ exec();
+
+ logThreadFinished = true;
+
+ m_heartbeatTimer->stop();
+ delete m_heartbeatTimer;
+
+ m_zmqInSock->setLinger(0);
+ m_zmqInSock->close();
+
+ if (logForwardThread)
+ logForwardThread->stop();
+ delete logForwardThread;
+ logForwardThread = NULL;
+
+ delete m_zmqContext;
+
+ RunEpilog();
+}
+
+/// \brief Sends a ping message to the given client
+/// \param clientId The clientID for the logging client we wish to ping
+void LogServerThread::pingClient(QString clientId)
+{
+ LogMessage msg;
+ QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
+ msg << clientBa << QByteArray("");
+ m_zmqInSock->sendMessage(msg);
+}
+
+
+/// \brief Handles heartbeat checking once a second. If a client is not heard
+/// from for at least 1 second, send it a heartbeat message which it
+/// should send back. If we haven't heard from it in 5s, shut down its
+/// logging.
+void LogServerThread::checkHeartBeats(void)
+{
+ qlonglong epoch;
+
+ QMutexLocker lock(&logClientMapMutex);
+ QMutexLocker lock2(&logClientToDelMutex);
+ loggingGetTimeStamp(&epoch, NULL);
+
+ ClientMap::iterator it = logClientMap.begin();
+ for( ; it != logClientMap.end(); ++it )
+ {
+ QString clientId = it.key();
+ LoggerListItem *logItem = it.value();
+ qlonglong age = epoch - logItem->epoch;
+
+ if (age > 5)
+ {
+ logClientToDel.append(clientId);
+ }
+ else
+ {
+ pingClient(clientId);
+ }
+ }
+}
+
+/// \brief Handles messages received from logging clients
+/// \param msg The message received (can be multi-part)
+void LogServerThread::receivedMessage(const QList<QByteArray> &msg)
+{
+ LogMessage *message = new LogMessage(msg);
+ QMutexLocker lock(&logMsgListMutex);
+
+ bool wasEmpty = logMsgList.isEmpty();
+ logMsgList.append(message);
+
+ if (wasEmpty)
+ logMsgListNotEmpty.wakeAll();
+}
+
+/// \brief Stop the thread
+void LogServerThread::stop(void)
+{
+ quit();
+}
+
+static QAtomicInt item_count;
+static QAtomicInt malloc_count;
+
+#define DEBUG_MEMORY 0
+#if DEBUG_MEMORY
+static int max_count = 0;
+static QTime memory_time;
+#endif
+
+
+
+/// \brief Entry point to start logging for the application. This will
+/// start up all of the threads needed.
+/// \param logfile Filename of the logfile to create. Empty if no file.
+/// \param progress non-zero if progress output will be sent to the console.
+/// This squelches all messages less important than LOG_ERR
+/// on the console
+/// \param quiet quiet level requested (squelches all console output)
+/// \param facility Syslog facility to use. -1 to disable syslog output
+/// \param level Minimum logging level to put into the logs
+/// \param dblog true if database logging is requested
+/// \param propagate true if the logfile path needs to be propagated to child
+/// processes.
+void logServerStart(void)
+{
+ if (logServerThread && logServerThread->isRunning())
+ return;
+
+ if (!logServerThread)
+ logServerThread = new LogServerThread();
+
+ // cerr << "starting server" << endl;
+ QMutexLocker locker(&logThreadStartedMutex);
+ logServerThread->start();
+ logThreadStarted.wait(locker.mutex());
+ locker.unlock();
+ // cerr << "done starting server" << endl;
+}
+
+/// \brief Entry point for stopping logging for an application
+void logServerStop(void)
+{
+ if (logServerThread)
+ {
+ logServerThread->stop();
+ logServerThread->wait();
+ }
+
+ QMutexLocker locker(&loggerMapMutex);
+ QMap<QString, LoggerBase *>::iterator it;
+ for (it = loggerMap.begin(); it != loggerMap.end(); ++it)
+ {
+ it.value()->stopDatabaseAccess();
+ }
+}
+
+void logServerWait(void)
+{
+ // cerr << "waiting" << endl;
+ QMutexLocker locker(&logThreadStartedMutex);
+ logThreadStarted.wait(locker.mutex());
+ locker.unlock();
+ // cerr << "done waiting" << endl;
+}
+
+void FileLogger::receivedMessage(const QList<QByteArray> &msg)
+{
+ // Filter on the clientId
+ QByteArray clientBa = msg.first();
+ QString clientId = QString(clientBa.toHex());
+
+ {
+ QMutexLocker locker(&logRevClientMapMutex);
+
+ ClientList *clients = logRevClientMap.value(this, NULL);
+ if (!clients || !clients->contains(clientId))
+ return;
+ }
+
+ QByteArray json = msg.at(1);
+ LoggingItem *item = LoggingItem::create(json);
+ logmsg(item);
+ item->deleteItem();
+}
+
+void SyslogLogger::receivedMessage(const QList<QByteArray> &msg)
+{
+ // Filter on the clientId
+ QByteArray clientBa = msg.first();
+ QString clientId = QString(clientBa.toHex());
+
+ {
+ QMutexLocker locker(&logRevClientMapMutex);
+
+ ClientList *clients = logRevClientMap.value(this, NULL);
+ if (!clients || !clients->contains(clientId))
+ return;
+ }
+
+ QByteArray json = msg.at(1);
+ LoggingItem *item = LoggingItem::create(json);
+ logmsg(item);
+ item->deleteItem();
+}
+
+void DatabaseLogger::receivedMessage(const QList<QByteArray> &msg)
+{
+ // Filter on the clientId
+ QByteArray clientBa = msg.first();
+ QString clientId = QString(clientBa.toHex());
+
+ {
+ QMutexLocker locker(&logRevClientMapMutex);
+
+ ClientList *clients = logRevClientMap.value(this, NULL);
+ if (!clients || !clients->contains(clientId))
+ return;
+ }
+
+ QByteArray json = msg.at(1);
+ LoggingItem *item = LoggingItem::create(json);
+ logmsg(item);
+}
+
+
+/// \brief LogForwardThread constructor.
+LogForwardThread::LogForwardThread() :
+ MThread("LogForward"), m_aborted(false), m_zmqContext(NULL),
+ m_zmqPubSock(NULL), m_sighupNotifier(NULL), m_shutdownTimer(NULL)
+{
+ moveToThread(qthread());
+}
+
+/// \brief LogForwardThread destructor.
+LogForwardThread::~LogForwardThread()
+{
+ stop();
+ wait();
+}
+
+/// \brief Run the log forwarding thread. This thread reads from an internal
+/// queue and handles distributing the LoggingItems to each logger
+/// instance vi ZeroMQ (inproc).
+void LogForwardThread::run(void)
+{
+ RunProlog();
+
#ifndef _WIN32
if (::socketpair(AF_UNIX, SOCK_STREAM, 0, sighupFd))
{
@@ -669,24 +930,9 @@ void LogServerThread::run(void)
(m_zmqContext);
ctx->start();
- m_zmqInSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_ROUTER);
- connect(m_zmqInSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
- SLOT(receivedMessage(const QList<QByteArray>&)));
- m_zmqInSock->bindTo("tcp://127.0.0.1:35327");
- m_zmqInSock->bindTo("inproc://mylogs");
-
- m_zmqPubSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_PUB);
+ m_zmqPubSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_PUB, this);
m_zmqPubSock->bindTo("inproc://loggers");
- // cerr << "wake all" << endl;
- locker.unlock();
- logThreadStarted.wakeAll();
- // cerr << "unlock" << endl;
-
- m_heartbeatTimer = new QTimer(this);
- connect(m_heartbeatTimer, SIGNAL(timeout()), this, SLOT(checkHeartBeats()));
- m_heartbeatTimer->start(1000);
-
m_shutdownTimer = new QTimer(this);
m_shutdownTimer->setSingleShot(true);
connect(m_shutdownTimer, SIGNAL(timeout()),
@@ -696,7 +942,7 @@ void LogServerThread::run(void)
while (!m_aborted)
{
- qApp->processEvents(QEventLoop::WaitForMoreEvents, 10);
+ qApp->processEvents(QEventLoop::AllEvents, 10);
{
QMutexLocker lock(&logMsgListMutex);
@@ -706,16 +952,20 @@ void LogServerThread::run(void)
continue;
}
- while (!logMsgList.isEmpty())
+ int processed = 0;
+ while (!logMsgList.isEmpty() && processed < 100)
{
+ processed++;
LogMessage *msg = logMsgList.takeFirst();
+ lock.unlock();
forwardMessage(msg);
delete msg;
+ lock.relock();
}
}
- }
- logThreadFinished = true;
+ expireClients();
+ }
delete m_sighupNotifier;
@@ -730,8 +980,8 @@ void LogServerThread::run(void)
sigaction( SIGHUP, &sa, NULL );
#endif
- m_heartbeatTimer->stop();
- delete m_heartbeatTimer;
+ m_zmqPubSock->setLinger(0);
+ m_zmqPubSock->close();
if (m_shutdownTimer)
{
@@ -741,11 +991,6 @@ void LogServerThread::run(void)
m_shutdownTimer = NULL;
}
- m_zmqPubSock->setLinger(0);
- m_zmqPubSock->close();
- m_zmqInSock->setLinger(0);
- m_zmqInSock->close();
-
LoggerList loggers;
{
@@ -764,44 +1009,10 @@ void LogServerThread::run(void)
RunEpilog();
}
-/// \brief Sends a ping message to the given client
-/// \param clientId The clientID for the logging client we wish to ping
-void LogServerThread::pingClient(QString clientId)
-{
- LogMessage msg;
- QByteArray clientBa = QByteArray::fromHex(clientId.toLocal8Bit());
- msg << clientBa << QByteArray("");
- m_zmqInSock->sendMessage(msg);
-}
-
-/// \brief SIGHUP handler - reopen all open logfiles for logrollers
-void LogServerThread::handleSigHup(void)
-{
-#ifndef _WIN32
- m_sighupNotifier->setEnabled(false);
-
- char tmp;
- int ret = ::read(sighupFd[1], &tmp, sizeof(tmp));
- (void)ret;
-
- LOG(VB_GENERAL, LOG_INFO, "SIGHUP received, rolling log files.");
-
- /* SIGHUP was sent. Close and reopen debug logfiles */
- QMutexLocker locker(&loggerMapMutex);
- QMap<QString, LoggerBase *>::iterator it;
- for (it = loggerMap.begin(); it != loggerMap.end(); ++it)
- {
- it.value()->reopen();
- }
-
- m_sighupNotifier->setEnabled(true);
-#endif
-}
-
/// \brief Fires off when no clients are left (other than the current daemon)
/// for 5 minutes.
-void LogServerThread::shutdownTimerExpired(void)
+void LogForwardThread::shutdownTimerExpired(void)
{
m_shutdownTimer->stop();
delete m_shutdownTimer;
@@ -812,39 +1023,16 @@ void LogServerThread::shutdownTimerExpired(void)
qApp->quit();
}
-/// \brief Handles heartbeat checking once a second. If a client is not heard
-/// from for at least 1 second, send it a heartbeat message which it
-/// should send back. If we haven't heard from it in 5s, shut down its
-/// logging.
-void LogServerThread::checkHeartBeats(void)
+/// \brief Expire any clients in the delete list
+void LogForwardThread::expireClients(void)
{
- qlonglong epoch;
- ClientList toDel;
-
QMutexLocker lock(&logClientMapMutex);
- loggingGetTimeStamp(&epoch, NULL);
-
- ClientMap::iterator it = logClientMap.begin();
- for( ; it != logClientMap.end(); ++it )
- {
- QString clientId = it.key();
- LoggerListItem *logItem = it.value();
- qlonglong age = epoch - logItem->epoch;
-
- if (age > 5)
- {
- toDel.append(clientId);
- }
- else
- {
- pingClient(clientId);
- }
- }
-
QMutexLocker lock2(&logRevClientMapMutex);
- while (!toDel.isEmpty())
+ QMutexLocker lock3(&logClientToDelMutex);
+
+ while (!logClientToDel.isEmpty())
{
- QString clientId = toDel.takeFirst();
+ QString clientId = logClientToDel.takeFirst();
logClientCount.deref();
LOG(VB_GENERAL, LOG_INFO, QString("Expiring client %1 (#%2)")
.arg(clientId).arg(logClientCount));
@@ -880,18 +1068,31 @@ void LogServerThread::checkHeartBeats(void)
}
}
-/// \brief Handles messages received from logging clients
-/// \param msg The message received (can be multi-part)
-void LogServerThread::receivedMessage(const QList<QByteArray> &msg)
+/// \brief SIGHUP handler - reopen all open logfiles for logrollers
+void LogForwardThread::handleSigHup(void)
{
- LogMessage *message = new LogMessage(msg);
- QMutexLocker lock(&logMsgListMutex);
+#ifndef _WIN32
+ m_sighupNotifier->setEnabled(false);
- logMsgList.append(message);
- logMsgListNotEmpty.wakeAll();
+ char tmp;
+ int ret = ::read(sighupFd[1], &tmp, sizeof(tmp));
+ (void)ret;
+
+ LOG(VB_GENERAL, LOG_INFO, "SIGHUP received, rolling log files.");
+
+ /* SIGHUP was sent. Close and reopen debug logfiles */
+ QMutexLocker locker(&loggerMapMutex);
+ QMap<QString, LoggerBase *>::iterator it;
+ for (it = loggerMap.begin(); it != loggerMap.end(); ++it)
+ {
+ it.value()->reopen();
+ }
+
+ m_sighupNotifier->setEnabled(true);
+#endif
}
-void LogServerThread::forwardMessage(LogMessage *msg)
+void LogForwardThread::forwardMessage(LogMessage *msg)
{
#ifdef DUMP_PACKET
QList<QByteArray>::const_iterator it = msg->begin();
@@ -919,7 +1120,7 @@ void LogServerThread::forwardMessage(LogMessage *msg)
if (!logItem)
{
// Send an initial ping so the client knows we are in the house
- pingClient(clientId);
+ emit pingClient(clientId);
}
else
{
@@ -928,8 +1129,6 @@ void LogServerThread::forwardMessage(LogMessage *msg)
return;
}
- LoggingItem *item = LoggingItem::create(json);
-
try
{
QMutexLocker lock(&logClientMapMutex);
@@ -941,6 +1140,8 @@ void LogServerThread::forwardMessage(LogMessage *msg)
}
else
{
+ LoggingItem *item = LoggingItem::create(json);
+
logClientCount.ref();
LOG(VB_GENERAL, LOG_INFO, QString("New Client: %1 (#%2)")
.arg(clientId).arg(logClientCount));
@@ -972,7 +1173,7 @@ void LogServerThread::forwardMessage(LogMessage *msg)
lock2.unlock();
// inserts into loggerMap
logger = new FileLogger(logfile.toLocal8Bit().constData());
- logger->moveToThread(logServerThread->qthread());
+ //logger->moveToThread(logForwardThread->qthread());
lock2.relock();
logger->setupZMQSocket();
@@ -1003,7 +1204,7 @@ void LogServerThread::forwardMessage(LogMessage *msg)
// Need to add a new SyslogLogger
lock2.unlock();
logger = new SyslogLogger; // inserts into loggerMap
- logger->moveToThread(logServerThread->qthread());
+ //logger->moveToThread(logForwardThread->qthread());
lock2.relock();
logger->setupZMQSocket();
@@ -1036,7 +1237,7 @@ void LogServerThread::forwardMessage(LogMessage *msg)
// inserts into loggerMap
logger =
new DatabaseLogger(table.toLocal8Bit().constData());
- logger->moveToThread(logServerThread->qthread());
+ //logger->moveToThread(logForwardThread->qthread());
lock2.relock();
logger->setupZMQSocket();
@@ -1061,7 +1262,7 @@ void LogServerThread::forwardMessage(LogMessage *msg)
logItem->list = loggers;
logClientMap.insert(clientId, logItem);
- msleep(10);
+ item->deleteItem();
}
}
catch (...)
@@ -1069,145 +1270,16 @@ void LogServerThread::forwardMessage(LogMessage *msg)
cout << "Exception occurred" << endl;
}
- item->deleteItem();
m_zmqPubSock->sendMessage(*msg);
}
-
-
-/// \brief Stop the thread by setting the abort flag after waiting a second for
-/// the queue to be flushed.
-void LogServerThread::stop(void)
+/// \brief Stop the thread by setting the abort flag
+void LogForwardThread::stop(void)
{
m_aborted = true;
}
-static QAtomicInt item_count;
-static QAtomicInt malloc_count;
-
-#define DEBUG_MEMORY 0
-#if DEBUG_MEMORY
-static int max_count = 0;
-static QTime memory_time;
-#endif
-
-
-
-/// \brief Entry point to start logging for the application. This will
-/// start up all of the threads needed.
-/// \param logfile Filename of the logfile to create. Empty if no file.
-/// \param progress non-zero if progress output will be sent to the console.
-/// This squelches all messages less important than LOG_ERR
-/// on the console
-/// \param quiet quiet level requested (squelches all console output)
-/// \param facility Syslog facility to use. -1 to disable syslog output
-/// \param level Minimum logging level to put into the logs
-/// \param dblog true if database logging is requested
-/// \param propagate true if the logfile path needs to be propagated to child
-/// processes.
-void logServerStart(void)
-{
- if (logServerThread && logServerThread->isRunning())
- return;
-
- if (!logServerThread)
- logServerThread = new LogServerThread();
-
- // cerr << "starting server" << endl;
- QMutexLocker locker(&logThreadStartedMutex);
- logServerThread->start();
- logThreadStarted.wait(locker.mutex());
- locker.unlock();
- // cerr << "done starting server" << endl;
-}
-
-/// \brief Entry point for stopping logging for an application
-void logServerStop(void)
-{
- if (logServerThread)
- {
- logServerThread->stop();
- logServerThread->wait();
- }
-
- QMutexLocker locker(&loggerMapMutex);
- QMap<QString, LoggerBase *>::iterator it;
- for (it = loggerMap.begin(); it != loggerMap.end(); ++it)
- {
- it.value()->stopDatabaseAccess();
- }
-}
-
-void logServerWait(void)
-{
- // cerr << "waiting" << endl;
- QMutexLocker locker(&logThreadStartedMutex);
- logThreadStarted.wait(locker.mutex());
- locker.unlock();
- // cerr << "done waiting" << endl;
-}
-
-void FileLogger::receivedMessage(const QList<QByteArray> &msg)
-{
- // Filter on the clientId
- QByteArray clientBa = msg.first();
- QString clientId = QString(clientBa.toHex());
-
- {
- QMutexLocker locker(&logRevClientMapMutex);
-
- ClientList *clients = logRevClientMap.value(this, NULL);
- if (!clients || !clients->contains(clientId))
- return;
- }
-
- QByteArray json = msg.at(1);
- LoggingItem *item = LoggingItem::create(json);
- logmsg(item);
- item->deleteItem();
-}
-
-void SyslogLogger::receivedMessage(const QList<QByteArray> &msg)
-{
- // Filter on the clientId
- QByteArray clientBa = msg.first();
- QString clientId = QString(clientBa.toHex());
-
- {
- QMutexLocker locker(&logRevClientMapMutex);
-
- ClientList *clients = logRevClientMap.value(this, NULL);
- if (!clients || !clients->contains(clientId))
- return;
- }
-
- QByteArray json = msg.at(1);
- LoggingItem *item = LoggingItem::create(json);
- logmsg(item);
- item->deleteItem();
-}
-
-void DatabaseLogger::receivedMessage(const QList<QByteArray> &msg)
-{
- // Filter on the clientId
- QByteArray clientBa = msg.first();
- QString clientId = QString(clientBa.toHex());
-
- {
- QMutexLocker locker(&logRevClientMapMutex);
-
- ClientList *clients = logRevClientMap.value(this, NULL);
- if (!clients || !clients->contains(clientId))
- return;
- }
-
- QByteArray json = msg.at(1);
- LoggingItem *item = LoggingItem::create(json);
- logmsg(item);
-}
-
-
/*
* vim:ts=4:sw=4:ai:et:si:sts=4
*/
View
45 mythtv/libs/libmythbase/loggingserver.h
@@ -130,8 +130,11 @@ typedef QList<LogMessage *> LogMessageList;
class LogServerThread : public QObject, public MThread
{
Q_OBJECT
+ friend class LogForwardThread;
+ friend class FileLogger;
+ friend class SyslogLogger;
+ friend class DatabaseLogger;
- friend void logSighup(int signum);
public:
LogServerThread();
~LogServerThread();
@@ -139,26 +142,52 @@ class LogServerThread : public QObject, public MThread
void stop(void);
nzmqt::ZMQContext *getZMQContext(void) { return m_zmqContext; };
private:
- bool m_aborted; ///< Flag to abort the thread.
nzmqt::ZMQContext *m_zmqContext; ///< ZeroMQ context
nzmqt::ZMQSocket *m_zmqInSock; ///< ZeroMQ feeding socket
- nzmqt::ZMQSocket *m_zmqPubSock; ///< ZeroMQ publishing socket
QTimer *m_heartbeatTimer; ///< 1s repeating timer for client
/// heartbeats
- QTimer *m_shutdownTimer; ///< 5 min timer to shut down if no clients
+
+ protected slots:
+ void receivedMessage(const QList<QByteArray>&);
+ void checkHeartBeats(void);
+ void pingClient(QString clientId);
+};
+
+/// \brief The logging thread that forwards received messages to the consuming
+/// loggers via ZeroMQ
+class LogForwardThread : public QObject, public MThread
+{
+ Q_OBJECT
+
+ friend void logSighup(int signum);
+ public:
+ LogForwardThread();
+ ~LogForwardThread();
+ void run(void);
+ void stop(void);
+ nzmqt::ZMQContext *getZMQContext(void) { return m_zmqContext; };
+ private:
+ bool m_aborted; ///< Flag to abort the thread.
+ nzmqt::ZMQContext *m_zmqContext; ///< ZeroMQ context
+ nzmqt::ZMQSocket *m_zmqPubSock; ///< ZeroMQ publishing socket
+
QSocketNotifier *m_sighupNotifier; ///< Notifier to synchronize to UNIX
/// signal SIGHUP safely
+ QTimer *m_shutdownTimer; ///< 5 min timer to shut down if no clients
+
void forwardMessage(LogMessage *msg);
- void pingClient(QString clientId);
+ void expireClients(void);
protected slots:
- void receivedMessage(const QList<QByteArray>&);
- void checkHeartBeats(void);
- void shutdownTimerExpired(void);
void handleSigHup(void);
+ void shutdownTimerExpired(void);
+
+ signals:
+ void pingClient(QString);
};
+
class QWaitCondition;
#define MAX_QUEUE_LEN 1000
Please sign in to comment.
Something went wrong with that request. Please try again.