Skip to content

Commit

Permalink
Livestatus: Add log table (WIP).
Browse files Browse the repository at this point in the history
refs #4433
  • Loading branch information
Michael Friedrich committed Oct 30, 2013
1 parent 266afc9 commit 989a317
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 29 deletions.
6 changes: 3 additions & 3 deletions components/livestatus/listener.cpp
Expand Up @@ -28,10 +28,10 @@
#include "base/networkstream.h"
#include "base/application.h"
#include "base/scriptfunction.h"
#include "base/convert.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/exception/diagnostic_information.hpp>


using namespace icinga;
using namespace livestatus;

Expand Down Expand Up @@ -145,6 +145,7 @@ void LivestatusListener::ClientThreadProc(const Socket::Ptr& client)
}
}


void LivestatusListener::ValidateSocketType(const String& location, const Dictionary::Ptr& attrs)
{
Value socket_type = attrs->Get("socket_type");
Expand All @@ -153,5 +154,4 @@ void LivestatusListener::ValidateSocketType(const String& location, const Dictio
ConfigCompilerContext::GetInstance()->AddMessage(true, "Validation failed for " +
location + ": Socket type '" + socket_type + "' is invalid.");
}
}

}
130 changes: 116 additions & 14 deletions components/livestatus/logtable.cpp
Expand Up @@ -20,13 +20,83 @@
#include "livestatus/logtable.h"
#include "icinga/icingaapplication.h"
#include "icinga/cib.h"
#include "base/convert.h"
#include "base/utility.h"
#include "base/logger_fwd.h"
#include "base/application.h"
#include "base/objectlock.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string/split.hpp>
#include <fstream>

using namespace icinga;
using namespace livestatus;

LogTable::LogTable(void)
LogTable::LogTable(const unsigned long& from, const unsigned long& until)
{
Log(LogInformation, "livestatus", "Pre-selecting log file from " + Convert::ToString(from) + " until " + Convert::ToString(until));

/* store from & until for FetchRows */
m_TimeFrom = from;
m_TimeUntil = until;

/* create log file index - TODO config option */
CreateLogIndex(Application::GetLocalStateDir() + "/log/icinga2/compat");

/* m_LogFileIndex map tells which log files are involved ordered by their start timestamp */
unsigned long ts;
unsigned long line_count = 0;
BOOST_FOREACH(boost::tie(ts, boost::tuples::ignore), m_LogFileIndex) {
if (ts < m_TimeFrom || ts > m_TimeUntil) //this is not entirely correct - logfile index only holds the start time! TODO
continue;

String log_file = m_LogFileIndex[ts];

std::ifstream fp;
fp.exceptions(std::ifstream::badbit);
fp.open(log_file.CStr(), std::ifstream::in);

while (fp.good()) {
std::string line;
std::getline(fp, line);

if (line.empty())
continue; /* Ignore empty lines */
/*
* [1379025342] SERVICE NOTIFICATION: testconfig-admin;1066localhost;cert[exp]: thomas-1.office.crt;WARNING;true;sudo: Kein TTY vorhanden und kein »askpass«-Programm angegeben
*/
unsigned long time = atoi(line.substr(1, 11).c_str());

size_t colon = line.find_first_of(':');
size_t colon_offset = colon - 13;

std::string type = line.substr(13, colon_offset);
std::string options = line.substr(colon + 1);

/* TODO - find class for log entry */

Dictionary::Ptr bag = boost::make_shared<Dictionary>();
bag->Set("time", time);
bag->Set("lineno", line_count);
bag->Set("class", 0); /* 0 is the default */
bag->Set("message", String(line)); /* complete line */
bag->Set("type", String(type));
bag->Set("options", String(options));

/* TODO: comment, plugin_output, state, state_type, attempt */
{
boost::mutex::scoped_lock lock(m_Mutex);
m_RowsCache[line_count] = bag;
}

line_count++;
}

fp.close();
}

AddColumns(this);
}

Expand Down Expand Up @@ -59,22 +129,22 @@ String LogTable::GetName(void) const

void LogTable::FetchRows(const AddRowFunction& addRowFn)
{
Object::Ptr obj = boost::make_shared<Object>();
unsigned long lineno;

/* Return a fake row. */
addRowFn(obj);
BOOST_FOREACH(boost::tie(lineno, boost::tuples::ignore), m_RowsCache) {
/* pass a dictionary with "lineno" as key */
addRowFn(m_RowsCache[lineno]);
}
}

Value LogTable::TimeAccessor(const Value& row)
{
/* not supported */
return Empty;
return static_cast<Dictionary::Ptr>(row)->Get("time");
}

Value LogTable::LinenoAccessor(const Value& row)
{
/* not supported */
return Empty;
return static_cast<Dictionary::Ptr>(row)->Get("lineno");
}

Value LogTable::ClassAccessor(const Value& row)
Expand All @@ -85,20 +155,17 @@ Value LogTable::ClassAccessor(const Value& row)

Value LogTable::MessageAccessor(const Value& row)
{
/* not supported */
return Empty;
return static_cast<Dictionary::Ptr>(row)->Get("message");
}

Value LogTable::TypeAccessor(const Value& row)
{
/* not supported */
return Empty;
return static_cast<Dictionary::Ptr>(row)->Get("type");
}

Value LogTable::OptionsAccessor(const Value& row)
{
/* not supported */
return Empty;
return static_cast<Dictionary::Ptr>(row)->Get("options");
}

Value LogTable::CommentAccessor(const Value& row)
Expand Down Expand Up @@ -154,3 +221,38 @@ Value LogTable::CommandNameAccessor(const Value& row)
/* not supported */
return Empty;
}

void LogTable::CreateLogIndex(const String& path)
{
Utility::Glob(path + "/icinga.log", boost::bind(&LogTable::CreateLogIndexFileHandler, _1, boost::ref(m_LogFileIndex)));
Utility::Glob(path + "/archives/*.log", boost::bind(&LogTable::CreateLogIndexFileHandler, _1, boost::ref(m_LogFileIndex)));
}

void LogTable::CreateLogIndexFileHandler(const String& path, std::map<unsigned long, String>& index)
{
std::ifstream stream;
stream.open(path.CStr(), std::ifstream::in);

if (!stream)
BOOST_THROW_EXCEPTION(std::runtime_error("Could not open log file: " + path));

/* read the first bytes to get the timestamp: [123456789] */
char buffer[12];

stream.read(buffer, 12);

if (buffer[0] != '[' || buffer[11] != ']') {
/* this can happen for directories too, silently ignore them */
return;
}

/* extract timestamp */
buffer[11] = 0;
unsigned int ts_start = atoi(buffer+1);

stream.close();

Log(LogInformation, "livestatus", "Indexing log file: '" + path + "' with timestamp start: '" + Convert::ToString(ts_start) + "'.");

index[ts_start] = path;
}
13 changes: 12 additions & 1 deletion components/livestatus/logtable.h
Expand Up @@ -21,6 +21,7 @@
#define LOGTABLE_H

#include "livestatus/table.h"
#include <boost/thread/mutex.hpp>

using namespace icinga;

Expand All @@ -35,7 +36,7 @@ class LogTable : public Table
public:
DECLARE_PTR_TYPEDEFS(LogTable);

LogTable(void);
LogTable(const unsigned long& from, const unsigned long& until);

static void AddColumns(Table *table, const String& prefix = String(),
const Column::ObjectAccessor& objectAccessor = Column::ObjectAccessor());
Expand All @@ -60,6 +61,16 @@ class LogTable : public Table
static Value HostNameAccessor(const Value& row);
static Value ContactNameAccessor(const Value& row);
static Value CommandNameAccessor(const Value& row);

private:
std::map<unsigned long, String> m_LogFileIndex;
std::map<unsigned long, Dictionary::Ptr> m_RowsCache;
unsigned long m_TimeFrom;
unsigned long m_TimeUntil;
boost::mutex m_Mutex;

void CreateLogIndex(const String& path);
static void CreateLogIndexFileHandler(const String& path, std::map<unsigned long, String>& index);
};

}
Expand Down
21 changes: 16 additions & 5 deletions components/livestatus/query.cpp
Expand Up @@ -36,6 +36,7 @@
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include "base/exception.h"
#include "base/utility.h"
#include <boost/algorithm/string/classification.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/foreach.hpp>
Expand All @@ -48,7 +49,8 @@ static int l_ExternalCommands = 0;
static boost::mutex l_QueryMutex;

Query::Query(const std::vector<String>& lines)
: m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1)
: m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1),
m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
{
if (lines.size() == 0) {
m_Verb = "ERROR";
Expand Down Expand Up @@ -120,7 +122,7 @@ Query::Query(const std::vector<String>& lines)
else if (header == "ColumnHeaders")
m_ColumnHeaders = (params == "on");
else if (header == "Filter") {
Filter::Ptr filter = ParseFilter(params);
Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);

if (!filter) {
m_Verb = "ERROR";
Expand Down Expand Up @@ -162,7 +164,7 @@ Query::Query(const std::vector<String>& lines)
} else if (aggregate_arg == "avginv") {
aggregator = boost::make_shared<InvAvgAggregator>(aggregate_attr);
} else {
filter = ParseFilter(params);
filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);

if (!filter) {
m_Verb = "ERROR";
Expand Down Expand Up @@ -249,7 +251,7 @@ int Query::GetExternalCommands(void)
return l_ExternalCommands;
}

Filter::Ptr Query::ParseFilter(const String& params)
Filter::Ptr Query::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
{
std::vector<String> tokens;
boost::algorithm::split(tokens, params, boost::is_any_of(" "));
Expand Down Expand Up @@ -282,6 +284,15 @@ Filter::Ptr Query::ParseFilter(const String& params)
if (negate)
filter = boost::make_shared<NegateFilter>(filter);

/* pre-filter log time duration */
if (tokens[0] == "time") {
if (op == "<" || op == "<=") {
until = Convert::ToLong(tokens[2]);
} else if (op == ">" || op == ">=") {
from = Convert::ToLong(tokens[2]);
}
}

return filter;
}

Expand Down Expand Up @@ -350,7 +361,7 @@ void Query::ExecuteGetHelper(const Stream::Ptr& stream)
{
Log(LogInformation, "livestatus", "Table: " + m_Table);

Table::Ptr table = Table::GetByName(m_Table);
Table::Ptr table = Table::GetByName(m_Table, m_LogTimeFrom, m_LogTimeUntil);

if (!table) {
SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
Expand Down
5 changes: 4 additions & 1 deletion components/livestatus/query.h
Expand Up @@ -78,6 +78,9 @@ class Query : public Object
/* Parameters for invalid queries. */
int m_ErrorCode;
String m_ErrorMessage;

unsigned long m_LogTimeFrom;
unsigned long m_LogTimeUntil;

void PrintResultSet(std::ostream& fp, const std::vector<String>& columns, const Array::Ptr& rs);
void PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level);
Expand All @@ -89,7 +92,7 @@ class Query : public Object
void SendResponse(const Stream::Ptr& stream, int code, const String& data);
void PrintFixed16(const Stream::Ptr& stream, int code, const String& data);

static Filter::Ptr ParseFilter(const String& params);
static Filter::Ptr ParseFilter(const String& params, unsigned long& from, unsigned long& until);
};

}
Expand Down
4 changes: 2 additions & 2 deletions components/livestatus/table.cpp
Expand Up @@ -44,7 +44,7 @@ using namespace livestatus;
Table::Table(void)
{ }

Table::Ptr Table::GetByName(const String& name)
Table::Ptr Table::GetByName(const String& name, const unsigned long& from, const unsigned long& until)
{
if (name == "status")
return boost::make_shared<StatusTable>();
Expand All @@ -69,7 +69,7 @@ Table::Ptr Table::GetByName(const String& name)
else if (name == "timeperiods")
return boost::make_shared<TimePeriodsTable>();
else if (name == "log")
return boost::make_shared<LogTable>();
return boost::make_shared<LogTable>(from, until);

return Table::Ptr();
}
Expand Down
3 changes: 1 addition & 2 deletions components/livestatus/table.h
Expand Up @@ -40,8 +40,7 @@ class Table : public Object

typedef boost::function<void (const Value&)> AddRowFunction;

static Table::Ptr GetByName(const String& name);

static Table::Ptr GetByName(const String& name, const unsigned long& from = 0, const unsigned long& until = 0);

virtual String GetName(void) const = 0;

Expand Down
4 changes: 4 additions & 0 deletions test/livestatus/queries/log/log
@@ -0,0 +1,4 @@
GET log
Filter: time >= 1348657741
ResponseHeader: fixed16

5 changes: 5 additions & 0 deletions test/livestatus/queries/log/minimal
@@ -0,0 +1,5 @@
GET log
Columns: host_name service_description time lineno class type options
Filter: time >= 1348657741
ResponseHeader: fixed16

2 changes: 1 addition & 1 deletion test/livestatus/run_queries
@@ -1,6 +1,6 @@
#!/bin/bash

LIVESTATUSHOST="10.0.10.18"
LIVESTATUSHOST="127.0.0.1"
LIVESTATUSPORT="6558"
LIVESTATUSQUERIES="./queries"

Expand Down

0 comments on commit 989a317

Please sign in to comment.