Skip to content

Commit

Permalink
feat(userspace/falco): non-blocking outputs
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Grasso <me@leonardograsso.com>
  • Loading branch information
leogr committed Oct 28, 2020
1 parent f449316 commit d59d2a4
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 29 deletions.
110 changes: 86 additions & 24 deletions userspace/falco/falco_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ falco_outputs::~falco_outputs()
{
if(m_initialized)
{
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
this->cleanup_outputs();
this->stop_worker();
if(m_worker_thread.joinable())
{
(*it)->cleanup();
m_worker_thread.join();
}
}
}
Expand All @@ -77,9 +79,12 @@ void falco_outputs::init(bool json_output,
m_time_format_iso_8601 = time_format_iso_8601;
m_hostname = hostname;

m_worker_thread = std::thread(&falco_outputs::worker, this);

m_initialized = true;
}

// todo(leogr): when the worker has started, adding an outputs is not thread-safe
void falco_outputs::add_output(falco::outputs::config oc)
{

Expand Down Expand Up @@ -129,6 +134,12 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
return;
}

falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = evt->get_ts();
cmsg.priority = priority;
cmsg.source = source;
cmsg.rule = rule;

string sformat;
if(source == "syscall")
{
Expand Down Expand Up @@ -163,35 +174,38 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
sformat += " " + format;
}

string msg;
msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat);
cmsg.msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat);
cmsg.fields = falco_formats::resolve_tokens(evt, source, sformat);

for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->output_event(evt, rule, source, priority, sformat, msg);
}
cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
m_queue.push(cmsg);
}

void falco_outputs::handle_msg(uint64_t now,
void falco_outputs::handle_msg(uint64_t ts,
falco_common::priority_type priority,
std::string &msg,
std::string &rule,
std::map<std::string, std::string> &output_fields)
{
std::string full_msg;
falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = ts;
cmsg.priority = priority;
cmsg.source = "";
cmsg.rule = rule;
cmsg.fields = output_fields;

if(m_json_output)
{
nlohmann::json jmsg;

// Convert the time-as-nanoseconds to a more json-friendly ISO8601.
time_t evttime = now / 1000000000;
time_t evttime = ts / 1000000000;
char time_sec[20]; // sizeof "YYYY-MM-DDTHH:MM:SS"
char time_ns[12]; // sizeof ".sssssssssZ"
string iso8601evttime;

strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime(&evttime));
snprintf(time_ns, sizeof(time_ns), ".%09luZ", now % 1000000000);
snprintf(time_ns, sizeof(time_ns), ".%09luZ", ts % 1000000000);
iso8601evttime = time_sec;
iso8601evttime += time_ns;

Expand All @@ -201,15 +215,15 @@ void falco_outputs::handle_msg(uint64_t now,
jmsg["time"] = iso8601evttime;
jmsg["output_fields"] = output_fields;

full_msg = jmsg.dump();
cmsg.msg = jmsg.dump();
}
else
{
std::string timestr;
bool first = true;

sinsp_utils::ts_to_string(now, &timestr, false, true);
full_msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " (";
sinsp_utils::ts_to_string(ts, &timestr, false, true);
cmsg.msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " (";
for(auto &pair : output_fields)
{
if(first)
Expand All @@ -218,23 +232,71 @@ void falco_outputs::handle_msg(uint64_t now,
}
else
{
full_msg += " ";
cmsg.msg += " ";
}
full_msg += pair.first + "=" + pair.second;
cmsg.msg += pair.first + "=" + pair.second;
}
full_msg += ")";
cmsg.msg += ")";
}

for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->output_msg(priority, full_msg);
}
cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
m_queue.push(cmsg);
}

void falco_outputs::cleanup_outputs()
{
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP);
}

void falco_outputs::reopen_outputs()
{
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN);
}

void falco_outputs::stop_worker()
{
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP);
}

inline void falco_outputs::push(ctrl_msg_type cmt)
{
falco_outputs::ctrl_msg cmsg = {};
cmsg.type = cmt;
m_queue.push(cmsg);
}

void falco_outputs::worker()
{
falco_outputs::ctrl_msg cmsg;
while(true)
{
(*it)->reopen();
// Block until a message becomes available.
m_queue.pop(cmsg);
switch(cmsg.type)
{
case ctrl_msg_type::CTRL_MSG_OUTPUT:
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->output(&cmsg);
}
break;
case ctrl_msg_type::CTRL_MSG_CLEANUP:
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->cleanup();
}
break;
case ctrl_msg_type::CTRL_MSG_REOPEN:
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->reopen();
}
break;
case ctrl_msg_type::CTRL_MSG_STOP:
return;

default:
break;
}
}
}
32 changes: 27 additions & 5 deletions userspace/falco/falco_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "token_bucket.h"
#include "falco_engine.h"
#include "outputs.h"
#include "tbb/concurrent_queue.h"

//
// This class acts as the primary interface between a program and the
Expand All @@ -44,20 +45,19 @@ class falco_outputs

void add_output(falco::outputs::config oc);

//
// evt is an event that has matched some rule. Pass the event
// to all configured outputs.
//
// Format then send the event to all configured outputs (`evt` is an event that has matched some rule).
void handle_event(gen_event *evt, std::string &rule, std::string &source,
falco_common::priority_type priority, std::string &format);

// Send a generic message to all outputs. Not necessarily associated with any event.
// Format then send a generic message to all outputs. Not necessarily associated with any event.
void handle_msg(uint64_t now,
falco_common::priority_type priority,
std::string &msg,
std::string &rule,
std::map<std::string, std::string> &output_fields);

void cleanup_outputs();

void reopen_outputs();

private:
Expand All @@ -72,4 +72,26 @@ class falco_outputs
bool m_json_output;
bool m_time_format_iso_8601;
std::string m_hostname;

enum ctrl_msg_type
{
CTRL_MSG_STOP = 0,
CTRL_MSG_OUTPUT = 1,
CTRL_MSG_CLEANUP = 2,
CTRL_MSG_REOPEN = 3,
};

struct ctrl_msg : falco::outputs::message
{
ctrl_msg_type type;
};

typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq;

falco_outputs_cbq m_queue;

std::thread m_worker_thread;
inline void push(ctrl_msg_type cmt);
void worker();
void stop_worker();
};

0 comments on commit d59d2a4

Please sign in to comment.