Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No way to check if there is a queued/buffered item awaiting in Flow #2193

Open
Globegitter opened this issue Aug 11, 2020 · 12 comments
Open
Labels

Comments

@Globegitter
Copy link

Globegitter commented Aug 11, 2020

I have been implementing a natural buffering operator as discussed in #902 and just discovered an edge case issue.

The code we have is very similar to what I posted in the other issue:

@ExperimentalCoroutinesApi
suspend fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = channelFlow<List<T>> {
    require(maxChunkSize >= 1) {
        "Max chunk size should be greater than 0 but was $maxChunkSize"
    }
    buffer(1)
    val bufferChunks = ArrayList<T>(maxChunkSize)
    collect {
        bufferChunks += it
        if (bufferChunks.size < maxChunkSize) {
            val offered = offer(bufferChunks.toList())
            if (offered) {
                bufferChunks.clear()
            }
        } else {
            send(bufferChunks.toList())
            bufferChunks.clear()
        }
    }
    if (bufferChunks.size > 0) send(buffer.toList())
}.buffer(0)

This has been working very well for us, but the issue I now uncovered is: Given a buffer that is not full, if I offer the buffer but the downstream consumer is busy the code here will then wait until it is able to collect an additional element before offering again to the downstream consumer, even though it may take a long time until I get the additional element and the downstream consumer might be free in the meantime. So is there a way to check if there is a queued item so I could offer the incomplete buffer until I can collect a new item? So ideally my code could now look something like that:

...
var offered = false
while (hasNoItemsQueued && offered == false) {
  var offered = offer(bufferChunks.toList())
}
...

where hasNoItemsQueued would be a variable in the scope provided by the collect method. This is btw just for illustration purposes and in the end I do not care how this would be implemented.

I have been able to get around this in a quite hacky/complicated way (just for a prototype), using an onEach that runs before the collect in a different context and sets a shared variable. But that does feel quite ugly and I am not even sure if that approach is free of issues.

@Globegitter
Copy link
Author

Globegitter commented Aug 11, 2020

I did actually find a way around this by changing the code to:

@ExperimentalCoroutinesApi
suspend fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = channelFlow<List<T>> {
    require(maxChunkSize >= 1) {
        "Max chunk size should be greater than 0 but was $maxChunkSize"
    }
    buffer(1)
    val bufferChunks = ArrayList<T>(maxChunkSize)
    produceIn(this).consume {
        while (!isClosedForReceive) {
          val it = receive()
          bufferChunks += it
          if (bufferChunks.size < maxChunkSize) {
           do {
              val offered = offer(bufferChunks.toList())
              if (offered) {
                  bufferChunks.clear()
              }
              delay(1)
            } while (isEmpty && bufferChunks.size > 0)
          } else {
            send(bufferChunks.toList())
            bufferChunks.clear()
          }
      }
    }
    if (bufferChunks.size > 0) send(buffer.toList())
}.buffer(0)

not sure how ideal this is as now we change the flow to a channel, or if that has some other implications for the rest of the application, but it does work in my initial tests. Would be great however to hear if that is the best way of if there is some way to achieve this with pure flow.

@Globegitter
Copy link
Author

I just tested this implementation with the produceIn and our average batch size has greatly reduced and performance has gotten much worse. I am not sure if the issue is because now we convert everything to channels or if that is just inherent about the added loop. I wonder if there was some other way arround it, like if poll had a time parameter, so it would try receiving a new item for 500ms and if it then did not receive it we could offer the same buffer again. Or similar if flow had some method 'do something if we did not receive a new element for x ms'

But no matter what, it would be really great if this issue could be solved some way.

@qwwdfsad qwwdfsad added the flow label Aug 13, 2020
@zach-klippenstein
Copy link
Contributor

Few notes about your code:

  1. bufferedChunks shouldn't be a suspend function. All it does is call callbackFlow and buffer(), neither of which are suspending calls. Functions that don't make suspending calls shouldn't be marked as suspending, because they force the compiler to unnecessarily generate state machines for the caller when the caller doesn't make any other suspending calls. It's also confusing for people reading the code, because a function that returns a Flow generally shouldn't need to suspend at all.
  2. That buffer(1) call is a no-op. Flow operators return new flows. Your buffer(1) call isn't doing anything with the return value, so it's a noop. For it to be applied to the current flow, you'd need to do buffer(1).produceIn(this).
  3. I'm guessing you noticed a performance impact because you're literally calling (the coroutine equivalent of) sleep in your polling loop. Luckily, the channel API is designed so you don't need do this kind of polling.

I think this code is roughly equivalent to what you wrote:

@ExperimentalCoroutinesApi
fun <T> Flow<T : Any>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = callbackFlow<List<T>> {
    require(maxChunkSize >= 1) {
        "Max chunk size should be greater than 0 but was $maxChunkSize"
    }
    val bufferChunks = ArrayList<T>(maxChunkSize)
    val upstreamChannel = buffer(1).produceIn(this)
    val downstreamChannel = this

    while (!upstreamChannel.isClosedForReceive) {
        // Buffer is full, don't process any more upstream emissions until the
        // buffer has been emitted.
        if (bufferChunks.size >= maxChunkSize) {
            downstreamChannel.send(bufferChunks.toList())
            bufferChunks.clear()
        }

        // Wait for new upstream emissions, but also try to send any buffered
        // items.
        select {
            upstreamChannel.onReceiveOrNull {
                // Null means the upstream completed while we were suspended, and
                // the loop will terminate after this. Note that if you need T to
                // be nullable, you'll need to wrap your upstream values in some
                // sort of additional value to distinguish between actual null
                // values and the close sentinel. Hopefully there will eventually
                // be an onReceiveOrClosed method that makes this simpler.
                if (it != null) {
                    bufferChunks += it
                }
            }

            if (bufferChunks.isNotEmpty()) {
                downstreamChannel.onSend(bufferChunks.toList()) {
                    bufferChunks.clear()
                }
            }
        }
    }

    // After upstream completes, flush any remaining items.
    if (bufferChunks.isNotEmpty()) send(buffer.toList())
}.buffer(0)

That said, I think you could also write this with only one channel:

@ExperimentalCoroutinesApi
fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = callbackFlow<List<T>> {
    coroutineScope{
        require(maxChunkSize >= 1) {
            "Max chunk size should be greater than 0 but was $maxChunkSize"
        }
        val bufferChunks = ArrayList<T>(maxChunkSize)
        val downstreamChannel = this@callbackFlow
        var sendJob: Job? = null
    
        collect { item ->
            // Cancel the send job in case the downstream is slow.
            // Need to join on the job to synchronize access to the buffer.
            sendJob?.cancelAndJoin()
    
            bufferChunks += item
            // Cache the full status of the buffer, since it's not safe to access
            // the buffer after launching the below coroutine until the coroutine has
            // completed.
            val isBufferFull = bufferChunks.size >= maxChunkSize

            // Launch a coroutine to send, so we can still accept more upstream emissions.
            sendJob = launch {
                // Potentially executing on different thread, but not racy since
                // the main coroutine will not touch the buffer until this job has
                // completed.
                downstreamChannel.send(bufferChunks.toList())
                // Send has atomic cancellation - if the send succeeds, it will
                // not throw a CancellationException even if the job was cancelled.
                bufferChunks.clear()
            }
    
            if (isBufferFull) {
                // Don't process any more upstream emissions until the
                // buffer has been emitted.
                sendJob!!.join()
            }
        }
    }
}.buffer(0)

@Globegitter
Copy link
Author

@zach-klippenstein thanks for the comments/improvements and possible solutions. I started to think of a similar solution to number 2 now as well so that is great to see that I was on the right track there. I wonder if this use-case could be made simpler than this solution, but otherwise we do have an answer now and if the API is not likely to change this issue could also be closed.

@elizarov
Copy link
Contributor

@zach-klippenstein It is unsettling to see a solution that relies on atomic cancellation in Channel.send give that we are thinking on abolishing this behavior in the next major release. See #1813. It makes me wonder how much other code "in the wild" does a similar exploit.

@zach-klippenstein
Copy link
Contributor

Not a huge fan of that myself, even without the future plans. It's a little too "magic". Atomic cancellation is load bearing but very subtle and not obvious unless you read all the docs.

@Globegitter
Copy link
Author

@elizarov I am now making use of a solution very similiar to the latest solution suggested by @zach-klippenstein, i.e. the one with the sendJob. Is that something that will be affected by #1813? And if so, is there another proposed solution for this issue?

@elizarov
Copy link
Contributor

elizarov commented Oct 8, 2020

You don't need two channels nor the select expression for this case. Performance will be much higher. You can use the following approach for natural batching:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*

@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) 
fun <T : Any> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> {
    require(maxChunkSize >= 1)
    return flow<List<T>> {
        coroutineScope {
            val upstreamChannel = this@bufferedChunks.buffer(maxChunkSize).produceIn(this)
            while (true) { // loop until closed
                val bufferChunks = ArrayList<T>(maxChunkSize) // allocate new array list every time
                // receive the first element (suspend until it is there)
                // null here means the channel was closed -> terminate the outer loop
                val first = upstreamChannel.receiveOrNull() ?: break
                bufferChunks.add(first)
                while (bufferChunks.size < maxChunkSize) {
                    // poll subsequent elements from the channel's buffer without waiting while they are present
                    // null here means there are no more element or channel was closed -> break from this loop
                    val element = upstreamChannel.poll() ?: break
                    bufferChunks.add(element)
                }
                emit(bufferChunks)
            }
        }
    }
}

@pacher
Copy link

pacher commented Nov 12, 2020

@Globegitter Did you end up using a solution from @elizarov? How is your experience? What about performance?

@Globegitter
Copy link
Author

@pacher Not yet. My work is project-based and the bug I described has not been high priority enough so I have currently been assigned to a different project. I do expect to come back to it in the future however.

@andguevara
Copy link

You don't need two channels nor the select expression for this case. Performance will be much higher. You can use the following approach for natural batching:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*

@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) 
fun <T : Any> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> {
    require(maxChunkSize >= 1)
    return flow<List<T>> {
        coroutineScope {
            val upstreamChannel = this@bufferedChunks.buffer(maxChunkSize).produceIn(this)
            while (true) { // loop until closed
                val bufferChunks = ArrayList<T>(maxChunkSize) // allocate new array list every time
                // receive the first element (suspend until it is there)
                // null here means the channel was closed -> terminate the outer loop
                val first = upstreamChannel.receiveOrNull() ?: break
                bufferChunks.add(first)
                while (bufferChunks.size < maxChunkSize) {
                    // poll subsequent elements from the channel's buffer without waiting while they are present
                    // null here means there are no more element or channel was closed -> break from this loop
                    val element = upstreamChannel.poll() ?: break
                    bufferChunks.add(element)
                }
                emit(bufferChunks)
            }
        }
    }
}

I think this method does not do buffering, I was trying it out in a project and the elements in the flow would not get buffered, not sure if someone else has seen that. If I get a chance I will try to elaborate more but at least that was my experience copying that method verbatim

@pacher
Copy link

pacher commented Jul 14, 2021

@andguevara It is not buffering, it is batching. If your collector processes elements fast enough there will be no buffering. Try to slow it down with some delay and you should see elements coming in in batches instead of slowing down the whole flow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants