Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(userspace/falco): re-design stats writer and make it thread-safe #2109

Merged
merged 9 commits into from Aug 26, 2022
2 changes: 1 addition & 1 deletion userspace/falco/CMakeLists.txt
Expand Up @@ -48,7 +48,7 @@ set(
outputs_stdout.cpp
outputs_syslog.cpp
event_drops.cpp
statsfilewriter.cpp
stats_writer.cpp
falco.cpp
)

Expand Down
33 changes: 20 additions & 13 deletions userspace/falco/app_actions/process_events.cpp
Expand Up @@ -26,7 +26,7 @@ limitations under the License.
#ifndef MINIMAL_BUILD
#include "webserver.h"
#endif
#include "statsfilewriter.h"
#include "stats_writer.h"
#include "application.h"
#include "falco_outputs.h"
#include "token_bucket.h"
Expand All @@ -39,12 +39,13 @@ using namespace falco::app;
// Event processing loop
//
application::run_result application::do_inspect(syscall_evt_drop_mgr &sdropmgr,
std::shared_ptr<stats_writer> statsw,
uint64_t duration_to_tot_ns,
uint64_t &num_evts)
{
int32_t rc;
sinsp_evt* ev;
StatsFileWriter writer;
stats_writer::collector stats_collector(statsw);
uint64_t duration_start = 0;
uint32_t timeouts_since_last_success_or_msg = 0;
std::size_t source_idx;
Expand All @@ -70,16 +71,6 @@ application::run_result application::do_inspect(syscall_evt_drop_mgr &sdropmgr,
m_state->config->m_syscall_evt_drop_max_burst,
m_state->config->m_syscall_evt_simulate_drops);

if (m_options.stats_filename != "")
{
string errstr;

if (!writer.init(m_state->inspector, m_options.stats_filename, m_options.stats_interval, errstr))
{
return run_result::fatal(errstr);
}
}

//
// Loop through the events
//
Expand All @@ -88,7 +79,7 @@ application::run_result application::do_inspect(syscall_evt_drop_mgr &sdropmgr,

rc = m_state->inspector->next(&ev);

writer.handle();
stats_collector.collect(m_state->inspector);

if(m_state->reopen_outputs)
{
Expand Down Expand Up @@ -220,10 +211,26 @@ application::run_result application::process_events()
scap_stats cstats;
uint64_t num_evts = 0;
run_result ret;
std::shared_ptr<stats_writer> statsw;

if (!m_options.stats_filename.empty())
{
std::string err;
if (!stats_writer::init_ticker(m_options.stats_interval, err))
{
return run_result::fatal(err);
}
statsw.reset(new stats_writer(m_options.stats_filename));
}
else
{
statsw.reset(new stats_writer());
}

duration = ((double)clock()) / CLOCKS_PER_SEC;

ret = do_inspect(sdropmgr,
statsw,
uint64_t(m_options.duration_to_tot*ONE_SECOND_IN_NS),
num_evts);

Expand Down
2 changes: 2 additions & 0 deletions userspace/falco/application.h
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#pragma once

#include "configuration.h"
#include "stats_writer.h"
#ifndef MINIMAL_BUILD
#include "grpc_server.h"
#include "webserver.h"
Expand Down Expand Up @@ -222,6 +223,7 @@ class application {
void print_all_ignored_events();
void format_plugin_info(std::shared_ptr<sinsp_plugin> p, std::ostream& os) const;
run_result do_inspect(syscall_evt_drop_mgr &sdropmgr,
std::shared_ptr<stats_writer> statsw,
uint64_t duration_to_tot_ns,
uint64_t &num_events);

Expand Down
187 changes: 187 additions & 0 deletions userspace/falco/stats_writer.cpp
@@ -0,0 +1,187 @@
/*
Copyright (C) 2022 The Falco Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <sys/time.h>
#include <signal.h>
#include <nlohmann/json.hpp>
#include <atomic>

#include <nlohmann/json.hpp>

#include "stats_writer.h"
#include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used
#include "logger.h"

// note: ticker_t is an uint16_t, which is enough because we don't care about
// overflows here. Threads calling stats_writer::handle() will just
// check that this value changed since their last observation.
static std::atomic<stats_writer::ticker_t> s_timer((stats_writer::ticker_t) 0);

static void timer_handler(int signum)
{
s_timer.fetch_add(1, std::memory_order_relaxed);
}

bool stats_writer::init_ticker(uint32_t interval_msec, string &err)
{
struct itimerval timer;
struct sigaction handler;

memset (&handler, 0, sizeof (handler));
handler.sa_handler = &timer_handler;
if (sigaction(SIGALRM, &handler, NULL) == -1)
{
err = string("Could not set up signal handler for periodic timer: ") + strerror(errno);
return false;
}

timer.it_value.tv_sec = interval_msec / 1000;
timer.it_value.tv_usec = (interval_msec % 1000) * 1000;
timer.it_interval = timer.it_value;
if (setitimer(ITIMER_REAL, &timer, NULL) == -1)
{
err = string("Could not set up periodic timer: ") + strerror(errno);
return false;
}

return true;
}

stats_writer::ticker_t stats_writer::get_ticker()
{
return s_timer.load(std::memory_order_relaxed);
}

stats_writer::stats_writer()
: m_initialized(false), m_total_samples(0)
{

}

stats_writer::stats_writer(const std::string &filename)
: m_initialized(true), m_total_samples(0)
{
m_output.exceptions(ofstream::failbit | ofstream::badbit);
m_output.open(filename, ios_base::app);
m_worker = std::thread(&stats_writer::worker, this);
}

stats_writer::~stats_writer()
{
if (m_initialized)
{
stop_worker();
m_output.close();
}
}

bool stats_writer::has_output() const
{
return m_initialized;
}

void stats_writer::stop_worker()
{
stats_writer::msg msg;
msg.stop = true;
push(msg);
if(m_worker.joinable())
{
m_worker.join();
}
}

inline void stats_writer::push(const stats_writer::msg& m)
{
if (!m_queue.try_push(m))
{
fprintf(stderr, "Fatal error: Stats queue reached maximum capacity. Exiting.\n");
exit(EXIT_FAILURE);
}
}

void stats_writer::worker() noexcept
{
stats_writer::msg m;
nlohmann::json jmsg;

while(true)
{
// blocks until a message becomes availables
m_queue.pop(m);
if (m.stop)
{
return;
}

m_total_samples++;
try
{
jmsg["sample"] = m_total_samples;
jmsg["cur"]["events"] = m.stats.n_evts;
jmsg["cur"]["drops"] = m.stats.n_drops;
jmsg["cur"]["preemptions"] = m.stats.n_preemptions;
jmsg["cur"]["drop_pct"] = (m.stats.n_evts == 0 ? 0.0 : (100.0*m.stats.n_drops/m.stats.n_evts));
jmsg["delta"]["events"] = m.delta.n_evts;
jmsg["delta"]["drops"] = m.delta.n_drops;
jmsg["delta"]["preemptions"] = m.delta.n_preemptions;
jmsg["delta"]["drop_pct"] = (m.delta.n_evts == 0 ? 0.0 : (100.0*m.delta.n_drops/m.delta.n_evts));
m_output << jmsg.dump() << endl;
}
catch(const exception &e)
{
falco_logger::log(LOG_ERR, "stats_writer (worker): " + string(e.what()) + "\n");
}
}
}

stats_writer::collector::collector(std::shared_ptr<stats_writer> writer)
: m_writer(writer), m_last_tick(0), m_samples(0)
{

}

void stats_writer::collector::collect(std::shared_ptr<sinsp> inspector)
{
// just skip if no output is configured
if (m_writer->has_output())
{
// collect stats once per each ticker period
auto tick = stats_writer::get_ticker();
if (tick != m_last_tick)
{
stats_writer::msg msg;
msg.stop = false;
inspector->get_capture_stats(&msg.stats);
m_samples++;
if(m_samples == 1)
{
msg.delta = msg.stats;
}
else
{
msg.delta.n_evts = msg.stats.n_evts - m_last_stats.n_evts;
msg.delta.n_drops = msg.stats.n_drops - m_last_stats.n_drops;
msg.delta.n_preemptions = msg.stats.n_preemptions - m_last_stats.n_preemptions;
}

m_last_tick = tick;
m_last_stats = msg.stats;
m_writer->push(msg);
}
}
}