Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scoped flow leaks the decomposition into behaviour #1128

Closed
qwwdfsad opened this issue Apr 21, 2019 · 4 comments
Closed

Scoped flow leaks the decomposition into behaviour #1128

qwwdfsad opened this issue Apr 21, 2019 · 4 comments
Assignees
Milestone

Comments

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Apr 21, 2019

Consider the following operators:

fun <T> Flow<T>.id(): Flow<T> = flow { collect { emit(it) } }

fun <T> Flow<T>.idScoped(): Flow<T> = flow {
    coroutineScope {
        val channel = Channel<T>()
        launch {
            try {
                for (i in channel) emit(i)
            } finally {
                channel.cancel()
            }
        }

        val result = runCatching {
            collect {
                channel.send(it)
            }
        }
        channel.close(result.exceptionOrNull())
        result.getOrThrow()
    }
}

Our goal, eventually, is to make them more or less indistinguishable (modulo dispatch order, of course).

Currently, it is not the case, this behavior leaks directly into user code:

flow {
    emit(1)
    delay(Long.MAX_VALUE)
}.id[Scoped]().collect { value ->
    kotlin.coroutines.coroutineContext.cancel()
}

Depending on the chosen operator, behaviour will be different (CE or hang).
To fix it, invokeOnCompletion { if (it is CancellationException) cancel() } should be used right after launch in a scoped variant.

While this can problem can be workarounded, it clearly indicates that we are missing an important primitive or cancellation mode.

@qwwdfsad qwwdfsad added this to the 1.3.0-alpha milestone Apr 21, 2019
@qwwdfsad qwwdfsad changed the title Scoped flow leaks the decomposition into behaviour of operators Scoped flow leaks the decomposition into behaviour Apr 21, 2019
@elizarov
Copy link
Contributor

elizarov commented Apr 26, 2019

It seems that we already have corresponding primitives:

  • produce on one side a channel
  • consumeEach on the other side of a channel

A correct implementation of idScoped seems to be the following:

fun <T> Flow<T>.idScoped(): Flow<T> = flow {
    coroutineScope {
        val channel = produce<T> {
            collect {
                send(it)
            }
        }
        channel.consumeEach { 
            emit(it)
        }
    }
}

@qwwdfsad
Copy link
Collaborator Author

qwwdfsad commented Apr 29, 2019

I am not sure it is correct (I deliberately avoided using produce here).

With produce the problem just hides a bit better, e.g.

flow {
    emit(1)
    kotlin.coroutines.coroutineContext.cancel()
}.idScoped().collect { value ->
    println(value)
}

still behaves differently depending on the implementation of idScoped

elizarov added a commit that referenced this issue Apr 30, 2019
* ReceiveChannel.cancel always closes channel with CancellationException,
  so sending or receiving from a cancelled channel produces the
  corresponding CancellationException.
* Cancelling produce builder has similar effect, but an more specific
  instance of JobCancellationException is created.
* This ensure that produce/consumeEach pair is transparent with respect
  to cancellation and can be used to build "identity" transformation
  of the flow (the corresponding test is added).
* ClosedSendChannelException is now a subclass of IllegalStateException,
  so that trying to send to a channel that was closed normally is
  reported as program error and is not eaten (test is added).

Fixes #957
Fixes #1128
@elizarov
Copy link
Contributor

The produce/consumeEach pair is supposed to work correctly in this case by design. If it does not work correctly, then it shall be fixed. In fact, it is the other side of the problem in #957 about confusing ClosedSendChannelException. I've committed fix in PR #1158 and added test to ensure that produce/consumeEach pair is transparent to cancellations.

@elizarov elizarov self-assigned this Apr 30, 2019
@elizarov
Copy link
Contributor

elizarov commented Apr 30, 2019

Here is a larger design problem that we have here. produce is currently an "atomic" primitive of kotlinx.coroutines in the sense that you cannot implement it via public API of launch and Channel() factory. We miss some lower-level "cancellation control" API to make it possible. However, the design of such a lower-level API is out of the scope now. We can simply use produce/consumeEach as needed in flow operator implementations.

elizarov added a commit that referenced this issue Apr 30, 2019
* ReceiveChannel.cancel always closes channel with CancellationException,
  so sending or receiving from a cancelled channel produces the
  corresponding CancellationException.
* Cancelling produce builder has similar effect, but an more specific
  instance of JobCancellationException is created.
* This ensure that produce/consumeEach pair is transparent with respect
  to cancellation and can be used to build "identity" transformation
  of the flow (the corresponding test is added).
* ClosedSendChannelException is now a subclass of IllegalStateException,
  so that trying to send to a channel that was closed normally is
  reported as program error and is not eaten (test is added).

Fixes #957
Fixes #1128
elizarov added a commit that referenced this issue Apr 30, 2019
* ReceiveChannel.cancel always closes channel with CancellationException,
  so sending or receiving from a cancelled channel produces the
  corresponding CancellationException.
* Cancelling produce builder has similar effect, but an more specific
  instance of JobCancellationException is created.
* This ensure that produce/consumeEach pair is transparent with respect
  to cancellation and can be used to build "identity" transformation
  of the flow (the corresponding test is added).
* ClosedSendChannelException is now a subclass of IllegalStateException,
  so that trying to send to a channel that was closed normally is
  reported as program error and is not eaten (test is added).

Fixes #957
Fixes #1128
This was referenced May 23, 2019
qwwdfsad added a commit that referenced this issue Jun 5, 2019
* Introducing flowScope, builder necessary for creating cancellation-transparent flow operators
* Incorporate flow scope into flow operators

Fixes #1218
Fixes #1128
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants