Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow collect does not honor cancellation #1177

Closed
PaulWoitaschek opened this issue May 6, 2019 · 4 comments
Closed

Flow collect does not honor cancellation #1177

PaulWoitaschek opened this issue May 6, 2019 · 4 comments
Assignees
Labels

Comments

@PaulWoitaschek
Copy link
Contributor

I would expect the following code not to throw an exception because the reason I'm cancelling it is that it does not emit further values.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow

@FlowPreview
fun main() {
  val flow = flow {
    while (true) {
      emit(Unit)
    }
  }

  val job = GlobalScope.launch {
    flow.collect {
      if (!isActive) {
        throw IllegalStateException("DOH")
      }
    }
  }

  runBlocking {
    job.cancelAndJoin()
  }
}

What helps here is to call ensureActive() as the first statement in the collect block.
Is this a bug or is it necessary to always call ensureActive on each collect?

For my code it is crucial that it does not execute the body of collect after I cancelled the job.

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented May 6, 2019

It is not a bug, but intended behavior.

Why do you expect this snippet not to throw?
KDoc to isActive states:

Returns true when this job is active – it was already started and has not completed nor was cancelled yet.

In your snippet, one thread is doing collect inside the launch block, while the other concurrently cancels it. At this moment, isActive starts returning false (and isCancelled starts returning true) and cancellation exception will be thrown at the very next suspension (because cancellation is cooperative).

What helps here is to call ensureActive() as the first statement in the collect block.

It doesn't really help either, because it is "check and act" race:

  1. Assume the first thread does ensureActive and it completes successfully
  2. Right after that another thread invokes cancel
  3. The first thread checks isActive expecting it to return true (because you called ensureActive on the previous line of code), but it returns false.

The main source of confusion here comes from the fact that cancellation is concurrent and most of the familiar check-and-act patterns do not work.

Additionally, ensureActive on every emit is a performance tradeoff that not everyone may be ready to pay (especially when it does not work 100% of the time).
Could you please elaborate on this problem in the terms of the application-specific problem you are solving? It will help us to get a better understanding of Flow usages and maybe we will find an elegant solution for that

@PaulWoitaschek
Copy link
Contributor Author

PaulWoitaschek commented May 6, 2019

Yes right, my example was apparently wrong. I tried to boil down my actual problem to a minimal self contained example.

My actual use case is actually pretty simple:
I call launch on a coroutine context that combines a job with the main disaptcher which will then touch android views. Then later, I cancel that job on the main thread by a lifecycle callback.

I try to boil it down once again. I use a newSingleThreadContext to simulate a main thread. I launch a new coroutine on the main thread which consumes the flow. Then I call launch with the same dispatcher to cancel the job. So here is no concurrency. I cancel it on the main thread and consume it on the main thread. And yet it emits while already cancelled.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow

@FlowPreview
fun main() {
  val flow = flow {
    withContext(Dispatchers.IO) {

    }
    emit(Unit)
  }

  val mainThreadContext = newSingleThreadContext("main-thread")
  runBlocking {
    repeat(100) {
      val job = launch(mainThreadContext) {
        flow.collect {
          if (!isActive) {
            throw IllegalStateException("DOH")
          }
        }
      }
      launch(mainThreadContext) {
        job.cancel()
      }
    }
  }
}

@zach-klippenstein
Copy link
Contributor

ensureActive on every emit is a performance tradeoff that not everyone may be ready to pay

It's a race if different threads are involved, as @qwwdfsad pointed out. However it's common in Android apps to consume a stream from the main thread, and also cancel it from the main thread (there's concurrency, but no parallelism). Coming from RxJava, this Flow behavior is surprising because when everything is on a single thread like this, a subscriber is guaranteed to stop getting emissions as soon as it is unsubscribed. In this case, a single ensureActive() call at the top of the collect block would be sufficient, even though it wouldn't work reliably if the coroutine were to be cancelled from a different thread. The same caveat applies to RxJava however (disposing a subscription from a background thread while onNext is running on the main thread won't affect the current emission).

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented May 8, 2019

@PaulWoitaschek thanks for the reproducer!

this Flow behavior is surprising because when everything is on a single thread like this, a subscriber is guaranteed to stop getting emissions as soon as it is unsubscribed.

I agree with that. When there is no parallelism and one has a (deterministic) sequential order of events, it is unexpected to observe a cancelled coroutine resumed normally. Most of the time cancellation works as expected in sequential scenarios, withContext is (hopefully) the only primitive that exposes such behaviour by default.
I am investigating the possible changes to fix the provided example without changing it.

For the ones interested in why did it happen, some implementation details: withContext does not check cancellation on exit because it is resumed in atomic mode by default.
IIRC it was done in #410 when we've decided what to do with withContext(start = ...). withContext(default) can be expressed via withContext(atomic), but not the vice versa

@qwwdfsad qwwdfsad added design and removed question labels May 8, 2019
qwwdfsad added a commit that referenced this issue May 14, 2019
…able).

    * Reasoning about cancellation is simplified in sequential scenarios, if 'cancel' was invoked before withContext return it will throw an exception, thus "isActive == false" cannot be observed in sequential scenarios after cancellation
    * withContext now complies its own documentation

Fixes #1177
qwwdfsad added a commit that referenced this issue Jun 2, 2019
  * It allows to apply the same reasoning about cancellation as in dispatched coroutines
  * Makes cancellation modes consistent

Related to #1177
@qwwdfsad qwwdfsad self-assigned this Jun 3, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants