From a25bf36f7c31e4c4bee45edf7b6ebfb9c7ad4003 Mon Sep 17 00:00:00 2001 From: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> Date: Fri, 13 Mar 2020 15:04:49 +0300 Subject: [PATCH] JDK9 Flow integration (#1783) JDK9 Flow integration is implemented as thin wrappers around the Reactive Streams integration. --- README.md | 5 +- reactive/kotlinx-coroutines-jdk9/README.md | 10 + .../api/kotlinx-coroutines-jdk9.api | 20 ++ reactive/kotlinx-coroutines-jdk9/build.gradle | 24 +++ reactive/kotlinx-coroutines-jdk9/package.list | 1 + reactive/kotlinx-coroutines-jdk9/src/Await.kt | 81 ++++++++ .../kotlinx-coroutines-jdk9/src/Publish.kt | 38 ++++ .../src/ReactiveFlow.kt | 39 ++++ .../test/FlowAsPublisherTest.kt | 79 ++++++++ .../test/IntegrationTest.kt | 132 +++++++++++++ .../test/PublishTest.kt | 185 ++++++++++++++++++ .../test/PublisherAsFlowTest.kt | 184 +++++++++++++++++ .../test/PublisherBackpressureTest.kt | 61 ++++++ .../test/PublisherCompletionStressTest.kt | 36 ++++ .../test/PublisherMultiTest.kt | 31 +++ .../test/IntegrationTest.kt | 25 +-- .../test/IntegrationTest.kt | 26 +-- settings.gradle | 1 + 18 files changed, 933 insertions(+), 45 deletions(-) create mode 100644 reactive/kotlinx-coroutines-jdk9/README.md create mode 100644 reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api create mode 100644 reactive/kotlinx-coroutines-jdk9/build.gradle create mode 100644 reactive/kotlinx-coroutines-jdk9/package.list create mode 100644 reactive/kotlinx-coroutines-jdk9/src/Await.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/src/Publish.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/test/PublisherBackpressureTest.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/test/PublisherCompletionStressTest.kt create mode 100644 reactive/kotlinx-coroutines-jdk9/test/PublisherMultiTest.kt diff --git a/README.md b/README.md index 7a827a2bc9..cef238aaad 100644 --- a/README.md +++ b/README.md @@ -45,8 +45,9 @@ suspend fun main() = coroutineScope { * [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout. * [reactive](reactive/README.md) — modules that provide builders and iteration support for various reactive streams libraries: * Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc), - RxJava 2.x ([rxFlowable], [rxSingle], etc), and - Project Reactor ([flux], [mono], etc). + * Flow (JDK 9) (the same interface as for Reactive Streams), + * RxJava 2.x ([rxFlowable], [rxSingle], etc), and + * Project Reactor ([flux], [mono], etc). * [ui](ui/README.md) — modules that provide coroutine dispatchers for various single-threaded UI libraries: * Android, JavaFX, and Swing. * [integration](integration/README.md) — modules that provide integration with various asynchronous callback- and future-based libraries: diff --git a/reactive/kotlinx-coroutines-jdk9/README.md b/reactive/kotlinx-coroutines-jdk9/README.md new file mode 100644 index 0000000000..fcabd7da15 --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/README.md @@ -0,0 +1,10 @@ +# Module kotlinx-coroutines-jdk9 + +Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html). + +Implemented as a collection of thin wrappers over [kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive), +an equivalent package for the Reactive Streams. + +# Package kotlinx.coroutines.jdk9 + +Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html). diff --git a/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api b/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api new file mode 100644 index 0000000000..d4bc1698ef --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api @@ -0,0 +1,20 @@ +public final class kotlinx/coroutines/jdk9/AwaitKt { + public static final fun awaitFirst (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrDefault (Ljava/util/concurrent/Flow$Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrElse (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrNull (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitLast (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public final class kotlinx/coroutines/jdk9/PublishKt { + public static final fun flowPublish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher; + public static synthetic fun flowPublish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher; +} + +public final class kotlinx/coroutines/jdk9/ReactiveFlowKt { + public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow; + public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher; + public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + diff --git a/reactive/kotlinx-coroutines-jdk9/build.gradle b/reactive/kotlinx-coroutines-jdk9/build.gradle new file mode 100644 index 0000000000..8737e8ed6d --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/build.gradle @@ -0,0 +1,24 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +targetCompatibility = 9 + +dependencies { + compile project(":kotlinx-coroutines-reactive") + compile "org.reactivestreams:reactive-streams-flow-adapters:$reactive_streams_version" +} + +compileTestKotlin { + kotlinOptions.jvmTarget = "9" +} + +compileKotlin { + kotlinOptions.jvmTarget = "9" +} + +tasks.withType(dokka.getClass()) { + externalDocumentationLink { + url = new URL("https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html") + packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL() + } +} diff --git a/reactive/kotlinx-coroutines-jdk9/package.list b/reactive/kotlinx-coroutines-jdk9/package.list new file mode 100644 index 0000000000..43e8ff22c7 --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/package.list @@ -0,0 +1 @@ +java.util.concurrent.Flow diff --git a/reactive/kotlinx-coroutines-jdk9/src/Await.kt b/reactive/kotlinx-coroutines-jdk9/src/Await.kt new file mode 100644 index 0000000000..88268890e2 --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/src/Await.kt @@ -0,0 +1,81 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import java.util.concurrent.* +import org.reactivestreams.FlowAdapters +import kotlinx.coroutines.reactive.* + +/** + * Awaits for the first value from the given publisher without blocking a thread and + * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * @throws NoSuchElementException if publisher does not emit any value + */ +public suspend fun Flow.Publisher.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst() + +/** + * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a + * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + */ +public suspend fun Flow.Publisher.awaitFirstOrDefault(default: T): T = + FlowAdapters.toPublisher(this).awaitFirstOrDefault(default) + +/** + * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a + * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + */ +public suspend fun Flow.Publisher.awaitFirstOrNull(): T? = + FlowAdapters.toPublisher(this).awaitFirstOrNull() + +/** + * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a + * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + */ +public suspend fun Flow.Publisher.awaitFirstOrElse(defaultValue: () -> T): T = + FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue) + +/** + * Awaits for the last value from the given publisher without blocking a thread and + * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * @throws NoSuchElementException if publisher does not emit any value + */ +public suspend fun Flow.Publisher.awaitLast(): T = + FlowAdapters.toPublisher(this).awaitLast() + +/** + * Awaits for the single value from the given publisher without blocking a thread and + * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * @throws NoSuchElementException if publisher does not emit any value + * @throws IllegalArgumentException if publisher emits more than one value + */ +public suspend fun Flow.Publisher.awaitSingle(): T = + FlowAdapters.toPublisher(this).awaitSingle() diff --git a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt new file mode 100644 index 0000000000..d274083668 --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import java.util.concurrent.* +import kotlin.coroutines.* +import org.reactivestreams.FlowAdapters + +/** + * Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine. + * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. + * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) + * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) + * if coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels running coroutine. + * + * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that + * `onNext` is not invoked concurrently. + * + * Coroutine context can be specified with [context] argument. + * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * + * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect + * to cancellation and error handling may change in the future. + */ +@ExperimentalCoroutinesApi +public fun flowPublish( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope.() -> Unit +): Flow.Publisher { + val reactivePublisher : org.reactivestreams.Publisher = kotlinx.coroutines.reactive.publish(context, block) + return FlowAdapters.toFlowPublisher(reactivePublisher) +} diff --git a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt new file mode 100644 index 0000000000..6568c73a4a --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.asPublisher +import kotlinx.coroutines.reactive.collect +import java.util.concurrent.Flow as JFlow +import org.reactivestreams.FlowAdapters + +/** + * Transforms the given reactive [Publisher] into [Flow]. + * Use [buffer] operator on the resulting flow to specify the size of the backpressure. + * More precisely, it specifies the value of the subscription's [request][Subscription.request]. + * [buffer] default capacity is used by default. + * + * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements + * are discarded. + */ +public fun JFlow.Publisher.asFlow(): Flow = + FlowAdapters.toPublisher(this).asFlow() + +/** + * Transforms the given flow to a reactive specification compliant [Publisher]. + */ +public fun Flow.asPublisher(): JFlow.Publisher { + val reactivePublisher : org.reactivestreams.Publisher = this.asPublisher() + return FlowAdapters.toFlowPublisher(reactivePublisher) +} + +/** + * Subscribes to this [Publisher] and performs the specified action for each received element. + * Cancels subscription if any exception happens during collect. + */ +public suspend inline fun JFlow.Publisher.collect(action: (T) -> Unit) = + FlowAdapters.toPublisher(this).collect(action) diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt new file mode 100644 index 0000000000..8017ee5b4f --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.Test +import java.util.concurrent.Flow as JFlow +import kotlin.test.* + +class FlowAsPublisherTest : TestBase() { + + @Test + fun testErrorOnCancellationIsReported() { + expect(1) + flow { + emit(2) + try { + hang { expect(3) } + } finally { + throw TestException() + } + }.asPublisher().subscribe(object : JFlow.Subscriber { + private lateinit var subscription: JFlow.Subscription + + override fun onComplete() { + expectUnreached() + } + + override fun onSubscribe(s: JFlow.Subscription?) { + subscription = s!! + subscription.request(2) + } + + override fun onNext(t: Int) { + expect(t) + subscription.cancel() + } + + override fun onError(t: Throwable?) { + assertTrue(t is TestException) + expect(4) + } + }) + finish(5) + } + + @Test + fun testCancellationIsNotReported() { + expect(1) + flow { + emit(2) + hang { expect(3) } + }.asPublisher().subscribe(object : JFlow.Subscriber { + private lateinit var subscription: JFlow.Subscription + + override fun onComplete() { + expect(4) + } + + override fun onSubscribe(s: JFlow.Subscription?) { + subscription = s!! + subscription.request(2) + } + + override fun onNext(t: Int) { + expect(t) + subscription.cancel() + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + finish(5) + } +} diff --git a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt new file mode 100644 index 0000000000..5bfddfee17 --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt @@ -0,0 +1,132 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import org.junit.Test +import kotlinx.coroutines.flow.flowOn +import org.junit.runner.* +import org.junit.runners.* +import java.util.concurrent.Flow as JFlow +import kotlin.coroutines.* +import kotlin.test.* + +@RunWith(Parameterized::class) +class IntegrationTest( + private val ctx: Ctx, + private val delay: Boolean +) : TestBase() { + + enum class Ctx { + MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) }, + DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default }, + UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined }; + + abstract operator fun invoke(context: CoroutineContext): CoroutineContext + } + + companion object { + @Parameterized.Parameters(name = "ctx={0}, delay={1}") + @JvmStatic + fun params(): Collection> = Ctx.values().flatMap { ctx -> + listOf(false, true).map { delay -> + arrayOf(ctx, delay) + } + } + } + + @Test + fun testEmpty(): Unit = runBlocking { + val pub = flowPublish(ctx(coroutineContext)) { + if (delay) delay(1) + // does not send anything + } + assertFailsWith { pub.awaitFirst() } + assertEquals("OK", pub.awaitFirstOrDefault("OK")) + assertNull(pub.awaitFirstOrNull()) + assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" }) + assertFailsWith { pub.awaitLast() } + assertFailsWith { pub.awaitSingle() } + var cnt = 0 + pub.collect { cnt++ } + assertEquals(0, cnt) + } + + @Test + fun testSingle() = runBlocking { + val pub = flowPublish(ctx(coroutineContext)) { + if (delay) delay(1) + send("OK") + } + assertEquals("OK", pub.awaitFirst()) + assertEquals("OK", pub.awaitFirstOrDefault("!")) + assertEquals("OK", pub.awaitFirstOrNull()) + assertEquals("OK", pub.awaitFirstOrElse { "ELSE" }) + assertEquals("OK", pub.awaitLast()) + assertEquals("OK", pub.awaitSingle()) + var cnt = 0 + pub.collect { + assertEquals("OK", it) + cnt++ + } + assertEquals(1, cnt) + } + + @Test + fun testNumbers() = runBlocking { + val n = 100 * stressTestMultiplier + val pub = flowPublish(ctx(coroutineContext)) { + for (i in 1..n) { + send(i) + if (delay) delay(1) + } + } + assertEquals(1, pub.awaitFirst()) + assertEquals(1, pub.awaitFirstOrDefault(0)) + assertEquals(n, pub.awaitLast()) + assertEquals(1, pub.awaitFirstOrNull()) + assertEquals(1, pub.awaitFirstOrElse { 0 }) + assertFailsWith { pub.awaitSingle() } + checkNumbers(n, pub) + val flow = pub.asFlow() + checkNumbers(n, flow.flowOn(ctx(coroutineContext)).asPublisher()) + } + + @Test + fun testCancelWithoutValue() = runTest { + val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { + flowPublish { + hang {} + }.awaitFirst() + } + + job.cancel() + job.join() + } + + @Test + fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException}) { + expect(1) + val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { + flowPublish { + yield() + expect(2) + // Nothing to emit + }.awaitFirst() + } + + job.join() + finish(3) + } + + private suspend fun checkNumbers(n: Int, pub: JFlow.Publisher) { + var last = 0 + pub.collect { + assertEquals(++last, it) + } + assertEquals(n, last) + } + +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt new file mode 100644 index 0000000000..1a36a389fa --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt @@ -0,0 +1,185 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import org.junit.Test +import java.util.concurrent.Flow as JFlow +import kotlin.test.* + +class PublishTest : TestBase() { + @Test + fun testBasicEmpty() = runTest { + expect(1) + val publisher = flowPublish(currentDispatcher()) { + expect(5) + } + expect(2) + publisher.subscribe(object : JFlow.Subscriber { + override fun onSubscribe(s: JFlow.Subscription?) { expect(3) } + override fun onNext(t: Int?) { expectUnreached() } + override fun onComplete() { expect(6) } + override fun onError(t: Throwable?) { expectUnreached() } + }) + expect(4) + yield() // to publish coroutine + finish(7) + } + + @Test + fun testBasicSingle() = runTest { + expect(1) + val publisher = flowPublish(currentDispatcher()) { + expect(5) + send(42) + expect(7) + } + expect(2) + publisher.subscribe(object : JFlow.Subscriber { + override fun onSubscribe(s: JFlow.Subscription) { + expect(3) + s.request(1) + } + override fun onNext(t: Int) { + expect(6) + assertEquals(42, t) + } + override fun onComplete() { expect(8) } + override fun onError(t: Throwable?) { expectUnreached() } + }) + expect(4) + yield() // to publish coroutine + finish(9) + } + + @Test + fun testBasicError() = runTest { + expect(1) + val publisher = flowPublish(currentDispatcher()) { + expect(5) + throw RuntimeException("OK") + } + expect(2) + publisher.subscribe(object : JFlow.Subscriber { + override fun onSubscribe(s: JFlow.Subscription) { + expect(3) + s.request(1) + } + override fun onNext(t: Int) { expectUnreached() } + override fun onComplete() { expectUnreached() } + override fun onError(t: Throwable) { + expect(6) + assertTrue(t is RuntimeException) + assertEquals("OK", t.message) + } + }) + expect(4) + yield() // to publish coroutine + finish(7) + } + + @Test + fun testHandleFailureAfterCancel() = runTest { + expect(1) + + val eh = CoroutineExceptionHandler { _, t -> + assertTrue(t is RuntimeException) + expect(6) + } + val publisher = flowPublish(Dispatchers.Unconfined + eh) { + try { + expect(3) + delay(10000) + } finally { + expect(5) + throw RuntimeException("FAILED") // crash after cancel + } + } + var sub: JFlow.Subscription? = null + publisher.subscribe(object : JFlow.Subscriber { + override fun onComplete() { + expectUnreached() + } + + override fun onSubscribe(s: JFlow.Subscription) { + expect(2) + sub = s + } + + override fun onNext(t: Unit?) { + expectUnreached() + } + + override fun onError(t: Throwable?) { + expectUnreached() + } + }) + expect(4) + sub!!.cancel() + finish(7) + } + + @Test + fun testOnNextError() = runTest { + expect(1) + val publisher = flowPublish(currentDispatcher()) { + expect(4) + try { + send("OK") + } catch(e: Throwable) { + expect(6) + assert(e is TestException) + } + } + expect(2) + val latch = CompletableDeferred() + publisher.subscribe(object : JFlow.Subscriber { + override fun onComplete() { + expectUnreached() + } + + override fun onSubscribe(s: JFlow.Subscription) { + expect(3) + s.request(1) + } + + override fun onNext(t: String) { + expect(5) + assertEquals("OK", t) + throw TestException() + } + + override fun onError(t: Throwable) { + expect(7) + assert(t is TestException) + latch.complete(Unit) + } + }) + latch.await() + finish(8) + } + + @Test + fun testFailingConsumer() = runTest { + val pub = flowPublish(currentDispatcher()) { + repeat(3) { + expect(it + 1) // expect(1), expect(2) *should* be invoked + send(it) + } + } + try { + pub.collect { + throw TestException() + } + } catch (e: TestException) { + finish(3) + } + } + + @Test + fun testIllegalArgumentException() { + assertFailsWith { flowPublish(Job()) { } } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt new file mode 100644 index 0000000000..97f106b3c5 --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherAsFlowTest.kt @@ -0,0 +1,184 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class PublisherAsFlowTest : TestBase() { + @Test + fun testCancellation() = runTest { + var onNext = 0 + var onCancelled = 0 + var onError = 0 + + val publisher = flowPublish(currentDispatcher()) { + coroutineContext[Job]?.invokeOnCompletion { + if (it is CancellationException) ++onCancelled + } + + repeat(100) { + send(it) + } + } + + publisher.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { + onEach { + ++onNext + throw RuntimeException() + } + catch { + ++onError + } + }.join() + + + assertEquals(1, onNext) + assertEquals(1, onError) + assertEquals(1, onCancelled) + } + + @Test + fun testBufferSize1() = runTest { + val publisher = flowPublish(currentDispatcher()) { + expect(1) + send(3) + + expect(2) + send(5) + + expect(4) + send(7) + expect(6) + } + + publisher.asFlow().buffer(1).collect { + expect(it) + } + + finish(8) + } + + @Test + fun testBufferSizeDefault() = runTest { + val publisher = flowPublish(currentDispatcher()) { + repeat(64) { + send(it + 1) + expect(it + 1) + } + assertFalse { offer(-1) } + } + + publisher.asFlow().collect { + expect(64 + it) + } + + finish(129) + } + + @Test + fun testDefaultCapacityIsProperlyOverwritten() = runTest { + val publisher = flowPublish(currentDispatcher()) { + expect(1) + send(3) + expect(2) + send(5) + expect(4) + send(7) + expect(6) + } + + publisher.asFlow().flowOn(wrapperDispatcher()).buffer(1).collect { + expect(it) + } + + finish(8) + } + + @Test + fun testBufferSize10() = runTest { + val publisher = flowPublish(currentDispatcher()) { + expect(1) + send(5) + + expect(2) + send(6) + + expect(3) + send(7) + expect(4) + } + + publisher.asFlow().buffer(10).collect { + expect(it) + } + + finish(8) + } + + @Test + fun testConflated() = runTest { + val publisher = flowPublish(currentDispatcher()) { + for (i in 1..5) send(i) + } + val list = publisher.asFlow().conflate().toList() + assertEquals(listOf(1, 5), list) + } + + @Test + fun testProduce() = runTest { + val flow = flowPublish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow() + check((0..9).toList(), flow.produceIn(this)) + check((0..9).toList(), flow.buffer(2).produceIn(this)) + check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) + check(listOf(0, 9), flow.conflate().produceIn(this)) + } + + private suspend fun check(expected: List, channel: ReceiveChannel) { + val result = ArrayList(10) + channel.consumeEach { result.add(it) } + assertEquals(expected, result) + } + + @Test + fun testProduceCancellation() = runTest { + expect(1) + // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled + val flow = flowPublish(currentDispatcher()) { + expect(3) + repeat(10) { value -> + when (value) { + in 0..6 -> send(value) + 7 -> try { + send(value) + } catch (e: CancellationException) { + expect(5) + throw e + } + else -> expectUnreached() + } + } + }.asFlow().buffer(1) + assertFailsWith { + coroutineScope { + expect(2) + val channel = flow.produceIn(this) + channel.consumeEach { value -> + when (value) { + in 0..4 -> {} + 5 -> { + expect(4) + throw TestException() + } + else -> expectUnreached() + } + } + } + } + finish(6) + } +} diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherBackpressureTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherBackpressureTest.kt new file mode 100644 index 0000000000..bc9d58ebcf --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherBackpressureTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import org.junit.* +import java.util.concurrent.Flow as JFlow + +class PublisherBackpressureTest : TestBase() { + @Test + fun testCancelWhileBPSuspended() = runBlocking { + expect(1) + val observable = flowPublish(currentDispatcher()) { + expect(5) + send("A") // will not suspend, because an item was requested + expect(7) + send("B") // second requested item + expect(9) + try { + send("C") // will suspend (no more requested) + } finally { + expect(12) + } + expectUnreached() + } + expect(2) + var sub: JFlow.Subscription? = null + observable.subscribe(object : JFlow.Subscriber { + override fun onSubscribe(s: JFlow.Subscription) { + sub = s + expect(3) + s.request(2) // request two items + } + + override fun onNext(t: String) { + when (t) { + "A" -> expect(6) + "B" -> expect(8) + else -> error("Should not happen") + } + } + + override fun onComplete() { + expectUnreached() + } + + override fun onError(e: Throwable) { + expectUnreached() + } + }) + expect(4) + yield() // yield to observable coroutine + expect(10) + sub!!.cancel() // now unsubscribe -- shall cancel coroutine (& do not signal) + expect(11) + yield() // shall perform finally in coroutine + finish(13) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherCompletionStressTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherCompletionStressTest.kt new file mode 100644 index 0000000000..8462df2c6f --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherCompletionStressTest.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import org.junit.* +import java.util.* +import kotlin.coroutines.* + +class PublisherCompletionStressTest : TestBase() { + private val N_REPEATS = 10_000 * stressTestMultiplier + + private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = flowPublish(context) { + for (x in start until start + count) send(x) + } + + @Test + fun testCompletion() { + val rnd = Random() + repeat(N_REPEATS) { + val count = rnd.nextInt(5) + runBlocking { + withTimeout(5000) { + var received = 0 + range(Dispatchers.Default, 1, count).collect { x -> + received++ + if (x != received) error("$x != $received") + } + if (received != count) error("$received != $count") + } + } + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherMultiTest.kt new file mode 100644 index 0000000000..a44850b52d --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherMultiTest.kt @@ -0,0 +1,31 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.test.* + +class PublisherMultiTest : TestBase() { + @Test + fun testConcurrentStress() = runBlocking { + val n = 10_000 * stressTestMultiplier + val observable = flowPublish { + // concurrent emitters (many coroutines) + val jobs = List(n) { + // launch + launch { + send(it) + } + } + jobs.forEach { it.join() } + } + val resultSet = mutableSetOf() + observable.collect { + assertTrue(resultSet.add(it)) + } + assertEquals(n, resultSet.size) + } +} diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 3ec0b93481..6f7d98480b 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -42,12 +42,12 @@ class IntegrationTest( if (delay) delay(1) // does not send anything } - assertNSE { pub.awaitFirst() } + assertFailsWith { pub.awaitFirst() } assertEquals("OK", pub.awaitFirstOrDefault("OK")) assertNull(pub.awaitFirstOrNull()) assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" }) - assertNSE { pub.awaitLast() } - assertNSE { pub.awaitSingle() } + assertFailsWith { pub.awaitLast() } + assertFailsWith { pub.awaitSingle() } var cnt = 0 pub.collect { cnt++ } assertEquals(0, cnt) @@ -87,7 +87,7 @@ class IntegrationTest( assertEquals(n, pub.awaitLast()) assertEquals(1, pub.awaitFirstOrNull()) assertEquals(1, pub.awaitFirstOrElse { 0 }) - assertIAE { pub.awaitSingle() } + assertFailsWith { pub.awaitSingle() } checkNumbers(n, pub) val channel = pub.openSubscription() checkNumbers(n, channel.asPublisher(ctx(coroutineContext))) @@ -129,21 +129,4 @@ class IntegrationTest( assertEquals(n, last) } - private inline fun assertIAE(block: () -> Unit) { - try { - block() - expectUnreached() - } catch (e: Throwable) { - assertTrue(e is IllegalArgumentException) - } - } - - private inline fun assertNSE(block: () -> Unit) { - try { - block() - expectUnreached() - } catch (e: Throwable) { - assertTrue(e is NoSuchElementException) - } - } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt index 4faebbd251..22e0e72191 100644 --- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt @@ -42,12 +42,12 @@ class IntegrationTest( if (delay) delay(1) // does not send anything } - assertNSE { observable.awaitFirst() } + assertFailsWith { observable.awaitFirst() } assertEquals("OK", observable.awaitFirstOrDefault("OK")) assertNull(observable.awaitFirstOrNull()) assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" }) - assertNSE { observable.awaitLast() } - assertNSE { observable.awaitSingle() } + assertFailsWith { observable.awaitLast() } + assertFailsWith { observable.awaitSingle() } var cnt = 0 observable.collect { cnt++ @@ -89,7 +89,7 @@ class IntegrationTest( assertEquals(1, observable.awaitFirstOrNull()) assertEquals(1, observable.awaitFirstOrElse { 0 }) assertEquals(n, observable.awaitLast()) - assertIAE { observable.awaitSingle() } + assertFailsWith { observable.awaitSingle() } checkNumbers(n, observable) val channel = observable.openSubscription() checkNumbers(n, channel.asObservable(ctx(coroutineContext))) @@ -131,22 +131,4 @@ class IntegrationTest( assertEquals(n, last) } - - private inline fun assertIAE(block: () -> Unit) { - try { - block() - expectUnreached() - } catch (e: Throwable) { - assertTrue(e is IllegalArgumentException) - } - } - - private inline fun assertNSE(block: () -> Unit) { - try { - block() - expectUnreached() - } catch (e: Throwable) { - assertTrue(e is NoSuchElementException) - } - } } \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index c158926e43..64ae2ffad3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -32,6 +32,7 @@ module('integration/kotlinx-coroutines-play-services') module('reactive/kotlinx-coroutines-reactive') module('reactive/kotlinx-coroutines-reactor') +module('reactive/kotlinx-coroutines-jdk9') module('reactive/kotlinx-coroutines-rx2') module('ui/kotlinx-coroutines-android') module('ui/kotlinx-coroutines-android/android-unit-tests')