Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review request #1

Open
Andarist opened this issue May 2, 2018 · 5 comments
Open

Review request #1

Andarist opened this issue May 2, 2018 · 5 comments

Comments

@Andarist
Copy link
Owner

Andarist commented May 2, 2018

@staltz @franciscotln @erikras

would love review of this code, because I had few iterations of it and i think it is finally covering correct behaviour, but I'm still a little bit unsure 😉

@staltz
Copy link

staltz commented May 2, 2018

Hi! I found two potential problems

if (queue.length === 0) {
return
}

Maybe this line should not happen, because the outer source could still emit some data, and we shouldn't complete it just yet.

And

if (innerTalkback !== null) {
innerTalkback(2, data)
}

I think we should queue the complete as well, just like queuing type=1.

@Andarist
Copy link
Owner Author

Andarist commented May 2, 2018

  1. This is the innerSink implementation, to my understanding it handles only inner stream communication. Intention here is to start a new inner stream if there is something in the queue. This completion doesn't propagate to the outer source or anything - it is just a notification from the inner stream and has no effect on the outer one. Did I get this correct? Doesn't look like you are referring to what I am, if you still see this as a problem I'd appreciate more thorough explanation.

  2. are you saying that inner stream should have a chance to complete itself even if outer source completes first? Can adjust it (would require a major bump, but that doesnt matter), just thought the whole thing should be terminated asap when the outer source completes - I've done some tests on Rx before deciding what to do here and given this code:

var makeLogger = label => (...vals) => console.log(label, ...vals)

Rx.Observable
  .interval(300)
  .concatMap(outer =>
    Rx.Observable
      .interval(1000)
      .map(inner => `${outer}: ${inner}`)
      .do(makeLogger('tapped:'))
      .take(2)
  )
  .take(3)
  .subscribe(makeLogger('got:'))

it seems that outer completion propagates immediately.

@Andarist
Copy link
Owner Author

Andarist commented May 2, 2018

Can't say I understand them right now, but certainly will study the notation and those tests later 😉 Thanks for the input and pointing me to those tests!

@Andarist
Copy link
Owner Author

Andarist commented May 2, 2018

I've studied those marble diagrams and from 1st linked test I can see how it describes what happens in issue number 2 (that outer completion has to be queued too).

I don't see issue with this code though:

if (queue.length === 0) {
return
}

It seems to me that at least this part of the code is highly needed and correct. It subscribes to queued inner source when previous inner completes - future outer emissions are irrelevant for this part of the code.

If you believe there is still issue with this part please raise your concern 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants