Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support leak-free closeable resources transfer via channel #1936

Closed
elizarov opened this issue Apr 22, 2020 · 10 comments
Closed

Support leak-free closeable resources transfer via channel #1936

elizarov opened this issue Apr 22, 2020 · 10 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Apr 22, 2020

The change in #1813 removes support for "atomic cancellation", making it impossible to transfer a resource via a channel from one coroutine to another. When a closeable resource (like open file or a handle to another native resource) is transferred via channel from one coroutine to another it can be lost if either send or receive operation are cancelled in transit.

We introduce the following replacement. A Channel() constructor function gets and additional optional parameter: onUndeliveredElement: ((E) -> Unit)? = null

When onUndeliveredElement option parameter is set, the corresponding function is called once for each element that was sent to the channel and is being lost due to cancellation, which can happen in the following cases:

  • send operation was cancelled before it had a chance to actually send the element.
  • receive operation retrieved the element from the channel but was cancelled when trying to return it the caller.
  • The channel was cancelled, in which case onUndeliveredElement is called on every remaining element in the channel's buffer.

Note, that onUndeliveredElement is called synchronously in an arbitrary context. It should be fast, non-blocking, and should not throw exceptions. Any exception thrown by onUndeliveredElement is wrapped into an internal runtime exception which is either rethrown or handed off to the exception handler in the current context (CoroutineExceptionHandler) when one is available.

UPDATE: The final name of the parameter is onUndeliveredElement.

@Tolriq
Copy link

Tolriq commented Apr 22, 2020

Since is was a real pain to reach proper code for #1092, will this come with complete example of a workerpool with closable resources? :)
Even after reading both post twice I'm still completely unsure of the impacts and future needs.

@pakoito
Copy link

pakoito commented Apr 22, 2020

Should onElementCancel allow suspend operations on itself? I can see one network request to a logger, some UI operation, or a database operation such as closing happening.

@elizarov
Copy link
Contributor Author

elizarov commented Apr 23, 2020

Should onElementCancel allow suspend operations on itself? I can see one network request to a logger, some UI operation, or a database operation such as closing happening.

Unfortunately, that is impractical to implement, since some places that encounter the situation with "the resource that is being lost" lack any context to perform a suspending operation (e.g. channel.cancel() call). The other consideration is that this mechanism is mostly designed to be used for "regular" resources that represent a handle to some native OS resource that needs to be closed to deallocate memory. Logging there is fine, too.

In those cases were "closing a resource" is a long-running and time-consuming operation (requiring UI interaction, DB write, network, etc) the recommended pattern is:

val channel = Channel<Resource>(onElementCancel = { resource ->
    resourceClosingScope.launch { longRunningOperation(resource) }
})

@pakoito
Copy link

pakoito commented Apr 23, 2020

Understood! Then the onElementCancel operation can be safe to throw in, as it's synchronous and can be just try-catched. A nice side-effect.

@elizarov
Copy link
Contributor Author

When exception is thrown in onElementCancel it is always caught, wrapped, and handled in a way as to ensure that no internal invariant is broken -- the wrapped exception is either rethrown (from operations like channel.cancel()) or is delivered to uncaught exception handler if it had happenned deep inside cancellation machinery.

@swankjesse
Copy link

One gotcha is that OutputStream.close() is blocking and could take arbitrarily long. The java.io blocking APIs don't have a mechanism to cancel, only to close cleanly.

Presumably the mitigation is to send that work to the IO dispatcher. (Maybe that practice should be recommended?)

elizarov added a commit that referenced this issue Jun 16, 2020
elizarov added a commit that referenced this issue Sep 16, 2020
@whyoleg
Copy link
Contributor

whyoleg commented Dec 4, 2020

@elizarov Is it expected, that for elements of Unlimited Channel, on cancel, onUndeliveredElement isn't called?
example code:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

suspend fun main(){
   val channelBuffered = Channel<Int>(10) { println("RELEASE[BUFFER]: $it") }
   val channelUnlimited = Channel<Int>(Channel.UNLIMITED) { println("RELEASE[UNLIMITED]: $it") }
   
   repeat(5) {
       channelBuffered.offer(it)
       channelUnlimited.offer(it)
   }
   
   channelBuffered.cancel()
   channelUnlimited.cancel()
}

prints:

RELEASE[BUFFER]: 0
RELEASE[BUFFER]: 1
RELEASE[BUFFER]: 2
RELEASE[BUFFER]: 3
RELEASE[BUFFER]: 4

play.kotlin link

I would expect, that onUndeliveredElement will be called on all elements in the same way, as for buffered channel

@qwwdfsad
Copy link
Member

qwwdfsad commented Dec 7, 2020

Thanks @whyoleg, filed #2435

recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
…otlin#1937)

This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic
* Channel onUnderliveredElement is introduced as a replacement.

Fixes Kotlin#1265
Fixes Kotlin#1813
Fixes Kotlin#1915
Fixes Kotlin#1936

Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
…otlin#1937)

This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic
* Channel onUnderliveredElement is introduced as a replacement.

Fixes Kotlin#1265
Fixes Kotlin#1813
Fixes Kotlin#1915
Fixes Kotlin#1936

Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
@pacher
Copy link

pacher commented Jul 16, 2021

A Channel() parameter is awesome, but it would be nice to also get it in places where a channel is used under the hood, like buffer.
For example I am using callbackFlow with a buffer for callback-based API and it would be great to be able to log undelivered elements which are lost in the buffer if collector fails.

@valenpo
Copy link

valenpo commented Jan 31, 2022

Any way use same feature to the flow?

amal added a commit to fluxo-kt/fluxo that referenced this issue Nov 16, 2022
…tate and SideEffects (strictly not recommended!)

  * Previous state will be properly closed on change
  * Side effects closed when not delivered

For leak-free transfer details see "Undelivered elements" section in [Channel](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/) documentation. Also see: Kotlin/kotlinx.coroutines#1936

Signed-off-by: Artyom Shendrik <artyom.shendrik@gmail.com>
amal added a commit to fluxo-kt/fluxo that referenced this issue Nov 16, 2022
…tate and SideEffects (strictly not recommended!)

  * Previous state will be properly closed on change
  * Side effects closed when not delivered

For leak-free transfer details see "Undelivered elements" section in [Channel](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/) documentation. Also see: Kotlin/kotlinx.coroutines#1936

Signed-off-by: Artyom Shendrik <artyom.shendrik@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants