Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow: decouple buffer size from various operators and fuse #1233

Closed
elizarov opened this issue May 30, 2019 · 0 comments
Closed

Flow: decouple buffer size from various operators and fuse #1233

elizarov opened this issue May 30, 2019 · 0 comments
Assignees
Labels

Comments

@elizarov
Copy link
Contributor

elizarov commented May 30, 2019

There is a number of flow operators that create several coroutines and/or establish channel to communicate between them: flowOn, channelFlow, produceIn, broadcastIn, flattenMerge, flatMapMerge. Buffer size in those operators is an optional an orthogonal concept that is not essential to those operations, so there is a proposal.

Introduce a separate buffer operator that becomes the only operator that takes buffer size parameter (parameter will be named capacity). This operator has its independent use-case -- it decouples upstream flow emitter from the flow collector by running them in different coroutine with a channel between them.

When this buffer operator is adjacent to any of the aforementioned operators it "fuses" with them and configures their buffer size. Note, that some operators like flowOn optionally create a channel (flowOn only creates a channel when there is a need to change a dispatcher), while buffer operator explicitly requests channel creation. The goal of fusion is to create the minimum number of channels -- only when they are required or explicitly requested.

The ultimate use-case of channel operator fusion is demonstrated by the following code:

channelFlow { 
    while(true) { 
        send(computeSomeValue()) 
    } 
}
    .flowOn(Dispatchers.IO)
    .buffer(32)
    .broadcastIn(scope) // terminal op that returns BroadcastChannel

This chain of calls leads to creation of a single BroadcastChannel instance with a buffer size of 32 and a coroutine that calls computeSomeValue() in IO dispatcher that directly sends to this broadcast channel as if broadcast(IO, 32) { ... } builder was used. A truly declarative way to configure the corresponding computation topology.

Adjacent calls to flowOn in any combination with buffer calls are also fused.

As a part of this change the "default" size of the channel is centralized. A special Channel.DEFAULT marker is introduced. It causes Channel(...) and BroadcastChannel(...) factory to builders to create a channel with a default capacity (16 by default). This default is used unless the buffer size is explicitly specified.

@elizarov elizarov added the flow label May 30, 2019
elizarov added a commit that referenced this issue May 30, 2019
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.DEFAULT buffer size marker

Fixes #1233
elizarov added a commit that referenced this issue May 30, 2019
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.DEFAULT buffer size marker

Fixes #1233
@elizarov elizarov self-assigned this May 30, 2019
elizarov added a commit that referenced this issue May 30, 2019
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.DEFAULT_BUFFER buffer size marker

Fixes #1233
elizarov added a commit that referenced this issue May 31, 2019
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.BUFFERED buffer size marker to request
  buffered channel with a default (unspecified) size

Fixes #1233
elizarov added a commit that referenced this issue Jun 3, 2019
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.BUFFERED buffer size marker to request
  buffered channel with a default (unspecified) size

Fixes #1233
elizarov added a commit that referenced this issue Jun 4, 2019
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.BUFFERED buffer size marker to request
  buffered channel with a default (unspecified) size

Fixes #1233
elizarov added a commit that referenced this issue Jun 4, 2019
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.BUFFERED buffer size marker to request
  buffered channel with a default (unspecified) size

Fixes #1233
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant