Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take Doesn't Reduce Large RequestN #5077

Closed
benjchristensen opened this issue Feb 7, 2017 · 23 comments
Closed

Take Doesn't Reduce Large RequestN #5077

benjchristensen opened this issue Feb 7, 2017 · 23 comments

Comments

@benjchristensen
Copy link
Member

When a child subscriber submits a large requestN value, such as Long.MAX_VALUE, the 'take' operator does not reduce it as expected.

For example, in the following where a default subscribe happens and requests Long.MAX_VALUE up, it is expected that the take(10) would adjust the requested size to the max value that take will permit through.

someFlowable.take(10).subscribe(s)

Here is a unit test:

    @Test
    public void testDoesntRequestMoreThanNeededFromUpstream2() throws InterruptedException {
        final AtomicLong requests = new AtomicLong();
        TestSubscriber<Long> ts = new TestSubscriber<Long>();
        Flowable.interval(100, TimeUnit.MILLISECONDS)
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long n) {
                    System.out.println(n);
                    requests.addAndGet(n);
            }})
            .take(2)
            .subscribe(ts);
        
        ts.awaitTerminalEvent();
        ts.assertComplete();
        ts.assertNoErrors();
        assertEquals(2, requests.get());
    }

This errors with:

java.lang.AssertionError: expected:<2> but was:<9223372036854775807>
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:834)
	at org.junit.Assert.assertEquals(Assert.java:645)
	at org.junit.Assert.assertEquals(Assert.java:631)
	at io.reactivex.internal.operators.flowable.FlowableTakeTest.testDoesntRequestMoreThanNeededFromUpstream2(FlowableTakeTest.java:419)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

Is there a reason that take in RxJava 2 does not behave this way and reduce the requestN value to the limit?

@akarnokd
Copy link
Member

akarnokd commented Feb 7, 2017

If the downstream is requesting more than N, the operator requests unbounded so the upstream can go on a fast-path when it emits. Take will then stop the upstream at N and there is no overflow. If the downstream requested M < N, take will also just request M.

Try this:

Flowable.interval(100, TimeUnit.MILLISECONDS)
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long n) {
                    System.out.println(n);
                    requests.addAndGet(n);
            }})
            .take(2)
            .rebatchRequests(1)         // <-----------------------------------------
            .subscribe(ts);

@benjchristensen
Copy link
Member Author

That doesn't make sense to me and differs from v1 behavior. If the upstream is across a network boundary that means someone has to know that take doesn't actually reduce the requested amount and therefore always needs to use both take and rebatch.

I don't buy the argument that if downstream requests more that this should be done, as it is normal for the downstream subscriber to use the default value and take to be used to constrain the output.

If unconstrained output was wanted I'd use an Observable, but I'm using Flowable so expect it to be constrained.

@akarnokd
Copy link
Member

akarnokd commented Feb 7, 2017

differs from v1 behavior

New major versions have the opportunity to change behavior on some level.

If the upstream is across a network boundary that means someone has to know that take doesn't actually reduce the requested amount and therefore always needs to use both take and rebatch.

Is this for the ReactiveSocket library? As an IO boundary provider, you should never trust the downstream's request amount the same way as observeOn doesn't trust it and requests on its own pace.

@benjchristensen
Copy link
Member Author

No it's not for ReactiveSocket. But it's for Reactive Streams APIs that involve IO.

I understand a major version can change behavior, but I don't understand why this would change. Please explain why the very small perf improvement in the source is worth changing this behavior? The point of Flowable is to constrain emission. If I wanted unconstrained I'd use Observable.

Why should the source not be restricted to the maximum that take will accept for real world use cases and not just fabricated micro benchmarks?

@akarnokd
Copy link
Member

akarnokd commented Feb 7, 2017

§3.17: A Subscription MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (java.lang.Long.MAX_VALUE). A demand equal or greater than 2^63-1 (java.lang.Long.MAX_VALUE) MAY be considered by the Publisher as “effectively unbounded”[3].

It is completely legal for an operator to go unbounded if it ensures the rest of the rules towards its downstream consumer. Running unbounded means less overhead from operators which means more time to process the actual items, especially those just came over from an async boundary (observeOn).

If I wanted unconstrained I'd use Observable

You are still thinking about requests as an end-to-end way of communicating with some server and sending a message to it requesting the next item based on consumption at the very end. Reactive-Streams backpressure is different. It is a negotiation between subsequent operators. Some may not interfere it, some might change the timing or amount, some might go unbounded. It is about honoring the downstream's amount and working out an amount for the upstream.

For example:

source.observeOn(Schedulers.computation()).take(5).subscribe(...)

Will the source consumed in an unbounded manner just because take(5) goes unbounded?

No.

take(5) and observeOn negotiate that observeOn is free to emit as fast as it can and as many items as available because take can handle it.

source and observeOn negotiate that due to the async boundary, batching is desired thus observeOn requests the default amount (128) and keeps requesting once take consumed what's already became available over the boundary. This requires source to honor that initial 128 and any subsequent requests (96).

@benjchristensen
Copy link
Member Author

It is completely legal for an operator to go unbounded if it ensures the rest of the rules towards its downstream consumer.

Yes I know that. I was involved in arguing for inclusion of that rule in the Reactive Streams spec and know the benefits.

You are still thinking about requests as an end-to-end way of communicating with some server and sending a message to it requesting the next item based on consumption at the very end. Reactive-Streams backpressure is different.

No I'm not. Remember I was part of the founding group for Reactive Streams, so I understand the semantics. Early in the discussions it was assumed every operator hop was async, but we argued that hops could be synchronous and therefore would benefit from optimizations in the spec, such as rule 3.17 and 2.2.

Some may not interfere it, some might change the timing or amount, some might go unbounded. It is about honoring the downstream's amount and working out an amount for the upstream.

I understand that, and am not questioning that. I'm questioning a simple implementation choice, not a contractual or semantic problem. Everything that the take operator is doing is perfectly legal. I am purely questioning the decision to not alter the upstream request(n) as it seems a legitimate choice to reduce to the limit.

But I won't bother attempting discussion any further. It's not important enough to me.

@davidmoten
Copy link
Collaborator

thanks for raising this @benjchristensen. I wasn't aware of it.

So I'm assuming then that if I've got some synchronous network requesting source (that can't effectively emit partially e.g. a web service request that returns a list of items) and I don't want that source to transfer an unnecessarily large list in reaction to a take(2) then I suppose I would have to do this:

source.rebatchRequests(2).take(2);

This is certainly an uncommon use case for me but I did like that in RxJava 1.x upstream requests were constrained generally to what was required by downstream with the exception of a few well-known operators like flatMap and observeOn. Perhaps now I can't make any assumptions and when I want to constrain requests I need to use rebatchRequests in lots of places. We'll see how that goes I suppose.

@davidmoten
Copy link
Collaborator

@akarnokd or @benjchristensen can you reopen this issue? I think it needs more discussion.

@benjchristensen
Copy link
Member Author

The use case is fairly common for me. Consider this:

Library:

public Flowable<Data> getDataFromNetwork(Input args) {
   return SomeFlowable(...)
               // ensure over network we do no more than request(128)
               .rebatchRequests(128);
}

Then a consumer that I don't control:

getDataFromNetwork(...).take(10).subscribe(...)

In that case I would prefer to only request 10 items over the network, but in the getDataFromNetwork method I don't know what my consumers will request. And the consumer calling it doesn't and shouldn't know the implementation details. So, the way take is currently implemented, I will have to just request(128), start emitting over the network, over produce and asyhnchronously race a cancel back over the network. Most likely I will end up producing all 128, sending many or most of them over the network, and then dropping 118 of them on the floor.

@davidmoten
Copy link
Collaborator

davidmoten commented Feb 8, 2017

Yep, I'm not a fan of that behaviour and I'm sure I'll encounter it tying together service calls as our organisation moves more logic behind services. There's a broader consideration here I suppose which is how much should users of the RxJava api know or expect of the request behaviour of operators. We don't really have a guiding principle in place that helps us here. For me I'd like to see operators only request what they need and where for performance considerations it makes sense to request in batches (across async boundaries for instance) then it should be configurable. flatMap and observeOn are configurable in this regard.

I've had a quick look through the list of operators and I've identified the operators that I'd prefer only requested what they need. To have constrained requesting behaviour on these operators as well as the ability to configure flatMap, observeOn and call rebatchRequests is enough control over request patterns for my needs.

Note that all these below request Long.MAX_VALUE:

elementAt
firstXXX
take

The operators above are the only operators I saw that have a definite upper bound that is less than Long.MAX_VALUE on the number of items required to complete.

@akarnokd
Copy link
Member

akarnokd commented Feb 8, 2017

Anything that has the @BackpressureSupport(UNBOUNDED_IN), including subscribe(Consumer). It is the duty of the getDataFromNetwork(...) to turn any downstream request pattern into a reasonable amount for its level, for example, by rebatchRequests internally.

@smaldini
Copy link

smaldini commented Feb 8, 2017

@akarnokd is right the way we solve it on the same behavior with our network components (at least kafka, redis and netty ones) is to address the Network IO by prefetch at generator level. Basically backpressure certainly applies around boundaries and you shouldn't bind yourself into a request behavior downstream.

In the end the unbounded information is still valuable as you influence the rate of replenishing upstream. We talked about it with @akarnokd last year already when we were experimenting with reactive-streams-commons and it we agreed then for the 3 libraries (including reactor) it seemed the right thing to do without failing the specification. So far we have one contention point with the RS spec which is to signal error if request is invalid/negative that we ignore until a special operator is used.

That's the same question with aggregating operators for instance which are going to be unbounded given the implicit contract that no callback is blocking. Of course it could block if told otherwise by an explicit thread jump like observeOn which then falls into the category discussed above for the generator, prefetch fixed-size queue.

@benjchristensen
Copy link
Member Author

It is the duty of the getDataFromNetwork(...) to turn any downstream request pattern into a reasonable amount for its level, for example, by rebatchRequests internally.

This makes total sense that it must define the max, but when it is known that the downstream is requesting less, then it would be better if that information was passed upwards so requests are constrained all the way up the chain.

Regarding the specific implementation of take, its current implementation makes total sense if optimizing for synchronous behavior and microbenchmarks. However, synchronous behavior is best served by Iterable or java.util.Stream. Flowable is intended for constrained emission over async boundaries.

In practice, the cost of over-producing elements, and over-sending elements across async boundaries (network, threads, queues) is far higher than any optimization gained by sending request(Long.MAX_VALUE) to allow micro-optimizations.

Thus, I suggest take is optimizing for the wrong thing and should instead focus on allowing the prevention of over-production across async boundaries - the very case Flowable is intended for.

@davidmoten
Copy link
Collaborator

davidmoten commented Feb 9, 2017

This makes total sense that it must define the max, but when it is known that the downstream is requesting less, then it would be better if that information was passed upwards so requests are constrained all the way up the chain.

I agree with this. The potentially bounded request from a take or the request of one from a first is valuable information that can help upstream do less. I understand the benefits of initiating a fast path in the upstream and allowing cancellation to chop it off but this only applies for a particular category of Flowable and we are favouring this category too much.

It might be worth noting that take(N) will pass through requests smaller than N if it is not already unbounded so getDataFromNetwork() could conceivably get a lot of requests for 1 item for instance. This might seem like a problem for using requests to determine getDataFromNetwork fetch amounts but I don't think it is. I'm making operators in rxjava2-extras like .rebatchRequests(min, max, constrainFirstRequest) which would rebatch requests between min and max and according to the boolean parameter would allow the first request to be less than min (max always applies).

getDataFromNetwork might be defined like this:

public Flowable<Data> getDataFromNetwork() {
   return SomeFlowable(...)
               // ensure over network we do no more than request(128)
               // first request is allowed through even if less than 10
               // later requests smaller than 10 will be rebatched to 10
               .rebatchRequests(10, 128, true);
}

and use cases like these below would all be quite efficient in terms of the network fetch if take,first,etc didn't go unbounded:

//will fetch 1
getDataFromNetwork().first().subscribe();

//will fetch in batches of 128 
getDataFromNetwork().take(1000000).subscribe();

//will fetch 1, then 10, 10, 10, ...
getDataFromNetwork().take(100).rebatchRequests(1).subscribe(); 

This approach is helpful when calls to the network source may happen frequently overall from many client processes (distributed or not) but the individual subscriptions to getDataFromNetwork() are not that busy.

@akarnokd
Copy link
Member

akarnokd commented Feb 10, 2017

Let's assume take didn't request unbounded, the following setup would still result in an excessive network usage if the source doesn't adjusts request amounts:

networkSource().take(1_000_000)
.subscribe(v -> { }, e -> { }, () -> { }, s -> s.request(1_000_000));

In RxJava, whenever there is a boundary, the operator adjusts its input request amounts to remain reasonable, such as observeOn and flatMap. take is not an async boundary, nor reduce, map, first, etc.

@davidmoten
Copy link
Collaborator

Let's assume take didn't request unbounded, the following setup would still result in an excessive network usage if the source doesn't adjusts request amounts.

Yep I agree, but we use a different rebatching operator internal to networkSource() that offers more flexibility than the existing rebatchRequests and still limits the fetch amounts across the network.

I've implemented some new request limiting operators in rxjava2-extras (minRequest, maxRequest and rebatchRequests(min, max, constrainFirstRequestMin) and another creation method called fetchPagesByRequest whose documentation has a fully worked example of what I'm getting at.

If there's viability to these new operators then review would be nice later (it's only a day old and I'll check it some more in the next week or two).

Critical to the argument is that I want support for sources that under-the-covers don't easily support streaming. This is a common situation especially for integrating legacy distributed services using RxJava.

@benjchristensen
Copy link
Member Author

Like @davidmoten I am doing rebatching myself before sending over the network as the built-in one is too limiting. I for example want:

  • an initial request of 256
  • subsequently maintain it at 64
  • send lower request value if downstream requests lower (such as what I expect take to do)

@akarnokd
Copy link
Member

@benjchristensen As you demonstrated publicly, you consider keeping behavioral compatibility very important and defended it to ensure continued interoperability. I have the same priority regarding changes to established behavior in RxJava 2 where I believe there was plenty of time before release anyone could bring up this problem, at which point and even now, I see the following compatible changes possible:

  1. have a FlowableTransformer-based take() variant in a library outside RxJava 2,
  2. have a new method with the behavior: for example, reintroduce limit(n): limits both the request amount and items consumed from upstream , or
  3. have an overload with a parameter about the backpressure handling strategy.

@davidmoten

In case you plan to implement any of the options, you may want to prepare for the following case setup:
source.limit(150).observeOn(scheduler).

If you want the 150 to be the upper bound of the sum of total requests sent to source, you need custom accounting because observeOn will first request 128 and once reaching the 96th item, another 96 is requested which would overshoot 150 unless capped by logic in request().

@davidmoten
Copy link
Collaborator

there was plenty of time before release

Sort of. That enormous rewrite was never going to get a thorough review, people didn't have the time unfortunately. I'm also not sure if this issue was ever raised specifically and discussed in terms that would have sparked reaction. I went looking for it in old issues and PRs and couldn't find it (but might be there of course).

That's water under the bridge now of course. @akarnokd are you open to a breaking change on this one so that the default behaviour is not unbounded? I frankly doubt it would affect anyone at all and the only people that would have read and absorbed the backpressure annotations on take etc would probably have been as surprised as @benjchristensen and myself.

In case you plan to implement any of the options, you may want to prepare for the following case setup:
source.limit(150).observeOn(scheduler).

Yeah thanks for that, I did handle request accounting in the operators in rxjava-extras as you describe when I wrote them weeks ago. If you (or anyone else) can review them terrific. I opened an issue at davidmoten/rxjava2-extras#4.

@akarnokd
Copy link
Member

akarnokd commented Mar 8, 2017

are you open to a breaking change

Not really, adding a new operator limit or take(n, eager) is a better option.

@davidmoten
Copy link
Collaborator

davidmoten commented Mar 15, 2017

I'd prefer an overload on take (and I suppose on first and elementAt)

@benjchristensen
Copy link
Member Author

That doesn't really help the primary use case for me since the consumer doesn't know what the producer behavior is. They shouldn't have to know to do this or not, the producer should just receive the smallest possible request(n) value - and take is a clear indicator that no more than the take(n) value ever needs to be produced.

The point of Flowable is flow control across async boundaries - preventing over production and buffering.

If optimizing for synchronous execution is what I was looking for I'd use Iterable/Stream. If flow control was not needed, I'd use Observable.

@akarnokd
Copy link
Member

Closing via #5655.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants