Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested streams not closing #41

Closed
victormoukhortov opened this issue Oct 15, 2014 · 43 comments
Closed

Nested streams not closing #41

victormoukhortov opened this issue Oct 15, 2014 · 43 comments
Labels
Milestone

Comments

@victormoukhortov
Copy link

It seems as though streams which are nested using flatMap are not closed out correctly. Code sample:

var most = require('most'),
  jQuery = require('jquery'),
  socket = require('sockjs')('/api/ws'),
  client = require('stomp').over(socket),
  connect = jQuery.Deferred();

client.connect({}, function() {
  connect.resolve();
});

function getResource(resource, headers) {
  return most.fromPromise(connect).flatMap(function() {
    return getSnapshot(resource, headers).flatMap(function(data) {
      return getUpdates(resource, data, headers);
    });
  });
}

function getSnapshot(resource, headers) {
  return stream('/resource/' + resource, headers)
    .take(1);
}

function getUpdates(resource, data, headers) {
  return stream('/topic/' + resource, headers)
    .startWith([])
    .scan(patch, data);
}

function patch(data, patch) {
  return jiff.patch(patch, data);
}

function stream(destination, headers) {
  return most.create(function(add) {
    var subscription = client.subscribe(destination, function(msg) {
      add(JSON.parse(msg.body));
    }, headers);
    return subscription.unsubscribe.bind(subscription);
  })
}

When running code such as:

getResource('resource').take(1).observe(console.log.bind(console));

I expect both streams which are created (getSnapshot and getUpdates) to be disposed of. Currently only the stream generated by getSnapshot is disposed of correctly while the stream generated by getUpdates remains un-disposed.

@briancavalier
Copy link
Member

Hey @victormoukhortov thanks for the report. Seems like we probably have a hole in the disposer chain. I'll look into this and see what's going on. Thanks for the sample code as well--should make it easy to reconstruct a simple test case.

@briancavalier
Copy link
Member

@victormoukhortov FYI, I'm gonna be looking into this today. I hope to have a branch for you to try later today, so we can make sure that it solves your issue.

@briancavalier
Copy link
Member

@victormoukhortov I just pushed the fix-scan-join-dispose branch with a candidate fix. Can you try it out? I used this test program, derived from your example, to verify. I still need to add a unit test for it.

@victormoukhortov
Copy link
Author

@briancavalier Thanks! I'll try it out.

@victormoukhortov
Copy link
Author

@briancavalier I'm afraid it's still not working. The updated program still does not unsubscribe from the inner subscriptions. I also think your test program is flawed as your test case calls take(1) which would seem to only return the value from getSnapshot without initializing the inner stream.

@briancavalier
Copy link
Member

Any chance you can post your code somewhere? It'd be great if I could use it to debug since it seems like the problem is still happening there, but not in my test.

which would seem to only return the value from getSnapshot without initializing the inner stream.

No items from getSnapshot will ever appear on the console, because each is flatMapped to a stream of updates. Only items from getUpdates will be logged. That's intentional, and is the same as your code above.

That said, maybe there is something different about my test and your original code. Can you spot any differences?

Also, I'm working on unit tests now to verify that both the outer and inner stream dispose functions are indeed called. As I finish the unit tests, they may shed some more light, too.

@briancavalier
Copy link
Member

Just added unit tests for disposing the outer and inner streams. In the process, I found another dispose problem in join, fixed it, and added a unit test for it.

@victormoukhortov
Copy link
Author

My code is pretty much verbatim what I posted. I'm trying to implement differential synchronization with STOMP over SockJS. Everything seems to be working smoothly, except that subscriptions to public topics seem to leak when the resulting streams are closed. I realize testing over a live WebSocket connection is impractical, but that's where the problem seems to be appearing. Your test case results in two closed streams?

@briancavalier
Copy link
Member

Your test case results in two closed streams?

Yeah. It logs to the console when a stream is disposed, and here's what I get on the console:

dispose snapshot
1
dispose update

So what's happening is:

  1. the outer snapshot stream produces an inner update stream via flatMap
  2. the outer snapshot stream is disposed--due to the take(1) inside getSnapshot
  3. the inner update stream produces a 1 via scan, which gets logged
  4. the inner update stream is disposed, due to the take(1) at the outermost level, which limits the whole composite stream to producing a single value, ie the 1 that's logged to the console.

except that subscriptions to public topics seem to leak when the resulting streams are closed

Hmmmm, I wonder if stompjs's unsubscribe isn't working the way we expect it to, or has a bug. How have you been able to determine that topic subscriptions are leaking?

@victormoukhortov
Copy link
Author

Yeah, that makes sense. Your latest fix seems to be working a little better though. take(1) seems to be working. takeWhile(..) however still doesn't seem to close nested streams. Also, there seems to be rather a long delay between receiving the message in the observe handler and MostJS actually closing the inner stream. Any thoughts?

@briancavalier
Copy link
Member

Yep, there was a one-character typo in takeWhile that broke dispose. Fixed in 0395555, updated the PR.

rather a long delay between receiving the message in the observe handler and MostJS actually closing the inner stream

Hmmm, I'm not seeing a delay in my test. The dispose function returned from the create callback is invoked very quickly. Quick question: What exactly do you mean by "closing the inner stream"? All most.js can do is call the disposer function (which it seems to be doing quickly in my test) ... maybe stompjs's unsubscribe function takes some time to complete? Is there any more info you can provide on the delay (maybe a timestamped log of some sort) that might help us track it down?

@briancavalier
Copy link
Member

BTW, I really appreciate your helping to work through these issues, and all the feedback so far!

@briancavalier
Copy link
Member

@victormoukhortov I'm going to merge #45, since there's some really critical fixes in there. I'll edit the description so that it doesn't auto-close this issue, though. We can either leave this one open until we figure out the source of the delay you're seeing, or we can open a new issue specifically for that (whichever you want to do is cool w/me).

@victormoukhortov
Copy link
Author

@briancavalier A rudimentary test seems to be working. I've still got a problem, but I think it's somewhere else. Thanks for all your help. Feel free to close this issue.

@briancavalier
Copy link
Member

@victormoukhortov No problem at all. Like I said above, I really appreciate all the help and feedback. Being able to plug those dispose holes before most.js gets to 1.0 is great.

@victormoukhortov
Copy link
Author

@briancavalier This still does not seem to be working correctly (0.9.0). What seems to be happening is that the inner stream is not properly disposed of. This means that there is no UNSUBSCRIBE message sent over STOMP. This means that when I receive a new message over this (now un-listened-to) topic, I get the following error:

Potentially unhandled rejection [3] TypeError: undefined is not a function
    at Stream.disposeScan [as dispose] (return stream.dispose(t, x, state);)
    at Stream.disposeCurrent [as dispose] (return s.stream.dispose(t, x, s.state);)
    at ... (disposeInner when callback - return si.stream.dispose(t, x, i.state);)
    at tryCatchReject
    at runContinuation1
    at Fulfilled.when
    at Pending.run
    at Scheduler._drain
    at define.drain

Any thoughts?

Edit: It seems as though it's trying to dispose of the inner stream upon receiving a message there (on /topic/resource), whereas correct behaviour would be to dispose of it right away since the outer stream provided the only event observers are interested in.

@briancavalier
Copy link
Member

Hmmm, yes, I was just able to reproduce that error with a recent demo app I created. It looks like my fix (from #45) for scan's dispose handling isn't quite right. Reopening this while I investigate more.

@briancavalier
Copy link
Member

@victormoukhortov I just pushed a potential fix to the fix-scan-dispose-state branch. The unit test for scan dispose was already passing, so clearly I need to beef up the test a bit. However, this change does still pass all existing tests and fix the problem in the demo app I mentioned above. So, it seems right.

It'd great if you could try it, though, to verify. Meanwhile, I'll work on creating a unit test that exposes the issue, so that I can see the test turn green :)

@victormoukhortov
Copy link
Author

@briancavalier Great! I'll try it out

@briancavalier
Copy link
Member

I did two more tests with my demo app. I added a take here, to the inner stream, and it worked correctly: I see a second UNSUBSCRIBE as expected and no error. Then, I removed that take, and added a take to the outer stream here, which also worked correctly: I again saw a second UNSUBSCRIBE as expected and no error.

Trying to devise a unit test for this is proving to be tricky, tho ...

@victormoukhortov
Copy link
Author

@briancavalier It seems as though the inner stream is still waiting for a message before unsubscribing. Though when it gets one it does seem to unsubscribe without error. Did you see the UNSUBSCRIBE before any message arrived at the topic?

I can see how unit tests would be tricky...

@briancavalier
Copy link
Member

I just pushed another update to the fix-scan-dispose-state branch. I think it's getting close. Still no unit test, but I do have a simpler text program, derived from the demo app (but runnable in node), that shows the problem. Running it on master fails, while running it on the branch succeeds. So next, I'll try to turn that into a unit test.

inner stream is still waiting for a message before unsubscribing

Hmmm, I'm not seeing any unexpected "wait before end" with take. This gist has a simple test program that seems to do what I'd expect: take(0) ends instantaneously upon observing, take(1) ends instantaneously after the first event (without waiting for a second), and similarly take(2) ends instantaneously after 2 events.

If you're seeing something weird, put together a smaller example and I'll take a look. Or maybe fork that gist and try to reproduce what you're seeing. Thanks!

@briancavalier
Copy link
Member

Ah ha! I think this gist shows the issue with waiting. I'll open another issue for that.

@briancavalier briancavalier added this to the 0.9.1 milestone Nov 5, 2014
@victormoukhortov
Copy link
Author

Hmm.. is that a separate issue? It seems to be that the inner stream is still not being closed when it's supposed to. In your second test, even when n is one, the inner stream persists until it receives a message..

@briancavalier
Copy link
Member

Yeah, I think it's a separate issue. The issue is that join() (which is used to implement flatMap) is waiting for an inner stream to return it's "current event" rather than simply disposing it and dropping it immediately.

Since this issue primarily has been dealing with dispose not happening at all, and so far that's been caused by bugs in scan and take, I'd like to open a new issue that specifically addresses join.

@briancavalier
Copy link
Member

@victormoukhortov I should be able to finish the unit test and get this latest round of changes ready for a 0.9.1 patch release tomorrow.

@victormoukhortov
Copy link
Author

@briancavalier which branch should I test it out on in the mean time? I guess it would be a merge of fix-scan-dispose-state and dispose-inner-join-immediately?

@briancavalier
Copy link
Member

I just created a temporary v0.9.1 branch that has both merged into it. Give that a go 😃

@victormoukhortov
Copy link
Author

Thanks for the branch. I'm afraid it's still not working however...

@briancavalier
Copy link
Member

Do you mean that you're still seeing take waiting in a way that you think it shouldn't be?

@victormoukhortov
Copy link
Author

Yeah, take(1) (on branch v0.9.1) still doesn't close the inner stream for me. I'll try to put together a test case tomorrow.

@briancavalier
Copy link
Member

Ok, I've made a little progress on this in the v0.9.1 branch, but nothing pushed yet. I think you were right that the best thing is to use a single branch, since there seem to be at least 2 very closely related issues (more closely related than I thought). So, I'll probably abandon the two open PR branches in favor of v0.9.1, and continue working there.

I want to take a little time to do a thorough review of the dispose architecture. I'll post updates here as I work through that.

I'll try to put together a test case tomorrow.

That'd be great. Thanks.

@victormoukhortov
Copy link
Author

Hi @briancavalier, I've made that unit test. I've not run it but it should communicate the intent.

Edit: I did run it, it is indeed broken.

@briancavalier
Copy link
Member

Really appreciate the unit test, @victormoukhortov!

@briancavalier
Copy link
Member

In the v0.9.1 branch now, the dispose failure introduced in 0.9.0 has been fixed, along with a few other subtle cases. However, I backed out the change to make join abandon inner streams early--that will require a bit more work. So, my plan is to release 0.9.1 to address the dispose regression in 0.9.0, and then deal with join() in 0.9.2.

Also, here's a slight variant on your gist. When I run it with the v0.9.1 branch, it logs:

Dispose Outer
Dispose Inner
Inner 1

It logs "Dispose Outer" after 1 second, which is correct, but logs "Dispose Inner" after 10 seconds, which is incorrect due to the known issue with join() (which will be addressed in 0.9.2). It does dispose without an error (ie 0.9.0), and doesn't propagate "Inner 2", which is also correct.

@briancavalier
Copy link
Member

This is working correctly, afaik, in the v0.10-merge branch

@briancavalier briancavalier modified the milestones: 0.10.0, 0.9.1 Dec 14, 2014
@briancavalier
Copy link
Member

This is fixed as of 6bbe55e.

@victormoukhortov Running your unit test logs:

❯ time node experiments/dispose/outer-inner1.js
# at 1 second
Dispose Outer undefined
# at 11 seconds
Inner
Dispose Inner undefined
node experiments/dispose/outer-inner1.js  0.09s user 0.02s system 0% cpu 11.111 total

Notes:

  • I added the # comments, they are not in the unit test output
  • undefined is the end signal value, ie what is passed to end(). If it was end(123), it would have logged 123 instead of undefined

The outer is disposed, as expected, at 1 second. However, ending the outer via end must allow inners that are still alive to continue, which is what happens. The take is being applied to the joined stream, which will only emit an item from the inner stream, which happens at 11 seconds (1 second to create first inner stream, then that stream waits 10 seconds before emitting its first item). Running the test via the time command reports that the whole thing took 11.111 wall clock seconds.

The test also works if you comment out the end in the inner stream, since the join.take is limiting everything to 1 event. Additionally if you insert more add(...)s into the inner stream, they will not produce more events, also due to the join.take. Note that it is impossible to prevent the developer from calling add many times synchronously because JavaScript is run-to-completion. For example:

setTimeout(function() {
    add("Inner 1");
    add("Inner 2");
    add("Inner 3");
    add("Inner 4");
    ... and so on
}, 10000);

However, the join.take(1) correctly prevents events beyond the first from propagating.

I think we're good. Closing, but please do try out the master branch :)

@victormoukhortov
Copy link
Author

@briancavalier I'm not sure I understand why this inner stream should not be immediately disposed of as soon as the joined stream is limited via take(1). What I would hope would happen would be that the Dispose Inner logging statement should be called after 1 second. If it correctly freed resources (i.e. cleared the timeout), then the Inner logging statement should never be called.

@briancavalier
Copy link
Member

I'm not sure I understand why this inner stream should not be immediately disposed of as soon as the joined stream is limited via take(1)

I hear you, higher order streams are tricky. The reason is that take(1) has been applied to the outer stream, meaning that the outer stream will end when 1 event has been observed on it (or perhaps: "escapes from it", or "propagates out of it"). The operations join and flatMap work by merging events from inner streams to the outer stream. The materialization of an inner stream is not an emitted event because it is unobservable. If the inner stream never emits any events, neither will the outer.

Any join()ed higher order stream with exactly 1 inner stream is indistinguishable from the inner stream. For example:

var alsoEmpty = most.of(most.empty()).join();
var alsoNever = most.of(most.never()).join();
var alsoDelayedByOneSecond = most.of(most.of('delayed').delay(1000)).join();

These are similar to the case from your unit test. In the unit test, once the inner stream has materialized (which doesn't cause the outer stream to emit an event, ie materialization !== event), it waits 10 seconds before emitting its first event. That first inner event is then emitted from the outer stream. So, a total of 11 seconds pass before the event is emitted from the outer stream. Only at that point, when the first observable event has been emitted from the outer stream, has take(1) been satisfied.

The explicit end() invoked by the outer stream's publisher function is simply an indication that no additional inner streams will be materialized.

I verified the current behavior against a other impls (RxJS and kefir) as well, and it's consistent.

What I would hope would happen would be that the Dispose Inner logging statement should be called after 1 second

The particular arrangement of combinators in the test sets up a different behavior than that. By using a different arrangement, you can certainly get an "inner ends after 1 second" behavior. Here's an example (I know this is not what you're after, just illustrating):

var endsInAtMostOneSecondNoMatterWhat = join.takeUntil(most.of().delay(1000));

I think the right thing is to think about the overall behavior you want, and then work to the arrangement of combinators that represents that behavior. I'm betting that's how you approached it originally, as did I when I wrote that original demo program with the websockets--and I may not have gotten it right!

I've kind of lost sight of what your original goal was (sorry!) that led to the take(1) example, but I'd be happy to try to help work through it.

@victormoukhortov
Copy link
Author

@briancavalier Thank you for your detailed reply. I've not run any tests yet, but for practical purposes, I'd be concerned that resources were not being freed as soon as possible. If a limited, joined stream had to wait for all of it's component streams to fire an event before freeing their resources, then these resources might never be freed.

join.take(1) from my example should free the inner stream after 1 second because join.take(1) emits an event after one second (because outerStream emits an event after one second), at which point we know we are no longer interested in events from any of the component streams and can thus free resources held by them.

@briancavalier
Copy link
Member

join.take(1) from my example should free the inner stream after 1 second because join.take(1) emits an event after one second

This is the part that doesn't match the situation being setup by the code. The stream named join doesn't emit an event after 1 second. The stream named outer does, which causes the stream named join materializes an inner stream after 1 second. The stream named join only emits an event 10 seconds later when inner emits its first event.

@victormoukhortov
Copy link
Author

Hmm.. so how would I write it to be as I described? So that join emits an event as soon as outer does?

@briancavalier
Copy link
Member

One thing that may help is if we think of join differently (and more correctly). Mechanically speaking, inside the imperative implementation of most.js, there are inner and outer streams being managed by flatMap.

Mathematically speaking however, there are not: the stream named join is a first-order stream containing the string "Inner" at time t = 11 seconds. Any higher-order-ness has been prevented by flatMap.

Hmm.. so how would I write it to be as I described?

I think it comes down to what is the larger goal. I'm still not quite sure I have a handle on that, but I'll try to answer the next question as best I can.

So that join emits an event as soon as outer does?

In the code exactly as it is in the gist right now, there is no way to emit the string "Inner" in 1 second. because "Inner" doesn't even exist until time t = 11 seconds. If inner immediately emitted "Inner" without setTimeout, it would appear on the console at t = 1 second. Afaict, removing the setTimeout is the only way (barring time travel, of course! :) )

Another option would be to use map instead of flatMap. That will, however, emit the inner stream itself (ie the JavaScript object) as an event in join after 1 second, at which point outer will end and be disposed, and the inner stream will never have been started. That doesn't seem like what you're after, though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants