Skip to content

Commit

Permalink
Make Mono.awaitSingleOrNull wait for onComplete() (#3489)
Browse files Browse the repository at this point in the history
Fixes #3487
  • Loading branch information
dkhalanskyjb committed Oct 19, 2022
1 parent f3527c9 commit 92a2495
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
10 changes: 6 additions & 4 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,22 @@ public fun <T> mono(
*/
public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private var seenValue = false
private var value: T? = null

override fun onSubscribe(s: Subscription) {
cont.invokeOnCancellation { s.cancel() }
s.request(Long.MAX_VALUE)
}

override fun onComplete() {
if (!seenValue) cont.resume(null)
cont.resume(value)
value = null
}

override fun onNext(t: T) {
seenValue = true
cont.resume(t)
// We don't return the value immediately because the process that emitted it may not be finished yet.
// Resuming now could lead to race conditions between emitter and the awaiting code.
value = t
}

override fun onError(error: Throwable) { cont.resumeWithException(error) }
Expand Down
54 changes: 54 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/MonoAwaitStressTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import org.junit.Test
import org.reactivestreams.*
import reactor.core.*
import reactor.core.publisher.*
import kotlin.concurrent.*
import kotlin.test.*

class MonoAwaitStressTest: TestBase() {
private val N_REPEATS = 10_000 * stressTestMultiplier

private var completed: Boolean = false

private var thread: Thread? = null

/**
* Tests that [Mono.awaitSingleOrNull] does await [CoreSubscriber.onComplete] and does not return
* the value as soon as it has it.
*/
@Test
fun testAwaitingRacingWithCompletion() = runTest {
val mono = object: Mono<Int>() {
override fun subscribe(s: CoreSubscriber<in Int>) {
s.onSubscribe(object : Subscription {
override fun request(n: Long) {
thread = thread {
s.onNext(1)
Thread.yield()
completed = true
s.onComplete()
}
}

override fun cancel() {
}
})
}
}
repeat(N_REPEATS) {
thread = null
completed = false
val value = mono.awaitSingleOrNull()
assertTrue(completed, "iteration $it")
assertEquals(1, value)
thread!!.join()
}
}
}

0 comments on commit 92a2495

Please sign in to comment.