Skip to content

Commit

Permalink
Add a RemoveObserverPolicy parameter to ObserverListThreadSafe
Browse files Browse the repository at this point in the history
The policy defaults to kAnySequence, the existing behaviour, but can be
set to kAddingSequenceOnly to CHECK that observers are only removed from
the same sequence that added them. This prevents a mistake where an
observer is deleted immediately after calling RemoveObserver from a
different sequence, even though it might still be inside a callback.

Also adds 3 more cross-thread ObserverListThreadSafeTest's, and reduces
the duration of each from 2s to 1s to compensate.

R=fdoray

Bug: 1471683
Change-Id: I73e9cc21f2186ebdcf8f2dacd1a71aa4d11134c7
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4781112
Commit-Queue: Matt Mueller <mattm@chromium.org>
Reviewed-by: Matt Mueller <mattm@chromium.org>
Reviewed-by: Francois Pierre Doray <fdoray@chromium.org>
Auto-Submit: Joe Mason <joenotcharles@google.com>
Cr-Commit-Position: refs/heads/main@{#1184841}
  • Loading branch information
JoeNotCharlesGoogle authored and Chromium LUCI CQ committed Aug 17, 2023
1 parent f3a9781 commit 7eda661
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 39 deletions.
33 changes: 29 additions & 4 deletions base/observer_list_threadsafe.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@
// callback.
// * If one sequence is notifying observers concurrently with an observer
// removing itself from the observer list, the notifications will be
// silently dropped.
// silently dropped. However if the observer is currently inside a
// notification callback, the callback will finish running.
//
// By default, observers can be removed from any sequence. However this can be
// error-prone since an observer may be running a callback when it's removed,
// in which case it isn't safe to delete until the callback is finished.
// Consider using the RemoveObserverPolicy::kAddingSequenceOnly template
// parameter, which will CHECK that observers are only removed from the
// sequence where they were added (which is also the sequence that runs
// callbacks).
//
// The drawback of the threadsafe observer list is that notifications are not
// as real-time as the non-threadsafe version of this class. Notifications
Expand Down Expand Up @@ -98,8 +107,19 @@ class BASE_EXPORT ObserverListThreadSafeBase

} // namespace internal

template <class ObserverType>
enum class RemoveObserverPolicy {
// Observers can be removed from any sequence.
kAnySequence,
// Observers can only be removed from the sequence that added them.
kAddingSequenceOnly,
};

template <class ObserverType,
RemoveObserverPolicy RemovePolicy =
RemoveObserverPolicy::kAnySequence>
class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
using Self = ObserverListThreadSafe<ObserverType, RemovePolicy>;

public:
enum class AddObserverResult {
kBecameNonEmpty,
Expand Down Expand Up @@ -160,7 +180,7 @@ class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
static_cast<const NotificationData*>(current_notification);
task_runner->PostTask(
current_notification->from_here,
BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
BindOnce(&Self::NotifyWrapper, this,
// While `observer` may be dangling, we pass it and
// check it wasn't deallocated in NotifyWrapper() which can
// check `observers_` to verify presence (the owner of the
Expand All @@ -184,6 +204,11 @@ class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
// observer won't stop it.
RemoveObserverResult RemoveObserver(ObserverType* observer) {
AutoLock auto_lock(lock_);
if constexpr (RemovePolicy == RemoveObserverPolicy::kAddingSequenceOnly) {
const auto it = observers_.find(observer);
CHECK(it == observers_.end() ||
it->second.task_runner->RunsTasksInCurrentSequence());
}
observers_.erase(observer);
return observers_.empty() ? RemoveObserverResult::kWasOrBecameEmpty
: RemoveObserverResult::kRemainsNonEmpty;
Expand Down Expand Up @@ -214,7 +239,7 @@ class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
for (const auto& observer : observers_) {
observer.second.task_runner->PostTask(
from_here,
BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
BindOnce(&Self::NotifyWrapper, this,
// While `observer.first` may be dangling, we pass it and
// check it wasn't deallocated in NotifyWrapper() which can
// check `observers_` to verify presence (the owner of the
Expand Down
130 changes: 106 additions & 24 deletions base/observer_list_threadsafe_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "base/observer_list_threadsafe.h"

#include <memory>
#include <utility>
#include <vector>

#include "base/compiler_specific.h"
Expand All @@ -20,7 +21,9 @@
#include "base/task/thread_pool.h"
#include "base/task/thread_pool/thread_pool_instance.h"
#include "base/test/bind.h"
#include "base/test/gtest_util.h"
#include "base/test/task_environment.h"
#include "base/test/test_waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/thread_restrictions.h"
#include "build/build_config.h"
Expand All @@ -30,7 +33,7 @@
namespace base {
namespace {

constexpr int kThreadRunTime = 2000; // ms to run the multi-threaded test.
constexpr int kThreadRunTime = 1000; // ms to run the multi-threaded test.

class Foo {
public:
Expand Down Expand Up @@ -73,19 +76,26 @@ class AddInObserve : public Foo {

// A task for use in the ThreadSafeObserver test which will add and remove
// itself from the notification list repeatedly.
template <RemoveObserverPolicy RemovePolicy =
RemoveObserverPolicy::kAnySequence>
class AddRemoveThread : public Foo {
using Self = AddRemoveThread<RemovePolicy>;
using ObserverList = ObserverListThreadSafe<Foo, RemovePolicy>;

public:
AddRemoveThread(ObserverListThreadSafe<Foo>* list, bool notify)
AddRemoveThread(ObserverList* list,
bool notify,
scoped_refptr<SingleThreadTaskRunner> removal_task_runner)
: list_(list),
task_runner_(ThreadPool::CreateSingleThreadTaskRunner(
{},
SingleThreadTaskRunnerThreadMode::DEDICATED)),
removal_task_runner_(std::move(removal_task_runner)),
in_list_(false),
start_(Time::Now()),
do_notifies_(notify) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AddRemoveThread::AddTask, weak_factory_.GetWeakPtr()));
FROM_HERE, base::BindOnce(&Self::AddTask, weak_factory_.GetWeakPtr()));
}

~AddRemoveThread() override = default;
Expand All @@ -108,8 +118,12 @@ class AddRemoveThread : public Foo {
}

SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&AddRemoveThread::AddTask, weak_factory_.GetWeakPtr()));
FROM_HERE, base::BindOnce(&Self::AddTask, weak_factory_.GetWeakPtr()));
}

void RemoveTask() {
list_->RemoveObserver(this);
in_list_ = false;
}

void Observe(int x) override {
Expand All @@ -120,20 +134,39 @@ class AddRemoveThread : public Foo {
// This callback should fire on the appropriate thread
EXPECT_TRUE(task_runner_->BelongsToCurrentThread());

list_->RemoveObserver(this);
in_list_ = false;
if (removal_task_runner_) {
// Remove the observer on a different thread, blocking the current thread
// until it's removed. Unretained is safe since the pointers are valid
// until the thread is unblocked.
base::TestWaitableEvent event;
removal_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&Self::RemoveTask, base::Unretained(this))
.Then(base::BindOnce(&base::TestWaitableEvent::Signal,
base::Unretained(&event))));
event.Wait();
} else {
// Remove the observer on the same thread.
RemoveTask();
}
}

scoped_refptr<SingleThreadTaskRunner> task_runner() const {
return task_runner_;
}

private:
raw_ptr<ObserverListThreadSafe<Foo>> list_;
raw_ptr<ObserverList> list_;
scoped_refptr<SingleThreadTaskRunner> task_runner_;
// Optional task runner used to remove observers. This will be the main task
// runner of a different AddRemoveThread.
scoped_refptr<SingleThreadTaskRunner> removal_task_runner_;
bool in_list_; // Are we currently registered for notifications.
// in_list_ is only used on |this| thread.
Time start_; // The time we started the test.

bool do_notifies_; // Whether these threads should do notifications.

base::WeakPtrFactory<AddRemoveThread> weak_factory_{this};
base::WeakPtrFactory<Self> weak_factory_{this};
};

} // namespace
Expand Down Expand Up @@ -253,27 +286,41 @@ TEST(ObserverListThreadSafeTest, RemoveMultipleObservers) {

// A test driver for a multi-threaded notification loop. Runs a number of
// observer threads, each of which constantly adds/removes itself from the
// observer list. Optionally, if cross_thread_notifies is set to true, the
// observer threads will also trigger notifications to all observers.
// observer list. Optionally, if `cross_thread_notifies` is set to true, the
// observer threads will also trigger notifications to all observers, and if
// `cross_thread_removes` is set to true, the observer threads will also remove
// observers added by other threads.
template <
RemoveObserverPolicy RemovePolicy = RemoveObserverPolicy::kAnySequence>
static void ThreadSafeObserverHarness(int num_threads,
bool cross_thread_notifies) {
bool cross_thread_notifies = false,
bool cross_thread_removes = false) {
test::TaskEnvironment task_environment;

scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
new ObserverListThreadSafe<Foo>);
auto observer_list =
base::MakeRefCounted<ObserverListThreadSafe<Foo, RemovePolicy>>();

Adder a(1);
Adder b(-1);

observer_list->AddObserver(&a);
observer_list->AddObserver(&b);

std::vector<std::unique_ptr<AddRemoveThread>> threaded_observer;
threaded_observer.reserve(num_threads);
using TestThread = AddRemoveThread<RemovePolicy>;
std::vector<std::unique_ptr<TestThread>> threaded_observers;
threaded_observers.reserve(num_threads);
scoped_refptr<SingleThreadTaskRunner> removal_task_runner;
for (int index = 0; index < num_threads; index++) {
threaded_observer.push_back(std::make_unique<AddRemoveThread>(
observer_list.get(), cross_thread_notifies));
auto add_remove_thread =
std::make_unique<TestThread>(observer_list.get(), cross_thread_notifies,
std::move(removal_task_runner));
if (cross_thread_removes) {
// Save the task runner to pass to the next thread.
removal_task_runner = add_remove_thread->task_runner();
}
threaded_observers.push_back(std::move(add_remove_thread));
}
ASSERT_EQ(static_cast<size_t>(num_threads), threaded_observer.size());
ASSERT_EQ(static_cast<size_t>(num_threads), threaded_observers.size());

Time start = Time::Now();
while (true) {
Expand All @@ -290,25 +337,60 @@ static void ThreadSafeObserverHarness(int num_threads,

TEST(ObserverListThreadSafeTest, CrossThreadObserver) {
// Use 7 observer threads. Notifications only come from the main thread.
ThreadSafeObserverHarness(7, false);
ThreadSafeObserverHarness(7);
}

TEST(ObserverListThreadSafeTest, CrossThreadNotifications) {
// Use 3 observer threads. Notifications will fire from the main thread and
// all 3 observer threads.
ThreadSafeObserverHarness(3, true);
ThreadSafeObserverHarness(3, /*cross_thread_notifies=*/true);
}

TEST(ObserverListThreadSafeTest, CrossThreadRemoval) {
// Use 3 observer threads. Observers can be removed from any thread.
ThreadSafeObserverHarness(3, /*cross_thread_notifies=*/true,
/*cross_thread_removes=*/true);
}

TEST(ObserverListThreadSafeTest, CrossThreadRemovalRestricted) {
// Use 3 observer threads. Observers must be removed from the thread that
// added them. This should succeed because the test doesn't break that
// restriction.
ThreadSafeObserverHarness<RemoveObserverPolicy::kAddingSequenceOnly>(
3, /*cross_thread_notifies=*/true, /*cross_thread_removes=*/false);
}

TEST(ObserverListThreadSafeDeathTest, CrossThreadRemovalRestricted) {
// Use 3 observer threads. Observers must be removed from the thread that
// added them. This should CHECK because the test breaks that restriction.
EXPECT_CHECK_DEATH(
ThreadSafeObserverHarness<RemoveObserverPolicy::kAddingSequenceOnly>(
3, /*cross_thread_notifies=*/true, /*cross_thread_removes=*/true));
}

TEST(ObserverListThreadSafeTest, OutlivesTaskEnvironment) {
absl::optional<test::TaskEnvironment> task_environment(absl::in_place);
scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
new ObserverListThreadSafe<Foo>);
auto observer_list = base::MakeRefCounted<ObserverListThreadSafe<Foo>>();

Adder a(1);
observer_list->AddObserver(&a);
task_environment.reset();
// Test passes if we don't crash here.
observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
observer_list->RemoveObserver(&a);
}

TEST(ObserverListThreadSafeTest, OutlivesTaskEnvironmentRemovalRestricted) {
absl::optional<test::TaskEnvironment> task_environment(absl::in_place);
auto observer_list = base::MakeRefCounted<
ObserverListThreadSafe<Foo, RemoveObserverPolicy::kAddingSequenceOnly>>();

Adder a(1);
observer_list->AddObserver(&a);
task_environment.reset();
// Test passes if we don't crash here.
observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
observer_list->RemoveObserver(&a);
}

namespace {
Expand Down
7 changes: 1 addition & 6 deletions net/cert/cert_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,10 @@

#include "base/memory/scoped_refptr.h"
#include "base/no_destructor.h"
#include "base/observer_list_threadsafe.h"
#include "build/build_config.h"
#include "net/base/net_export.h"

namespace base {

template <class ObserverType>
class ObserverListThreadSafe;
}

namespace net {

// This class allows callers to observe changes to the underlying certificate
Expand Down
6 changes: 1 addition & 5 deletions net/cert/nss_cert_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "base/functional/callback_forward.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/observer_list_threadsafe.h"
#include "build/build_config.h"
#include "build/chromeos_buildflags.h"
#include "crypto/scoped_nss_types.h"
Expand All @@ -23,11 +24,6 @@
#include "net/cert/scoped_nss_types.h"
#include "net/cert/x509_certificate.h"

namespace base {
template <class ObserverType>
class ObserverListThreadSafe;
}

namespace net {

// Provides functions to manipulate the NSS certificate stores.
Expand Down

0 comments on commit 7eda661

Please sign in to comment.