Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReceiveChannel.asFlow operator #1340

Closed
qwwdfsad opened this issue Jul 15, 2019 · 5 comments
Closed

ReceiveChannel.asFlow operator #1340

qwwdfsad opened this issue Jul 15, 2019 · 5 comments
Assignees

Comments

@qwwdfsad
Copy link
Collaborator

A missing piece of the Flow puzzle is a connection between regular ReceiveChannel and Flow (note that we already have a pretty non-obvious ReceieveChannel.asFlux) with a proper migration path for channel operators.

For example, all operators on top of the channels are deprecated, but users have no clear migration path from their channel operators sequence to flow.

The main focus here is to provide clear consumption semantics and decide whether we want to introduce such primitive at all

@zach-klippenstein
Copy link
Contributor

I've run up against this a few times, where I know I will only ever have a single collector (so the lack of multicasting is not an issue), but I still want to use the operators (or integrate with an API that requires Flows). This comes up the most often in unit tests. Another use case I have is that I want to use a regular (non-broadcast) Channel to send, because I don't want ArrayBroadcastChannel's "drop items when not subscribed" behavior, but I would still potentially like to be able to expose a Flow as the API for consistency and convenient access to operators (square/workflow#463).

If such an operator were to be added to the library, I think it would need to be either clearly named and/or clearly documented that multiple concurrent collectors would not get the same items. As long as that is documented, I think it would be fine as long as #1261 lands, since a lot of use cases would probably want to multicast the channel before doing anything else with it.

@elizarov elizarov self-assigned this Jul 16, 2019
elizarov added a commit that referenced this issue Jul 16, 2019
* This is a consuming conversion -- the resulting flow can be collected
  just once and the channel is closed after the first collect.
* The implementation is made efficient (without iterators) using
  a new internal ReceiveChannel.consumeEachTo function which also ensure
  that the reference to the last emitted value is not retained.
* AbstractChannel implementation is optimized to avoid code duplication
  in different receive methods.

Fixes #1340
Fixes #1333
@elizarov
Copy link
Contributor

elizarov commented Jul 16, 2019

The proposal is to name it consumeAsFlow and add additional precaution that repeated attempts to collect from the resulting flow produce IllegalStateException just in case. See

/**
* Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel
* on the first collection from this flow. The resulting flow can be collected just once and throws
* [IllegalStateException] when trying to collect it more than once.
*
* ### Cancellation semantics
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, channel is cancelled.
*
*/
@FlowPreview
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = object : Flow<T> {

elizarov added a commit that referenced this issue Jul 16, 2019
* This is a consuming conversion -- the resulting flow can be collected
  just once and the channel is closed after the first collect.
* The implementation is made efficient (without iterators) using
  a new internal ReceiveChannel.consumeEachTo function which also
  ensures that the reference to the last emitted value is not retained
  (does not leak).
* AbstractChannel implementation is optimized to avoid code duplication
  in different receive methods (receive and receiveOrNull) and also
  shares code with new receiveInternal that is used for an efficient
  consumeEachTo implementation.

Fixes #1340
Fixes #1333
@fvasco
Copy link
Contributor

fvasco commented Jul 16, 2019

The proposal is to name it consumeAsFlow

Should BroadcastChannel<T>.asFlow() be renamed to BroadcastChannel<T>.subscribeAsFlow()?

BroadcastChannel looks slightly different, should it be: openSubscription().consumeAsFlow()?

@elizarov
Copy link
Contributor

elizarov commented Jul 16, 2019

@fvasco For BroadcastChannel a simple asFlow seems appropriate because there is good match between the source type (BroadcastChannel) and a target type (Flow) and representing the former as the latter is quite a natural transformation. It is only ReceiveChannel that requires a special attention to the fact that unlike all the regular flows, the plain channel can be received (consumed) just once.

@fvasco
Copy link
Contributor

fvasco commented Jul 16, 2019

You are right, @elizarov,
I confused this use case with the example #1326 (comment) , in that case asFlow drops messages (and so on... see other issue).

elizarov added a commit that referenced this issue Jul 16, 2019
* This is a consuming conversion -- the resulting flow can be collected
  just once and the channel is closed after the first collect.
* The implementation is made efficient (without iterators) using
  a new internal ReceiveChannel.consumeEachTo function which also
  ensures that the reference to the last emitted value is not retained
  (does not leak).
* AbstractChannel implementation is optimized to avoid code duplication
  in different receive methods (receive and receiveOrNull) and also
  shares code with new receiveInternal that is used for an efficient
  consumeEachTo implementation.

Fixes #1340
Fixes #1333
elizarov added a commit that referenced this issue Jul 18, 2019
* This is a consuming conversion -- the resulting flow can be collected
  just once and the channel is closed after the first collect.
* The implementation is made efficient via emitAll extension.
* Experimental FlowCollector.emitAll extension is introduced.
* It is based on the (internal) Channel.receiveOrClose
  and ensures that the reference to the last emitted value is
  not retained (does not leak).

Fixes #1340
Fixes #1333
elizarov added a commit that referenced this issue Jul 18, 2019
* This is a consuming conversion -- the resulting flow can be collected
  just once and the channel is closed after the first collect.
* The implementation is made efficient via emitAll extension.
* Experimental FlowCollector.emitAll extension is introduced.
* It is based on the (internal) Channel.receiveOrClose
  and ensures that the reference to the last emitted value is
  not retained (does not leak).

Fixes #1340
Fixes #1333
elizarov added a commit that referenced this issue Jul 18, 2019
* This is a consuming conversion -- the resulting flow can be collected
  just once and the channel is closed after the first collect.
* The implementation is made efficient via emitAll extension.
* Experimental FlowCollector.emitAll extension is introduced.
* It is based on the (internal) Channel.receiveOrClose
  and ensures that the reference to the last emitted value is
  not retained (does not leak).

Fixes #1340
Fixes #1333
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants