Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flatMapLatest spawns next substream before the previous is disposed #783

Open
semmel opened this issue Feb 9, 2021 · 1 comment
Open

Comments

@semmel
Copy link
Member

semmel commented Feb 9, 2021

Sometimes flatMapLatest(fn) creates the substream fn(x) before the past substream got disposed.

I did not expect this behaviour. When I used tasks.flatMapLatest(sendTaskToWebWorkerAndWaitForCancellationOrResult) to feed tasks into a single WebWorker thread, the webworker did sometimes receive a new task before the previous got cancelled.

Also easy to reproduce using fromArray.

The reason is that in the implementation, which is roughly

flatMapLatest(obs, fn) = obs.flatMap(x => fn(x).takeUntil(obs));

It is unclear if a new x value first cancels the previous substream fn(x).takeUntil(obs) or first calls fn(x) to create the next substream.

An implementation

flatMapLatest(obs, fn) = obs.flatMap(x => B.later(0, x).map(fn).takeUntil(obs));

might fix it.

However, I don't know if I am aware of all use cases. Otherwise I might attempt a PR.

Do you think it's worth fixing?

@semmel
Copy link
Member Author

semmel commented Jan 4, 2022

One could also delegate this problem to user-land.

For example, if there is this unwanted behaviour:

var subStream = x => { 
   console.log(`Creating substream ${x}...`); 
   return B.fromBinder(sink => {
      const interval = setInterval(sink, 1000, x);
      return () => {
         console.log(`disposing substream ${x}...`);
         clearInterval(interval);
      };
   });
 };
B.sequentially(2500, ["A", "B", "C"])
.flatMapLatest(subStream)
.take(7).onValue(console.log);
/*
Creating substream A...
A
A
Creating substream B...    ↑  incorrect order
disposing substream A...   ↓
B
B
Creating substream C...    ↑  incorrect order
disposing substream B...   ↓
C
C
disposing substream C...
C
*/

the user can enforce the correct order by delaying substream creation:

var callNextTick = f => x => B.later(0, x).flatMap(f);

B.sequentially(2500, ["A", "B", "C"])
.flatMapLatest(callNextTick(subStream))
.take(7).onValue(console.log);
/*
Creating substream A...
A
A
disposing substream A...
Creating substream B...
B
B
disposing substream B...
Creating substream C...
C
C
disposing substream C...
C
*/

If this would be fixed in the library, problems with synchronous substreams (e.g. x => B.once(x)) as in #719 would reappear. On the other hand I don't see much value in synchronous events, since in v2.0 they are discontinued anyway.

So either add a comment to the documentation of flatMapLatest or fix in the library possible breaking stuff?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant