Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Got all the linkage in to receive and log

Still not quite right, segfaults.

Also, need to cleanup LoggerBase items, and implement the heartbeat to clients
  • Loading branch information...
commit a619456b274798fa111b3c89d5305ca676c4f9cd 1 parent 5bb3277
@Beirdo Beirdo authored
View
276 mythtv/libs/libmythbase/loggingserver.cpp
@@ -52,13 +52,25 @@ extern "C" {
#include <mach/mach.h>
#endif
-static QMutex loggerListMutex;
-static QList<LoggerBase *> loggerList;
+static QMutex loggerMapMutex;
+static QMap<QString, LoggerBase *> loggerMap;
-LogServerThread *logServerThread = NULL;
-static QMutex logThreadStartedMutex;
-static QWaitCondition logThreadStarted;
-static bool logThreadFinished = false;
+LogServerThread *logServerThread = NULL;
+static QMutex logThreadStartedMutex;
+static QWaitCondition logThreadStarted;
+static bool logThreadFinished = false;
+
+typedef QList<LoggerBase *> LoggerList;
+typedef QMap<QByteArray, LoggerList *> ClientMap;
+
+typedef QList<QByteArray> ClientList;
+typedef QMap<LoggerBase *, ClientList *> RevClientMap;
+
+static QMutex logClientMapMutex;
+static ClientMap logClientMap;
+
+static QMutex logRevClientMapMutex;
+static RevClientMap logRevClientMap;
#define TIMESTAMP_MAX 30
#define MAX_STRING_LENGTH (LOGLINE_MAX+120)
@@ -69,24 +81,30 @@ void logSighup( int signum, siginfo_t *info, void *secret );
/// \brief LoggerBase class constructor. Adds the new logger instance to the
-/// loggerList.
+/// loggerMap.
/// \param string a C-string of the handle for this instance (NULL if unused)
-LoggerBase::LoggerBase(char *string)
+LoggerBase::LoggerBase(const char *string)
{
- QMutexLocker locker(&loggerListMutex);
+ QMutexLocker locker(&loggerMapMutex);
if (string)
+ {
m_handle = strdup(string);
+ loggerMap.insert(QString(m_handle), this);
+ }
else
+ {
m_handle = NULL;
- loggerList.append(this);
+ loggerMap.insert(QString(""), this);
+ }
+
}
/// \brief LoggerBase deconstructor. Removes the logger instance from the
-/// loggerList.
+/// loggerMap.
LoggerBase::~LoggerBase()
{
- QMutexLocker locker(&loggerListMutex);
- loggerList.removeAll(this);
+ QMutexLocker locker(&loggerMapMutex);
+ loggerMap.remove(QString(m_handle));
if (m_handle)
free(m_handle);
@@ -95,13 +113,20 @@ LoggerBase::~LoggerBase()
/// \brief FileLogger constructor
/// \param filename Filename of the logfile.
-FileLogger::FileLogger(char *filename) :
+FileLogger::FileLogger(const char *filename) :
LoggerBase(filename), m_opened(false), m_fd(-1)
{
m_fd = open(filename, O_WRONLY|O_CREAT|O_APPEND, 0664);
m_opened = (m_fd != -1);
LOG(VB_GENERAL, LOG_INFO, QString("Added logging to %1")
.arg(filename));
+
+ nzmqt::ZMQContext *ctx = logServerThread->getZMQContext();
+ m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
+ connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
+ SLOT(receivedMessage(const QList<QByteArray>&)));
+ m_zmqSock->subscribeTo("");
+ m_zmqSock->connectTo("inproc://loggers");
}
/// \brief FileLogger deconstructor - close the logfile
@@ -111,6 +136,7 @@ FileLogger::~FileLogger()
{
LOG(VB_GENERAL, LOG_INFO, "Removed logging to the console");
}
+ delete m_zmqSock;
}
/// \brief Reopen the logfile after a SIGHUP. Log files only (no console).
@@ -145,8 +171,8 @@ bool FileLogger::logmsg(LoggingItem *item)
(const struct tm *)&tm);
snprintf( usPart, 9, ".%06d", (int)(item->usec()) );
strcat( timestamp, usPart );
- char shortname;
+ char shortname;
{
QMutexLocker locker(&loglevelMapMutex);
LoglevelMap::iterator it = loglevelMap.find(item->level());
@@ -171,8 +197,6 @@ bool FileLogger::logmsg(LoggingItem *item)
int result = write(m_fd, line, strlen(line));
- item->deleteItem();
-
if( result == -1 )
{
LOG(VB_GENERAL, LOG_ERR,
@@ -194,6 +218,13 @@ SyslogLogger::SyslogLogger() : LoggerBase(NULL), m_opened(false)
m_opened = true;
LOG(VB_GENERAL, LOG_INFO, "Added syslogging");
+
+ nzmqt::ZMQContext *ctx = logServerThread->getZMQContext();
+ m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
+ connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
+ SLOT(receivedMessage(const QList<QByteArray>&)));
+ m_zmqSock->subscribeTo("");
+ m_zmqSock->connectTo("inproc://loggers");
}
/// \brief SyslogLogger deconstructor.
@@ -201,6 +232,7 @@ SyslogLogger::~SyslogLogger()
{
LOG(VB_GENERAL, LOG_INFO, "Removing syslogging");
closelog();
+ delete m_zmqSock;
}
@@ -235,9 +267,8 @@ const int DatabaseLogger::kMinDisabledTime = 1000;
/// \brief DatabaseLogger constructor
/// \param table C-string of the database table to log to
-DatabaseLogger::DatabaseLogger(char *table) : LoggerBase(table),
- m_opened(false),
- m_loggingTableExists(false)
+DatabaseLogger::DatabaseLogger(const char *table) :
+ LoggerBase(table), m_opened(false), m_loggingTableExists(false)
{
m_query = QString(
"INSERT INTO %1 "
@@ -255,6 +286,13 @@ DatabaseLogger::DatabaseLogger(char *table) : LoggerBase(table),
m_opened = true;
m_disabled = false;
+
+ nzmqt::ZMQContext *ctx = logServerThread->getZMQContext();
+ m_zmqSock = ctx->createSocket(nzmqt::ZMQSocket::TYP_SUB, this);
+ connect(m_zmqSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
+ SLOT(receivedMessage(const QList<QByteArray>&)));
+ m_zmqSock->subscribeTo("");
+ m_zmqSock->connectTo("inproc://loggers");
}
/// \brief DatabaseLogger deconstructor
@@ -263,6 +301,7 @@ DatabaseLogger::~DatabaseLogger()
LOG(VB_GENERAL, LOG_INFO, "Removing database logging");
stopDatabaseAccess();
+ delete m_zmqSock;
}
/// \brief Stop logging to the database and wait for the thread to stop.
@@ -338,6 +377,8 @@ bool DatabaseLogger::logqmsg(MSqlQuery &query, LoggingItem *item)
query.bindValue(":MSGTIME", timestamp);
query.bindValue(":LEVEL", item->level());
query.bindValue(":MESSAGE", item->message());
+ query.bindValue(":APP", item->appName());
+ query.bindValue(":PID", item->pid());
if (!query.exec())
{
@@ -366,8 +407,6 @@ void DatabaseLogger::prepare(MSqlQuery &query)
{
query.prepare(m_query);
query.bindValue(":HOST", gCoreContext->GetHostName());
- query.bindValue(":APP", QCoreApplication::applicationName());
- query.bindValue(":PID", getpid());
}
/// \brief Check if the database is ready for use
@@ -421,8 +460,7 @@ bool DatabaseLogger::tableExists(const QString &table)
/// \brief DBLoggerThread constructor
/// \param logger DatabaseLogger instance that this thread belongs to
DBLoggerThread::DBLoggerThread(DatabaseLogger *logger) :
- MThread("DBLogger"),
- m_logger(logger),
+ MThread("DBLogger"), m_logger(logger),
m_queue(new QQueue<LoggingItem *>),
m_wait(new QWaitCondition()), m_aborted(false)
{
@@ -521,6 +559,9 @@ void DBLoggerThread::stop(void)
m_wait->wakeAll();
}
+
+
+
/// \brief LogServerThread constructor.
LogServerThread::LogServerThread() :
MThread("LogServer"), m_aborted(false)
@@ -556,15 +597,12 @@ void LogServerThread::run(void)
m_zmqInSock->bindTo("tcp://127.0.0.1:35327");
m_zmqInSock->bindTo("inproc://mylogs");
-// m_zmqPubSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_PUB);
-// m_zmqPubSock->bindTo("inproc://loggers");
+ m_zmqPubSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_PUB);
+ m_zmqPubSock->bindTo("inproc://loggers");
logThreadStarted.wakeAll();
locker.unlock();
-LOG(VB_GENERAL, LOG_INFO, "GOT here");
-cout << "Got here" << endl;
-
while (!m_aborted)
{
msleep(100);
@@ -585,15 +623,125 @@ cout << "Got here" << endl;
/// \param msg The message received (can be multi-part)
void LogServerThread::receivedMessage(const QList<QByteArray> &msg)
{
-cout << "Received" << endl;
+#if 1
+ //def DUMP_PACKET
QList<QByteArray>::const_iterator it = msg.begin();
int i = 0;
for (; it != msg.end(); ++it, i++)
{
QByteArray buf = *it;
- cout << i << ":\t" << buf.toHex().constData() << endl << "\t" << buf.constData() << endl;
+ cout << i << ":\t" << buf.size() << endl << "\t"
+ << buf.toHex().constData() << endl << "\t"
+ << buf.constData() << endl;
}
-// m_zmqPubSock->sendMessage(msg);
+#endif
+
+ // First section is the client id
+ QByteArray clientId = msg.first();
+ clientId.detach();
+
+ QByteArray json = msg.at(1);
+ LoggingItem *item = LoggingItem::create(json);
+
+ {
+ QMutexLocker lock(&logClientMapMutex);
+
+ if (!logClientMap.contains(clientId))
+ {
+ QMutexLocker lock2(&loggerMapMutex);
+ QMutexLocker lock3(&logRevClientMapMutex);
+
+ // Need to find or create the loggers
+ LoggerList *loggers = new LoggerList;
+ LoggerBase *logger;
+
+ // FileLogger from logFile
+ QString logfile = item->logFile();
+ if (!logfile.isEmpty())
+ {
+ ClientList *clients;
+ logger = loggerMap.value(logfile, NULL);
+ if (!logger)
+ {
+ // Need to add a new FileLogger
+ lock2.unlock();
+ // inserts into loggerMap
+ logger = new FileLogger(logfile.toLocal8Bit().constData());
+ lock2.relock();
+
+ clients = new ClientList;
+ clients->insert(0, clientId);
+ logRevClientMap.insert(logger, clients);
+ }
+ else
+ {
+ clients = logRevClientMap.value(logger);
+ clients->insert(0, clientId);
+ }
+ loggers->insert(0, logger);
+ }
+
+ // SyslogLogger from facility
+ int facility = item->facility();
+ if (facility > 0)
+ {
+ ClientList *clients;
+ logger = loggerMap.value(NULL, NULL);
+ if (!logger)
+ {
+ // Need to add a new SyslogLogger
+ lock2.unlock();
+ logger = new SyslogLogger; // inserts into loggerMap
+ lock2.relock();
+
+ clients = new ClientList;
+ clients->insert(0, clientId);
+ logRevClientMap.insert(logger, clients);
+ }
+ else
+ {
+ clients = logRevClientMap.value(logger);
+ clients->insert(0, clientId);
+ }
+ loggers->insert(0, logger);
+ }
+
+ // DatabaseLogger from table
+ QString table = item->table();
+ if (!table.isEmpty())
+ {
+ ClientList *clients;
+ logger = loggerMap.value(table, NULL);
+ if (!logger)
+ {
+ // Need to add a new DatabaseLogger
+ lock2.unlock();
+ // inserts into loggerMap
+ logger =
+ new DatabaseLogger(table.toLocal8Bit().constData());
+ lock2.relock();
+
+ clients = new ClientList;
+ clients->insert(0, clientId);
+ logRevClientMap.insert(logger, clients);
+ }
+ else
+ {
+ clients = logRevClientMap.value(logger);
+ clients->insert(0, clientId);
+ }
+ loggers->insert(0, logger);
+ }
+
+ logClientMap.insert(clientId, loggers);
+ }
+
+ msleep(10);
+ }
+
+ item->deleteItem();
+
+ m_zmqPubSock->sendMessage(msg);
}
@@ -622,12 +770,12 @@ void logSighup( int signum, siginfo_t *info, void *secret )
LOG(VB_GENERAL, LOG_INFO, "SIGHUP received, rolling log files.");
/* SIGHUP was sent. Close and reopen debug logfiles */
- QMutexLocker locker(&loggerListMutex);
+ QMutexLocker locker(&loggerMapMutex);
- QList<LoggerBase *>::iterator it;
- for (it = loggerList.begin(); it != loggerList.end(); ++it)
+ QMap<QString, LoggerBase *>::iterator it;
+ for (it = loggerMap.begin(); it != loggerMap.end(); ++it)
{
- (*it)->reopen();
+ it.value()->reopen();
}
}
#endif
@@ -687,23 +835,67 @@ void logServerStop(void)
sigaction( SIGHUP, &sa, NULL );
#endif
- QList<LoggerBase *>::iterator it;
- for (it = loggerList.begin(); it != loggerList.end(); ++it)
+ QMap<QString, LoggerBase *>::iterator it;
+ for (it = loggerMap.begin(); it != loggerMap.end(); ++it)
{
- (*it)->stopDatabaseAccess();
+ it.value()->stopDatabaseAccess();
}
}
-void FileLogger::messageReceived(const QList<QByteArray>&)
+void FileLogger::receivedMessage(const QList<QByteArray> &msg)
{
+ cout << "file" << endl;
+ // Filter on the clientId
+ QByteArray clientId = msg.first();
+ {
+ QMutexLocker locker(&logRevClientMapMutex);
+
+ ClientList *clients = logRevClientMap[this];
+ if (!clients->contains(clientId))
+ return;
+ }
+
+ QByteArray json = msg.at(1);
+ LoggingItem *item = LoggingItem::create(json);
+ logmsg(item);
+ item->deleteItem();
}
-void SyslogLogger::messageReceived(const QList<QByteArray>&)
+void SyslogLogger::receivedMessage(const QList<QByteArray> &msg)
{
+ cout << "syslog" << endl;
+ // Filter on the clientId
+ QByteArray clientId = msg.first();
+ {
+ QMutexLocker locker(&logRevClientMapMutex);
+
+ ClientList *clients = logRevClientMap[this];
+ if (!clients->contains(clientId))
+ return;
+ }
+
+ QByteArray json = msg.at(1);
+ LoggingItem *item = LoggingItem::create(json);
+ logmsg(item);
+ item->deleteItem();
}
-void DatabaseLogger::messageReceived(const QList<QByteArray>&)
+void DatabaseLogger::receivedMessage(const QList<QByteArray> &msg)
{
+ cout << "db" << endl;
+ // Filter on the clientId
+ QByteArray clientId = msg.first();
+ {
+ QMutexLocker locker(&logRevClientMapMutex);
+
+ ClientList *clients = logRevClientMap[this];
+ if (!clients->contains(clientId))
+ return;
+ }
+
+ QByteArray json = msg.at(1);
+ LoggingItem *item = LoggingItem::create(json);
+ logmsg(item);
}
View
25 mythtv/libs/libmythbase/loggingserver.h
@@ -30,7 +30,7 @@ class LoggerBase : public QObject {
public:
/// \brief LoggerBase Constructor
- LoggerBase(char *string);
+ LoggerBase(const char *string);
/// \brief LoggerBase Deconstructor
virtual ~LoggerBase();
/// \brief Process a log message for the logger instance
@@ -41,37 +41,44 @@ class LoggerBase : public QObject {
/// \brief Stop logging to the database
virtual void stopDatabaseAccess(void) { }
/// \brief Deal with an incoming log message
- virtual void messageReceived(const QList<QByteArray>&) = 0;
protected:
char *m_handle; ///< semi-opaque handle for identifying instance
};
/// \brief File-based logger - used for logfiles and console
class FileLogger : public LoggerBase {
+ Q_OBJECT
+
public:
- FileLogger(char *filename);
+ FileLogger(const char *filename);
~FileLogger();
bool logmsg(LoggingItem *item);
void reopen(void);
- void messageReceived(const QList<QByteArray>&);
private:
bool m_opened; ///< true when the logfile is opened
int m_fd; ///< contains the file descriptor for the logfile
+ nzmqt::ZMQSocket *m_zmqSock; ///< ZeroMQ feeding socket
+ protected slots:
+ void receivedMessage(const QList<QByteArray>&);
};
#ifndef _WIN32
/// \brief Syslog-based logger (not available in Windows)
class SyslogLogger : public LoggerBase {
+ Q_OBJECT
+
public:
SyslogLogger();
~SyslogLogger();
bool logmsg(LoggingItem *item);
/// \brief Unused for this logger.
void reopen(void) { };
- void messageReceived(const QList<QByteArray>&);
private:
char *m_application; ///< C-string of the application name
bool m_opened; ///< true when syslog channel open.
+ nzmqt::ZMQSocket *m_zmqSock; ///< ZeroMQ feeding socket
+ protected slots:
+ void receivedMessage(const QList<QByteArray>&);
};
#endif
@@ -79,14 +86,15 @@ class DBLoggerThread;
/// \brief Database logger - logs to the MythTV database
class DatabaseLogger : public LoggerBase {
+ Q_OBJECT
+
friend class DBLoggerThread;
public:
- DatabaseLogger(char *table);
+ DatabaseLogger(const char *table);
~DatabaseLogger();
bool logmsg(LoggingItem *item);
void reopen(void) { };
virtual void stopDatabaseAccess(void);
- void messageReceived(const QList<QByteArray>&);
protected:
bool logqmsg(MSqlQuery &query, LoggingItem *item);
void prepare(MSqlQuery &query);
@@ -103,6 +111,9 @@ class DatabaseLogger : public LoggerBase {
QTime m_errorLoggingTime; ///< Time when DB error logging was last done
static const int kMinDisabledTime; ///< Minimum time to disable DB logging
/// (in ms)
+ nzmqt::ZMQSocket *m_zmqSock; ///< ZeroMQ feeding socket
+ protected slots:
+ void receivedMessage(const QList<QByteArray>&);
};
Please sign in to comment.
Something went wrong with that request. Please try again.