Skip to content

Flow onBackpressureReduce operator? #4271

@shirozatou

Description

@shirozatou

Use case

Fast data producer but slow collector, reduce information from each event without drop or suspend.
For example, send update events to channel, receive most recent result at long running task.

val f = flow {
    repeat(52) {
        emit(it)
        delay(100)
    }
}
f.onBackpressureReduce(::maxOf).collect {
    println(it)
    delay(1000)
}
// output: 0, 9, 19, 29, 39, 49, 51

The Shape of the API

// sample
fun <T : Any> Flow<T>.onBackpressureReduce(reducer: (T, T) -> T): Flow<T> {
    return channelFlow {
        var acc: T? = null
        collectLatest { value ->
            val v = acc?.let { reducer(it, value) } ?: value
            acc = v
            withContext(Dispatchers.Unconfined) {
                send(v)
                acc = null
            }
        }
    }.buffer(capacity = 0)
}

image

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions