-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add ReceiveChannel.consumeTo(destination) #4520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ReceiveChannel.consumeTo(destination) #4520
Conversation
Use case: collecting elements up until the point the channel is closed without losing the elements when `toList` when the exception is thrown. This function is similar to `Flow<T>.toList(destination)`, which we already have, so the addition makes sense from the point of view of consistency as well.
murfel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context, Flow's toList(destination) that you've referenced
63b1e1a to
2d8f52b
Compare
4992f06 to
aa1adf1
Compare
aa1adf1 to
6039cf8
Compare
|
Could you also fix the |
|
@murfel, that page is automatically generated from the documentation comments that are edited in this PR. |
|
I'm aware. |
938369e to
caed24d
Compare
| * | ||
| * There is no way to recover channel elements if the channel gets closed with an exception | ||
| * or to apply additional transformations to the elements before building the resulting collection. | ||
| * Please use [consumeAsFlow] and [kotlinx.coroutines.flow.toCollection] for such advanced use-cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * Please use [consumeAsFlow] and [kotlinx.coroutines.flow.toCollection] for such advanced use-cases. | |
| * Please use either [consumeTo] or [consumeAsFlow] and [kotlinx.coroutines.flow.toCollection] for such advanced use-cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do want to direct the users to Flow here.
| * check(channel.toList() == values) | ||
| * ``` | ||
| */ | ||
| public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We forgot about lost elements (onUndeliveredElement). It should be at least mentioned in the doc, and possibly onUndeliveredElement could be added as a parameter.
onUndeliveredElement overcomplicates the signature for no real use-case, but provides completeness. No strong opinion on my end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what behavior of onUndeliveredElement is missing from the description? We do say that the elements that did get delivered may still be lost if the channel is closed with a cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, found it.
It should be placed (or duplicated) at the end, near the description of its terminal behaviour, since it's logically connected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how that's connected. Terminal only means that the operation cancels the channel, which does get mentioned.
| * [consumeTo] attempts to receive elements and put them into the collection until the channel is | ||
| * [closed][SendChannel.close]. | ||
| * | ||
| * If the channel is [closed][SendChannel.close] with a cause, [consumeTo] will rethrow that cause. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * If the channel is [closed][SendChannel.close] with a cause, [consumeTo] will rethrow that cause. | |
| * If the channel is [closed][SendChannel.close] with a cause, [consumeTo] will rethrow that cause immediately. | |
| * The channel could still contain elements that will never be received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not true: https://pl.kotl.in/U4TKRZ-I2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I short circuited (and also misread the docs). Please do add when exactly the exception will be rethrown, though.
The channel could still contain elements that will never be received.
Elaborate this whole scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do add when exactly the exception will be rethrown, though.
What happens there is inherent to the behavior of closing the channel: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
Elaborate this whole scenario.
The text you quote is incorrect, closing the channel still allows accessing all elements there.
| * channel.consumeTo(remainingElements) | ||
| * } finally { | ||
| * println("Remaining elements: $remainingElements") | ||
| * } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more closer example would be to have a channel which is can be closed with a cause, to separate the usage of this function from just channel.toList()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also would be nice to show an example when consumeTo is used before the channel is definitely closed.
(This example could approach this by just removing delay on L251)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit wary that you are advocating exclusively for one particular use-case, here in the sample and in the kdoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss the intention of this API and samples on Monday!
| * check(channel.toList() == values) | ||
| * ``` | ||
| */ | ||
| public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, found it.
It should be placed (or duplicated) at the end, near the description of its terminal behaviour, since it's logically connected.
| * [consumeTo] attempts to receive elements and put them into the collection until the channel is | ||
| * [closed][SendChannel.close]. | ||
| * | ||
| * If the channel is [closed][SendChannel.close] with a cause, [consumeTo] will rethrow that cause. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I short circuited (and also misread the docs). Please do add when exactly the exception will be rethrown, though.
The channel could still contain elements that will never be received.
Elaborate this whole scenario.
|
Each E.g.
|
The way to trigger the pitfall that used to be described can be
boiled down to this pattern:
```kotlin
run {
channel.consumeEach {
if (channel.isEmpty) {
// do something
return@run
} else {
// do something else
}
}
}
```
However, here, `isEmpty` is already introducing a race condition,
so `consumeEach` itself does not cause any additional issues.
This does not seem like a pitfall in realistic scenarios after all.
`consumeEach` can perform an early return and thus erase
the elements present in the channel, but this also goes for
elements that possibly entered the channel long ago; without
explicitly checking if the channel is empty, there is no way to
distinguish these two scenarios.
|
Good job noticing the discrepancy, fixed it. |
murfel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The kdocs still have many discrepancies, the ones I mentioned and other, but that is a non-goal of the PR. Otherwise no concerns.
Use case: collecting elements up until the point the channel is closed without losing the elements when
toListwhen the exception is thrown.This function is similar to
Flow<T>.toList(destination), which we already have, so the addition makes sense from the point of view of consistency as well.