New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager ConcatMap #3357

Merged
merged 1 commit into from Oct 8, 2015

Conversation

Projects
None yet
5 participants
@akarnokd
Member

akarnokd commented Sep 19, 2015

Related discussion in #3017.

@akarnokd akarnokd added this to the 1.0.x milestone Sep 19, 2015

@akarnokd akarnokd referenced this pull request Sep 19, 2015

Closed

Eager ConcatMap #3017

@davidmoten

This comment has been minimized.

Show comment
Hide comment
@davidmoten

davidmoten Sep 24, 2015

Contributor

Thanks a lot for contributing this one @akarnokd. I've had one look through and no problems jump out at me but I will review further. This will be useful.

Contributor

davidmoten commented Sep 24, 2015

Thanks a lot for contributing this one @akarnokd. I've had one look through and no problems jump out at me but I will review further. This will be useful.

drain();
}
void drain() {

This comment has been minimized.

@davidmoten

davidmoten Sep 24, 2015

Contributor

For readability I'd suggest longer names for variables like d, a, r, e, p.

@davidmoten

davidmoten Sep 24, 2015

Contributor

For readability I'd suggest longer names for variables like d, a, r, e, p.

Show outdated Hide outdated src/main/java/rx/internal/operators/OperatorEagerConcatMap.java
}
int missed = 1;
final AtomicLong p = sharedProducer;

This comment has been minimized.

@davidmoten

davidmoten Sep 24, 2015

Contributor

p is not being used in the sense of a producer but rather in its sense of the request count. I'd like to see this renamed to requested or something similar in keeping with naming in other operators.

@davidmoten

davidmoten Sep 24, 2015

Contributor

p is not being used in the sense of a producer but rather in its sense of the request count. I'd like to see this renamed to requested or something similar in keeping with naming in other operators.

Show outdated Hide outdated src/main/java/rx/internal/operators/OperatorEagerConcatMap.java
}
if (!empty) {
long r = p.get();

This comment has been minimized.

@davidmoten

davidmoten Sep 24, 2015

Contributor

r could be req say and p gets renamed to requested

@davidmoten

davidmoten Sep 24, 2015

Contributor

r could be req say and p gets renamed to requested

}
missed = wip.addAndGet(-missed);
if (missed == 0) {

This comment has been minimized.

@davidmoten

davidmoten Sep 24, 2015

Contributor

A unit test that covers both branches of this statement might be nice

@davidmoten

davidmoten Sep 24, 2015

Contributor

A unit test that covers both branches of this statement might be nice

This comment has been minimized.

@akarnokd

akarnokd Sep 28, 2015

Member

Done. See the reentrancy test.

@akarnokd

akarnokd Sep 28, 2015

Member

Done. See the reentrancy test.

final EagerOuterSubscriber<?, ?> parent;
public EagerOuterProducer(EagerOuterSubscriber<?, ?> parent) {

This comment has been minimized.

@davidmoten

davidmoten Sep 24, 2015

Contributor

remove public? (All the constructors in this file for non-public static classes are marked as public).

@davidmoten

davidmoten Sep 24, 2015

Contributor

remove public? (All the constructors in this file for non-public static classes are marked as public).

This comment has been minimized.

@akarnokd

akarnokd Sep 24, 2015

Member

It doesn't really matter and it helps me visually tell apart the fields and the constructor in general.

@akarnokd

akarnokd Sep 24, 2015

Member

It doesn't really matter and it helps me visually tell apart the fields and the constructor in general.

Show outdated Hide outdated src/main/java/rx/internal/operators/OperatorEagerConcatMap.java
EagerInnerSubscriber<R> inner;
boolean d = done;

This comment has been minimized.

@davidmoten

davidmoten Sep 24, 2015

Contributor

rename to outerDone?

@davidmoten

davidmoten Sep 24, 2015

Contributor

rename to outerDone?

Show outdated Hide outdated src/main/java/rx/internal/operators/OperatorEagerConcatMap.java
for (;;) {
d = inner.done;

This comment has been minimized.

@davidmoten

davidmoten Sep 24, 2015

Contributor

There's a bit of reuse of the d boolean here for a different meaning (innerDone). Could we use a new variable name for this?

@davidmoten

davidmoten Sep 24, 2015

Contributor

There's a bit of reuse of the d boolean here for a different meaning (innerDone). Could we use a new variable name for this?

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Sep 24, 2015

Member

Thanks for the review @davidmoten . Usually I use one letter variables because I find it easier to parse the code and instead of long variable names, I use newlines to separate logical blocks. This way, I don't have to type that many letters and I don't have to wait while Eclipse returns with the content assist (which is blocking by the way and may take hundreds of milliseconds, even on my i7 + SSD).

I took the time and renamed variables as you asked for and added some more unit tests that check the code paths. That being said, I don't plan to do such renames in my contributions very often in the future and encourage anybody to post their PRs with their proposed cleanups/renames.

Member

akarnokd commented Sep 24, 2015

Thanks for the review @davidmoten . Usually I use one letter variables because I find it easier to parse the code and instead of long variable names, I use newlines to separate logical blocks. This way, I don't have to type that many letters and I don't have to wait while Eclipse returns with the content assist (which is blocking by the way and may take hundreds of milliseconds, even on my i7 + SSD).

I took the time and renamed variables as you asked for and added some more unit tests that check the code paths. That being said, I don't plan to do such renames in my contributions very often in the future and encourage anybody to post their PRs with their proposed cleanups/renames.

Show outdated Hide outdated src/main/java/rx/Observable.java
* @return
*/
@SuppressWarnings("unchecked")
public static <T> Observable<T> eagerConcatMap(

This comment has been minimized.

@stealthcode

stealthcode Sep 28, 2015

Should we name these eagerConcat instead of eagerConcatMap? These overloads do not take a FuncN so they aren't really mapping.

@stealthcode

stealthcode Sep 28, 2015

Should we name these eagerConcat instead of eagerConcatMap? These overloads do not take a FuncN so they aren't really mapping.

This comment has been minimized.

@davidmoten

davidmoten Sep 28, 2015

Contributor

Ditto that @stealthcode and for discoverability call it concatEager?

@davidmoten

davidmoten Sep 28, 2015

Contributor

Ditto that @stealthcode and for discoverability call it concatEager?

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Sep 28, 2015

Member

Thanks for the feedback. I've updated the method names to concatMapEager and concatEager, added the @Experimental tags, some javadoc and a missing test for a 2-parameter concatEager overload.

Member

akarnokd commented Sep 28, 2015

Thanks for the feedback. I've updated the method names to concatMapEager and concatEager, added the @Experimental tags, some javadoc and a missing test for a 2-parameter concatEager overload.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Sep 28, 2015

Member

Updated with a capacity fix for the SpscLinkedArrayQueue.

Member

akarnokd commented Sep 28, 2015

Updated with a capacity fix for the SpscLinkedArrayQueue.

@stevegury

This comment has been minimized.

Show comment
Hide comment
@stevegury

stevegury Sep 28, 2015

Member

I reviewed in details in the code and I believe it is correct.

But I wonder if subscribing to the sources in a "unbounded mode" is the right thing to do.
At the expense of complexifying the code, I believe it could be possible to split the request(n) between the sources. Maybe requesting 1 to all the m sources expect the first one, which we request n - m + 1 ... and so on.

The devil is in the details, and I believe this would significantly complexify the code, but I would like to know what you are thinking about this?

Member

stevegury commented Sep 28, 2015

I reviewed in details in the code and I believe it is correct.

But I wonder if subscribing to the sources in a "unbounded mode" is the right thing to do.
At the expense of complexifying the code, I believe it could be possible to split the request(n) between the sources. Maybe requesting 1 to all the m sources expect the first one, which we request n - m + 1 ... and so on.

The devil is in the details, and I believe this would significantly complexify the code, but I would like to know what you are thinking about this?

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Sep 29, 2015

Member

@stevegury This operator consumes source observables in order and doesn't make sense to split any request from downstream. If the downstream requests n and the first is requested n / count, that would hang the sequence because only the first is allowed to emit and it won't emit enough to trigger a new request.

It is possible to use a bounded buffer per source so while they are not consumed, they don't grow indefinitely. However, it means that each of them can produce only RxRingBuffer.SIZE elements before stopping and thus the operator wouldn't be eager anymore.

Member

akarnokd commented Sep 29, 2015

@stevegury This operator consumes source observables in order and doesn't make sense to split any request from downstream. If the downstream requests n and the first is requested n / count, that would hang the sequence because only the first is allowed to emit and it won't emit enough to trigger a new request.

It is possible to use a bounded buffer per source so while they are not consumed, they don't grow indefinitely. However, it means that each of them can produce only RxRingBuffer.SIZE elements before stopping and thus the operator wouldn't be eager anymore.

@stevegury

This comment has been minimized.

Show comment
Hide comment
@stevegury

stevegury Sep 29, 2015

Member

@akarnokd sorry I wasn't very clear but what I proposed was roughly what you described (replacing RxRingBuffer.SIZE by 1).

My main concern here is the use of unbounded buffer, which, I think, could be avoided.

Member

stevegury commented Sep 29, 2015

@akarnokd sorry I wasn't very clear but what I proposed was roughly what you described (replacing RxRingBuffer.SIZE by 1).

My main concern here is the use of unbounded buffer, which, I think, could be avoided.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Sep 29, 2015

Member

@davidmoten What do you think about the bounded buffering and the fact that such sources would act more like a delayed source?

Member

akarnokd commented Sep 29, 2015

@davidmoten What do you think about the bounded buffering and the fact that such sources would act more like a delayed source?

@davidmoten

This comment has been minimized.

Show comment
Hide comment
@davidmoten

davidmoten Oct 2, 2015

Contributor

I'd be happy to see bounded buffering (RxRingBuffer.SIZE sounds a better default size than 1 to me and would be consistent with the request eagerness of other operators like merge for instance). Could we add an overload to allow the specification of buffer size as well?

If this is significant rework I'm content to see this unbounded version documented with its buffering characteristics merged and we can defer the bounding work to another PR.

Contributor

davidmoten commented Oct 2, 2015

I'd be happy to see bounded buffering (RxRingBuffer.SIZE sounds a better default size than 1 to me and would be consistent with the request eagerness of other operators like merge for instance). Could we add an overload to allow the specification of buffer size as well?

If this is significant rework I'm content to see this unbounded version documented with its buffering characteristics merged and we can defer the bounding work to another PR.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Oct 3, 2015

Member

Done.

Member

akarnokd commented Oct 3, 2015

Done.

@davidmoten

This comment has been minimized.

Show comment
Hide comment
@davidmoten

davidmoten Oct 3, 2015

Contributor

Great, thanks @akarnokd ! Lightning quick as always.

Contributor

davidmoten commented Oct 3, 2015

Great, thanks @akarnokd ! Lightning quick as always.

@stevegury

This comment has been minimized.

Show comment
Hide comment
@stevegury

stevegury Oct 6, 2015

Member

LGTM 👍

Member

stevegury commented Oct 6, 2015

LGTM 👍

@abersnaze

This comment has been minimized.

Show comment
Hide comment
@abersnaze

abersnaze Oct 8, 2015

Contributor

The three of you seem happy with it so I'm merging it.

Contributor

abersnaze commented Oct 8, 2015

The three of you seem happy with it so I'm merging it.

abersnaze added a commit that referenced this pull request Oct 8, 2015

@abersnaze abersnaze merged commit 125b10d into ReactiveX:1.x Oct 8, 2015

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@akarnokd akarnokd deleted the akarnokd:EagerConcatMap branch Oct 8, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment