Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] New flatten map operators implementation #3149

Closed
ghetolay opened this issue Dec 1, 2017 · 6 comments
Closed

[Proposal] New flatten map operators implementation #3149

ghetolay opened this issue Dec 1, 2017 · 6 comments

Comments

@ghetolay
Copy link
Contributor

ghetolay commented Dec 1, 2017

From that @dorus's comment, I think we can merge the implementation of all the flatten map operators: mergeMap, concatMap, switchMap, exhaustMap, debounce/audiMap (#1777) into a single operator, flexible enough to cover all scenarios.
Then each of those operators would just be an alias of that main operator. Plus it may open new possibilities like the debounce/auditMap that doesn't exist yet.

I'll use flatMap and Queue because it's hard to find new meaningful names and it's not worth spending too much time on it at this stage.

Signature

flatMap<T>(project: (value: T, index: number) => ObservableInput<R>, queue: Queue): OperatorFunction<T, R>;

The new flatMap operator becomes just some kind of shell doing operator related stuff like managing inner/outer observable but it won't take any decision anymore about what to subscribe to or cancel. Those decisions will be delegated to queue which is a simple interface decorrelated from rxjs internals to make it accessible for users.
Queue will be composed of 2 functions, analog to push & pop but specific to our case.
Those functions will be called :

  • when the source observable emits.
  • when an inner observable complete.

The algorithm of flatMap will then be as follow :

  1. subscribe to source
  • on source emits, depending on the queue implementation, do zero or more of :
    • run project on an item and subscribe
    • cancel a running subscription
  • on inner completion, depending on the queue implementation, either :
    • run project on an item and subscribe
    • do nothing
  • on source completion, inner emits, inner error and outer unsubscribe :
    • current behavior

Queue Interface

export interface Queue<T> {
	/**
	 *
	 * @param item new item emitted by the source
	 * @param actives list of items corresponding to actives subscriptions running
	 * @return a tuple with 2 values :
	 *   1. optional(arguable) item to run project and subscribe to 
	 *   2. optional index of subscription to cancel
	 */
	onNewItem(item: T, actives: T[]): [T | undefined, number | undefined] | undefined;

	/**
	 * 
	 * @param completed item corresponding to the completed observable
	 * @return optional, an item (to run project and subscribe to)
	 */
	onSubComplete(completed: T): T | void;
}

As you can see it's fairly simple (except the tuple maybe), straightforward and easy for users to implements. It's all just about items, and nothing about rx stuff like subscriptions.
We can easily build a concurrent queue, buffer queue, priority queue etc...

Variant operators

With those 2 queue implementations :

const NoQueue = {
	onNewItem: item => item,
	onSubComplete: () => { }
}

class ConcurrentFifoQueue<T> implements Queue<T> {
	private buffer: T[] = [];

	constructor(private concurrent = Number.POSITIVE_INFINITY, 
		private bufferSize = Number.POSITIVE_INFINITY, 
		private dropRunning = false) {}

	onNewItem(item: T, actives: T[]): [T | undefined, number | undefined] | void {
		// didn't reach maximum concurrent we can subscribe to item
		if (actives.length < this.concurrent)
			return [item];

		// max concurrent reached, save item on buffer nothing else
		if (this.buffer.length < this.bufferSize) {
			this.buffer.push(item);
			return;
		}

		// buffer overflow, remove latest item and add the new item
		this.buffer.push(item);
		const dropItem = this.buffer.shift();

		// drop latest running subscription and subscribe to latest buffered item
		if (this.dropRunning) {
			return [dropItem, actives.length - 1];
		}

		// drop latest item and do nothing else.
	}

	onSubComplete(): T | void {
		if (this.buffer.length > 0)
			return this.buffer.shift();
	}
}

We can now express and export all existings flatten operators as an alias :

  • mergeMap: flatMap(project, NoQueue)
  • concatMap: flatMap(project, new ConcurrentFifoQueue(1)) (default buffersize being infinity).
  • exhaustMap: flatMap(project, new ConcurrentFifoQueue(1, 0)) (default drop being drop item). onNewItem(item: any, actives: any[]) => actives.length > 0 ? undefined : item
  • switchMap: flatMap(project, new ConcurrentFifoQueue(1, 0, true)) onNewItem(item: any, actives: any[]) => [item, actives.length - 1]
  • debounce/auditMap: flatMap(project, new ConcurrentFifoQueue(1, 1))

Pro/cons

Pro :

  • Add flexibility and offers new possibilities (debounceMap, priority queue...)
  • Reduce lib code size
  • Easier to reason about and maintain

Con:

  • Obviously some perf penality due to the flexibility but should be very minimal (only a bunch of conditonals and array read/write).
  • onNewItem signature is not the sexiest API cause of the tuple, I tried removing it but ended up with even more complicated API.

Thanks

Thanks @Dorus for all your time spent :)

@ghetolay ghetolay changed the title [Proposal]: New flatten map operators implementation [Proposal] New flatten map operators implementation Dec 1, 2017
@staltz
Copy link
Member

staltz commented Dec 2, 2017

Hi @ghetolay
I appreciate the time you put into writing this carefully, but on the other hand I find it hard to agree that this will improve e.g. as you mention "easier to reason about". For instance:

flatMap(project, new ConcurrentFifoQueue(1, 0, true))

The lack of named arguments for 1, 0, true is not easier to understand that this line of code does "switching of inner Observables", while the simple switchMap(project) at least hints about switching, while being much less tedious to write.

@ghetolay
Copy link
Contributor Author

ghetolay commented Dec 2, 2017

Humm I failed to explain it clearly, the idea is not to remove all the operators and only export that one. It's to replace all operators implementation with an alias to that one like it's currently done for concatMap.

Once we build and test that operator the only thing left to think about is the queue implementation and that's what I think is easier to reason about for both maintainers and users (if they use it). For example if we were to rebuild all the operators like they didn't exist, I think it would be easier to do by following that path rather than building each operator separately like it was done.

Now maybe I'm still wrong but that's what I was talking about.

@benlesh
Copy link
Member

benlesh commented Dec 5, 2017

It seems like we can do this with mergeScan.

@ghetolay
Copy link
Contributor Author

ghetolay commented Dec 5, 2017

I can't really see how.
We need to be able to cancel running subscriptions and to potentially subscribe to a new observable when previous one complete, we can't do such things inside a mergeScan can we ?

@ghetolay
Copy link
Contributor Author

In order to get a better knowledge of the base code and challenges related to this issue I made a quick implementation of it. I was able to pass all unit tests (couldn't run perf tests) but I used some hacks; I just wanted to pass all tests by any means for the moment.

During this playtime I found that current API has some unattended limitations.
So I think we need to first settle about the level of flexibility we want :

  1. Nothing, it's good as it is, duplicated code is not big deal.
  2. Internal refactor only, just merge existing flatten operator.
  3. Internal refactor only, 1. + then add a few new operators easily (debounceMap).
  4. Public API change, 2. + allow user to build it's own flatten operator without having to handle sub/unsub stuff but with an easy API.
    a) Restrcit the use cases (can only sub/unsub one at once...)
    b) Allow most possible use cases (user can sub/unsub anytime anywhere).

The OP is about 3.b, but the current API is a 3.a. I really think we can get to 3.b : reduce the code base, extend the possibilities with a nice API; all that with minimal perfomance impact.

If I could just get that approval, I'll then rework the API, submit it here and build PRs for 1, 2, 3.

@benlesh
Copy link
Member

benlesh commented Aug 21, 2020

Closing due to lack of interest.

@benlesh benlesh closed this as completed Aug 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants