Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can I test back-pressure? #157

Open
caiofaustino opened this issue Sep 29, 2022 · 8 comments
Open

How can I test back-pressure? #157

caiofaustino opened this issue Sep 29, 2022 · 8 comments

Comments

@caiofaustino
Copy link

Hi, I was wondering if there is any mechanism to test the same back-pressure behavior that collect() has.
Here is an example on the different behaviors I encounter.

@Test
public fun `when I test - flow should backpressure`(): TestResult = runTest {
    val myFlow:Flow<Int> = flow {
        println("emitting 1")
        emit(1)
        println("emitted 1")
        delay(10)
        println("emitting 2")
        emit(2)
        println("emitted 2")
        delay(10)
        println("emitting 3")
        emit(3)
        println("emitted 3")
    }


    println("COLLECT")
    myFlow.collect {
        println("collected $it")
        delay(50)
    }

    println()
    println("TEST")
    // Test
    myFlow.test {
        val item1 = awaitItem()
        println("collected $item1")
        delay(50)
        val item2 = awaitItem()
        println("collected $item2")
        delay(50)
        val item3 = awaitItem()
        println("collected $item3")
        awaitComplete()
    }
}

Prints

COLLECT
emitting 1
collected 1
emitted 1
emitting 2
collected 2
emitted 2
emitting 3
collected 3
emitted 3

TEST
emitting 1
emitted 1
emitting 2
emitted 2
emitting 3
emitted 3
collected 1
collected 2
collected 3
@JakeWharton
Copy link
Collaborator

Turbine uses an unconfined collector which runs concurrently with your test lambda to ensure that we see all values. We do not have a mechanism for applying backpressure to the upstream source.

Can you describe what you're actually testing?

@caiofaustino
Copy link
Author

caiofaustino commented Sep 29, 2022

This is probably not a good implementation, but I'm trying to see if I could have a flow that exposes an editable queue.
Each collect suspends and takes some time to execute, while asynchronously the rest of the queue could be cleared or have new items added to it.

So while collect is still suspended, and the queue is cleared, the next item would not be emitted.

Any suggestions on how to achieve this would be appreciated :)

Here is a bit of a naive implementation.

val eventQueueList = mutableListOf<Event>()
val eventQueueFlow = flow {
    while (true) {
        val event = eventQueueList.removeFirstOrNull()
        if (event != null) {
            emit(event)
        }
        delay(10)
    }
}

@JakeWharton
Copy link
Collaborator

We can possibly design a variation of the API that allows applying backpressure by using a channel with a smaller buffer and potentially an intermediate coroutine which controls the number of allowed items before it blocks the collector.

@JakeWharton
Copy link
Collaborator

Okay my thought here is that we basically let you control the number of buffered items by blocking the collector. By default it's infinite (the current behavior). You can explicitly increase the count which will unblock the collector for N items. Or if the collector is blocked and you call awaitItem we implicitly increase the count by 1 (effectively creating rendezvous behavior).

@caiofaustino
Copy link
Author

Ok I think I got it, so the default behavior would be the same, but somehow I could make this a blocking test, and there I would be able to call awaitItem() to increase the buffer by 1 and get the next item, with the behavior I expected on the description of the issue, or I could also call something like awaitItems(3) to increase the buffer by N and collect the next N values. Effectively suspending the producer until await* is called.

Is my understand correct? Do you think this could me implemented in the same flow.test{} API without breaking other behaviors, or should it be something like flow.blockingTest{}? Because I think with the blocking one you will lose the capability of detecting a timeout on a hanging await* right?

@JakeWharton
Copy link
Collaborator

I think it would be the same API.

You would call .test(initialCapacity = 1) { .. } (or whatever you want) which would allow at most 1 item into the channel and then block the collector. We probably have to force you to at least allow 1 from the start because we need to start collecting from the Flow. But then after that, because the collector is blocked, backpressure is applied. So if your Flow is conflated and you trigger two operations they should result in only a single notification when you eventually unblock the collector with a second awaitItem call or a call to a new function such as increaseCapacity(1) (but with a better name).

@caiofaustino
Copy link
Author

I agree with enforcing at least 1.

But I can't think of a use case where I would want to call increaseCapacity(N) without an associated awaitItem. Something like unblocking the Flow production in a controlled manner without collecting it, but I guess there might be some convoluted use case for it since the buffer already exists.

@JakeWharton
Copy link
Collaborator

I mean we can omit it and start with only two modes: rendezvous or unlimited buffer. If someone complains at least we have an idea on how to build support for arbitrary buffer capacities, but maybe no one actually needs it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants