Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancellation in consumeEach should dispose Rx Observable #1012

Merged
merged 2 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions reactive/kotlinx-coroutines-reactive/src/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T
/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
*/
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
openSubscription().consumeEach(action)
}

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T>(
Expand All @@ -42,8 +41,7 @@ private class SubscriptionChannel<T>(
require(request >= 0) { "Invalid request size: $request" }
}

@Volatile
private var subscription: Subscription? = null
private val _subscription = atomic<Subscription?>(null)

// requested from subscription minus number of received minus number of enqueued receivers,
// can be negative if we have receivers, but no subscription yet
Expand All @@ -53,7 +51,7 @@ private class SubscriptionChannel<T>(
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
override fun onReceiveEnqueued() {
_requested.loop { wasRequested ->
val subscription = this.subscription
val subscription = _subscription.value
val needRequested = wasRequested - 1
if (subscription != null && needRequested < 0) { // need to request more from subscription
// try to fixup by making request
Expand All @@ -74,12 +72,12 @@ private class SubscriptionChannel<T>(

@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
subscription?.cancel()
_subscription.getAndSet(null)?.cancel() // cancel exactly once
}

// Subscriber overrides
override fun onSubscribe(s: Subscription) {
subscription = s
_subscription.value = s
while (true) { // lock-free loop on _requested
if (isClosedForSend) {
s.cancel()
Expand Down
18 changes: 0 additions & 18 deletions reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,6 @@ class IntegrationTest(
assertThat(cnt, IsEqual(1))
}

@Test
fun testFailingConsumer() = runTest {
val pub = publish {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
}
}

try {
pub.consumeEach {
throw TestException()
}
} catch (e: TestException) {
finish(3)
}
}

@Test
fun testNumbers() = runBlocking<Unit> {
val n = 100 * stressTestMultiplier
Expand Down
17 changes: 17 additions & 0 deletions reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,21 @@ class PublishTest : TestBase() {
latch.await()
finish(8)
}

@Test
fun testFailingConsumer() = runTest {
val pub = publish {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
}
}
try {
pub.consumeEach {
throw TestException()
}
} catch (e: TestException) {
finish(3)
}
}
}
58 changes: 58 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
import org.junit.Test
import kotlin.test.*

class FluxTest : TestBase() {
@Test
Expand Down Expand Up @@ -80,4 +83,59 @@ class FluxTest : TestBase() {
{ assert(it is RuntimeException) }
)
}

@Test
fun testNotifyOnceOnCancellation() = runTest {
expect(1)
val observable =
flux {
expect(5)
send("OK")
try {
delay(Long.MAX_VALUE)
} catch (e: CancellationException) {
expect(11)
}
}
.doOnNext {
expect(6)
assertEquals("OK", it)
}
.doOnCancel {
expect(10) // notified once!
}
expect(2)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(3)
observable.consumeEach {
expect(8)
assertEquals("OK", it)
}
}
expect(4)
yield() // to observable code
expect(7)
yield() // to consuming coroutines
expect(9)
job.cancel()
job.join()
finish(12)
}

@Test
fun testFailingConsumer() = runTest {
val pub = flux {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
}
}
try {
pub.consumeEach {
throw TestException()
}
} catch (e: TestException) {
finish(3)
}
}
}
24 changes: 9 additions & 15 deletions reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.disposables.*
import kotlinx.coroutines.channels.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*

/**
Expand Down Expand Up @@ -43,36 +44,29 @@ public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
/**
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
*/
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) {
val channel = openSubscription()
for (x in channel) action(x)
channel.cancel()
}
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
openSubscription().consumeEach(action)

/**
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
*/
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) {
val channel = openSubscription()
for (x in channel) action(x)
channel.cancel()
}
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
openSubscription().consumeEach(action)

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T> :
LinkedListChannel<T>(), Observer<T>, MaybeObserver<T>
{
@Volatile
var subscription: Disposable? = null
private val _subscription = atomic<Disposable?>(null)

@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
subscription?.dispose()
_subscription.getAndSet(null)?.dispose() // dispose exactly once
}

// Observer overrider
override fun onSubscribe(sub: Disposable) {
subscription = sub
_subscription.value = sub
}

override fun onSuccess(t: T) {
Expand Down
57 changes: 57 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
import org.junit.Test
import kotlin.test.*

class FlowableTest : TestBase() {
@Test
Expand Down Expand Up @@ -81,4 +83,59 @@ class FlowableTest : TestBase() {
{ assert(it is RuntimeException) }
)
}

@Test
fun testNotifyOnceOnCancellation() = runTest {
expect(1)
val observable =
rxFlowable {
expect(5)
send("OK")
try {
delay(Long.MAX_VALUE)
} catch (e: CancellationException) {
expect(11)
}
}
.doOnNext {
expect(6)
assertEquals("OK", it)
}
.doOnCancel {
expect(10) // notified once!
}
expect(2)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(3)
observable.consumeEach{
expect(8)
assertEquals("OK", it)
}
}
expect(4)
yield() // to observable code
expect(7)
yield() // to consuming coroutines
expect(9)
job.cancel()
job.join()
finish(12)
}

@Test
fun testFailingConsumer() = runTest {
val pub = rxFlowable {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
}
}
try {
pub.consumeEach {
throw TestException()
}
} catch (e: TestException) {
finish(3)
}
}
}
27 changes: 27 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException

class MaybeTest : TestBase() {
@Before
Expand Down Expand Up @@ -211,4 +212,30 @@ class MaybeTest : TestBase() {
{ assert(it is RuntimeException) }
)
}

@Test
fun testCancelledConsumer() = runTest {
expect(1)
val maybe = rxMaybe<Int> {
expect(4)
try {
delay(Long.MAX_VALUE)
} catch (e: CancellationException) {
expect(6)
}
42
}
expect(2)
val timeout = withTimeoutOrNull(100) {
expect(3)
maybe.consumeEach {
expectUnreached()
}
expectUnreached()
}
assertNull(timeout)
expect(5)
yield() // must cancel code inside maybe!!!
finish(7)
}
}
Loading