From 4f24a7a71f83564089c5871d48b94bc758d6c079 Mon Sep 17 00:00:00 2001 From: Nikita Koval Date: Tue, 12 Nov 2019 15:54:50 +0300 Subject: [PATCH] Update lincheck to 2.5.3 and re-write the corresponding tests --- build.gradle | 2 +- gradle.properties | 2 +- kotlinx-coroutines-core/build.gradle | 2 +- .../common/test/channels/TestChannelKind.kt | 55 ++--- .../jvm/test/LCStressOptionsDefault.kt | 20 ++ kotlinx-coroutines-core/jvm/test/TestBase.kt | 4 + .../test/internal/SegmentQueueLCStressTest.kt | 30 --- .../ChannelCloseLCStressTest.kt | 82 ------- .../ChannelIsClosedLCStressTest.kt | 54 ----- .../linearizability/ChannelLCStressTest.kt | 73 ------ .../linearizability/ChannelsLCStressTest.kt | 226 ++++++++++++++++++ .../jvm/test/linearizability/LinTesting.kt | 138 ----------- .../LockFreeListLCStressTest.kt | 40 +--- .../LockFreeTaskQueueLCStressTest.kt | 89 ++----- .../SegmentQueueLCStressTest.kt | 40 ++++ 15 files changed, 342 insertions(+), 515 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt delete mode 100644 kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLCStressTest.kt delete mode 100644 kotlinx-coroutines-core/jvm/test/linearizability/ChannelCloseLCStressTest.kt delete mode 100644 kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLCStressTest.kt delete mode 100644 kotlinx-coroutines-core/jvm/test/linearizability/ChannelLCStressTest.kt create mode 100644 kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt delete mode 100644 kotlinx-coroutines-core/jvm/test/linearizability/LinTesting.kt create mode 100644 kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt diff --git a/build.gradle b/build.gradle index c410ff73f7..01345b6f95 100644 --- a/build.gradle +++ b/build.gradle @@ -101,7 +101,7 @@ allprojects { kotlin_version = rootProject.properties['kotlin_snapshot_version'] } - if (build_snapshot_train || atomicfu_version.endsWith("-SNAPSHOT")) { + if (build_snapshot_train || atomicfu_version.endsWith("-SNAPSHOT") || lincheck_version.endsWith("-SNAPSHOT")) { repositories { mavenLocal() maven { url "https://oss.sonatype.org/content/repositories/snapshots" } diff --git a/gradle.properties b/gradle.properties index 1ac5febcff..6d8456a5a8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,7 +11,7 @@ kotlin_version=1.3.61 junit_version=4.12 atomicfu_version=0.14.1 html_version=0.6.8 -lincheck_version=2.0 +lincheck_version=2.5.3 dokka_version=0.9.16-rdev-2-mpp-hacks byte_buddy_version=1.9.3 reactor_vesion=3.2.5.RELEASE diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index 4d516962e9..6fedd7c016 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -48,7 +48,7 @@ configurations { kotlin.sourceSets { jvmTest.dependencies { - api "com.devexperts.lincheck:lincheck:$lincheck_version" + api "org.jetbrains.kotlinx:lincheck:$lincheck_version" api "com.esotericsoftware:kryo:4.0.0" implementation project (":android-unit-tests") } diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt index 27c58165c1..69d8fd03e3 100644 --- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt +++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt @@ -7,45 +7,26 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlinx.coroutines.selects.* -enum class TestChannelKind { - RENDEZVOUS { - override fun create(): Channel = Channel(Channel.RENDEZVOUS) - override fun toString(): String = "RendezvousChannel" - }, - ARRAY_1 { - override fun create(): Channel = Channel(1) - override fun toString(): String = "ArrayChannel(1)" - }, - ARRAY_10 { - override fun create(): Channel = Channel(10) - override fun toString(): String = "ArrayChannel(10)" - }, - LINKED_LIST { - override fun create(): Channel = Channel(Channel.UNLIMITED) - override fun toString(): String = "LinkedListChannel" - }, - CONFLATED { - override fun create(): Channel = Channel(Channel.CONFLATED) - override fun toString(): String = "ConflatedChannel" - override val isConflated: Boolean get() = true - }, - ARRAY_BROADCAST_1 { - override fun create(): Channel = ChannelViaBroadcast(BroadcastChannel(1)) - override fun toString(): String = "ArrayBroadcastChannel(1)" - }, - ARRAY_BROADCAST_10 { - override fun create(): Channel = ChannelViaBroadcast(BroadcastChannel(10)) - override fun toString(): String = "ArrayBroadcastChannel(10)" - }, - CONFLATED_BROADCAST { - override fun create(): Channel = ChannelViaBroadcast(ConflatedBroadcastChannel()) - override fun toString(): String = "ConflatedBroadcastChannel" - override val isConflated: Boolean get() = true - } +enum class TestChannelKind(val capacity: Int, + private val description: String, + private val viaBroadcast: Boolean = false +) { + RENDEZVOUS(0, "RendezvousChannel"), + ARRAY_1(1, "ArrayChannel(1)"), + ARRAY_2(2, "ArrayChannel(2)"), + ARRAY_10(10, "ArrayChannel(10)"), + LINKED_LIST(Channel.UNLIMITED, "LinkedListChannel"), + CONFLATED(Channel.CONFLATED, "ConflatedChannel"), + ARRAY_1_BROADCAST(1, "ArrayBroadcastChannel(1)", viaBroadcast = true), + ARRAY_10_BROADCAST(10, "ArrayBroadcastChannel(10)", viaBroadcast = true), + CONFLATED_BROADCAST(Channel.CONFLATED, "ConflatedBroadcastChannel", viaBroadcast = true) ; - abstract fun create(): Channel - open val isConflated: Boolean get() = false + fun create(): Channel = if (viaBroadcast) ChannelViaBroadcast(BroadcastChannel(capacity)) + else Channel(capacity) + + val isConflated get() = capacity == Channel.CONFLATED + override fun toString(): String = description } private class ChannelViaBroadcast( diff --git a/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt b/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt new file mode 100644 index 0000000000..62ded9f969 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt @@ -0,0 +1,20 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +package kotlinx.coroutines + +import org.jetbrains.kotlinx.lincheck.* +import org.jetbrains.kotlinx.lincheck.strategy.stress.* +import kotlin.reflect.* + +class LCStressOptionsDefault : StressOptions() { + init { + iterations(100 * stressTestMultiplierCbrt) + invocationsPerIteration(1000 * stressTestMultiplierCbrt) + actorsBefore(if (isStressTest) 3 else 0) + threads(3) + actorsPerThread(if (isStressTest) 3 else 2) + } +} + +fun Options<*,*>.check(testClass: KClass<*>) = LinChecker.check(testClass.java, this) \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index 9c3efb4a39..bf462cc78f 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -7,9 +7,11 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.scheduling.* import org.junit.* +import java.lang.Math.* import java.util.* import java.util.concurrent.atomic.* import kotlin.coroutines.* +import kotlin.math.* import kotlin.test.* private val VERBOSE = systemProp("test.verbose", false) @@ -26,6 +28,8 @@ public val stressTestMultiplierSqrt = if (isStressTest) 5 else 1 */ public actual val stressTestMultiplier = stressTestMultiplierSqrt * stressTestMultiplierSqrt +public val stressTestMultiplierCbrt = cbrt(stressTestMultiplier.toDouble()).roundToInt() + /** * Base class for tests, so that tests for predictable scheduling of actions in multiple coroutines sharing a single * thread can be written. Use it like this: diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLCStressTest.kt deleted file mode 100644 index c8493f6f30..0000000000 --- a/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLCStressTest.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -import com.devexperts.dxlab.lincheck.LinChecker -import com.devexperts.dxlab.lincheck.annotations.Operation -import com.devexperts.dxlab.lincheck.annotations.Param -import com.devexperts.dxlab.lincheck.paramgen.IntGen -import com.devexperts.dxlab.lincheck.strategy.stress.StressCTest -import org.junit.Test - -@StressCTest -class SegmentQueueLCStressTest { - private val q = SegmentBasedQueue() - - @Operation - fun add(@Param(gen = IntGen::class) x: Int) { - q.enqueue(x) - } - - @Operation - fun poll(): Int? = q.dequeue() - - @Test - fun test() { - LinChecker.check(SegmentQueueLCStressTest::class.java) - } -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelCloseLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelCloseLCStressTest.kt deleted file mode 100644 index 5bdc2841dc..0000000000 --- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelCloseLCStressTest.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -@file:Suppress("unused") - -package kotlinx.coroutines.linearizability - -import com.devexperts.dxlab.lincheck.* -import com.devexperts.dxlab.lincheck.annotations.* -import com.devexperts.dxlab.lincheck.paramgen.* -import com.devexperts.dxlab.lincheck.strategy.stress.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import org.junit.* -import java.io.* - -/** - * This is stress test that is fine-tuned to catch the problem - * [#1419](https://github.com/Kotlin/kotlinx.coroutines/issues/1419) - */ -@Param(name = "value", gen = IntGen::class, conf = "2:2") -@OpGroupConfig.OpGroupConfigs( - OpGroupConfig(name = "send", nonParallel = true), - OpGroupConfig(name = "receive", nonParallel = true), - OpGroupConfig(name = "close", nonParallel = true) -) -class ChannelCloseLCStressTest : TestBase() { - - private companion object { - // Emulating ctor argument for lincheck - var capacity = 0 - } - - private val lt = LinTesting() - private var channel: Channel = Channel(capacity) - - @Operation(runOnce = true, group = "send") - fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) } - - @Operation(runOnce = true, group = "send") - fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) } - - @Operation(runOnce = true, group = "receive") - fun receive1() = lt.run("receive1") { channel.receive() } - - @Operation(runOnce = true, group = "receive") - fun receive2() = lt.run("receive2") { channel.receive() } - - @Operation(runOnce = true, group = "close") - fun close1() = lt.run("close1") { channel.close(IOException("close1")) } - - @Operation(runOnce = true, group = "close") - fun close2() = lt.run("close2") { channel.close(IOException("close2")) } - - @Test - fun testRendezvousChannelLinearizability() { - runTest(0) - } - - @Test - fun testArrayChannelLinearizability() { - for (i in listOf(1, 2, 16)) { - runTest(i) - } - } - - @Test - fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED) - - @Test - fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED) - - private fun runTest(capacity: Int) { - ChannelCloseLCStressTest.capacity = capacity - val options = StressOptions() - .iterations(1) // only one iteration -- test scenario is fixed - .invocationsPerIteration(10_000 * stressTestMultiplierSqrt) - .threads(3) - .verifier(LinVerifier::class.java) - LinChecker.check(ChannelCloseLCStressTest::class.java, options) - } -} diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLCStressTest.kt deleted file mode 100644 index 44ba182dd3..0000000000 --- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLCStressTest.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -@file:Suppress("unused") - -package kotlinx.coroutines.linearizability - -import com.devexperts.dxlab.lincheck.* -import com.devexperts.dxlab.lincheck.annotations.* -import com.devexperts.dxlab.lincheck.paramgen.* -import com.devexperts.dxlab.lincheck.strategy.stress.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import org.junit.* -import java.io.* - -@Param(name = "value", gen = IntGen::class, conf = "1:3") -class ChannelIsClosedLCStressTest : TestBase() { - - private val lt = LinTesting() - private val channel = Channel() - - @Operation(runOnce = true) - fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) } - - @Operation(runOnce = true) - fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) } - - @Operation(runOnce = true) - fun receive1() = lt.run("receive1") { channel.receive() } - - @Operation(runOnce = true) - fun receive2() = lt.run("receive2") { channel.receive() } - - @Operation(runOnce = true) - fun close1() = lt.run("close1") { channel.close(IOException("close1")) } - - @Operation(runOnce = true) - fun isClosedForReceive() = lt.run("isClosedForReceive") { channel.isClosedForReceive } - - @Operation(runOnce = true) - fun isClosedForSend() = lt.run("isClosedForSend") { channel.isClosedForSend } - - @Test - fun testLinearizability() { - val options = StressOptions() - .iterations(100 * stressTestMultiplierSqrt) - .invocationsPerIteration(1000 * stressTestMultiplierSqrt) - .threads(3) - .verifier(LinVerifier::class.java) - - LinChecker.check(ChannelIsClosedLCStressTest::class.java, options) - } -} diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelLCStressTest.kt deleted file mode 100644 index f4b775631f..0000000000 --- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelLCStressTest.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -@file:Suppress("unused") - -package kotlinx.coroutines.linearizability - -import com.devexperts.dxlab.lincheck.* -import com.devexperts.dxlab.lincheck.annotations.* -import com.devexperts.dxlab.lincheck.paramgen.* -import com.devexperts.dxlab.lincheck.strategy.stress.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import org.junit.* -import java.io.* - -@Param(name = "value", gen = IntGen::class, conf = "1:3") -class ChannelLCStressTest : TestBase() { - - private companion object { - // Emulating ctor argument for lincheck - var capacity = 0 - } - - private val lt = LinTesting() - private var channel: Channel = Channel(capacity) - - @Operation(runOnce = true) - fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) } - - @Operation(runOnce = true) - fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) } - - @Operation(runOnce = true) - fun receive1() = lt.run("receive1") { channel.receive() } - - @Operation(runOnce = true) - fun receive2() = lt.run("receive2") { channel.receive() } - - @Operation(runOnce = true) - fun close1() = lt.run("close1") { channel.close(IOException("close1")) } - - @Operation(runOnce = true) - fun close2() = lt.run("close2") { channel.close(IOException("close2")) } - - @Test - fun testRendezvousChannelLinearizability() { - runTest(0) - } - - @Test - fun testArrayChannelLinearizability() { - for (i in listOf(1, 2, 16)) { - runTest(i) - } - } - - @Test - fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED) - - @Test - fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED) - - private fun runTest(capacity: Int) { - ChannelLCStressTest.capacity = capacity - val options = StressOptions() - .iterations(50 * stressTestMultiplierSqrt) - .invocationsPerIteration(500 * stressTestMultiplierSqrt) - .threads(3) - .verifier(LinVerifier::class.java) - LinChecker.check(ChannelLCStressTest::class.java, options) - } -} diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt new file mode 100644 index 0000000000..625c620497 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt @@ -0,0 +1,226 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:Suppress("unused") + +package kotlinx.coroutines.linearizability + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.channels.Channel.Factory.CONFLATED +import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS +import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED +import kotlinx.coroutines.selects.* +import org.jetbrains.kotlinx.lincheck.annotations.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.paramgen.* +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.junit.* + + +class RendezvousChannelLCStressTest : ChannelLCStressTestBase( + c = Channel(RENDEZVOUS), + sequentialSpecification = SequentialRendezvousChannel::class.java +) +class SequentialRendezvousChannel : SequentialIntChannelBase(RENDEZVOUS) + +class Array1ChannelLCStressTest : ChannelLCStressTestBase( + c = Channel(1), + sequentialSpecification = SequentialArray1RendezvousChannel::class.java +) +class SequentialArray1RendezvousChannel : SequentialIntChannelBase(1) + +class Array2ChannelLCStressTest : ChannelLCStressTestBase( + c = Channel(2), + sequentialSpecification = SequentialArray2RendezvousChannel::class.java +) +class SequentialArray2RendezvousChannel : SequentialIntChannelBase(2) + +class UnlimitedChannelLCStressTest : ChannelLCStressTestBase( + c = Channel(UNLIMITED), + sequentialSpecification = SequentialUnlimitedChannel::class.java +) +class SequentialUnlimitedChannel : SequentialIntChannelBase(UNLIMITED) + +class ConflatedChannelLCStressTest : ChannelLCStressTestBase( + c = Channel(CONFLATED), + sequentialSpecification = SequentialConflatedChannel::class.java +) +class SequentialConflatedChannel : SequentialIntChannelBase(CONFLATED) + + +@Param.Params( + Param(name = "value", gen = IntGen::class, conf = "1:5"), + Param(name = "closeToken", gen = IntGen::class, conf = "1:3") +) +abstract class ChannelLCStressTestBase(private val c: Channel, private val sequentialSpecification: Class<*>) { + @Operation + suspend fun send(@Param(name = "value") value: Int): Any = try { + c.send(value) + } catch (e: NumberedCancellationException) { + e.testResult + } + + @Operation + fun offer(@Param(name = "value") value: Int): Any = try { + c.offer(value) + } catch (e: NumberedCancellationException) { + e.testResult + } + + // TODO: this operation should be (and can be!) linearizable, but is not + // @Operation + suspend fun sendViaSelect(@Param(name = "value") value: Int): Any = try { + select { c.onSend(value) {} } + } catch (e: NumberedCancellationException) { + e.testResult + } + + @Operation + suspend fun receive(): Any = try { + c.receive() + } catch (e: NumberedCancellationException) { + e.testResult + } + + @Operation + fun poll(): Any? = try { + c.poll() + } catch (e: NumberedCancellationException) { + e.testResult + } + + // TODO: this operation should be (and can be!) linearizable, but is not + // @Operation + suspend fun receiveViaSelect(): Any = try { + select { c.onReceive { it } } + } catch (e: NumberedCancellationException) { + e.testResult + } + + @Operation + fun close(@Param(name = "closeToken") token: Int): Boolean = c.close(NumberedCancellationException(token)) + + // TODO: this operation should be (and can be!) linearizable, but is not + // @Operation + fun cancel(@Param(name = "closeToken") token: Int) = c.cancel(NumberedCancellationException(token)) + +// @Operation + fun isClosedForReceive() = c.isClosedForReceive + +// @Operation + fun isClosedForSend() = c.isClosedForSend + + // TODO: this operation should be (and can be!) linearizable, but is not + // @Operation + fun isEmpty() = c.isEmpty + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .sequentialSpecification(sequentialSpecification) + .check(this::class) +} + +private class NumberedCancellationException(number: Int) : CancellationException() { + val testResult = "Closed($number)" +} + + +abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierState() { + private val senders = ArrayList, Int>>() + private val receivers = ArrayList>() + private val buffer = ArrayList() + private var closedMessage: String? = null + + suspend fun send(x: Int): Any = when (val offerRes = offer(x)) { + true -> Unit + false -> suspendCancellableCoroutine { cont -> + senders.add(cont to x) + } + else -> offerRes + } + + fun offer(element: Int): Any { + if (closedMessage !== null) return closedMessage!! + if (capacity == CONFLATED) { + if (resumeFirstReceiver(element)) return true + buffer.clear() + buffer.add(element) + return true + } + if (resumeFirstReceiver(element)) return true + if (buffer.size < capacity) { + buffer.add(element) + return true + } + return false + } + + private fun resumeFirstReceiver(element: Int): Boolean { + while (receivers.isNotEmpty()) { + val r = receivers.removeAt(0) + if (r.resume(element)) return true + } + return false + } + + suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont -> + receivers.add(cont) + } + + fun poll(): Any? { + if (buffer.isNotEmpty()) { + val el = buffer.removeAt(0) + resumeFirstSender().also { + if (it !== null) buffer.add(it) + } + return el + } + resumeFirstSender()?.also { return it } + if (closedMessage !== null) return closedMessage + return null + } + + private fun resumeFirstSender(): Int? { + while (senders.isNotEmpty()) { + val (s, el) = senders.removeAt(0) + if (s.resume(Unit)) return el + } + return null + } + + suspend fun sendViaSelect(element: Int) = send(element) + suspend fun receiveViaSelect() = receive() + + fun close(token: Int): Boolean { + if (closedMessage !== null) return false + closedMessage = "Closed($token)" + for (r in receivers) r.resume(closedMessage!!) + receivers.clear() + return true + } + + fun cancel(token: Int) { + if (!close(token)) return + for ((s, _) in senders) s.resume(closedMessage!!) + senders.clear() + buffer.clear() + } + + fun isClosedForSend(): Boolean = closedMessage !== null + fun isClosedForReceive(): Boolean = isClosedForSend() && buffer.isEmpty() && senders.isEmpty() + + fun isEmpty(): Boolean { + if (closedMessage !== null) return false + return buffer.isEmpty() && senders.isEmpty() + } + + override fun extractState() = buffer to closedMessage +} + +private fun CancellableContinuation.resume(res: T): Boolean { + val token = tryResume(res) ?: return false + completeResume(token) + return true +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LinTesting.kt b/kotlinx-coroutines-core/jvm/test/linearizability/LinTesting.kt deleted file mode 100644 index 14cf2a7039..0000000000 --- a/kotlinx-coroutines-core/jvm/test/linearizability/LinTesting.kt +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines - -import com.devexperts.dxlab.lincheck.Actor -import com.devexperts.dxlab.lincheck.Result -import com.devexperts.dxlab.lincheck.Utils.* -import com.devexperts.dxlab.lincheck.execution.* -import com.devexperts.dxlab.lincheck.verifier.Verifier -import java.lang.reflect.Method -import java.util.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED -import kotlin.coroutines.intrinsics.startCoroutineUninterceptedOrReturn - -data class OpResult(val name: String, val value: Any?) { - override fun toString(): String = "$name=$value" -} - -private const val CS_STR = "COROUTINE_SUSPENDED" - -class LinTesting { - private val resumed = object : ThreadLocal>() { - override fun initialValue() = arrayListOf() - } - - private inline fun wrap(block: () -> Any?): Any? = - try { repr(block()) } - catch(e: Throwable) { repr(e) } - - private fun repr(e: Any?): Any? = - when { - e === COROUTINE_SUSPENDED -> CS_STR - e is Throwable -> e.toString() - else -> e - } - - fun run(name: String, block: suspend () -> T): List { - val list = resumed.get() - list.clear() - val result = arrayListOf(OpResult(name, wrap { - block.startCoroutineUninterceptedOrReturn(completion = object : Continuation { - override val context: CoroutineContext - get() = EmptyCoroutineContext - - override fun resumeWith(result: kotlin.Result) { - val value = if (result.isSuccess) result.getOrNull() else result.exceptionOrNull() - resumed.get() += OpResult(name, repr(value)) - } - } - ) - })) - result.addAll(list) - return result - } -} - -class LinVerifier(scenario: ExecutionScenario, - testClass: Class<*>) : Verifier { - private val possibleResultsSet: Set>> = - generateAllLinearizableExecutions(scenario.parallelExecution) - .asSequence() - .map { linEx: List -> - val res: List = executeActors(testClass.newInstance(), linEx) - val actorIds = linEx.asSequence().withIndex().associateBy({ it.value}, { it.index }) - scenario.parallelExecution.map { actors -> actors.map { actor -> res[actorIds[actor]!!] } } - }.toSet() - - override fun verifyResults(results: ExecutionResult): Boolean { - if (!valid(results.parallelResults)) { - println("\nNon-linearizable execution:") - printResults(results.parallelResults) - println("\nPossible linearizable executions:") - possibleResultsSet.forEach { possibleResults -> - printResults(possibleResults) - println() - } - throw AssertionError("Non-linearizable execution detected, see log for details") - } - - return true - } - - private fun printResults(results: List>) { - results.forEachIndexed { index, res -> - println("Thread $index: $res") - } - println("Op map: ${results.toOpMap()}") - } - - private fun valid(results: List>): Boolean = - (results in possibleResultsSet) || possibleResultsSet.any { matches(results, it) } - - private fun matches(results: List>, possible: List>): Boolean = - results.toOpMap() == possible.toOpMap() - - private fun List>.toOpMap(): Map> { - val filtered = flatMap { it }.flatMap { it.resultValue }.filter { it.value != CS_STR } - return filtered.groupBy({ it.name }, { it.value }) - } - - private fun generateAllLinearizableExecutions(actorsPerThread: List>): List> { - val executions = ArrayList>() - generateLinearizableExecutions0( - executions, actorsPerThread, ArrayList(), IntArray(actorsPerThread.size), - actorsPerThread.sumBy { it.size }) - return executions - } - - @Suppress("UNCHECKED_CAST") - private fun generateLinearizableExecutions0(executions: MutableList>, actorsPerThread: List>, - currentExecution: ArrayList, indexes: IntArray, length: Int) { - if (currentExecution.size == length) { - executions.add(currentExecution.clone() as List) - return - } - for (i in indexes.indices) { - val actors = actorsPerThread[i] - if (indexes[i] == actors.size) - continue - currentExecution.add(actors[indexes[i]]) - indexes[i]++ - generateLinearizableExecutions0(executions, actorsPerThread, currentExecution, indexes, length) - indexes[i]-- - currentExecution.removeAt(currentExecution.size - 1) - } - } -} - -private val VALUE = Result::class.java.getDeclaredField("value").apply { isAccessible = true } - -@Suppress("UNCHECKED_CAST") -private val Result.resultValue: List - get() = VALUE.get(this) as List diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt index 546615489b..5f91c640a6 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt @@ -5,16 +5,16 @@ package kotlinx.coroutines.linearizability -import com.devexperts.dxlab.lincheck.* -import com.devexperts.dxlab.lincheck.annotations.* -import com.devexperts.dxlab.lincheck.paramgen.* -import com.devexperts.dxlab.lincheck.strategy.stress.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* +import org.jetbrains.kotlinx.lincheck.annotations.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.paramgen.* +import org.jetbrains.kotlinx.lincheck.verifier.* import kotlin.test.* -@Param(name = "value", gen = IntGen::class, conf = "1:3") -class LockFreeListLCStressTest : TestBase() { +@Param(name = "value", gen = IntGen::class, conf = "1:5") +class LockFreeListLCStressTest : VerifierState() { class Node(val value: Int): LockFreeLinkedListNode() private val q: LockFreeLinkedListHead = LockFreeLinkedListHead() @@ -44,29 +44,11 @@ class LockFreeListLCStressTest : TestBase() { private fun Any.isSame(value: Int) = this is Node && this.value == value @Test - fun testAddRemoveLinearizability() { - val options = StressOptions() - .iterations(100 * stressTestMultiplierSqrt) - .invocationsPerIteration(1000 * stressTestMultiplierSqrt) - .threads(3) - LinChecker.check(LockFreeListLCStressTest::class.java, options) - } - - private var _curElements: ArrayList? = null - private val curElements: ArrayList get() { - if (_curElements == null) { - _curElements = ArrayList() - q.forEach { _curElements!!.add(it.value) } - } - return _curElements!! - } - - override fun equals(other: Any?): Boolean { - other as LockFreeListLCStressTest - return curElements == other.curElements - } + fun testAddRemoveLinearizability() = LCStressOptionsDefault().check(this::class) - override fun hashCode(): Int { - return curElements.hashCode() + override fun extractState(): Any { + val elements = ArrayList() + q.forEach { elements.add(it.value) } + return elements } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt index ea2afa1019..de494cc1e6 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt @@ -5,22 +5,19 @@ package kotlinx.coroutines.linearizability -import com.devexperts.dxlab.lincheck.* -import com.devexperts.dxlab.lincheck.annotations.* -import com.devexperts.dxlab.lincheck.paramgen.* -import com.devexperts.dxlab.lincheck.strategy.stress.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* +import org.jetbrains.kotlinx.lincheck.annotations.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.paramgen.* +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.jetbrains.kotlinx.lincheck.verifier.quiescent.* import kotlin.test.* -internal data class Snapshot(val elements: List, val isClosed: Boolean) { - constructor(q: LockFreeTaskQueue) : this(q.map { it }, q.isClosed()) -} - -@OpGroupConfig.OpGroupConfigs(OpGroupConfig(name = "consumer", nonParallel = true)) @Param(name = "value", gen = IntGen::class, conf = "1:3") -class SCLockFreeTaskQueueLCStressTest : LockFreeTaskQueueLCTestBase() { - private val q: LockFreeTaskQueue = LockFreeTaskQueue(singleConsumer = true) +internal abstract class AbstractLockFreeTaskQueueWithoutRemoveLCStressTest protected constructor(singleConsumer: Boolean) : VerifierState() { + @JvmField + protected val q = LockFreeTaskQueue(singleConsumer = singleConsumer) @Operation fun close() = q.close() @@ -28,66 +25,20 @@ class SCLockFreeTaskQueueLCStressTest : LockFreeTaskQueueLCTestBase() { @Operation fun addLast(@Param(name = "value") value: Int) = q.addLast(value) - /** - * Note that removeFirstOrNull is not linearizable w.r.t. to addLast, so here - * we test only linearizability of close. - */ -// @Operation(group = "consumer") -// fun removeFirstOrNull() = q.removeFirstOrNull() - - @Test - fun testSC() = linTest() - - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as SCLockFreeTaskQueueLCStressTest - - return Snapshot(q) == Snapshot(other.q) - } - - override fun hashCode(): Int = Snapshot(q).hashCode() -} - -@Param(name = "value", gen = IntGen::class, conf = "1:3") -class MCLockFreeTaskQueueLCStressTest : LockFreeTaskQueueLCTestBase() { - private val q: LockFreeTaskQueue = LockFreeTaskQueue(singleConsumer = false) - - @Operation - fun close() = q.close() - - @Operation - fun addLast(@Param(name = "value") value: Int) = q.addLast(value) + @QuiescentConsistent + @Operation(group = "consumer") + fun removeFirstOrNull() = q.removeFirstOrNull() - /** - * Note that removeFirstOrNull is not linearizable w.r.t. to addLast, so here - * we test only linearizability of close. - */ -// @Operation(group = "consumer") -// fun removeFirstOrNull() = q.removeFirstOrNull() + override fun extractState() = q.map { it } to q.isClosed() @Test - fun testMC() = linTest() - - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as MCLockFreeTaskQueueLCStressTest - - return Snapshot(q) == Snapshot(other.q) - } - - override fun hashCode(): Int = Snapshot(q).hashCode() + fun testWithRemoveForQuiescentConsistency() = LCStressOptionsDefault() + .verifier(QuiescentConsistencyVerifier::class.java) + .check(this::class) } -open class LockFreeTaskQueueLCTestBase : TestBase() { - fun linTest() { - val options = StressOptions() - .iterations(100 * stressTestMultiplierSqrt) - .invocationsPerIteration(1000 * stressTestMultiplierSqrt) - .threads(2) - LinChecker.check(this::class.java, options) - } -} \ No newline at end of file +@OpGroupConfig(name = "consumer", nonParallel = false) +internal class MCLockFreeTaskQueueWithRemoveLCStressTest : AbstractLockFreeTaskQueueWithoutRemoveLCStressTest(singleConsumer = false) + +@OpGroupConfig(name = "consumer", nonParallel = true) +internal class SCLockFreeTaskQueueWithRemoveLCStressTest : AbstractLockFreeTaskQueueWithoutRemoveLCStressTest(singleConsumer = true) \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt new file mode 100644 index 0000000000..1bb51a568f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:Suppress("unused") + +package kotlinx.coroutines.linearizability + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.SegmentBasedQueue +import org.jetbrains.kotlinx.lincheck.annotations.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.paramgen.* +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.junit.* + +@Param(name = "value", gen = IntGen::class, conf = "1:5") +class SegmentQueueLCStressTest : VerifierState() { + private val q = SegmentBasedQueue() + + @Operation + fun add(@Param(name = "value") value: Int) { + q.enqueue(value) + } + + @Operation + fun poll(): Int? = q.dequeue() + + override fun extractState(): Any { + val elements = ArrayList() + while (true) { + val x = q.dequeue() ?: break + elements.add(x) + } + + return elements + } + + @Test + fun test() = LCStressOptionsDefault().check(this::class) +} \ No newline at end of file