Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job.cancel does not dispose Rx Observable when using consumeEach #1008

Closed
mhernand40 opened this issue Feb 26, 2019 · 1 comment
Closed

Job.cancel does not dispose Rx Observable when using consumeEach #1008

mhernand40 opened this issue Feb 26, 2019 · 1 comment
Assignees
Labels

Comments

@mhernand40
Copy link

As I'm looking to convert my Singles, Maybes, and Completables into suspend functions, while still relying on RxJava's Observables, I was hoping that CoroutineScope could manage the subscription to my Observable streams by relying on the consumeEach extension function. However, it turns out that when the Job gets cancelled, although no more values will be "consumed" by the underlying Channel, the Observable itself will not get disposed. This could potentially leak the Observable.

Sample Repro:

@Test
fun repro() {
    val observable = Observable.interval(1L, TimeUnit.SECONDS)
        .doOnNext { println("onNext received $it") }
        .doOnDispose { println("Observable disposed!") }

    val job = GlobalScope.launch {
        observable.consumeEach{
            println("Value $it consumed")
        }
    }

    Thread.sleep(3000L)
    job.cancel()
    Thread.sleep(3000L)
}

When I run this test I get the following result:

onNext received 0
Value 0 consumed
onNext received 1
Value 1 consumed
onNext received 2
onNext received 3
onNext received 4
onNext received 5
@elizarov elizarov added the bug label Feb 26, 2019
@elizarov
Copy link
Contributor

elizarov commented Feb 26, 2019

That is a bug. That's for the report.

elizarov added a commit that referenced this issue Feb 26, 2019
Fixed bugs in MaybeSource/ObservableSource.consumeEach implementation
so that observable is disposed on cancellation.

Also optimized implementation of bridge function to avoid extra dispose
calls if possible (this is permissible by specification, though)

Fixes #1008
elizarov added a commit that referenced this issue Feb 26, 2019
Fixed bugs in MaybeSource/ObservableSource.consumeEach implementation
so that observable is disposed on cancellation.

Also optimized implementation of bridge function to avoid extra dispose
calls if possible (this is permissible by specification, though)

Fixes #1008
@elizarov elizarov self-assigned this Mar 1, 2019
qwwdfsad pushed a commit that referenced this issue Mar 6, 2019
Fixed bugs in MaybeSource/ObservableSource.consumeEach implementation
so that observable is disposed on cancellation.

Also optimized implementation of bridge function to avoid extra dispose
calls if possible (this is permissible by specification, though)

Fixes #1008
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants