Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The constructor doesn't have a way to register teardown. #3

Closed
benlesh opened this issue Mar 18, 2023 · 3 comments
Closed

The constructor doesn't have a way to register teardown. #3

benlesh opened this issue Mar 18, 2023 · 3 comments

Comments

@benlesh
Copy link
Collaborator

benlesh commented Mar 18, 2023

This is pretty critical to the design of Observable. There needs to be some sort of way to register setup and teardown during subscription. In RxJS there's a few paths, but the most well-known/understood one is this:

new Observable(subscriber => {
  // Setup here
  let n = 0
  const id = setInterval(() => subscriber.next(n++), 1000)
 
  return () => {
    // Teardown here
    clearInterval(id)
  }
})

However, if we're leaning on DOM APIs like AbortSignal we can use that here as well. It's just admittedly more ugly:

new Observable(subscriber => {
  // Setup here
  let n = 0
  const id = setInterval(() => subscriber.next(n++), 1000)
 
  subscriber.signal.addEventListener('abort', () => {
    // Teardown here
    clearInterval(id)
  })
})

Alternatively, it could come in as the second argument, as it's a very commonly accessed piece of functionality:

new Observable((subscriber, signal) => {
  // Setup here
  let n = 0
  const id = setInterval(() => subscriber.next(n++), 1000)
 
  signal.addEventListener('abort', () => {
    // Teardown here
    clearInterval(id)
  })
})

Finally, the Subscriber itself could implement event listener, and that might help clean things up:

new Observable((subscriber) => {
  // Setup here
  let n = 0
  const id = setInterval(() => subscriber.next(n++), 1000)
 
  subscriber.addEventListener('teardown', () => {
    // Teardown here
    clearInterval(id)
  })
})

I prefer something like the fourth option, because one of the guarantees observable is supposed to provide is that when you error() or complete() or unsubscribe(), the teardown is called. "abort" is maybe a misnomer.

@domfarolino
Copy link
Collaborator

domfarolino commented Mar 24, 2023

My understanding is that unsubscribing could happen in one of two ways:

  1. Observable-initiated
  2. Subscriber-initiated

With (1), the Observable calls error() or complete(), and presumably is responsible for invoking any tear-down function (although I guess it wouldn't be hard to have the implementation auto-call a registered teardown function as a part of the error()/complete() terminators). So that part is fairly straightforward I think.

With (2), the subscriber would either:

  • a. Call unsubscribe(), if subscribe() returned some sort of "Subscription" object as it appears to do in RxJS (I'm not sure about other libraries)
  • b. Simply abort() the signal it passed into the subscribe() function along with its callbacks

I kinda like (b), especially since it was one of the stated goals in whatwg/dom#544: "Ergonomic unsubscription that plays well with AbortSignal/AbortController". If we were go to with that approach, I think it is in tension with the last example that uses a custom teardown event and thus requires subscribe() to return a "Subscription" of some sort.

@benlesh
Copy link
Collaborator Author

benlesh commented Apr 13, 2023

My understanding is that unsubscribing could happen in one of two ways:

Observable-initiated
Subscriber-initiated

For sake of discussion, I'd like to frame this as "producer-initiated" and "consumer-initiated".

The producer-initiated finalization side of observable is pretty straight forward:

  1. The producer is done sending values (complete)
  2. The producer hit an error and is no longer sending values (error)

On the consumer-side it's always going to be:

  • "I no longer want values" (aka unsubscribe or abort)

Now that said, there's some nuance on both sides.

Some APIs (not Observable) allow an errors to happen but continue to emit values. These are not well behaved observables. Just like the "dual" of observable, iterable, would not be well behaved if you could next on an iterator and have it throw an error but not be "done" afterwards.

Then on the consumer-facing API side there are a lot of ways an observable can "unsubscribe":

// 1. A subscription object (or unsubscription function) is returned
const subscription = someObservable.subscribe(console.log)
subscription.unsubscribe();

// 2. A cancellation token of some sort (AbortSignal or other)
const controller = new AbortController();
const signal = controller.signal;
someObservable.subscribe(console.log, { signal });
controller.abort();

// 3. A register/unregister sort of API
const observer = console.log
someObservable.subscribe(observer);
someObservable.unsubscribe(observer);

Each of these has different benefits and draw backs:

Number 1 has the benefit of a uniform and simple API that is easy to understand and pretty terse. The disadvantage to number 1 comes when the action performed by the subscription is synchronous. If it's synchronous, you don't get a handle to the cancellation mechanism until after the subscription action is performed. This is the "synchronous firehose" problem.

Number 2 avoids the issue of Number 1 by giving you a handle to the cancellation mechanism upfront. But it does mean there's the additional overhead of needing to create an object in advance, and it's a bit less ergonomic. AbortController and AbortSignal are notably unergonomic in some regards that we can discuss in another thread.

Number 3 is the most problematic of all, because it means that A) your observable is inherently stateful and has to manage who is subscribed to it. and B) You're forced to carry around the observer instance itself to cancel anything. This is the problem with addEventListener and removeEventListener we're trying to avoid.

All of that said: There's a couple of alternatives to the above we can discuss:

Alternative 1: Have a "start" or "subscribe" handler on the observer, that gives you a reference to the subscription ASAP:

someObservable.subscribe({
  start: (subscription) => {
    // called first. Do what you need to do here.
  },
  next: console.log
})

That gets a little ugly, because you'd need to "leak" that subscription out if you wanted to keep it or pass it around. But you can still have the subscribe call itself return the same subscription object.

Alternative 2: Allow a subscription/subscriber to be passed in a similar manner to signals.

const subscription = new Subscription()
// Subscription chained here.
someObservable.subscribe(console.log, { owner: subscription });
subscription.unsubscribe();

// OR

const outerObservable = new Observable(subscriber => {
  innerObservable.subscribe({ 
    next: value => subscriber.next(value * value)
  }, {
    // error, complete, and subscriptions are chained here.
    owner: subscriber
  })
})

This is a proposal that I have an RFC up for with RxJS. The explanation here gets a little weird, but there are some common-enough edge cases with "inner observable" subscriptions (think like concatMap or flatMap for observables) when the inner observables are synchronous, and this tries to get around it in a simplified way that is similar to a cancellation token, but provides some niceness for chaining observables together. I'm not saying it's a great idea for THIS work, I just wanted you to know about it, and I'm happy to discuss the "why"s at some point as well.

@robbiespeed
Copy link

I'd like to make a suggestion that's similar to passing the abort signal, but adds greater flexibility.

const o = new Observable((subscriber, registerTeardown) => {
  // Setup here
  let n = 0
  const id = setInterval(() => subscriber.next(n++), 1000)
 
  registerTeardown?.(() => {
    // Teardown here
    clearInterval(id)
  })
});

This allows teardown to be triggered by a AbortController, Observable, Promise, (pretty much anything):

o.subscribe(console.log, (dispose) =>
  abortSignal.addEventListener('abort', dispose, { once: true })
);
o.subscribe(console.log, (dispose) => otherObservable.subscribe(dispose));
o.subscribe(console.log, (dispose) => promise.then(dispose));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants