Skip to content

Commit

Permalink
Make message loop task entry containers thread safe (#11367)
Browse files Browse the repository at this point in the history
The core underlying issue is that vector push_back could re-allocate and cause us to segfault. I have switched the backing queues to a map per @jason-simmons suggestion in flutter/flutter#38778.

I've also added a test to capture the aforementioned bug. I've run internal tests several times to validate that this is fixed.

General threading note for this class is that only the following operations take a write lock on the meta mutex:

1. Create
2. Dispose

The rest of the operations take read lock on the meta mutex and acquire finer grained locks for the duration of the operation. We can not grab read lock for the entire duration of NotifyObservers for example because observer can in-turn create other queues -- Which we should not block.

Additional changes:

1. Make as many methods as possible const. Unlocked methods are all const.
2. Migrate all the queue members to a struct, and have a map.
3. Get rid of the un-used Swap functionality.
  • Loading branch information
iskakaushik committed Aug 23, 2019
1 parent 975a8aa commit 632a37b
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 392 deletions.
1 change: 0 additions & 1 deletion ci/licenses_golden/licenses_flutter
Expand Up @@ -127,7 +127,6 @@ FILE: ../../../flutter/fml/memory/weak_ptr.h
FILE: ../../../flutter/fml/memory/weak_ptr_internal.cc
FILE: ../../../flutter/fml/memory/weak_ptr_internal.h
FILE: ../../../flutter/fml/memory/weak_ptr_unittest.cc
FILE: ../../../flutter/fml/merged_queues_runner.cc
FILE: ../../../flutter/fml/message.cc
FILE: ../../../flutter/fml/message.h
FILE: ../../../flutter/fml/message_loop.cc
Expand Down
1 change: 0 additions & 1 deletion fml/BUILD.gn
Expand Up @@ -42,7 +42,6 @@ source_set("fml") {
"memory/weak_ptr.h",
"memory/weak_ptr_internal.cc",
"memory/weak_ptr_internal.h",
"merged_queues_runner.cc",
"message.cc",
"message.h",
"message_loop.cc",
Expand Down
58 changes: 0 additions & 58 deletions fml/merged_queues_runner.cc

This file was deleted.

6 changes: 0 additions & 6 deletions fml/message_loop.cc
Expand Up @@ -73,12 +73,6 @@ void MessageLoop::RunExpiredTasksNow() {
loop_->RunExpiredTasksNow();
}

void MessageLoop::SwapTaskQueues(MessageLoop* other) {
FML_CHECK(loop_);
FML_CHECK(other->loop_);
loop_->SwapTaskQueues(other->loop_);
}

TaskQueueId MessageLoop::GetCurrentTaskQueueId() {
auto* loop = tls_message_loop.get();
FML_CHECK(loop != nullptr)
Expand Down
2 changes: 0 additions & 2 deletions fml/message_loop.h
Expand Up @@ -34,8 +34,6 @@ class MessageLoop {
// instead of dedicating a thread to the message loop.
void RunExpiredTasksNow();

void SwapTaskQueues(MessageLoop* other);

static void EnsureInitializedForCurrentThread();

static bool IsInitializedForCurrentThread();
Expand Down
35 changes: 9 additions & 26 deletions fml/message_loop_impl.cc
Expand Up @@ -46,7 +46,9 @@ MessageLoopImpl::MessageLoopImpl()
task_queue_->SetWakeable(queue_id_, this);
}

MessageLoopImpl::~MessageLoopImpl() = default;
MessageLoopImpl::~MessageLoopImpl() {
task_queue_->Dispose(queue_id_);
}

void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) {
FML_DCHECK(task != nullptr);
Expand Down Expand Up @@ -101,46 +103,27 @@ void MessageLoopImpl::DoRun() {
// should be destructed on the message loop's thread. We have just returned
// from the implementations |Run| method which we know is on the correct
// thread. Drop all pending tasks on the floor.
task_queue_->Dispose(queue_id_);
task_queue_->DisposeTasks(queue_id_);
}

void MessageLoopImpl::DoTerminate() {
terminated_ = true;
Terminate();
}

// Thread safety analysis disabled as it does not account for defered locks.
void MessageLoopImpl::SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other)
FML_NO_THREAD_SAFETY_ANALYSIS {
if (terminated_ || other->terminated_) {
return;
}

// task_flushing locks
std::unique_lock<std::mutex> t1(tasks_flushing_mutex_, std::defer_lock);
std::unique_lock<std::mutex> t2(other->tasks_flushing_mutex_,
std::defer_lock);

std::lock(t1, t2);
task_queue_->Swap(queue_id_, other->queue_id_);
}

void MessageLoopImpl::FlushTasks(FlushType type) {
TRACE_EVENT0("fml", "MessageLoop::FlushTasks");
std::vector<fml::closure> invocations;

// We are grabbing this lock here as a proxy to indicate
// that we are running tasks and will invoke the
// "right" observers, we are trying to avoid the scenario
// where:
// gather invocations -> Swap -> execute invocations
// will lead us to run invocations on the wrong thread.
std::scoped_lock task_flush_lock(tasks_flushing_mutex_);
task_queue_->GetTasksToRunNow(queue_id_, type, invocations);

for (const auto& invocation : invocations) {
invocation();
task_queue_->NotifyObservers(queue_id_);
std::vector<fml::closure> observers =
task_queue_->GetObserversToNotify(queue_id_);
for (const auto& observer : observers) {
observer();
}
}
}

Expand Down
4 changes: 0 additions & 4 deletions fml/message_loop_impl.h
Expand Up @@ -47,8 +47,6 @@ class MessageLoopImpl : public Wakeable,

virtual TaskQueueId GetTaskQueueId() const;

void SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other);

protected:
// Exposed for the embedder shell which allows clients to poll for events
// instead of dedicating a thread to the message loop.
Expand All @@ -65,8 +63,6 @@ class MessageLoopImpl : public Wakeable,
fml::RefPtr<MessageLoopTaskQueues> task_queue_;
TaskQueueId queue_id_;

std::mutex tasks_flushing_mutex_;

std::atomic_bool terminated_;

void FlushTasks(FlushType type);
Expand Down

0 comments on commit 632a37b

Please sign in to comment.