Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The case for tokens (AbortSignal, et al). #5649

Closed
benlesh opened this issue Aug 17, 2020 · 3 comments
Closed

The case for tokens (AbortSignal, et al). #5649

benlesh opened this issue Aug 17, 2020 · 3 comments

Comments

@benlesh
Copy link
Member

benlesh commented Aug 17, 2020

There are weird corners of RxJS involving teardown and "synchronous firehose" observables that are very hard to resolve with naive RxJS code. This is due primarily to the fact that we do not give the consumer a reference to the "closed" state of the underlying subscription.. and the only way to do that, really, is with some sort of token.

Here is a contrived example that maybe only a seasoned RxJS veteran will understand, but it's here for discussion purposes:

  it.only("OMG wtf", () => {

    let sideEffects = 0;

    const firehose = new Observable(subscriber => {
      let n = 0;
      // We're checking subscriber.closed on each loop here.
      // in cases like `firehose.pipe(take(n))`, this will work... but in our test case
      // here, it will not work!
      while (!subscriber.closed && n < 1000) {
        sideEffects++;
        subscriber.next(n++);
      }
    });

    // Let's say this observable here is the result of some sort of operator that someone made.
    // If this is hard to understand, I guess a challenge would be to have someone "roll their own"
    // concatMap operator without using our Subscribers and having working knowledge of
    // our implementation details.
    const flatteningSyncOuter = new Observable(subscriber => {
      const subscription = new Subscription();
      // I KNOW these are synchronous, so I'm simplifying code here
      subscription.add(firehose.subscribe({
        next: value => subscriber.next(value)
      }));

      subscription.add(firehose.subscribe({
        next: value => subscriber.next(value),
        complete: () => subscriber.complete()
      }));

      return subscription;
    });

    flatteningSyncOuter.pipe(
      take(300)
    ).subscribe();

    // This will fail, as sideEffects will be 200.
    expect(sideEffects).to.equal(3);
  });

The problem

If there is an additional subscription step between the firehose and the take(3), there is no way to thread that closed state of the outer subscription (related to the take(3)) to the inner subscription in such a way that it would close it.. If the firehose was infinite, or "very long", this would be catastrophic and hard to debug without a lot of expertise.

This is not a problem in the "99% use case" for RxJS, which should be async. This is why we haven't run across a lot of complaints about this. In the case where the inner subscription (to firehose) is async, the code has time to get a reference to the inner subscription and connect it to the outer subscription.

How a token system improves this

If we had a token system, the same token gets threaded through the entire subscription, even to the inner subscriptions. This means that the firehose would be checking signal.aborted or token.cancelled or whatever it ended up being called, instead of subscriber.closed (however, we could wire the subscriber up to make sure it is closed by the token). The big advantage is the shared state throughout the subscription chain, even to the inner subscriptions.

A token system will make the internal code lighter, and the implementation of naive operators by our users more readable and straight-forward, IMO.

Example with a token system (AbortSignal, let's say)

This is a make-believe example of the same problem, only using a system with AbortSignal-style tokens.

  it.only("OMG wtf", () => {

    let sideEffects = 0;

    const firehose = new Observable((subscriber, signal) => {
      let n = 0;
      while (!signal.aborted && n < 1000) {
        sideEffects++;
        subscriber.next(n++);
      }
    });

    // Let's say this observable here is the result of some sort of operator that someone made.
    // If this is hard to understand, I guess a challenge would be to have someone "roll their own"
    // concatMap operator without using our Subscribers and having working knowledge of
    // our implementation details.
    const flatteningSyncOuter = new Observable((subscriber, signal) => {

      // Notice how now we are threading the signal through. Pretty straight forward.
      firehose.subscribe({
        next: value => subscriber.next(value)
      }, signal);

      firehose.subscribe({
        next: value => subscriber.next(value),
        complete: () => subscriber.complete()
      }, signal);
    });

    const ac = new AbortController();
    flatteningSyncOuter.pipe(
      tap(() => {
          // Give it a 1 in 10 chance of unsubbing externally.
          if (Math.random() > 0.9) unsubController.next();
      }),
      take(300)
    ).subscribe(undefined, ac.signal);

    expect(sideEffects).to.equal(3);
  });

Summary

I think we should probably move away from Subscriptions in the long term. I have been critical of the ergonomics of tokens vs subscriptions in the past, however given real-world implementations of things like we see above, I think that in those cases, subscriptions might actually be more complicated. (Think of other things, such as "Compound Subscriptions" aka "Parent Subscriptions" vs just using the same token for all subscriptions).

I'll admit, tokens are still imperfect, there are rough edges around things like having more than one way to unsubscribe a subscription using a token. (as you'd have to create a token by "racing" two tokens, or something), but I think that's workable.

Short term, I think we should try to get every Subscription to create a signal/token of its own to thread through as seen above. Since we create a master "subscription" (in the form of a Subscriber, internally) at the beginning of every subscription, there is opportunity to provide this as an API), and we should start guiding people toward using these tokens when creating operators, particularly flattening operators as seen above.

Alternatives

It's also plausible that we could do something avant-garde and literally use Subscription like a combination AbortController/AbortSignal (it does provide both pieces of functionality there and is arguably more ergonomic). However, that might be confusing. As the goal would be to get people to stop relying on the return value of subscribe directly, and there would suddenly be subscriptions everywhere.

Then again, this migration might be confusing either way.

@benlesh benlesh added AGENDA ITEM Flagged for discussion at core team meetings type: discussion labels Aug 17, 2020
@voliva
Copy link
Contributor

voliva commented Aug 18, 2020

I think that's a nice example which shows a really good case where a token makes things less error-prone, which I think it's awesome.

However, I'd like to raise an issue which makes this not as straightforward as it might seem. Imagine, following the original example, that the user wants to subscribe to some changes for each value and emit those. Although it could be done in the same way as the example, let's put this in a custom operator so it doesn't look as contrived:

const withLatestUserInput = () => source => new Observable((subscriber, signal) => {
  let lastMousePos = null;
  mouseMove$.subscribe(({ position }) => {
    lastMousePos = position;
  }, signal);

  let lastClickPos = null;
  mouseClick$.subscribe(({ position }) => {
    lastClickPos = position;
  }, signal);

  source.pipe(
    map(value => ({ value, lastMousePos, lastClickPos }))
  ).subscribe(subscriber, signal)
})

const ac = new AbortController();
flatteningSyncOuter.pipe(
  take(3),
  withLatestUserInput()
).subscribe(undefined, ac.signal);

I'm sorry this is contrived, it's hard to find a good example - What this withLatestUserInput wants to do is every time there's a new value, add in the position of the mouse along with where the last click happened.

On first sight, if we follow the logic that the signal is shared along the whole subscription chain, it looks like it would work, right? take cancels the subscription, and this signal is used along all the inner subscriptions, also on the ones from withLatestUserInput.

However, this assumption is wrong, unless we'd use a different token system from AbortController/AbortSignal: an AbortSignal has a property aborted but it's read-only. The only way to abort the signal, is through its AbortController - a reference that take doesn't have.

This means that internally, take will have to create a new AbortController, and pass this signal down to its source. Then take 3 (in this case), abort that signal and emit a complete down the stream.

This means that withLatestUserInput will receive a complete, but its signal won't be aborted (until it's aborted somewhere else in the chain). When implementing withLatestUserInput you'll need to manually merge every abort signal and abort the inner subscription when source.complete fires. You can't really assume the signal will cover all cases where you need to cleanup every internal subscription.

I think it would be great if we can find a solution where we can leverage the best of both worlds. Tokens have their use and in some cases are great to use (specially in synchronous operations), and Subscriptions provide a good abstraction that makes composition easier, although in some specific cases they are hard to use.

Would it be posible to expose the token as proposed, while also keeping some sort of "cleanup" logic on subscription?

@benlesh
Copy link
Member Author

benlesh commented Aug 27, 2020

@voliva There are many ways to make sure this works. In all cases, an internal AbortController must be created, and likely shared with all subscribers down the chain. Such that when take, et al, all complete or error this internal AbortController would be aborted. This is the same one that provides the signal throughout the chain.

signals, if provided through subscribe calls would be wired to abort the internal AbortController.

@SimonFarrugia
Copy link

SimonFarrugia commented Dec 28, 2020

@benlesh If I understand the "synchronous firehose" issue you are describing here, wouldn't adding a start function on the Observer as described in the tc39/proposal-observable fix this issue?

Example (from your example):

const subscription = new Subscription();

firehose.subscribe({
    start: sub => subscription.add(sub),
    next: value => subscriber.next(value)
});

// // Instead of this:
// subscription.add(firehose.subscribe({
//    next: value => subscriber.next(value)
// }));

Also, the start function adds the ability for the Observer to unsubscribe itself, which at the moment is still not possible in rxjs.
Example:

// Without support of `start` function, instance of MyObserver has no way of getting its own subscription to unsubscribe itself at some point in the future or immediately.
const subscription = myObservable.subscribe(new MyObserver());

I also see the advantage and the simplicity of using signal instead of subscriptions. Maybe in that case it would be better to also add the signal on the Observer interface:

interface Observer<T> {
    start(): void; // Note, here we don't get the subscription anymore since we are using signal.
    next(value: T): void;
    error(err: any): void;
    complete(): void;
    signal: AbortSignal;
}

This way an Observer instance can also "unsubscribe" itself by creating its own termination signal.

Example:

class ConsoleLogObserver {
    constructor(time) {
        this._time = time;
    
        this._ac = new AbortController();
        this.signal = this._ac.signal;
    }

    start() {
        // The Observer can unsubscribe itself since it has access to its own `AbortController`
        timer(this._time).subscribe({
            next: () => this._ac.abort(),
            signal: this.signal,
        });
    }

    next(value) {
        console.log(value);
    }

    complete() {
        this._ac.abort();
    }
}

// console.log values for 30sec
valueStream$.subscribe(new ConsoleLogObserver(30000));

@benlesh benlesh removed the AGENDA ITEM Flagged for discussion at core team meetings label Apr 20, 2021
@benlesh benlesh closed this as completed May 4, 2021
@ReactiveX ReactiveX locked and limited conversation to collaborators May 4, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants