Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ref Counted Observable discussion #178

Open
benlesh opened this issue Oct 1, 2024 · 21 comments
Open

Ref Counted Observable discussion #178

benlesh opened this issue Oct 1, 2024 · 21 comments

Comments

@benlesh
Copy link
Collaborator

benlesh commented Oct 1, 2024

After TPAC last week, it was determined that the ideal type of observable might be a ref-counted observable. The ref-counted observable will function roughly as follows:

  1. The observable is "cold" until the first subscriber.
  2. When the first subscriber subscribes, the subscription logic is executed.
  3. Every additional subscriber will share that subscription (it's "hot").
  4. When all subscribers abort their subscriptions, the ref-count drops to 0, and teardown/unsubscription logic is executed, returning the observable to a "cold" state.

The idea behind this type is to address concerns from one issue (#170), where an RxJS user thought it was confusing that sometimes observables have side effects, and sometimes they didn't. This would guarantee that for N subscribers, there would be at most one side effect.

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 1, 2024

This type exists in RxJS-land as the result of share(). The only two issues I can think of, off the top of my head, that will come up with this design are:

  1. When ref-count drops to zero, it's often desirable to have the teardown wait for some small amount of time to make sure that no new subscribers join shortly after. An example of this would be an observable that was shared amongst multiple web components. It's not uncommon that during a single render, all components may be removed or unmounted, then replaced with all new component instances that were going to subscribe to the same thing. In this case, the ref count would drop to zero, everything would teardown, then moments later new components would mount and subscribe, and the subscription needs to start all over again. To that end, RxJS has made this configurable.
  2. A very common use case for observables is to have this "share" behavior, but to emit to new subscribers the most recent value that was seen. For example, imagine you have a sensor that collects data once every 10 seconds. Ideally, new subscribers don't have to wait for 9.5 seconds, if the last bit of data showed up 0.5 seconds ago. RxJS has also made this configurable, but we don't have configuration for this in this new design.

@domfarolino
Copy link
Collaborator

Thanks for filing this before I could get to it! I do think this type tends to be a nice middle ground that provides general parity with userland Observables in terms of scheduling & coldness, as well as a fix to the userland issues around unpredictability and extra side-effects incurred with multiple subscriptions to the same Observable.

I want to clarify something that was discussed offline regarding Observable teardown & re-subscription. We discussed what should happen with an Observable that has gone from >=1 subscriber, to 0 subscribers, and back to >=1 subscriber. One thing that was mentioned was that this final subscribe() could throw, and the producer should not be restarted. However, we came to the conclusion last week that this was not feasible because it introduces an untenable amount of unpredictability into the API1. So just for clarity, the model of an Observable becoming hot & then cold, will allow the next subscription to make the Observable hot again, and restart the producer to serve more subscriptions.

To that end, RxJS has made this [unsubscription / teardown] configurable.

In what way is it configurable? I read this as: you can configure the Observable to sometimes make teardown happen immediately / synchronously, and other times delayed by, say, a microtask. That feels a little unpredictable though. How does Rx, or other userland libraries handle this in the ref-counted producer type? I feel like if users sometimes need synchronous teardown and sometimes not, this could be provided by the userland code itself. That said, I'm not entirely opposed to just baking in strict microtask-delayed unsubscription timing.

Footnotes

  1. For example, you could get an Observable from some source, save it, and before you subscribe to it, someone else could quickly subscribe and unsubscribe. Therefore, at any time asynchronously in the future, could not be sure that subscribe() wouldn't throw.

@noamr
Copy link

noamr commented Oct 2, 2024

IMO this is the most reasonable semantic.
It makes the most sense for event listeners:

  • they're relatively cheap to subscribe/unsubscribe to
  • however, unnecessary subscribers could have undesired consequences (e.g. touch/scroll listeners)
  • By and large subscribing/unsubscribing to event listeners doesn't throw exceptions

Since event listeners are the initial primary use for observables from within the platform using when, making this the default for the "neutral" (constructed) observable would create a reasonably consistent set of expectations.

*(I am less familiar with current uses of RxJS etc though)

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 2, 2024

I'm almost completely sold on the ref-counted observable after some deep thought and experimentation. But it does limit the type quite a bit in a few ways.

A very, very common thing through my experience, and what research I've done, is that people want the ability to "replay" the most recent value to new subscribers. This one:

For example, imagine you have a sensor that collects data once every 10 seconds. Ideally, new subscribers don't have to wait for 9.5 seconds, if the last bit of data showed up 0.5 seconds ago.

A use case we have at work is we have a shared worker that is getting a stream of messages from a service, and when new web views start up, they have to subscribe to the stream of messages, but they don't want to wait for the next message, because it might be several minutes before one arrives. So they've created a (non-RxJS) "observable" that simply caches the previous value and sends it to new subscribers. That's not possible with the proposed ref-counted observable.

A more web-platform based use case would be observable-based APIs around things similar to IntersectionObserver. Right now, IntersectionObserver will immediately fire with information when you start observing an element. However, if more than one component needs to know if that element is intersecting or not, the API isn't great. You'd have to cache the last value yourself, or observe the element again (I'm not sure that does anything immediately though?) Where an observable-based API would be more straight forward:

// Hypothetically... an API like this:
const elementOnScreen = IntersectionObserver.observe(element, { root: null }).map((entry) => entry.isIntersecting);

elementOnScreen.subscribe(handler, { signal });

// and later, somewhere else...
elementOnScreen.subscribe(handler, { signal });


// Which to get anything even close to parity we'd have to do something like this:
const callbacks = new Set();

let isIntersecting = false;

const observer = new IntersectionObserver((entries) => {
  for (const entry of entries) {
    if (entry.target === element && entry.isIntersecting !== isIntersecting) {
      isIntersecting = entry.isIntersecting;
      for (const callback of callbacks) {
        // Average developers screw this up.
        // If the callback throws, we don't want to
        // break the loop and cause producer interference.
        // Observable handles this.
        try {
          callback(isIntersecting);
        } catch (error) {
          reportError(error);
        }
      }
      break;
    }
  }
}, { root: null })

observer.observe(element);

function isElementOnScreen(callback, { signal }) {
  callbacks.add(callback);
  callback(isIntersecting);
  signal.addEventListener('abort', () => {
    callbacks.delete(callback);
    if (callbacks.size === 0) {
      // ref count zero
      observer.unobserve(element);
    }
  }, { once: true });
}

The problem is we need a way to configure new Observable to be able to replay the last value as an opt-in.

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 2, 2024

In what way is it configurable? I read this as: you can configure the Observable to sometimes make teardown happen immediately / synchronously, and other times delayed by, say, a microtask. That feels a little unpredictable though.

For RxJS it's just an option like share({ resetOnRefCountZero: 3000 }) (wait 3000ms) or even share({ resetOnRefCountZero: async () => { /* whatever */ }) }).

That feels a little unpredictable though.

If it's ref-counted, it's already unpredictable. Any given consumer can't "know" they're the last one to end their subscription.

For this API, I think it should be configurable in the new Observable constructor in a second argument of options. new Observable(fn, { beforeReset: async () => {} } would do, IMO. Scheduler.postTask or anything could be used there. It would only happen on ref count zero... not on producer complete or error, those should immediately reset things.

@noamr
Copy link

noamr commented Oct 2, 2024

The problem is we need a way to configure new Observable to be able to replay the last value as an opt-in.

In performance APIs, you play all the buffered values and not only the last one. And then this requires some way to manage/empty the buffer etc... Also buffering only the last event would have implications as you're secretly retaining that value from garbage collection.

I would expect to do this kind of things as some sort of a composition rather than a default behavior, like

   element.when("click").buffer(1).subscribe(...)

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 2, 2024

@noamr .buffer(1) can't be created/composed with a ref-counted observable, unfortunately. We'd need the constructor to allow an opt-in like: new Observable(fn, { buffer: 1 }) or the like... even if we wanted to build a .buffer(1) method.

@noamr
Copy link

noamr commented Oct 3, 2024

@noamr .buffer(1) can't be created/composed with a ref-counted observable, unfortunately. We'd need the constructor to allow an opt-in like: new Observable(fn, { buffer: 1 }) or the like... even if we wanted to build a .buffer(1) method.

I don't understand why it "can't" but I don't have the bandwidth to dig deeper unfortunately so will take your word for it.

Seems to me that observables try to create a unified API to things that are in fact subtly different (event targets, promises, different kinds of observers like interaction/resize/performance/mutation). Perhaps it would be good to have some examples of how observable integration would look like in those cases, with emphasis on things like buffering and subscribe/unsubscribe semantics. (*Perhaps these examples exist already, I'm not familiar enough with all past conversations)

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 3, 2024

I don't understand why it "can't"

The result of observable.buffer(1) would need to replay the last buffered value to every new subscriber. But an always ref-counted observable can't treat every new subscriber differently. Only a "cold" observable can really do that.

@domfarolino
Copy link
Collaborator

I'm not sure I follow. observable.buffer(1) could just always store the last value it received from the single producer. It pushes it to all existing subscribers, and then holds onto it in case any new subscribers come along before the producer pushes the next value. Once it receives the next value, it would drop the old one, hold onto it, push it to all existing subscribers, and hold onto it in case any more new subscribers come along before the next new value from the producer.

Does that not work?

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 3, 2024

@domfarolino How could this be implemented on top of a ref-counted observable?

Here it is with a cold observable:

ColdObservable.prototype.buffer = function (bufferSize) {
  let buffer = [];
  const subscribers = new Set();
  let abortController = null;
  
  return new ColdObservable((subscriber) => {
    subscribers.add(subscriber);

    subscriber.addTeardown(() => {
      subscribers.delete(subscriber);
      if (subscribers.size === 0) {
        // last unsubscription, disconnect from source
        abortController.abort();
        abortController = null;
      }
    });

    // Notify the new subscriber with whatever is in the buffer.
    for (const value of buffer) {
      subscriber.next(value);
    }

    if (subscribers.size === 1) {
      // First subscription, connect to the source.
      abortController = new AbortController();
      
      this.subscribe({
        next: (value) => {
          buffer.push(value);
          if (buffer.length > bufferSize) buffer.shift();
          
          for (const subscriber of subscribers) {
            subscriber.next(value);
          }
        },
        error: (error) => {
          buffer = []
          for (const subscriber of subscribers) {
            subscriber.error(error);
          }
          subscribers.clear();
        },
        complete: () => {
          buffer = [];
          for (const subscriber of subscribers) {
            subscriber.complete();
          }
          subscribers.clear();
        }
      }, { signal: abortController.signal })
    }
  });
}

The problem with an always-ref-counted observable is that the subscriber you get is only the subscriber to the source. It's not the joining subscriber. Calling subscriber.next() on it will notify all listeners.

@domfarolino
Copy link
Collaborator

The design I had in mind would be that you'd have a subscriber object for each consumer, and when the Observable encountered a new value from the producer, it would push it to all N existing subscribers. It would hold onto a single value, and push it to any new subscribers that joined the list of pre-existing subscribers.

@domfarolino
Copy link
Collaborator

Edit

To clarify, the producer would only see a single subscriber, but the implementation would keep track of a number of observers, and whenever any new observers joined, the Observable would be responsible for pushing the single value it holds to the new observer. (I misused the word subscriber in the previous message)

@noamr
Copy link

noamr commented Oct 4, 2024

Edit

To clarify, the producer would only see a single subscriber, but the implementation would keep track of a number of observers, and whenever any new observers joined, the Observable would be responsible for pushing the single value it holds to the new observer. (I misused the word subscriber in the previous message)

I guess that would be a bit strange because it would only buffer if you have subscribers, and the first subscriber wouldn't get any buffered events.
In terms of web APIs, every API has its own buffering semantics in a way. I think this would have to be expressed somehow in the shim between the APIs and the observer pipeline (the when function equivalent).

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 4, 2024

LOL... I'm so sorry, I'm not following what the idea is.

Generally speaking, a ref-counted observable is implemented pretty much the same way a cold-observable is... with the difference being that it has an internal subscriber that forwards everything to a list of external subscribers.

So... if your observable looks like this:

const secondClock = new Observable((internalSubscriber) => {
  const id = setInterval(() => internalSubscriber.next(Date.now()), 1000);
  internalSubscriber.addTeardown(() => clearInterval(id));
})

and you subscribe like this:

secondClock.subscribe(console.log);
secondClock.subscribe(console.log);

You're going to add two Subscribers, one for each subscribe call, to an internal list of some sort... and when the first one is added, an internalSubscriber (also a Subscriber) is created and passed to the initialization/producer function from the constructor... when that internalSubscriber is notified, it will forward the notification to all subscribers in the internal list.

Further, when consumer subscribers unsubscribe (abort) they are removed from the list, if the list gets to length/size 0, then the internalSubscriber will teardown, and the observable will go "cold" again.

@benlesh
Copy link
Collaborator Author

benlesh commented Oct 4, 2024

Okay, so I put together a ref-counted Observable example in a StackBlitz

It should show the behavior. It will hopefully also demonstrate how there's not really a way to send a previously buffered value to a new consumer with the default interface.

@flensrocker
Copy link
Contributor

I also like the "ref count" approach to the Observable API, because it's always difficult to explain to new developers when, how and why they must use "share" etc. on an observable. Even after years of using rxjs with Angular I sometimes make this error by myself. Most of the time I get not hit by this error, because most of the time there's only one subscriber - and then the problem doesn't occur. But sometimes you add another subscriber in the future and then things get suddenly weird. That's why I have some state manament thing to wrap sideeffects like calling an http resource etc., because I have to do it nevertheless to handle errors.

And I understand the problem with not being able to "buffer".

So here are some thoughts from me:
The buffer is some kind of state to me. And observables are not really good at managing state (or they only could because of a replay/buffer mechanism). Currently when talking about state, signals come to my mind (but they don't have to be used to manage state - and they don't really exist in the language or platform yet, just userland).
In my experience, when I need some replay mechanism, it's not because I want to process the event behind this, but to display a derived state/value from some source. And then a signal seems a better choice (to me).

Just some random thoughts on this - I may be completely of the track... 😎

@domfarolino
Copy link
Collaborator

Thanks a lot @flensrocker! This is great feedback to be getting. From a purist perspective, I like the idea of separating state management (like a buffer of past events) out from this proposal, so I'm definitely wondering if we can get away with not having a buffer option in the constructor or similar operator. It'd be good to hear from others how important this might be for the Refcounted producer proposal to land.

@benlesh
Copy link
Collaborator Author

benlesh commented Nov 30, 2024

Really the only issue I have at all with the proposal as it stands, the ONLY issue, is that there's no way to create an observable that "replays" previous values to new subscribers. MAYBE people could subclass it like this?

class BufferedObservable extends Observable {
  constructor(subscriberCall, options) {
    super(subscriberCall);
    this.#bufferSize = Math.max(0, options?.bufferSize ?? Infinity)
    this.#innerSource = this.do((value) => {
      this.#buffer.push(value);
      if (this.#buffer.length > this.#bufferSize) this.#buffer.shift();
    });
  }

  subscribe(subscriber, options) {
    const next = typeof subscriber === 'function' ? subscriber : subscriber?.next ? subscriber.next.bind(subscribeer) : null;
    if (typeof next === 'function') {
      for (const value of this.#buffer) {
        next(value);
      }
    }

    return this.#innerSource.subscribe(subscriber, options);
  }

  // do this for EVERY method?
  map(...args) {
    return this.#innerSource.map(...args);
  }
}

Overall it sort of sucks, for what is a fairly common use case.

@domfarolino
Copy link
Collaborator

Or we could just add a buffer option to the Observable constructor?

@noamr
Copy link

noamr commented Dec 2, 2024

Or we could just add a buffer option to the Observable constructor?

... or a subclass, kind of like how streams have subclasses for different buffering options

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants