Skip to content

Commit

Permalink
Fix linearizability in AbstractChannel#sendSuspend, add infrastructur…
Browse files Browse the repository at this point in the history
…e to inject custom execution into linearizability checker
  • Loading branch information
qwwdfsad authored and elizarov committed Jul 25, 2018
1 parent df80002 commit 7bd983f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
return@sc
}
is Closed<*> -> {
helpClose(enqueueResult)
cont.resumeWithException(enqueueResult.sendException)
return@sc
}
Expand All @@ -205,6 +206,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
}
offerResult === OFFER_FAILED -> continue@loop
offerResult is Closed<*> -> {
helpClose(offerResult)
cont.resumeWithException(offerResult.sendException)
return@sc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ class ChannelIsClosedLinearizabilityTest : TestBase() {
.addThread(1, 3)
.addThread(1, 3)
.verifier(LinVerifier::class.java)
.injectExecution { actors, methods ->
actors[0].add(actorMethod(methods, "receive1"))
actors[0].add(actorMethod(methods, "receive2"))
actors[0].add(actorMethod(methods, "close1"))

actors[1].add(actorMethod(methods, "send2"))
actors[1].add(actorMethod(methods, "send1"))

actors[2].add(actorMethod(methods, "isClosedForSend"))
}

LinChecker.check(ChannelIsClosedLinearizabilityTest::class.java, options)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental.channels

import com.devexperts.dxlab.lincheck.*
import com.devexperts.dxlab.lincheck.execution.*
import java.util.*

typealias ExecutionBuilder = (List<MutableList<Actor>>, List<ActorGenerator>) -> Unit

/**
* Example of usage:
*
* ```
* StressOptions().injectExecution { actors, methods ->
* actors[0].add(actorMethod(methods, "receive1"))
* actors[0].add(actorMethod(methods, "receive2"))
*
* actors[1].add(actorMethod(methods, "send2"))
* actors[1].add(actorMethod(methods, "send1"))
* }
*
* ```
*
* Will produce
* ```
* Actors per thread:
* [receive1(), receive2()]
* [send2(), send1()]
* ```
* at the first iteration.
*
* DSL will be improved when this method will be used frequently
*/
fun Options<*, *>.injectExecution(behaviourBuilder: ExecutionBuilder): Options<*, *> {
injectedBehaviour.add(behaviourBuilder)
executionGenerator(FixedBehaviourInjectorExecutionGenerator::class.java)
return this
}

fun actorMethod(generators: List<ActorGenerator>, name: String): Actor =
generators.find { it.generate().method.name.contains(name) }?.generate() ?: error("Actor method $name is not found in ${generators.map { it.generate().method.name }}")

private val injectedBehaviour: Queue<ExecutionBuilder> = ArrayDeque<ExecutionBuilder>()

class FixedBehaviourInjectorExecutionGenerator(testConfiguration: CTestConfiguration, testStructure: CTestStructure)
: ExecutionGenerator(testConfiguration, testStructure) {

private val randomGenerator = RandomExecutionGenerator(testConfiguration, testStructure)

override fun nextExecution(): List<List<Actor>> {
val injector = injectedBehaviour.poll()
if (injector != null) {
val parallelGroup = ArrayList(testStructure.actorGenerators)
val actorsPerThread = ArrayList<MutableList<Actor>>()
for (i in testConfiguration.threadConfigurations.indices) {
actorsPerThread.add(ArrayList())
}

injector.invoke(actorsPerThread, parallelGroup)
return actorsPerThread
}

return randomGenerator.nextExecution()
}
}

// Ad-hoc fixed execution injection for lin-checker
class FixedBehaviourExecutionGenerator(testConfiguration: CTestConfiguration, testStructure: CTestStructure)
: ExecutionGenerator(testConfiguration, testStructure) {

override fun nextExecution(): List<List<Actor>> {
val parallelGroup = ArrayList(testStructure.actorGenerators)
val actorsPerThread = ArrayList<MutableList<Actor>>()
for (i in testConfiguration.threadConfigurations.indices) {
actorsPerThread.add(ArrayList())
}


actorsPerThread[0].add(actorMethod(parallelGroup, "receive1"))
actorsPerThread[0].add(actorMethod(parallelGroup, "receive2"))
actorsPerThread[0].add(actorMethod(parallelGroup, "close1"))

actorsPerThread[1].add(actorMethod(parallelGroup, "send2"))
actorsPerThread[1].add(actorMethod(parallelGroup, "send1"))

return actorsPerThread
}
}

0 comments on commit 7bd983f

Please sign in to comment.