Skip to content

Commit

Permalink
[task]Added a priority queue + new functions to sequence class to sto…
Browse files Browse the repository at this point in the history
…re and handle delayed tasks + testing

This is part of a multi-step solution to reduce context switches in the cross-platform threadpool and save battery power on Chrome.

Bug: 1127430
Change-Id: I408b8ba1f6a4695cea5c3926e1f72b402fca0782
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3763124
Commit-Queue: Abdias Dagbekpo <adagbekpo@google.com>
Reviewed-by: Etienne Pierre-Doray <etiennep@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1033309}
  • Loading branch information
Abdias Dagbekpo authored and Chromium LUCI CQ committed Aug 10, 2022
1 parent 39fc912 commit f0a619d
Show file tree
Hide file tree
Showing 15 changed files with 769 additions and 141 deletions.
9 changes: 9 additions & 0 deletions base/task/thread_pool/job_task_source.cc
Expand Up @@ -358,6 +358,15 @@ bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
GetMaxConcurrency(state_before_sub.worker_count() - 1);
}

// This is a no-op and should always return true.
bool JobTaskSource::WillReEnqueue(TimeTicks now,
TaskSource::Transaction* /*transaction*/) {
return true;
}

// This is a no-op.
void JobTaskSource::OnBecomeReady() {}

TaskSourceSortKey JobTaskSource::GetSortKey(
bool disable_fair_scheduling) const {
if (disable_fair_scheduling) {
Expand Down
3 changes: 3 additions & 0 deletions base/task/thread_pool/job_task_source.h
Expand Up @@ -193,6 +193,9 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
Task TakeTask(TaskSource::Transaction* transaction) override;
Task Clear(TaskSource::Transaction* transaction) override;
bool DidProcessTask(TaskSource::Transaction* transaction) override;
bool WillReEnqueue(TimeTicks now,
TaskSource::Transaction* transaction) override;
void OnBecomeReady() override;

// Synchronizes access to workers state.
mutable CheckedLock worker_lock_{UniversalSuccessor()};
Expand Down
16 changes: 10 additions & 6 deletions base/task/thread_pool/pooled_single_thread_task_runner_manager.cc
Expand Up @@ -131,8 +131,12 @@ class WorkerThreadDelegate : public WorkerThread::Delegate {

void DidProcessTask(RegisteredTaskSource task_source) override {
if (task_source) {
EnqueueTaskSource(TransactionWithRegisteredTaskSource::FromTaskSource(
std::move(task_source)));
auto task_source_with_transaction =
TransactionWithRegisteredTaskSource::FromTaskSource(
std::move(task_source));
task_source_with_transaction.task_source.WillReEnqueue(
TimeTicks::Now(), &task_source_with_transaction.transaction);
EnqueueTaskSource(std::move(task_source_with_transaction));
}
}

Expand All @@ -143,7 +147,7 @@ class WorkerThreadDelegate : public WorkerThread::Delegate {

// |task| will be pushed to |sequence|, and |sequence| will be queued
// to |priority_queue_| iff |sequence_should_be_queued| is true.
const bool sequence_should_be_queued = transaction.WillPushTask();
const bool sequence_should_be_queued = transaction.ShouldBeQueued();
RegisteredTaskSource task_source;
if (sequence_should_be_queued) {
task_source = task_tracker_->RegisterTaskSource(sequence);
Expand All @@ -153,7 +157,7 @@ class WorkerThreadDelegate : public WorkerThread::Delegate {
}
if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
return false;
transaction.PushTask(std::move(task));
transaction.PushImmediateTask(std::move(task));
if (task_source) {
bool should_wakeup =
EnqueueTaskSource({std::move(task_source), std::move(transaction)});
Expand Down Expand Up @@ -355,15 +359,15 @@ class WorkerThreadCOMDelegate : public WorkerThreadDelegate {
if (task_tracker()->WillPostTask(
&pump_message_task, TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
auto transaction = message_pump_sequence_->BeginTransaction();
const bool sequence_should_be_queued = transaction.WillPushTask();
const bool sequence_should_be_queued = transaction.ShouldBeQueued();
DCHECK(sequence_should_be_queued)
<< "GetWorkFromWindowsMessageQueue() does not expect "
"queueing of pump tasks.";
auto registered_task_source = task_tracker_->RegisterTaskSource(
std::move(message_pump_sequence_));
if (!registered_task_source)
return nullptr;
transaction.PushTask(std::move(pump_message_task));
transaction.PushImmediateTask(std::move(pump_message_task));
return registered_task_source;
}
}
Expand Down
2 changes: 1 addition & 1 deletion base/task/thread_pool/priority_queue_unittest.cc
Expand Up @@ -43,7 +43,7 @@ class PriorityQueueWithSequencesTest : public testing::Test {
task_environment.FastForwardBy(Microseconds(1));
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
traits, nullptr, TaskSourceExecutionMode::kParallel);
sequence->BeginTransaction().PushTask(
sequence->BeginTransaction().PushImmediateTask(
Task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta()));
return sequence;
}
Expand Down

0 comments on commit f0a619d

Please sign in to comment.