Skip to content

Commit

Permalink
DOM: Introduce Subscriber#addTeardown() for Observables
Browse files Browse the repository at this point in the history
See https://github.com/WICG/observable/blob/master/README.md and the
design discussion in WICG/observable#22.

For WPTs:
Co-authored-by: ben@benlesh.com

R=masonf@chromium.org

Bug: 1485981
Change-Id: I1de6ff5813f1c9ea6762fb4569440350c5dc38ca
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5018147
Commit-Queue: Dominic Farolino <dom@chromium.org>
Reviewed-by: Mason Freed <masonf@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1227535}
  • Loading branch information
domfarolino authored and Chromium LUCI CQ committed Nov 21, 2023
1 parent 95d778d commit 0a95b03
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 14 deletions.
62 changes: 49 additions & 13 deletions third_party/blink/renderer/core/dom/subscriber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

#include "third_party/blink/renderer/core/dom/subscriber.h"

#include "base/containers/adapters.h"
#include "third_party/blink/public/mojom/use_counter/metrics/web_feature.mojom-blink.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observer.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observer_callback.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observer_complete_callback.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_script_runner.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_void_function.h"
#include "third_party/blink/renderer/core/dom/abort_controller.h"
#include "third_party/blink/renderer/core/dom/abort_signal.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
Expand All @@ -23,7 +25,27 @@ class Subscriber::CloseSubscriptionAlgorithm final
: subscriber_(subscriber) {}
~CloseSubscriptionAlgorithm() override = default;

void Run() override { subscriber_->CloseSubscription(); }
void Run() override {
// There are two things to do when the signal associated with a subscription
// gets aborted.
// 1. "Close" the subscription. This is idempotent; it only makes the
// web-exposed `Subscriber#active` false, and makes it impossible to
// call any `Observer`-provided functions.
// 2. Run any and all teardown callbacks that were registered with
// `Subscriber#addTeardown()` in LIFO order, and then remove all of
// them.
subscriber_->CloseSubscription();

// Note that since the subscription is now inactive, `teardown_callbacks_`
// cannot be modified anymore. If any of these callbacks below invoke
// `addTeardown()` with a *new* callback, it will be invoked synchronously
// instead of added to this vector.
for (Member<V8VoidFunction> teardown :
base::Reversed(subscriber_->teardown_callbacks_)) {
teardown->InvokeAndReportException(nullptr);
}
subscriber_->teardown_callbacks_.clear();
}

void Trace(Visitor* visitor) const override {
visitor->Trace(subscriber_);
Expand All @@ -47,8 +69,7 @@ Subscriber::Subscriber(base::PassKey<Observable>,
// When this input signal is aborted we:
// a. Call `CloseSubscription()`, which sets `active_` to false and
// ensures that no `Observer` callback methods can be called.
// b. Runs all of the teardowns. TODO(domfarolino): Implement this in
// https://crrev.com/c/5018147.
// b. Runs all of the teardowns.
// 2. [Never null]: The signal associated with
// `complete_or_error_controller_`. This signal is aborted when the
// `complete()` or `error()` method is called. Specifically, in this
Expand All @@ -61,7 +82,6 @@ Subscriber::Subscriber(base::PassKey<Observable>,
// d. Abort `complete_or_error_controller_`, which is only used to abort
// `signal_`.
// e. In response to `signal_`'s abortion, run all of the teardowns.
// TODO(domfarolino): Implement this in https://crrev.com/c/5018147.
// f. Finally return from the `Subscriber#{complete(), error()}` method.
//
// See https://dom.spec.whatwg.org/#abortsignal-dependent-signals for more
Expand All @@ -73,17 +93,22 @@ Subscriber::Subscriber(base::PassKey<Observable>,
}
signal_ = MakeGarbageCollected<AbortSignal>(script_state, signals);

// When `signal_` is finally aborted, this should immediately close the
// subscription. Note that the subscription might *already* be closed, if
// `signal_` was aborted as a result of `complete()` or `error()` being
// called, which both have to manually close the subscription before invoking
// their respective `Observer` callbacks. Closing the subscription is
// idempotent though.
close_subscription_algorithm_handle_ = signal_->AddAlgorithm(
MakeGarbageCollected<CloseSubscriptionAlgorithm>(this));

if (signal_->aborted()) {
CloseSubscription();
} else {
// When `signal_` is finally aborted, this should immediately:
// 1. Close the subscription (making `active_` false).
// 2. Run any registered teardown callbacks.
// See the documentation in `CloseSubscriptionAlgorithm::Run()`.
//
// Note that by the time `signal_` gets aborted, the subscription might
// *already* be closed (i.e., (1) above might have already been done). For
// example, when `complete()` or `error()` are called, they manually close
// the subscription *before* invoking their respective `Observer` callbacks
// and aborting `complete_or_error_controller_`. This is fine because
// closing the subscription is idempotent.
close_subscription_algorithm_handle_ = signal_->AddAlgorithm(
MakeGarbageCollected<CloseSubscriptionAlgorithm>(this));
}
}

Expand Down Expand Up @@ -148,6 +173,16 @@ void Subscriber::error(ScriptState* script_state, ScriptValue error_value) {
complete_or_error_controller_->abort(script_state);
}

void Subscriber::addTeardown(V8VoidFunction* teardown) {
if (active_) {
teardown_callbacks_.push_back(teardown);
} else {
// If the subscription is inactive, invoke the teardown immediately, because
// if we just queue it to `teardown_callbacks_` it will never run!
teardown->InvokeAndReportException(nullptr);
}
}

void Subscriber::CloseSubscription() {
close_subscription_algorithm_handle_.Clear();
active_ = false;
Expand All @@ -166,6 +201,7 @@ void Subscriber::Trace(Visitor* visitor) const {
visitor->Trace(complete_or_error_controller_);
visitor->Trace(signal_);
visitor->Trace(close_subscription_algorithm_handle_);
visitor->Trace(teardown_callbacks_);

ScriptWrappable::Trace(visitor);
ExecutionContextClient::Trace(visitor);
Expand Down
4 changes: 4 additions & 0 deletions third_party/blink/renderer/core/dom/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Observer;
class ScriptState;
class V8ObserverCallback;
class V8ObserverCompleteCallback;
class V8VoidFunction;

class CORE_EXPORT Subscriber final : public ScriptWrappable,
public ExecutionContextClient {
Expand All @@ -34,6 +35,7 @@ class CORE_EXPORT Subscriber final : public ScriptWrappable,
void next(ScriptValue);
void complete(ScriptState*);
void error(ScriptState*, ScriptValue);
void addTeardown(V8VoidFunction*);

// API attributes.
bool active() { return active_; }
Expand Down Expand Up @@ -76,6 +78,8 @@ class CORE_EXPORT Subscriber final : public ScriptWrappable,

// Non-null before `CloseSubscription()` is called.
Member<AbortSignal::AlgorithmHandle> close_subscription_algorithm_handle_;

HeapVector<Member<V8VoidFunction>> teardown_callbacks_;
};

} // namespace blink
Expand Down
1 change: 1 addition & 0 deletions third_party/blink/renderer/core/dom/subscriber.idl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ interface Subscriber {
void next(any result);
[CallWith=ScriptState] void complete();
[CallWith=ScriptState] void error(any error);
void addTeardown(VoidFunction teardown);

readonly attribute boolean active;
readonly attribute AbortSignal signal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,4 +653,169 @@ test(() => {
assert_equals(errorReported.error, error, "Error object is equivalent");
}, "Calling subscribe should never throw an error synchronously, subscriber pushes error");

// TODO(domfarolino): Add back the teardown tests that Ben wrote.
test(() => {
let addTeardownCalled = false;
let activeDuringTeardown;

const source = new Observable((subscriber) => {
subscriber.addTeardown(() => {
addTeardownCalled = true;
activeDuringTeardown = subscriber.active;
});
});

const ac = new AbortController();
source.subscribe({
signal: ac.signal,
});

assert_false(addTeardownCalled, "Teardown is not be called upon subscription");
ac.abort();
assert_true(addTeardownCalled, "Teardown is called when subscription is aborted");
assert_false(activeDuringTeardown, "Teardown observers inactive subscription");
}, "Teardown should be called when subscription is aborted");

test(() => {
const addTeardownsCalled = [];
// This is used to snapshot `addTeardownsCalled` from within the subscribe
// callback, for assertion/comparison later.
let teardownsSnapshot = [];
const results = [];

const source = new Observable((subscriber) => {
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1"));
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2"));

subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();

// We don't run the actual `assert_array_equals` here because if it fails,
// it won't be properly caught. This is because assertion failures throw an
// error, and in subscriber callback, thrown errors result in
// `window.onerror` handlers being called, which this test file doesn't
// record as an error (see the first line of this file).
teardownsSnapshot = addTeardownsCalled;
});

source.subscribe({
next: (x) => results.push(x),
error: () => results.push("unreached"),
complete: () => results.push("complete"),
});

assert_array_equals(
results,
[1, 2, 3, "complete"],
"should emit values and complete synchronously"
);

assert_array_equals(teardownsSnapshot, addTeardownsCalled);
assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"],
"Teardowns called in LIFO order synchronously after complete()");
}, "Teardowns should be called when subscription is closed by completion");

test(() => {
const addTeardownsCalled = [];
let teardownsSnapshot = [];
const error = new Error("error");
const results = [];

const source = new Observable((subscriber) => {
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1"));
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2"));

subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.error(error);

teardownsSnapshot = addTeardownsCalled;
});

source.subscribe({
next: (x) => results.push(x),
error: (error) => results.push(error),
complete: () => assert_unreached("complete should not be called"),
});

assert_array_equals(
results,
[1, 2, 3, error],
"should emit values and error synchronously"
);

assert_array_equals(teardownsSnapshot, addTeardownsCalled);
assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"],
"Teardowns called in LIFO order synchronously after error()");
}, "Teardowns should be called when subscription is closed by subscriber pushing an error");

test(() => {
const addTeardownsCalled = [];
const error = new Error("error");
const results = [];

const source = new Observable((subscriber) => {
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1"));
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2"));

subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
throw error;
});

source.subscribe({
next: (x) => results.push(x),
error: (error) => results.push(error),
complete: () => assert_unreached("complete should not be called"),
});

assert_array_equals(
results,
[1, 2, 3, error],
"should emit values and error synchronously"
);

assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"],
"Teardowns called in LIFO order synchronously after thrown error");
}, "Teardowns should be called when subscription is closed by subscriber throwing error");

test(() => {
const addTeardownsCalled = [];
const results = [];
let firstTeardownInvokedSynchronously = false;
let secondTeardownInvokedSynchronously = false;

const source = new Observable((subscriber) => {
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1"));
if (addTeardownsCalled.length === 1) {
firstTeardownInvokedSynchronously = true;
}
subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2"));
if (addTeardownsCalled.length === 2) {
secondTeardownInvokedSynchronously = true;
}

subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const ac = new AbortController();
ac.abort();
source.subscribe({
next: (x) => results.push(x),
error: (error) => results.push(error),
complete: () => results.push('complete'),
signal: ac.signal,
});

assert_array_equals(results, []);
assert_true(firstTeardownInvokedSynchronously, "First teardown callback is invoked during addTeardown()");
assert_true(secondTeardownInvokedSynchronously, "Second teardown callback is invoked during addTeardown()");
assert_array_equals(addTeardownsCalled, ["teardown 1", "teardown 2"],
"Teardowns called synchronously upon addition end up in FIFO order");
}, "Teardowns should be called synchronously during addTeardown() if the subscription is inactive");
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,7 @@ interface Subscriber
attribute @@toStringTag
getter active
getter signal
method addTeardown
method complete
method constructor
method error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,7 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] attribute @@toStringTag
[Worker] getter active
[Worker] getter signal
[Worker] method addTeardown
[Worker] method complete
[Worker] method constructor
[Worker] method error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9514,6 +9514,7 @@ interface Subscriber
attribute @@toStringTag
getter active
getter signal
method addTeardown
method complete
method constructor
method error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,7 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] attribute @@toStringTag
[Worker] getter active
[Worker] getter signal
[Worker] method addTeardown
[Worker] method complete
[Worker] method constructor
[Worker] method error
Expand Down

0 comments on commit 0a95b03

Please sign in to comment.