Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.retry() and cancellation #1122

Closed
adamp opened this issue Apr 20, 2019 · 5 comments
Closed

Flow.retry() and cancellation #1122

adamp opened this issue Apr 20, 2019 · 5 comments
Assignees
Labels

Comments

@adamp
Copy link

adamp commented Apr 20, 2019

The implementation of Flow.retry() differentiates between upstream exceptions that should potentially trigger a retry and downstream exceptions that should pass through by wrapping the call to emit. This has unexpected behavior in the case of cancellation when the operation is suspended on a call that is technically upstream.

Consider:

val reader = flow {
  while (hasMoreEvents) {
    emit(suspendingReadEvent())
  }
}

val collectorJob = launch {
  reader.retry().collect { event ->
    handle(event)
  }
}

// Some time later, as part of an external signal
collectorJob.cancelAndJoin()

The above can potentially never join. retry attempts to start a new collect from upstream after catching the CancellationException that did not originate from the inner emit call within retry's implementation, and the new collect on the upstream source quickly fails because the calling job is cancelling. Retry dutifully catches the CancellationException again and repeats the process.

The problem can be avoided by explicitly allowing CancellationException to pass through via the predicate, e.g. reader.retry { it !is CancellationException } though perhaps this should be the default behavior.

@qwwdfsad qwwdfsad self-assigned this Apr 20, 2019
@qwwdfsad qwwdfsad added the flow label Apr 20, 2019
@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Apr 25, 2019

I agree that this behavior is counter-intuitive and should be fixed.
But it is not that simple to fix, for example:

val flow = flow<Int> {
    withTimeout(someTimeout) {
        val value  = somePotentiallyTimedOutSuspendingCall()
    }
    emit(value)
}.retry { it !is CancellationException }

withTimeout interfere with retry in a strange way here

@adamp
Copy link
Author

adamp commented Apr 27, 2019

Would checking the collecting job's isActive be appropriate?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented May 14, 2019

No, because it is prone to check-and-act data races.
Though I think there exists a solution that wraps the emission into coroutineScope, I will try to prototype it

qwwdfsad added a commit that referenced this issue May 21, 2019
qwwdfsad added a commit that referenced this issue May 21, 2019
@adamp
Copy link
Author

adamp commented Jun 16, 2019

Are there plans to make the underpinnings of this check public API that could be used from operators defined outside of kotlinx.coroutines?

@elizarov
Copy link
Contributor

Yes. We are thinking on how to generalize this solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants