From 247110bb9e52876f0412ece815c81ca055a3a898 Mon Sep 17 00:00:00 2001 From: Sami Kyostila Date: Wed, 14 Feb 2018 10:18:45 +0000 Subject: [PATCH] Improve ftrace scheduling This patch implements the following ftrace scheduling algorithm: main thread [drain] [unblock] /: | post .-----' : | / : v worker #0 [splice ...] [wakeup] [block ............] [splice] : worker #1 [splice ...] [wakeup] [block ........] [splice] : worker #2 [splice ..........................................] : : drain period (100ms) In other words, the splice(2) system call is used to move data from the raw kernel ftrace pipe into an intermediate pipe at a page granularity. This call allows every per-cpu worker to sleep until there is at least one page of data available. When a worker wakes up, it will attempt to move as many pages as possible to its staging pipe (up to 64K, depending on the system's pipe buffer size) in a non-blocking way. After this, it will notify the main thread that data is available. This notification will block the calling worker until the main thread has drained the data. When at least one worker has woken up, we schedule a drain operation on the main thread for the next drain period (every 100ms by default). The drain operation parses ftrace data from the staging pipes of every worker having pending data. After this, each waiting worker is allowed to issue another call to splice(), restarting the cycle. Change-Id: I9657a5d50cd936f190bc57f2c9778019cb7419ab --- include/perfetto/base/logging.h | 3 + .../ftrace_reader/ftrace_controller.h | 36 ++-- src/ftrace_reader/cpu_reader.cc | 149 ++++++++++++--- src/ftrace_reader/cpu_reader.h | 24 ++- src/ftrace_reader/ftrace_controller.cc | 136 +++++++++---- .../ftrace_controller_unittest.cc | 178 ++++++++++++++---- 6 files changed, 409 insertions(+), 117 deletions(-) diff --git a/include/perfetto/base/logging.h b/include/perfetto/base/logging.h index af9866fd9d..a5a07bcddc 100644 --- a/include/perfetto/base/logging.h +++ b/include/perfetto/base/logging.h @@ -100,6 +100,9 @@ constexpr const char* kLogFmt[] = {"\x1b[2m", "\x1b[39m", "\x1b[32m\x1b[1m", PERFETTO_IMMEDIATE_CRASH(); \ } while (0) +#define PERFETTO_PLOG(x) \ + PERFETTO_ELOG("%s (errno: %d, %s)", (x), errno, strerror(errno)) + #if PERFETTO_DCHECK_IS_ON() #define PERFETTO_DLOG(fmt, ...) PERFETTO_XLOG(kLogDebug, fmt, ##__VA_ARGS__) diff --git a/include/perfetto/ftrace_reader/ftrace_controller.h b/include/perfetto/ftrace_reader/ftrace_controller.h index 7de9e53c06..4dcbd71429 100644 --- a/include/perfetto/ftrace_reader/ftrace_controller.h +++ b/include/perfetto/ftrace_reader/ftrace_controller.h @@ -19,8 +19,11 @@ #include +#include +#include #include #include +#include #include #include #include @@ -41,6 +44,7 @@ class FtraceEventBundle; } // namespace protos const size_t kMaxSinks = 32; +const size_t kMaxCpus = 64; // Method of last resort to reset ftrace state. void HardResetFtraceState(); @@ -115,22 +119,30 @@ class FtraceController { base::TaskRunner*, std::unique_ptr); - // Called to read data from the raw pipe - // for the given |cpu|. Kicks off the reading/parsing - // of the pipe. Returns true if there is probably more to read. + // Called to read data from the staging pipe for the given |cpu| and parse it + // into the sinks. Protected and virtual for testing. + virtual void OnRawFtraceDataAvailable(size_t cpu); + // Protected and virtual for testing. - virtual bool OnRawFtraceDataAvailable(size_t cpu); + virtual uint64_t NowMs() const; private: friend FtraceSink; + friend class TestFtraceController; FRIEND_TEST(FtraceControllerIntegrationTest, EnableDisableEvent); FtraceController(const FtraceController&) = delete; FtraceController& operator=(const FtraceController&) = delete; - static void PeriodicDrainCPU(base::WeakPtr, - size_t generation, - int cpu); + // Called on a worker thread when |cpu| has at least one page of data + // available for reading. + void OnDataAvailable(base::WeakPtr, + size_t generation, + size_t cpu, + uint32_t drain_period_ms); + + static void DrainCPUs(base::WeakPtr, size_t generation); + static void UnblockReaders(base::WeakPtr); uint32_t GetDrainPeriodMs(); uint32_t GetCpuBufferSizeInPages(); @@ -146,13 +158,15 @@ class FtraceController { void StartIfNeeded(); void StopIfNeeded(); - // Returns a cached CpuReader for |cpu|. - // CpuReaders are constructed lazily and owned by the controller. - CpuReader* GetCpuReader(size_t cpu); + // Begin lock-protected members. + std::mutex lock_; + std::condition_variable data_drained_; + std::bitset cpus_to_drain_; + bool listening_for_raw_trace_data_ = false; + // End lock-protected members. std::unique_ptr ftrace_procfs_; size_t generation_ = 0; - bool listening_for_raw_trace_data_ = false; bool atrace_running_ = false; base::TaskRunner* task_runner_ = nullptr; std::vector enabled_count_; diff --git a/src/ftrace_reader/cpu_reader.cc b/src/ftrace_reader/cpu_reader.cc index 8108b3fe30..134c6feb43 100644 --- a/src/ftrace_reader/cpu_reader.cc +++ b/src/ftrace_reader/cpu_reader.cc @@ -16,6 +16,8 @@ #include "cpu_reader.h" +#include + #include #include "perfetto/base/logging.h" @@ -60,6 +62,12 @@ const std::vector BuildEnabledVector(const ProtoTranslationTable& table, return enabled; } +void SetBlocking(int fd, bool is_blocking) { + int flags = fcntl(fd, F_GETFL, 0); + flags = (is_blocking) ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); + PERFETTO_CHECK(fcntl(fd, F_SETFL, flags) == 0); +} + // For further documentation of these constants see the kernel source: // linux/include/linux/ring_buffer.h // Some information about the values of these constants are exposed to user @@ -96,42 +104,131 @@ EventFilter::~EventFilter() = default; CpuReader::CpuReader(const ProtoTranslationTable* table, size_t cpu, - base::ScopedFile fd) - : table_(table), cpu_(cpu), fd_(std::move(fd)) {} + base::ScopedFile fd, + std::function on_data_available) + : table_(table), cpu_(cpu), trace_fd_(std::move(fd)) { + int pipe_fds[2]; + PERFETTO_CHECK(pipe(&pipe_fds[0]) == 0); + staging_read_fd_.reset(pipe_fds[0]); + staging_write_fd_.reset(pipe_fds[1]); + + // Make reads from the raw pipe blocking so that splice() can sleep. + PERFETTO_CHECK(trace_fd_); + SetBlocking(*trace_fd_, true); + + // Reads from the staging pipe are always non-blocking. + SetBlocking(*staging_read_fd_, false); + + // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The + // blocking vs non-blocking behavior is controlled solely by the + // SPLICE_F_NONBLOCK flag passed to splice(). + SetBlocking(*staging_write_fd_, false); + + // We need a non-default SIGPIPE handler to make it so that the blocking + // splice() is woken up when the ~CpuReader() dtor destroys the pipes. + // Just masking out the signal would cause an implicit syscall restart and + // hence make the join() in the dtor unreliable. + struct sigaction current_act = {}; + PERFETTO_CHECK(sigaction(SIGPIPE, nullptr, ¤t_act) == 0); + if (current_act.sa_handler == SIG_DFL || current_act.sa_handler == SIG_IGN) { + struct sigaction act = {}; + act.sa_sigaction = [](int, siginfo_t*, void*) {}; + PERFETTO_CHECK(sigaction(SIGPIPE, &act, nullptr) == 0); + } + + worker_thread_ = + std::thread(std::bind(&RunWorkerThread, cpu_, *trace_fd_, + *staging_write_fd_, on_data_available)); +} + +CpuReader::~CpuReader() { + // Close the staging pipe to cause any pending splice on the worker thread to + // exit. + staging_read_fd_.reset(); + staging_write_fd_.reset(); + trace_fd_.reset(); + + // Not strictly required, but let's also raise the pipe signal explicitly just + // to be safe. + pthread_kill(worker_thread_.native_handle(), SIGPIPE); + worker_thread_.join(); +} + +// static +void CpuReader::RunWorkerThread(size_t cpu, + int trace_fd, + int staging_write_fd, + std::function on_data_available) { + // This thread is responsible for moving data from the trace pipe into the + // staging pipe at least one page at a time. This is done using the splice(2) + // system call, which unlike poll/select makes it possible to block until at + // least a full page of data is ready to be read. The downside is that as the + // call is blocking we need a dedicated thread for each trace pipe (i.e., + // CPU). + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "traced_probes%zu", cpu); + pthread_setname_np(pthread_self(), thread_name); + + while (true) { + // First do a blocking splice which sleeps until there is at least one + // page of data available and enough space to write it into the staging + // pipe. + int splice_res = splice(trace_fd, nullptr, staging_write_fd, nullptr, + base::kPageSize, SPLICE_F_MOVE); + if (splice_res < 0) { + // The kernel ftrace code has its own splice() implementation that can + // occasionally fail with transient errors not reported in man 2 splice. + // Just try again if we see these. + if (errno == ENOMEM || errno == EBUSY) { + PERFETTO_DPLOG("Transient splice failure -- retrying"); + usleep(100 * 1000); + continue; + } + PERFETTO_DCHECK(errno == EPIPE || errno == EINTR || errno == EBADF); + break; // ~CpuReader is waiting to join this thread. + } + + // Then do as many non-blocking splices as we can. This moves any full + // pages from the trace pipe into the staging pipe as long as there is + // data in the former and space in the latter. + while (true) { + splice_res = splice(trace_fd, nullptr, staging_write_fd, nullptr, + base::kPageSize, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); + if (splice_res < 0) { + if (errno != EAGAIN && errno != ENOMEM && errno != EBUSY) + PERFETTO_PLOG("splice"); + break; + } + } -int CpuReader::GetFileDescriptor() { - return fd_.get(); + // This callback will block until we are allowed to read more data. + on_data_available(); + } } bool CpuReader::Drain(const std::array& filters, const std::array& bundles) { - if (!fd_) - return false; + PERFETTO_DCHECK_THREAD(thread_checker_); + while (true) { + uint8_t* buffer = GetBuffer(); + long bytes = + PERFETTO_EINTR(read(*staging_read_fd_, buffer, base::kPageSize)); + if (bytes == -1 && errno == EAGAIN) + return true; + PERFETTO_CHECK(static_cast(bytes) == base::kPageSize); - uint8_t* buffer = GetBuffer(); - // TOOD(hjd): One read() per page may be too many. - long bytes = PERFETTO_EINTR(read(fd_.get(), buffer, base::kPageSize)); - if (bytes == -1 && errno == EAGAIN) - return false; - if (bytes != base::kPageSize) - return false; - PERFETTO_CHECK(static_cast(bytes) <= base::kPageSize); - - size_t evt_size = 0; - for (size_t i = 0; i < kMaxSinks; i++) { - if (!filters[i]) - break; - evt_size = ParsePage(cpu_, buffer, filters[i], &*bundles[i], table_); - PERFETTO_DCHECK(evt_size); + size_t evt_size = 0; + for (size_t i = 0; i < kMaxSinks; i++) { + if (!filters[i]) + break; + evt_size = ParsePage(cpu_, buffer, filters[i], &*bundles[i], table_); + PERFETTO_DCHECK(evt_size); + } } - - // TODO(hjd): Introduce enum to distinguish real failures. - return evt_size > (base::kPageSize / 2); } -CpuReader::~CpuReader() = default; - uint8_t* CpuReader::GetBuffer() { + PERFETTO_DCHECK_THREAD(thread_checker_); // TODO(primiano): Guard against overflows, like BufferedFrameDeserializer. if (!buffer_) buffer_ = std::unique_ptr(new uint8_t[base::kPageSize]); diff --git a/src/ftrace_reader/cpu_reader.h b/src/ftrace_reader/cpu_reader.h index 79e258d4e0..8a08a103c8 100644 --- a/src/ftrace_reader/cpu_reader.h +++ b/src/ftrace_reader/cpu_reader.h @@ -22,9 +22,11 @@ #include #include +#include #include "gtest/gtest_prod.h" #include "perfetto/base/scoped_file.h" +#include "perfetto/base/thread_checker.h" #include "perfetto/ftrace_reader/ftrace_controller.h" #include "perfetto/protozero/protozero_message.h" #include "proto_translation_table.h" @@ -64,17 +66,24 @@ class EventFilter { std::set enabled_names_; }; +// Processes raw ftrace data for a logical CPU core. class CpuReader { public: - CpuReader(const ProtoTranslationTable*, size_t cpu, base::ScopedFile fd); + // |on_data_available| will be called on an arbitrary thread when at least one + // page of ftrace data is available for draining on this CPU. + CpuReader(const ProtoTranslationTable*, + size_t cpu, + base::ScopedFile fd, + std::function on_data_available); ~CpuReader(); + // Drains all available data from the staging pipe into the given sinks. + // Should be called in response to the |on_data_available| callback. bool Drain( const std::array&, const std::array< protozero::ProtoZeroMessageHandle, kMaxSinks>&); - int GetFileDescriptor(); template static bool ReadAndAdvance(const uint8_t** ptr, const uint8_t* end, T* out) { @@ -128,14 +137,23 @@ class CpuReader { protozero::ProtoZeroMessage* message); private: + static void RunWorkerThread(size_t cpu, + int trace_fd, + int staging_write_fd, + std::function on_data_available); + uint8_t* GetBuffer(); CpuReader(const CpuReader&) = delete; CpuReader& operator=(const CpuReader&) = delete; const ProtoTranslationTable* table_; const size_t cpu_; - base::ScopedFile fd_; + base::ScopedFile trace_fd_; + base::ScopedFile staging_read_fd_; + base::ScopedFile staging_write_fd_; std::unique_ptr buffer_; + std::thread worker_thread_; + PERFETTO_THREAD_CHECKER(thread_checker_) }; } // namespace perfetto diff --git a/src/ftrace_reader/ftrace_controller.cc b/src/ftrace_reader/ftrace_controller.cc index 053eae6c53..755afe97ad 100644 --- a/src/ftrace_reader/ftrace_controller.cc +++ b/src/ftrace_reader/ftrace_controller.cc @@ -171,55 +171,87 @@ FtraceController::FtraceController(std::unique_ptr ftrace_procfs, weak_factory_(this) {} FtraceController::~FtraceController() { + PERFETTO_DCHECK_THREAD(thread_checker_); for (size_t id = 1; id <= table_->largest_id(); id++) { if (enabled_count_[id]) { const Event* event = table_->GetEventById(id); ftrace_procfs_->DisableEvent(event->group, event->name); } } - if (listening_for_raw_trace_data_) { - sinks_.clear(); - StopIfNeeded(); - } + sinks_.clear(); + StopIfNeeded(); +} + +uint64_t FtraceController::NowMs() const { + timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + return (now.tv_sec * 1000000000L + now.tv_nsec) / 1000000L; } // static -void FtraceController::PeriodicDrainCPU( - base::WeakPtr weak_this, - size_t generation, - int cpu) { +void FtraceController::DrainCPUs(base::WeakPtr weak_this, + size_t generation) { // The controller might be gone. if (!weak_this) return; - // We might have stopped caring about events. - if (!weak_this->listening_for_raw_trace_data_) - return; // We might have stopped tracing then quickly re-enabled it, in this case // we don't want to end up with two periodic tasks for each CPU: if (weak_this->generation_ != generation) return; - bool has_more = weak_this->OnRawFtraceDataAvailable(cpu); - weak_this->task_runner_->PostDelayedTask( - std::bind(&FtraceController::PeriodicDrainCPU, weak_this, generation, - cpu), - has_more ? 0 : weak_this->GetDrainPeriodMs()); + PERFETTO_DCHECK_THREAD(weak_this->thread_checker_); + std::bitset cpus_to_drain; + { + std::unique_lock lock(weak_this->lock_); + // We might have stopped caring about events. + if (!weak_this->listening_for_raw_trace_data_) + return; + std::swap(cpus_to_drain, weak_this->cpus_to_drain_); + } + + for (size_t cpu = 0; cpu < weak_this->ftrace_procfs_->NumberOfCpus(); cpu++) { + if (!cpus_to_drain[cpu]) + continue; + weak_this->OnRawFtraceDataAvailable(cpu); + } + + // If we filled up any SHM pages while draining the data, we will have posted + // a task to notify traced about this. Only unblock the readers after this + // notification is sent to make it less likely that they steal CPU time away + // from traced. + weak_this->task_runner_->PostTask( + std::bind(&FtraceController::UnblockReaders, weak_this)); +} + +// static +void FtraceController::UnblockReaders( + base::WeakPtr weak_this) { + if (!weak_this) + return; + // Unblock all waiting readers to start moving more data into their + // respective staging pipes. + weak_this->data_drained_.notify_all(); } void FtraceController::StartIfNeeded() { if (sinks_.size() > 1) return; PERFETTO_CHECK(sinks_.size() != 0); - PERFETTO_CHECK(!listening_for_raw_trace_data_); - listening_for_raw_trace_data_ = true; + { + std::unique_lock lock(lock_); + PERFETTO_CHECK(!listening_for_raw_trace_data_); + listening_for_raw_trace_data_ = true; + } ftrace_procfs_->SetCpuBufferSizeInPages(GetCpuBufferSizeInPages()); ftrace_procfs_->EnableTracing(); generation_++; + base::WeakPtr weak_this = weak_factory_.GetWeakPtr(); for (size_t cpu = 0; cpu < ftrace_procfs_->NumberOfCpus(); cpu++) { - base::WeakPtr weak_this = weak_factory_.GetWeakPtr(); - task_runner_->PostDelayedTask(std::bind(&FtraceController::PeriodicDrainCPU, - weak_this, generation_, cpu), - GetDrainPeriodMs()); + readers_.emplace( + cpu, std::unique_ptr(new CpuReader( + table_.get(), cpu, ftrace_procfs_->OpenPipeForCpu(cpu), + std::bind(&FtraceController::OnDataAvailable, this, weak_this, + generation_, cpu, GetDrainPeriodMs())))); } } @@ -258,14 +290,21 @@ void FtraceController::WriteTraceMarker(const std::string& s) { void FtraceController::StopIfNeeded() { if (sinks_.size() != 0) return; - PERFETTO_CHECK(listening_for_raw_trace_data_); - listening_for_raw_trace_data_ = false; + { + // Unblock any readers that are waiting for us to drain data. + std::unique_lock lock(lock_); + if (listening_for_raw_trace_data_) + ftrace_procfs_->DisableTracing(); + listening_for_raw_trace_data_ = false; + cpus_to_drain_.reset(); + } + data_drained_.notify_all(); readers_.clear(); - ftrace_procfs_->DisableTracing(); } -bool FtraceController::OnRawFtraceDataAvailable(size_t cpu) { - CpuReader* reader = GetCpuReader(cpu); +void FtraceController::OnRawFtraceDataAvailable(size_t cpu) { + PERFETTO_CHECK(cpu < ftrace_procfs_->NumberOfCpus()); + CpuReader* reader = readers_[cpu].get(); using BundleHandle = protozero::ProtoZeroMessageHandle; std::array filters{}; @@ -276,22 +315,11 @@ bool FtraceController::OnRawFtraceDataAvailable(size_t cpu) { filters[i] = sink->get_event_filter(); bundles[i++] = sink->GetBundleForCpu(cpu); } - bool res = reader->Drain(filters, bundles); + reader->Drain(filters, bundles); i = 0; for (FtraceSink* sink : sinks_) sink->OnBundleComplete(cpu, std::move(bundles[i++])); PERFETTO_DCHECK(sinks_.size() == sink_count); - return res; -} - -CpuReader* FtraceController::GetCpuReader(size_t cpu) { - PERFETTO_CHECK(cpu < ftrace_procfs_->NumberOfCpus()); - if (!readers_.count(cpu)) { - readers_.emplace( - cpu, std::unique_ptr(new CpuReader( - table_.get(), cpu, ftrace_procfs_->OpenPipeForCpu(cpu)))); - } - return readers_.at(cpu).get(); } std::unique_ptr FtraceController::CreateSink( @@ -313,6 +341,36 @@ std::unique_ptr FtraceController::CreateSink( return sink; } +void FtraceController::OnDataAvailable( + base::WeakPtr weak_this, + size_t generation, + size_t cpu, + uint32_t drain_period_ms) { + // Called on the worker thread. + PERFETTO_DCHECK(cpu < ftrace_procfs_->NumberOfCpus()); + std::unique_lock lock(lock_); + if (!listening_for_raw_trace_data_) + return; + if (cpus_to_drain_.none()) { + // If this was the first CPU to wake up, schedule a drain for the next drain + // interval. + uint64_t delay_ms = NowMs() % drain_period_ms; + if (!delay_ms) + delay_ms = drain_period_ms; + task_runner_->PostDelayedTask( + std::bind(&FtraceController::DrainCPUs, weak_this, generation), + static_cast(delay_ms)); + } + cpus_to_drain_[cpu] = true; + + // Wait until the main thread has finished draining. + // TODO(skyostil): The threads waiting here will all try to grab lock_ + // when woken up. Find a way to avoid this. + data_drained_.wait(lock, [this, cpu] { + return !cpus_to_drain_[cpu] || !listening_for_raw_trace_data_; + }); +} + void FtraceController::Register(FtraceSink* sink) { PERFETTO_DCHECK_THREAD(thread_checker_); auto it_and_inserted = sinks_.insert(sink); diff --git a/src/ftrace_reader/ftrace_controller_unittest.cc b/src/ftrace_reader/ftrace_controller_unittest.cc index 40caae3570..1d30d06f5b 100644 --- a/src/ftrace_reader/ftrace_controller_unittest.cc +++ b/src/ftrace_reader/ftrace_controller_unittest.cc @@ -27,6 +27,8 @@ #include "perfetto/ftrace_reader/ftrace_config.h" #include "proto_translation_table.h" +#include "src/base/test/test_task_runner.h" + #include "perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h" using testing::_; @@ -49,17 +51,30 @@ const char kBarEnablePath[] = "/root/events/group/bar/enable"; class MockTaskRunner : public base::TaskRunner { public: MockTaskRunner() { + ON_CALL(*this, PostTask(_)) + .WillByDefault(Invoke(this, &MockTaskRunner::OnPostTask)); ON_CALL(*this, PostDelayedTask(_, _)) .WillByDefault(Invoke(this, &MockTaskRunner::OnPostDelayedTask)); } + void OnPostTask(std::function task) { + std::unique_lock lock(lock_); + EXPECT_FALSE(task_); + task_ = std::move(task); + } + void OnPostDelayedTask(std::function task, int _delay) { + std::unique_lock lock(lock_); + EXPECT_FALSE(task_); task_ = std::move(task); } - void RunLastTask() { task_(); } + void RunLastTask() { TakeTask()(); } - std::function TakeTask() { return std::move(task_); } + std::function TakeTask() { + std::unique_lock lock(lock_); + return std::move(task_); + } MOCK_METHOD1(PostTask, void(std::function)); MOCK_METHOD2(PostDelayedTask, void(std::function, int delay_ms)); @@ -67,6 +82,7 @@ class MockTaskRunner : public base::TaskRunner { MOCK_METHOD1(RemoveFileDescriptorWatch, void(int fd)); private: + std::mutex lock_; std::function task_; }; @@ -110,16 +126,22 @@ std::unique_ptr FakeTable() { class MockFtraceProcfs : public FtraceProcfs { public: - MockFtraceProcfs() : FtraceProcfs("/root/") { - ON_CALL(*this, NumberOfCpus()).WillByDefault(Return(1)); + MockFtraceProcfs(size_t cpu_count = 1) : FtraceProcfs("/root/") { + ON_CALL(*this, NumberOfCpus()).WillByDefault(Return(cpu_count)); EXPECT_CALL(*this, NumberOfCpus()).Times(AnyNumber()); } + base::ScopedFile OpenPipeForCpu(size_t cpu) override { + return base::ScopedFile(open("/dev/null", O_RDONLY)); + } + MOCK_METHOD2(WriteToFile, bool(const std::string& path, const std::string& str)); MOCK_CONST_METHOD0(NumberOfCpus, size_t()); }; +} // namespace + class TestFtraceController : public FtraceController { public: TestFtraceController(std::unique_ptr ftrace_procfs, @@ -127,15 +149,38 @@ class TestFtraceController : public FtraceController { std::unique_ptr
table) : FtraceController(std::move(ftrace_procfs), runner, std::move(table)) {} - MOCK_METHOD1(OnRawFtraceDataAvailable, bool(size_t cpu)); + MOCK_METHOD1(OnRawFtraceDataAvailable, void(size_t cpu)); + + uint64_t NowMs() const override { return now_ms; } + + uint32_t drain_period_ms() { return GetDrainPeriodMs(); } + + std::function GetDataAvailableCallback(size_t cpu) { + base::WeakPtr weak_this = weak_factory_.GetWeakPtr(); + size_t generation = generation_; + return [this, weak_this, generation, cpu] { + OnDataAvailable(weak_this, generation, cpu, GetDrainPeriodMs()); + }; + } + + void WaitForData(size_t cpu) { + while (true) { + { + std::unique_lock lock(lock_); + if (cpus_to_drain_[cpu]) + return; + } + usleep(5000); + } + } + + uint64_t now_ms = 0; private: TestFtraceController(const TestFtraceController&) = delete; TestFtraceController& operator=(const TestFtraceController&) = delete; }; -} // namespace - TEST(FtraceControllerTest, NonExistentEventsDontCrash) { NiceMock task_runner; auto ftrace_procfs = @@ -173,7 +218,6 @@ TEST(FtraceControllerTest, OneSink) { FtraceConfig config = CreateFtraceConfig({"foo"}); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile("/root/tracing_on", "1")); - EXPECT_CALL(task_runner, PostDelayedTask(_, _)); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile(kFooEnablePath, "1")); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile("/root/buffer_size_kb", _)); std::unique_ptr sink = controller.CreateSink(config, &delegate); @@ -199,7 +243,6 @@ TEST(FtraceControllerTest, MultipleSinks) { EXPECT_CALL(*raw_ftrace_procfs, WriteToFile("/root/tracing_on", "1")); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile("/root/buffer_size_kb", _)); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile(kFooEnablePath, "1")); - EXPECT_CALL(task_runner, PostDelayedTask(_, _)); std::unique_ptr sinkA = controller.CreateSink(configA, &delegate); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile(kBarEnablePath, "1")); @@ -227,7 +270,6 @@ TEST(FtraceControllerTest, ControllerMayDieFirst) { EXPECT_CALL(*raw_ftrace_procfs, WriteToFile("/root/buffer_size_kb", _)); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile("/root/tracing_on", "1")); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile(kFooEnablePath, "1")); - EXPECT_CALL(task_runner, PostDelayedTask(_, _)); std::unique_ptr sink = controller->CreateSink(config, &delegate); EXPECT_CALL(*raw_ftrace_procfs, WriteToFile(kFooEnablePath, "0")); @@ -240,7 +282,7 @@ TEST(FtraceControllerTest, ControllerMayDieFirst) { TEST(FtraceControllerTest, TaskScheduling) { MockTaskRunner task_runner; auto ftrace_procfs = - std::unique_ptr(new MockFtraceProcfs()); + std::unique_ptr(new MockFtraceProcfs(2u)); auto raw_ftrace_procfs = ftrace_procfs.get(); TestFtraceController controller(std::move(ftrace_procfs), &task_runner, FakeTable()); @@ -251,26 +293,81 @@ TEST(FtraceControllerTest, TaskScheduling) { MockDelegate delegate; FtraceConfig config = CreateFtraceConfig({"foo"}); - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); std::unique_ptr sink = controller.CreateSink(config, &delegate); - // Running task will call OnRawFtraceDataAvailable: - EXPECT_CALL(controller, OnRawFtraceDataAvailable(_)).WillOnce(Return(true)); - // And since we return true (= there is more data) we re-schedule immediately: - EXPECT_CALL(task_runner, PostDelayedTask(_, 0)); + // Only one call to drain should be scheduled for the next drain period. + EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); + + // However both CPUs should be drained. + EXPECT_CALL(controller, OnRawFtraceDataAvailable(_)).Times(2); + + // Finally, another task should be posted to unblock the workers. + EXPECT_CALL(task_runner, PostTask(_)); + + // Simulate two worker threads reporting available data. + auto on_data_available0 = controller.GetDataAvailableCallback(0u); + std::thread worker0([on_data_available0] { on_data_available0(); }); + + auto on_data_available1 = controller.GetDataAvailableCallback(1u); + std::thread worker1([on_data_available1] { on_data_available1(); }); + + // Poll until both worker threads have reported available data. + controller.WaitForData(0u); + controller.WaitForData(1u); + + // Run the task to drain all CPUs. task_runner.RunLastTask(); - // Running task will call OnRawFtraceDataAvailable: - EXPECT_CALL(controller, OnRawFtraceDataAvailable(_)).WillOnce(Return(false)); - // And since we return false (= no more data) we re-schedule in 100ms: - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); + // Run the task to unblock all workers. task_runner.RunLastTask(); + worker0.join(); + worker1.join(); + sink.reset(); +} - // The task may be run after the sink is gone, in this case we shouldn't call - // OnRawFtraceDataAvailable and shouldn't reschedule. - task_runner.RunLastTask(); +TEST(FtraceControllerTest, DrainPeriodRespected) { + MockTaskRunner task_runner; + auto ftrace_procfs = + std::unique_ptr(new MockFtraceProcfs()); + auto raw_ftrace_procfs = ftrace_procfs.get(); + TestFtraceController controller(std::move(ftrace_procfs), &task_runner, + FakeTable()); + + // For this test we don't care about calls to WriteToFile. + EXPECT_CALL(*raw_ftrace_procfs, WriteToFile(_, _)).Times(AnyNumber()); + + MockDelegate delegate; + FtraceConfig config = CreateFtraceConfig({"foo"}); + + // Test several cycles of a worker producing data and make sure the drain + // delay is consistent with the drain period. + std::unique_ptr sink = controller.CreateSink(config, &delegate); + + const int kCycles = 50; + EXPECT_CALL(task_runner, PostDelayedTask(_, controller.drain_period_ms())) + .Times(kCycles); + EXPECT_CALL(controller, OnRawFtraceDataAvailable(_)).Times(kCycles); + EXPECT_CALL(task_runner, PostTask(_)).Times(kCycles); + + // Simulate a worker thread continually reporting pages of available data. + auto on_data_available = controller.GetDataAvailableCallback(0u); + std::thread worker([on_data_available] { + for (int i = 0; i < kCycles; i++) + on_data_available(); + }); + + for (int i = 0; i < kCycles; i++) { + controller.WaitForData(0u); + // Run two tasks: one to drain each CPU and another to unblock the worker. + task_runner.RunLastTask(); + task_runner.RunLastTask(); + controller.now_ms += controller.drain_period_ms(); + } + + worker.join(); + sink.reset(); } TEST(FtraceControllerTest, BackToBackEnableDisable) { @@ -287,23 +384,28 @@ TEST(FtraceControllerTest, BackToBackEnableDisable) { MockDelegate delegate; FtraceConfig config = CreateFtraceConfig({"foo"}); - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); + EXPECT_CALL(task_runner, PostDelayedTask(_, 100)).Times(2); std::unique_ptr sink_a = controller.CreateSink(config, &delegate); + + auto on_data_available = controller.GetDataAvailableCallback(0u); + std::thread worker([on_data_available] { on_data_available(); }); + controller.WaitForData(0u); + + // Disable the first sink and run the delayed task that it generated. It + // should be a no-op. sink_a.reset(); - std::function task_a = task_runner.TakeTask(); + task_runner.RunLastTask(); + worker.join(); - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); + // Register another sink and wait for it to generate data. std::unique_ptr sink_b = controller.CreateSink(config, &delegate); - std::function task_b = task_runner.TakeTask(); - - // Task A shouldn't reschedule: - task_a(); - // But task B should: - EXPECT_CALL(controller, OnRawFtraceDataAvailable(_)).WillOnce(Return(false)); - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); - task_b(); + std::thread worker2([on_data_available] { on_data_available(); }); + controller.WaitForData(0u); + // This drain should also be a no-op after the sink is unregistered. sink_b.reset(); + task_runner.RunLastTask(); + worker2.join(); } TEST(FtraceControllerTest, BufferSize) { @@ -386,33 +488,33 @@ TEST(FtraceControllerTest, PeriodicDrainConfig) { { // No period -> good default. - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); FtraceConfig config = CreateFtraceConfig({"foo"}); auto sink = controller.CreateSink(config, &delegate); + EXPECT_EQ(100u, controller.drain_period_ms()); } { // Pick a tiny value -> good default. - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); FtraceConfig config = CreateFtraceConfig({"foo"}); config.set_drain_period_ms(0); auto sink = controller.CreateSink(config, &delegate); + EXPECT_EQ(100u, controller.drain_period_ms()); } { // Pick a huge value -> good default. - EXPECT_CALL(task_runner, PostDelayedTask(_, 100)); FtraceConfig config = CreateFtraceConfig({"foo"}); config.set_drain_period_ms(1000 * 60 * 60); auto sink = controller.CreateSink(config, &delegate); + EXPECT_EQ(100u, controller.drain_period_ms()); } { // Pick a resonable value -> get that value. - EXPECT_CALL(task_runner, PostDelayedTask(_, 200)); FtraceConfig config = CreateFtraceConfig({"foo"}); config.set_drain_period_ms(200); auto sink = controller.CreateSink(config, &delegate); + EXPECT_EQ(200u, controller.drain_period_ms()); } }