From 81e829b0751a26ec0db936b5ace3a76734454fb1 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 30 Jan 2020 18:33:19 +0300 Subject: [PATCH] Implement ObservableValue.asFlow() --- .../kotlinx-coroutines-javafx.txt | 4 + .../src/JavaFxConvert.kt | 43 ++++++++ ...{JavaFxTest.kt => JavaFxDispatcherTest.kt} | 2 +- .../test/JavaFxObservableAsFlowTest.kt | 99 +++++++++++++++++++ 4 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt rename ui/kotlinx-coroutines-javafx/test/{JavaFxTest.kt => JavaFxDispatcherTest.kt} (97%) create mode 100644 ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt index 24c5b70b9d..620e904612 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt @@ -1,3 +1,7 @@ +public final class kotlinx/coroutines/javafx/JavaFxConvertKt { + public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow; +} + public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt new file mode 100644 index 0000000000..1474d3f608 --- /dev/null +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt @@ -0,0 +1,43 @@ +package kotlinx.coroutines.javafx + +import javafx.beans.value.ChangeListener +import javafx.beans.value.ObservableValue +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.conflate + +/** + * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces + * its values as they change. + * + * The resulting flow is conflated, meaning that if several values arrive in a quick succession, only + * the last one will be produced. + * + * It produces at least one value. + * + * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue] + * supports lazy evaluation, eager computation will be enforced while the flow is being collected. + */ +@ExperimentalCoroutinesApi +fun ObservableValue.asFlow(): Flow = callbackFlow { + val listener = ChangeListener { observable, oldValue, newValue -> + try { + offer(newValue) + } catch (e: CancellationException) { + // In case the event fires after the channel is closed + } + } + withContext(Dispatchers.JavaFx) { + addListener(listener) + send(value) + } + awaitClose { + runBlocking { + withContext(Dispatchers.JavaFx) { + removeListener(listener) + } + } + } +}.conflate() \ No newline at end of file diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt similarity index 97% rename from ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt rename to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt index e6a1ddb414..724be6d77b 100644 --- a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt +++ b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt @@ -8,7 +8,7 @@ import javafx.application.* import kotlinx.coroutines.* import org.junit.* -class JavaFxTest : TestBase() { +class JavaFxDispatcherTest : TestBase() { @Before fun setup() { ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher") diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt new file mode 100644 index 0000000000..258480dec7 --- /dev/null +++ b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt @@ -0,0 +1,99 @@ +package kotlinx.coroutines.javafx + +import javafx.beans.property.SimpleIntegerProperty +import kotlinx.coroutines.TestBase +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.Before +import org.junit.Test +import kotlin.test.assertTrue + + +class JavaFxObservableAsFlowTest : TestBase() { + + @Before + fun setup() { + ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher") + } + + @Test + fun testFlowOrder() = runTest { + if (!initPlatform()) { + println("Skipping JavaFxTest in headless environment") + return@runTest // ignore test in headless environments + } + + val integerProperty = SimpleIntegerProperty(0) + val n = 10000 * stressTestMultiplier + val flow = integerProperty.asFlow().takeWhile { j -> j != n } + newSingleThreadContext("setter").use { pool -> + launch(pool) { + for (i in 1..n) { + launch(Dispatchers.JavaFx) { + integerProperty.set(i) + } + } + } + var i = -1 + flow.collect { j -> + // elements are neither repeated nor shuffled + assertTrue(i < (j as Int)) + i = j + } + // at least one element is present + assertTrue(i != -1) + } + } + + @Test + fun testConflation() = runTest { + if (!initPlatform()) { + println("Skipping JavaFxTest in headless environment") + return@runTest // ignore test in headless environments + } + + val END_MARKER = -1 + val integerProperty = SimpleIntegerProperty(0) + val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER } + launch(start = CoroutineStart.UNDISPATCHED) { + withContext(Dispatchers.JavaFx) { + integerProperty.set(1) + } + withContext(Dispatchers.JavaFx) { + integerProperty.set(-2) // should be skipped + integerProperty.set(2) + } + withContext(Dispatchers.JavaFx) { + integerProperty.set(END_MARKER) + } + } + + flow.collect { i -> + assertTrue(i == 1 || i == 2) + } + } + + @Test + fun cancellationRaceStressTest() = runTest { + if (!initPlatform()) { + println("Skipping JavaFxTest in headless environment") + return@runTest // ignore test in headless environments + } + + val integerProperty = SimpleIntegerProperty(0) + val flow = integerProperty.asFlow() + var i = 1 + val n = 1000 * stressTestMultiplier + newSingleThreadContext("collector").use { pool -> + repeat (n) { + launch(pool) { + flow.first() + } + withContext(Dispatchers.JavaFx) { + integerProperty.set(i) + } + i += 1 + } + } + } +}