Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flowable scan operators backpressure inconsistency #7109

Closed
fsbarata opened this issue Nov 12, 2020 · 4 comments · Fixed by #7110
Closed

Flowable scan operators backpressure inconsistency #7109

fsbarata opened this issue Nov 12, 2020 · 4 comments · Fixed by #7110

Comments

@fsbarata
Copy link
Contributor

Hello,

Flowable operator scan (without initial value) pulls values according to downstream requests, but other scan variations will pull in buffersize() chunks.

RxJava version: 3.0.7

Code (if kotlin is not acceptable, let me know and I will amend to a Java version):

class ScanTest {
	@Test
	fun `scan with initial`() {
		var requested = 0L
		Flowable.never<Int>()
			.doOnRequest { requested += it }
			.scan(0, Int::plus)
			.test(2)

		assert(requested == 1L) { "Requested was $requested and not 1" }
	}

	@Test
	fun `scan without initial`() {
		var requested = 0L
		Flowable.never<Int>()
			.doOnRequest { requested += it }
			.scan(Int::plus)
			.test(1)

		assert(requested == 1L) { "Requested was $requested and not 1" }
	}

	@Test
	fun `scanWith`() {
		var requested = 0L
		Flowable.never<Int>()
			.doOnRequest { requested += it }
			.scanWith({ 0 }, Int::plus)
			.test(2)

		assert(requested == 1L) { "Requested was $requested and not 1" }
	}
}

only the second test passes.

Trace:

java.lang.AssertionError: Requested was 127 and not 1

	at ScanTest.scanWith(ScanTest.kt:35)


java.lang.AssertionError: Requested was 127 and not 1

	at ScanTest.scan with initial(ScanTest.kt:13)

My expectation would be that after supplying the initial value, the remaining values would be requested up as the downstream requires (akin to the no initial value scan).

@akarnokd
Copy link
Member

My expectation would be that after supplying the initial value, the remaining values would be requested up as the downstream requires

It is by design due to how asynchronous requesting can interact with that very first value emission vs the upstream.

For example, the upstream may be completing asynchronously without any value exactly the time when the downstream requests 1 item (the initial value) thus there is a race and need for serialization. The straightforward solution is to use prefetching + queue just like many other operators, consequently, the downstream request pattern is lost in the process.

The prefetch amount is currently not exposed on the scan operator so in case you find the default too high, you can use rebatchRequests to lower the amount.

@fsbarata
Copy link
Contributor Author

fsbarata commented Nov 12, 2020

Many thanks for clarifying that.

Is it worth documenting the behavioural difference in the operator javadoc?

Edit: or perhaps provide a new BackpressureSupport value (like BUFFERED_IN)

@akarnokd
Copy link
Member

Their documentation could include an expanded Backpressure: section:

     * <dl>
     *  <dt><b>Backpressure:</b></dt>
     *  <dd>
+    * The downstream request pattern is not preserved across this operator. 
+    * The upstream is requested {@link #bufferSize()} - 1 upfront and 75% of {@link #bufferSize()} thereafter.
     *  </dd>

Would you like to create a PR for such documentation change?

Edit: or perhaps provide a new BackpressureSupport value (like BUFFERED_IN)

Not likely.

@fsbarata
Copy link
Contributor Author

No problem. Will do

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants