Skip to content

Commit

Permalink
OK, good JSON made, no ZeroMQ reception
Browse files Browse the repository at this point in the history
  • Loading branch information
Beirdo committed Apr 25, 2012
1 parent 6f69e85 commit d3299b9
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 54 deletions.
87 changes: 53 additions & 34 deletions mythtv/libs/libmythbase/logging.cpp
Expand Up @@ -17,6 +17,7 @@ using namespace std;

#include "mythlogging.h"
#include "logging.h"
#include "loggingserver.h"
#include "mythconfig.h"
#include "mythdb.h"
#include "mythcorecontext.h"
Expand Down Expand Up @@ -147,7 +148,7 @@ QByteArray LoggingItem::toByteArray(void)
QJson::Serializer serializer;
QByteArray json = serializer.serialize(variant);

cout << json.constData() << endl;
//cout << json.constData() << endl;

return json;
}
Expand Down Expand Up @@ -218,7 +219,7 @@ LoggerThread::LoggerThread(QString filename, bool progress, bool quiet,
m_waitEmpty(new QWaitCondition()),
m_aborted(false), m_filename(filename), m_progress(progress),
m_quiet(quiet), m_appname(QCoreApplication::applicationName()),
m_tablename(table), m_facility(facility)
m_tablename(table), m_facility(facility), m_pid(getpid())
{
char *debug = getenv("VERBOSE_THREADS");
if (debug != NULL)
Expand Down Expand Up @@ -253,46 +254,62 @@ void LoggerThread::run(void)

LOG(VB_GENERAL, LOG_INFO, "Added logging to the console");

m_zmqContext = new nzmqt::PollingZMQContext(this, 4);
m_zmqContext->start();

m_zmqSocket = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_DEALER);
m_zmqSocket->connectTo("tcp://127.0.0.1:35327");
bool locallogs = (m_appname == MYTH_APPNAME_MYTHLOGSERVER);
if (locallogs)
m_zmqContext = logServerThread->getZMQContext();
else
{
m_zmqContext = nzmqt::createDefaultContext(this);
m_zmqContext->start();
}

m_zmqSocket = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_DEALER,
this);
connect(m_zmqSocket, SIGNAL(messageReceived(const QList<QByteArray>&)),
this, SLOT(messageReceived(const QList<QByteArray>&)));
SLOT(messageReceived(const QList<QByteArray>&)));

m_initialWaiting = true;
LoggingItem *item = LoggingItem::create(__FILE__, __FUNCTION__,
__LINE__, (LogLevel_t)LOG_DEBUG,
kInitializing);
if (item)
{
fillItem(item);
m_zmqSocket->sendMessage(item->toByteArray());
item->deleteItem();
}
if (locallogs)
m_zmqSocket->connectTo("inproc://mylogs");
else
m_zmqSocket->connectTo("tcp://127.0.0.1:35327");

msleep(100); // wait up to 100ms for mythlogserver to respond
if (m_initialWaiting &&
QCoreApplication::applicationName() != MYTH_APPNAME_MYTHLOGSERVER)
if (!locallogs)
{
// Got no response from mythlogserver, let's assume it's dead and start
// it up
m_initialWaiting = false;
LOG(VB_GENERAL, LOG_INFO, "Starting mythlogserver");
m_initialWaiting = true;
LoggingItem *item = LoggingItem::create(__FILE__, __FUNCTION__,
__LINE__, (LogLevel_t)LOG_DEBUG,
kInitializing);
if (item)
{
fillItem(item);
m_zmqSocket->sendMessage(item->toByteArray());
item->deleteItem();
}

MythSystemMask mask = MythSystemMask(kMSDontBlockInputDevs |
kMSDontDisableDrawing |
kMSRunBackground | kMSRunShell);
QStringList args;
args << "--daemon";
msleep(100); // wait up to 100ms for mythlogserver to respond
if (m_initialWaiting && !locallogs)
{
// Got no response from mythlogserver, let's assume it's dead and
// start // it up
m_initialWaiting = false;
LOG(VB_GENERAL, LOG_INFO, "Starting mythlogserver");

MythSystemMask mask = MythSystemMask(kMSDontBlockInputDevs |
kMSDontDisableDrawing |
kMSRunBackground |
kMSRunShell);
QStringList args;
args << "--daemon";

MythSystem ms("mythlogserver", args, mask);
ms.Run();
}

MythSystem ms("mythlogserver", args, mask);
ms.Run();
LOG(VB_GENERAL, LOG_INFO,
"Added logging to mythlogserver at TCP:35327");
}

LOG(VB_GENERAL, LOG_INFO, "Added logging to mythlogserver at TCP:35327");
else
LOG(VB_GENERAL, LOG_INFO, "Added logging to mythlogserver locally");

QMutexLocker qLock(&logQueueMutex);

Expand Down Expand Up @@ -490,6 +507,8 @@ void LoggerThread::fillItem(LoggingItem *item)
if (!item)
return;

item->setPid(m_pid);
item->setThreadName(item->getThreadName());
item->setAppName(m_appname);
item->setTable(m_tablename);
item->setLogFile(m_filename);
Expand Down
3 changes: 2 additions & 1 deletion mythtv/libs/libmythbase/logging.h
Expand Up @@ -188,8 +188,9 @@ class LoggerThread : public QObject, public MThread
QString m_appname; ///< Cached application name
QString m_tablename; ///< Cached table name for db logging
int m_facility; ///< Cached syslog facility (or -1 to disable)
pid_t m_pid; ///< Cached pid value

nzmqt::PollingZMQContext *m_zmqContext; ///< ZeroMQ context to use in this logger
nzmqt::ZMQContext *m_zmqContext; ///< ZeroMQ context to use in this logger
nzmqt::ZMQSocket *m_zmqSocket; ///< ZeroMQ socket to talk to mythlogserver

protected slots:
Expand Down
47 changes: 32 additions & 15 deletions mythtv/libs/libmythbase/loggingserver.cpp
Expand Up @@ -55,7 +55,9 @@ extern "C" {
static QMutex loggerListMutex;
static QList<LoggerBase *> loggerList;

static LogServerThread *logThread = NULL;
LogServerThread *logServerThread = NULL;
static QMutex logThreadStartedMutex;
static QWaitCondition logThreadStarted;
static bool logThreadFinished = false;

#define TIMESTAMP_MAX 30
Expand Down Expand Up @@ -541,21 +543,32 @@ void LogServerThread::run(void)
RunProlog();

logThreadFinished = false;
QMutexLocker locker(&logThreadStartedMutex);

m_zmqContext = new nzmqt::PollingZMQContext(this, 4);
m_zmqContext->start();
m_zmqContext = nzmqt::createDefaultContext(this);
nzmqt::PollingZMQContext *ctx = static_cast<nzmqt::PollingZMQContext *>
(m_zmqContext);
ctx->start();

m_zmqInSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_ROUTER);
connect(m_zmqInSock, SIGNAL(messageReceived(const QList<QByteArray>&)),
this, SLOT(messageReceived(const QList<QByteArray>&)));
SLOT(receivedMessage(const QList<QByteArray>&)));
m_zmqInSock->bindTo("tcp://127.0.0.1:35327");
m_zmqInSock->bindTo("inproc://mylogs");

m_zmqPubSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_PUB);
m_zmqPubSock->bindTo("inproc://loggers");
// m_zmqPubSock = m_zmqContext->createSocket(nzmqt::ZMQSocket::TYP_PUB);
// m_zmqPubSock->bindTo("inproc://loggers");

logThreadStarted.wakeAll();
locker.unlock();

LOG(VB_GENERAL, LOG_INFO, "GOT here");
cout << "Got here" << endl;

while (!m_aborted)
{
msleep(1000);
cout << "Tick" << endl;
// handle heartbeat...
}

Expand All @@ -570,16 +583,17 @@ void LogServerThread::run(void)

/// \brief Handles messages received from logging clients
/// \param msg The message received (can be multi-part)
void LogServerThread::messageReceived(const QList<QByteArray> &msg)
void LogServerThread::receivedMessage(const QList<QByteArray> &msg)
{
cout << "Received" << endl;
QList<QByteArray>::const_iterator it = msg.begin();
int i = 0;
for (; it != msg.end(); ++it, i++)
{
QByteArray buf = *it;
cout << i << ":\t" << buf.toHex().constData() << endl << "\t" << buf.constData() << endl;
}
m_zmqPubSock->sendMessage(msg);
// m_zmqPubSock->sendMessage(msg);
}


Expand Down Expand Up @@ -633,11 +647,11 @@ void logSighup( int signum, siginfo_t *info, void *secret )
/// processes.
void logServerStart(void)
{
if (logThread && logThread->isRunning())
if (logServerThread && logServerThread->isRunning())
return;

if (!logThread)
logThread = new LogServerThread();
if (!logServerThread)
logServerThread = new LogServerThread();

#ifndef _WIN32
/* Setup SIGHUP */
Expand All @@ -649,16 +663,19 @@ void logServerStart(void)
sigaction( SIGHUP, &sa, NULL );
#endif

logThread->start();
QMutexLocker locker(&logThreadStartedMutex);
logServerThread->start();
logThreadStarted.wait(locker.mutex());

}

/// \brief Entry point for stopping logging for an application
void logServerStop(void)
{
if (logThread)
if (logServerThread)
{
logThread->stop();
logThread->wait();
logServerThread->stop();
logServerThread->wait();
}

#ifndef _WIN32
Expand Down
7 changes: 5 additions & 2 deletions mythtv/libs/libmythbase/loggingserver.h
Expand Up @@ -117,13 +117,14 @@ class LogServerThread : public QObject, public MThread
~LogServerThread();
void run(void);
void stop(void);
nzmqt::ZMQContext *getZMQContext(void) { return m_zmqContext; };
private:
bool m_aborted; ///< Flag to abort the thread.
nzmqt::PollingZMQContext *m_zmqContext; ///< ZeroMQ context
nzmqt::ZMQContext *m_zmqContext; ///< ZeroMQ context
nzmqt::ZMQSocket *m_zmqInSock; ///< ZeroMQ feeding socket
nzmqt::ZMQSocket *m_zmqPubSock; ///< ZeroMQ publishing socket
protected slots:
void messageReceived(const QList<QByteArray>&);
void receivedMessage(const QList<QByteArray>&);
};

class QWaitCondition;
Expand Down Expand Up @@ -170,6 +171,8 @@ class DBLoggerThread : public MThread
/// Protected by m_queueMutex
};

extern LogServerThread *logServerThread;

#endif

/*
Expand Down
5 changes: 3 additions & 2 deletions mythtv/programs/mythlogserver/main.cpp
Expand Up @@ -84,6 +84,9 @@ int main(int argc, char *argv[])
if (retval != GENERIC_EXIT_OK)
return retval;

// Must be started before ConfigureLogging()
logServerStart();

bool daemonize = cmdline.toBool("daemon");
QString mask("general");
if ((retval = cmdline.ConfigureLogging(mask, daemonize)) != GENERIC_EXIT_OK)
Expand All @@ -104,8 +107,6 @@ int main(int argc, char *argv[])
return GENERIC_EXIT_NO_MYTHCONTEXT;
}

logServerStart();

while (true)
usleep(100000);

Expand Down

0 comments on commit d3299b9

Please sign in to comment.