Skip to content

Commit

Permalink
Improve ftrace scheduling
Browse files Browse the repository at this point in the history
This patch implements the following ftrace scheduling algorithm:


  main thread                           [drain] [unblock]
                                        /:              |
                            post .-----' :              |
                                /        :              v
  worker #0  [splice ...] [wakeup] [block ............] [splice]
                                         :
  worker #1  [splice ...]     [wakeup] [block ........] [splice]
                                         :
  worker #2  [splice ..........................................]
                                         :
                                         :
                                    drain period (100ms)

In other words, the splice(2) system call is used to move data from
the raw kernel ftrace pipe into an intermediate pipe at a page
granularity. This call allows every per-cpu worker to sleep until there
is at least one page of data available.

When a worker wakes up, it will attempt to move as many pages as
possible to its staging pipe (up to 64K, depending on the
system's pipe buffer size) in a non-blocking way. After this, it
will notify the main thread that data is available. This notification
will block the calling worker until the main thread has drained the
data.

When at least one worker has woken up, we schedule a drain operation
on the main thread for the next drain period (every 100ms by default).
The drain operation parses ftrace data from the staging pipes of
every worker having pending data. After this, each waiting worker is
allowed to issue another call to splice(), restarting the cycle.

Change-Id: I9657a5d50cd936f190bc57f2c9778019cb7419ab
  • Loading branch information
skyostil committed Feb 14, 2018
1 parent 6eb0ee5 commit 247110b
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 117 deletions.
3 changes: 3 additions & 0 deletions include/perfetto/base/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ constexpr const char* kLogFmt[] = {"\x1b[2m", "\x1b[39m", "\x1b[32m\x1b[1m",
PERFETTO_IMMEDIATE_CRASH(); \
} while (0)

#define PERFETTO_PLOG(x) \
PERFETTO_ELOG("%s (errno: %d, %s)", (x), errno, strerror(errno))

#if PERFETTO_DCHECK_IS_ON()

#define PERFETTO_DLOG(fmt, ...) PERFETTO_XLOG(kLogDebug, fmt, ##__VA_ARGS__)
Expand Down
36 changes: 25 additions & 11 deletions include/perfetto/ftrace_reader/ftrace_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

#include <unistd.h>

#include <bitset>
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
Expand All @@ -41,6 +44,7 @@ class FtraceEventBundle;
} // namespace protos

const size_t kMaxSinks = 32;
const size_t kMaxCpus = 64;

// Method of last resort to reset ftrace state.
void HardResetFtraceState();
Expand Down Expand Up @@ -115,22 +119,30 @@ class FtraceController {
base::TaskRunner*,
std::unique_ptr<ProtoTranslationTable>);

// Called to read data from the raw pipe
// for the given |cpu|. Kicks off the reading/parsing
// of the pipe. Returns true if there is probably more to read.
// Called to read data from the staging pipe for the given |cpu| and parse it
// into the sinks. Protected and virtual for testing.
virtual void OnRawFtraceDataAvailable(size_t cpu);

// Protected and virtual for testing.
virtual bool OnRawFtraceDataAvailable(size_t cpu);
virtual uint64_t NowMs() const;

private:
friend FtraceSink;
friend class TestFtraceController;
FRIEND_TEST(FtraceControllerIntegrationTest, EnableDisableEvent);

FtraceController(const FtraceController&) = delete;
FtraceController& operator=(const FtraceController&) = delete;

static void PeriodicDrainCPU(base::WeakPtr<FtraceController>,
size_t generation,
int cpu);
// Called on a worker thread when |cpu| has at least one page of data
// available for reading.
void OnDataAvailable(base::WeakPtr<FtraceController>,
size_t generation,
size_t cpu,
uint32_t drain_period_ms);

static void DrainCPUs(base::WeakPtr<FtraceController>, size_t generation);
static void UnblockReaders(base::WeakPtr<FtraceController>);

uint32_t GetDrainPeriodMs();
uint32_t GetCpuBufferSizeInPages();
Expand All @@ -146,13 +158,15 @@ class FtraceController {
void StartIfNeeded();
void StopIfNeeded();

// Returns a cached CpuReader for |cpu|.
// CpuReaders are constructed lazily and owned by the controller.
CpuReader* GetCpuReader(size_t cpu);
// Begin lock-protected members.
std::mutex lock_;
std::condition_variable data_drained_;
std::bitset<kMaxCpus> cpus_to_drain_;
bool listening_for_raw_trace_data_ = false;
// End lock-protected members.

std::unique_ptr<FtraceProcfs> ftrace_procfs_;
size_t generation_ = 0;
bool listening_for_raw_trace_data_ = false;
bool atrace_running_ = false;
base::TaskRunner* task_runner_ = nullptr;
std::vector<size_t> enabled_count_;
Expand Down
149 changes: 123 additions & 26 deletions src/ftrace_reader/cpu_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "cpu_reader.h"

#include <signal.h>

#include <utility>

#include "perfetto/base/logging.h"
Expand Down Expand Up @@ -60,6 +62,12 @@ const std::vector<bool> BuildEnabledVector(const ProtoTranslationTable& table,
return enabled;
}

void SetBlocking(int fd, bool is_blocking) {
int flags = fcntl(fd, F_GETFL, 0);
flags = (is_blocking) ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
PERFETTO_CHECK(fcntl(fd, F_SETFL, flags) == 0);
}

// For further documentation of these constants see the kernel source:
// linux/include/linux/ring_buffer.h
// Some information about the values of these constants are exposed to user
Expand Down Expand Up @@ -96,42 +104,131 @@ EventFilter::~EventFilter() = default;

CpuReader::CpuReader(const ProtoTranslationTable* table,
size_t cpu,
base::ScopedFile fd)
: table_(table), cpu_(cpu), fd_(std::move(fd)) {}
base::ScopedFile fd,
std::function<void()> on_data_available)
: table_(table), cpu_(cpu), trace_fd_(std::move(fd)) {
int pipe_fds[2];
PERFETTO_CHECK(pipe(&pipe_fds[0]) == 0);
staging_read_fd_.reset(pipe_fds[0]);
staging_write_fd_.reset(pipe_fds[1]);

// Make reads from the raw pipe blocking so that splice() can sleep.
PERFETTO_CHECK(trace_fd_);
SetBlocking(*trace_fd_, true);

// Reads from the staging pipe are always non-blocking.
SetBlocking(*staging_read_fd_, false);

// Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
// blocking vs non-blocking behavior is controlled solely by the
// SPLICE_F_NONBLOCK flag passed to splice().
SetBlocking(*staging_write_fd_, false);

// We need a non-default SIGPIPE handler to make it so that the blocking
// splice() is woken up when the ~CpuReader() dtor destroys the pipes.
// Just masking out the signal would cause an implicit syscall restart and
// hence make the join() in the dtor unreliable.
struct sigaction current_act = {};
PERFETTO_CHECK(sigaction(SIGPIPE, nullptr, &current_act) == 0);
if (current_act.sa_handler == SIG_DFL || current_act.sa_handler == SIG_IGN) {
struct sigaction act = {};
act.sa_sigaction = [](int, siginfo_t*, void*) {};
PERFETTO_CHECK(sigaction(SIGPIPE, &act, nullptr) == 0);
}

worker_thread_ =
std::thread(std::bind(&RunWorkerThread, cpu_, *trace_fd_,
*staging_write_fd_, on_data_available));
}

CpuReader::~CpuReader() {
// Close the staging pipe to cause any pending splice on the worker thread to
// exit.
staging_read_fd_.reset();
staging_write_fd_.reset();
trace_fd_.reset();

// Not strictly required, but let's also raise the pipe signal explicitly just
// to be safe.
pthread_kill(worker_thread_.native_handle(), SIGPIPE);
worker_thread_.join();
}

// static
void CpuReader::RunWorkerThread(size_t cpu,
int trace_fd,
int staging_write_fd,
std::function<void()> on_data_available) {
// This thread is responsible for moving data from the trace pipe into the
// staging pipe at least one page at a time. This is done using the splice(2)
// system call, which unlike poll/select makes it possible to block until at
// least a full page of data is ready to be read. The downside is that as the
// call is blocking we need a dedicated thread for each trace pipe (i.e.,
// CPU).
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "traced_probes%zu", cpu);
pthread_setname_np(pthread_self(), thread_name);

while (true) {
// First do a blocking splice which sleeps until there is at least one
// page of data available and enough space to write it into the staging
// pipe.
int splice_res = splice(trace_fd, nullptr, staging_write_fd, nullptr,
base::kPageSize, SPLICE_F_MOVE);
if (splice_res < 0) {
// The kernel ftrace code has its own splice() implementation that can
// occasionally fail with transient errors not reported in man 2 splice.
// Just try again if we see these.
if (errno == ENOMEM || errno == EBUSY) {
PERFETTO_DPLOG("Transient splice failure -- retrying");
usleep(100 * 1000);
continue;
}
PERFETTO_DCHECK(errno == EPIPE || errno == EINTR || errno == EBADF);
break; // ~CpuReader is waiting to join this thread.
}

// Then do as many non-blocking splices as we can. This moves any full
// pages from the trace pipe into the staging pipe as long as there is
// data in the former and space in the latter.
while (true) {
splice_res = splice(trace_fd, nullptr, staging_write_fd, nullptr,
base::kPageSize, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
if (splice_res < 0) {
if (errno != EAGAIN && errno != ENOMEM && errno != EBUSY)
PERFETTO_PLOG("splice");
break;
}
}

int CpuReader::GetFileDescriptor() {
return fd_.get();
// This callback will block until we are allowed to read more data.
on_data_available();
}
}

bool CpuReader::Drain(const std::array<const EventFilter*, kMaxSinks>& filters,
const std::array<BundleHandle, kMaxSinks>& bundles) {
if (!fd_)
return false;
PERFETTO_DCHECK_THREAD(thread_checker_);
while (true) {
uint8_t* buffer = GetBuffer();
long bytes =
PERFETTO_EINTR(read(*staging_read_fd_, buffer, base::kPageSize));
if (bytes == -1 && errno == EAGAIN)
return true;
PERFETTO_CHECK(static_cast<size_t>(bytes) == base::kPageSize);

uint8_t* buffer = GetBuffer();
// TOOD(hjd): One read() per page may be too many.
long bytes = PERFETTO_EINTR(read(fd_.get(), buffer, base::kPageSize));
if (bytes == -1 && errno == EAGAIN)
return false;
if (bytes != base::kPageSize)
return false;
PERFETTO_CHECK(static_cast<size_t>(bytes) <= base::kPageSize);

size_t evt_size = 0;
for (size_t i = 0; i < kMaxSinks; i++) {
if (!filters[i])
break;
evt_size = ParsePage(cpu_, buffer, filters[i], &*bundles[i], table_);
PERFETTO_DCHECK(evt_size);
size_t evt_size = 0;
for (size_t i = 0; i < kMaxSinks; i++) {
if (!filters[i])
break;
evt_size = ParsePage(cpu_, buffer, filters[i], &*bundles[i], table_);
PERFETTO_DCHECK(evt_size);
}
}

// TODO(hjd): Introduce enum to distinguish real failures.
return evt_size > (base::kPageSize / 2);
}

CpuReader::~CpuReader() = default;

uint8_t* CpuReader::GetBuffer() {
PERFETTO_DCHECK_THREAD(thread_checker_);
// TODO(primiano): Guard against overflows, like BufferedFrameDeserializer.
if (!buffer_)
buffer_ = std::unique_ptr<uint8_t[]>(new uint8_t[base::kPageSize]);
Expand Down
24 changes: 21 additions & 3 deletions src/ftrace_reader/cpu_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

#include <array>
#include <memory>
#include <thread>

#include "gtest/gtest_prod.h"
#include "perfetto/base/scoped_file.h"
#include "perfetto/base/thread_checker.h"
#include "perfetto/ftrace_reader/ftrace_controller.h"
#include "perfetto/protozero/protozero_message.h"
#include "proto_translation_table.h"
Expand Down Expand Up @@ -64,17 +66,24 @@ class EventFilter {
std::set<std::string> enabled_names_;
};

// Processes raw ftrace data for a logical CPU core.
class CpuReader {
public:
CpuReader(const ProtoTranslationTable*, size_t cpu, base::ScopedFile fd);
// |on_data_available| will be called on an arbitrary thread when at least one
// page of ftrace data is available for draining on this CPU.
CpuReader(const ProtoTranslationTable*,
size_t cpu,
base::ScopedFile fd,
std::function<void()> on_data_available);
~CpuReader();

// Drains all available data from the staging pipe into the given sinks.
// Should be called in response to the |on_data_available| callback.
bool Drain(
const std::array<const EventFilter*, kMaxSinks>&,
const std::array<
protozero::ProtoZeroMessageHandle<protos::pbzero::FtraceEventBundle>,
kMaxSinks>&);
int GetFileDescriptor();

template <typename T>
static bool ReadAndAdvance(const uint8_t** ptr, const uint8_t* end, T* out) {
Expand Down Expand Up @@ -128,14 +137,23 @@ class CpuReader {
protozero::ProtoZeroMessage* message);

private:
static void RunWorkerThread(size_t cpu,
int trace_fd,
int staging_write_fd,
std::function<void()> on_data_available);

uint8_t* GetBuffer();
CpuReader(const CpuReader&) = delete;
CpuReader& operator=(const CpuReader&) = delete;

const ProtoTranslationTable* table_;
const size_t cpu_;
base::ScopedFile fd_;
base::ScopedFile trace_fd_;
base::ScopedFile staging_read_fd_;
base::ScopedFile staging_write_fd_;
std::unique_ptr<uint8_t[]> buffer_;
std::thread worker_thread_;
PERFETTO_THREAD_CHECKER(thread_checker_)
};

} // namespace perfetto
Expand Down
Loading

0 comments on commit 247110b

Please sign in to comment.