Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: asynchronous outputs and slow outputs detection #1451

Merged
merged 19 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f449316
update(userspace/falco): introduce message struct for outputs
leogr Oct 19, 2020
d59d2a4
feat(userspace/falco): non-blocking outputs
leogr Oct 19, 2020
612513f
chore(userspace/falco): avoid multiple outputs init
leogr Oct 19, 2020
0478ec9
update(userspace/falco): add "internal" source to outputs and proto
leogr Oct 19, 2020
0e913fb
chore(userspace/falco): correct exception message
leogr Oct 29, 2020
68b1dc0
update(userspace/falco): add accessor method for output's name
leogr Oct 30, 2020
540ac95
update(userspace/falco): outputs error handling
leogr Nov 2, 2020
d6f2a3a
new(userspace/falco): Watchdog timer utility
leogr Nov 3, 2020
4e520fa
update(userspace/falco): watchdog for outputs
leogr Nov 3, 2020
921d36c
new(userspace/falco): add "output_timeout" config node
leogr Nov 6, 2020
2aea709
chore(userspace/falco): configurable outputs timeout
leogr Nov 6, 2020
97b7610
new: Falco config for output timeout
leogr Nov 6, 2020
a5d9d2f
docs(falco.yaml): better explanation on "output_timeout"
leogr Nov 10, 2020
034bce3
chore(userspace/falco): handle freeing of output objects
leogr Nov 18, 2020
631b021
chore(userspace/falco): add_output init check
leogr Nov 18, 2020
96c589b
update(userspace/falco): clear output queue if still blocked during t…
leogr Nov 25, 2020
02f8cef
chore(userspace/falco): apply suggestions from review
leogr Nov 27, 2020
4b020fa
fix(userspace/falco) class naming convention
leogr Nov 27, 2020
cf38e63
update(userspace/falco): output worker should not throw exceptions
leogr Nov 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions falco.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ syscall_event_drops:
rate: .03333
max_burst: 10

# Falco continuously monitors outputs performance. When an output channel does not allow
# to deliver an alert within a given deadline, an error is reported indicating
# which output is blocking notifications.
# The timeout error will be reported to the log according to the above log_* settings.
# Note that the notification will not be discarded from the output queue; thus,
# output channels may indefinitely remain blocked.
# An output timeout error indeed indicate a misconfiguration issue or I/O problems
# that cannot be recovered by Falco and should be fixed by the user.
#
# The "output_timeout" value specifies the duration in milliseconds to wait before
# considering the deadline exceed.
#
# With a 2000ms default, the notification consumer can block the Falco output
# for up to 2 seconds without reaching the timeout.

output_timeout: 2000

# A throttling mechanism implemented as a token bucket limits the
# rate of falco notifications. This throttling is controlled by the following configuration
# options:
Expand Down
2 changes: 2 additions & 0 deletions userspace/falco/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ void falco_configuration::init(string conf_filename, list<string> &cmdline_optio

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a chance to discuss feedback that got addressed by @leogr - this looks ready to me now

LGTM

falco_logger::set_level(m_log_level);

m_output_timeout = m_config->get_scalar<uint32_t>("output_timeout", 2000);

m_notifications_rate = m_config->get_scalar<uint32_t>("outputs", "rate", 1);
m_notifications_max_burst = m_config->get_scalar<uint32_t>("outputs", "max_burst", 1000);

Expand Down
1 change: 1 addition & 0 deletions userspace/falco/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class falco_configuration

bool m_buffered_outputs;
bool m_time_format_iso_8601;
uint32_t m_output_timeout;

bool m_grpc_enabled;
uint32_t m_grpc_threadiness;
Expand Down
1 change: 1 addition & 0 deletions userspace/falco/falco.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ int falco_init(int argc, char **argv)

outputs->init(config.m_json_output,
config.m_json_include_output_property,
config.m_output_timeout,
config.m_notifications_rate, config.m_notifications_max_burst,
config.m_buffered_outputs,
config.m_time_format_iso_8601,
Expand Down
156 changes: 129 additions & 27 deletions userspace/falco/falco_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.

#include "formats.h"
#include "logger.h"
#include "watchdog.h"

#include "outputs_file.h"
#include "outputs_program.h"
Expand Down Expand Up @@ -51,18 +52,26 @@ falco_outputs::~falco_outputs()
{
if(m_initialized)
{
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
this->stop_worker();
for(auto o : m_outputs)
{
(*it)->cleanup();
delete o;
}
}
}

void falco_outputs::init(bool json_output,
bool json_include_output_property,
uint32_t rate, uint32_t max_burst, bool buffered,
bool time_format_iso_8601, string hostname)
bool json_include_output_property,
uint32_t timeout,
uint32_t rate, uint32_t max_burst, bool buffered,
bool time_format_iso_8601, std::string hostname)
{
// Cannot be initialized more than one time.
if(m_initialized)
{
throw falco_exception("falco_outputs already initialized");
}

m_json_output = json_output;

// Note that falco_formats is already initialized by the engine,
Expand All @@ -71,17 +80,29 @@ void falco_outputs::init(bool json_output,
falco_formats::s_json_output = json_output;
falco_formats::s_json_include_output_property = json_include_output_property;

m_timeout = std::chrono::milliseconds(timeout);

m_notifications_tb.init(rate, max_burst);

m_buffered = buffered;
m_time_format_iso_8601 = time_format_iso_8601;
m_hostname = hostname;

m_worker_thread = std::thread(&falco_outputs::worker, this);
leogr marked this conversation as resolved.
Show resolved Hide resolved

m_initialized = true;
}

// This function has to be called after init() since some configuration settings
// need to be passed to the output plugins. Then, although the worker has started,
// the worker is still on hold, waiting for a message.
// Thus it is still safe to call add_output() before any message has been enqueued.
void falco_outputs::add_output(falco::outputs::config oc)
{
if(!m_initialized)
{
throw falco_exception("cannot add output: falco_outputs not initialized yet");
}

falco::outputs::abstract_output *oo;

Expand Down Expand Up @@ -129,6 +150,12 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
return;
}

falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = evt->get_ts();
cmsg.priority = priority;
cmsg.source = source;
cmsg.rule = rule;

string sformat;
if(source == "syscall")
{
Expand Down Expand Up @@ -163,35 +190,38 @@ void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
sformat += " " + format;
}

string msg;
msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat);
cmsg.msg = falco_formats::format_event(evt, rule, source, falco_common::priority_names[priority], sformat);
cmsg.fields = falco_formats::resolve_tokens(evt, source, sformat);

for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->output_event(evt, rule, source, priority, sformat, msg);
}
cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
m_queue.push(cmsg);
}

void falco_outputs::handle_msg(uint64_t now,
void falco_outputs::handle_msg(uint64_t ts,
falco_common::priority_type priority,
std::string &msg,
std::string &rule,
std::map<std::string, std::string> &output_fields)
{
std::string full_msg;
falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = ts;
cmsg.priority = priority;
cmsg.source = "internal";
cmsg.rule = rule;
cmsg.fields = output_fields;

if(m_json_output)
{
nlohmann::json jmsg;

// Convert the time-as-nanoseconds to a more json-friendly ISO8601.
time_t evttime = now / 1000000000;
time_t evttime = ts / 1000000000;
char time_sec[20]; // sizeof "YYYY-MM-DDTHH:MM:SS"
char time_ns[12]; // sizeof ".sssssssssZ"
string iso8601evttime;

strftime(time_sec, sizeof(time_sec), "%FT%T", gmtime(&evttime));
snprintf(time_ns, sizeof(time_ns), ".%09luZ", now % 1000000000);
snprintf(time_ns, sizeof(time_ns), ".%09luZ", ts % 1000000000);
iso8601evttime = time_sec;
iso8601evttime += time_ns;

Expand All @@ -201,15 +231,15 @@ void falco_outputs::handle_msg(uint64_t now,
jmsg["time"] = iso8601evttime;
jmsg["output_fields"] = output_fields;

full_msg = jmsg.dump();
cmsg.msg = jmsg.dump();
}
else
{
std::string timestr;
bool first = true;

sinsp_utils::ts_to_string(now, &timestr, false, true);
full_msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " (";
sinsp_utils::ts_to_string(ts, &timestr, false, true);
cmsg.msg = timestr + ": " + falco_common::priority_names[priority] + " " + msg + " (";
for(auto &pair : output_fields)
{
if(first)
Expand All @@ -218,23 +248,95 @@ void falco_outputs::handle_msg(uint64_t now,
}
else
{
full_msg += " ";
cmsg.msg += " ";
}
full_msg += pair.first + "=" + pair.second;
cmsg.msg += pair.first + "=" + pair.second;
}
full_msg += ")";
cmsg.msg += ")";
}

for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
{
(*it)->output_msg(priority, full_msg);
}
cmsg.type = ctrl_msg_type::CTRL_MSG_OUTPUT;
m_queue.push(cmsg);
}

void falco_outputs::cleanup_outputs()
{
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_CLEANUP);
}

void falco_outputs::reopen_outputs()
{
for(auto it = m_outputs.cbegin(); it != m_outputs.cend(); ++it)
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_REOPEN);
}

void falco_outputs::stop_worker()
{
watchdog<void *> wd;
wd.start([&](void *) -> void {
falco_logger::log(LOG_NOTICE, "output channels still blocked, discarding all remaining notifications\n");
m_queue.clear();
this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP);
});
wd.set_timeout(m_timeout, nullptr);

this->push(falco_outputs::ctrl_msg_type::CTRL_MSG_STOP);
if(m_worker_thread.joinable())
{
(*it)->reopen();
m_worker_thread.join();
}
}

inline void falco_outputs::push(ctrl_msg_type cmt)
{
falco_outputs::ctrl_msg cmsg = {};
cmsg.type = cmt;
m_queue.push(cmsg);
}

// todo(leogr,leodido): this function is not supposed to throw exceptions, and with "noexcept",
// the program is terminated if that occurs. Although that's the wanted behavior,
// we still need to improve the error reporting since some inner functions can throw exceptions.
void falco_outputs::worker() noexcept
{
watchdog<std::string> wd;
wd.start([&](std::string payload) -> void {
falco_logger::log(LOG_CRIT, "\"" + payload + "\" output timeout, all output channels are blocked\n");
});

auto timeout = m_timeout;

falco_outputs::ctrl_msg cmsg;
do
{
// Block until a message becomes available.
m_queue.pop(cmsg);

for(const auto o : m_outputs)
{
wd.set_timeout(timeout, o->get_name());
try
{
switch(cmsg.type)
{
case ctrl_msg_type::CTRL_MSG_OUTPUT:
o->output(&cmsg);
break;
case ctrl_msg_type::CTRL_MSG_CLEANUP:
case ctrl_msg_type::CTRL_MSG_STOP:
o->cleanup();
break;
case ctrl_msg_type::CTRL_MSG_REOPEN:
o->reopen();
break;
default:
falco_logger::log(LOG_DEBUG, "Outputs worker received an unknown message type\n");
}
}
catch(const exception &e)
{
falco_logger::log(LOG_ERR, o->get_name() + ": " + string(e.what()) + "\n");
}
}
wd.cancel_timeout();
} while(cmsg.type != ctrl_msg_type::CTRL_MSG_STOP);
}
34 changes: 29 additions & 5 deletions userspace/falco/falco_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "token_bucket.h"
#include "falco_engine.h"
#include "outputs.h"
#include "tbb/concurrent_queue.h"

//
// This class acts as the primary interface between a program and the
Expand All @@ -39,25 +40,25 @@ class falco_outputs

void init(bool json_output,
bool json_include_output_property,
uint32_t timeout,
uint32_t rate, uint32_t max_burst, bool buffered,
bool time_format_iso_8601, std::string hostname);

void add_output(falco::outputs::config oc);

//
// evt is an event that has matched some rule. Pass the event
// to all configured outputs.
//
// Format then send the event to all configured outputs (`evt` is an event that has matched some rule).
void handle_event(gen_event *evt, std::string &rule, std::string &source,
falco_common::priority_type priority, std::string &format);

// Send a generic message to all outputs. Not necessarily associated with any event.
// Format then send a generic message to all outputs. Not necessarily associated with any event.
void handle_msg(uint64_t now,
falco_common::priority_type priority,
std::string &msg,
std::string &rule,
std::map<std::string, std::string> &output_fields);

void cleanup_outputs();

void reopen_outputs();

private:
Expand All @@ -71,5 +72,28 @@ class falco_outputs
bool m_buffered;
bool m_json_output;
bool m_time_format_iso_8601;
std::chrono::milliseconds m_timeout;
std::string m_hostname;

enum ctrl_msg_type
{
CTRL_MSG_STOP = 0,
CTRL_MSG_OUTPUT = 1,
CTRL_MSG_CLEANUP = 2,
CTRL_MSG_REOPEN = 3,
};

struct ctrl_msg : falco::outputs::message
{
ctrl_msg_type type;
};

typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq;

falco_outputs_cbq m_queue;

std::thread m_worker_thread;
inline void push(ctrl_msg_type cmt);
void worker() noexcept;
void stop_worker();
};
Loading