Skip to content

Commit

Permalink
Add ManualExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
NthPortal committed Feb 24, 2017
1 parent cc6cc4b commit 2007c27
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 0 deletions.
102 changes: 102 additions & 0 deletions src/main/scala/com/nthportal/testing/concurrent/ManualExecutor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.nthportal.testing.concurrent

import java.util.concurrent.{ConcurrentLinkedQueue, Executor}

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext

/**
* An [[Executor]] whose tasks can be executed manually in
* the current thread when desired, for fine-grained control
* over when tasks are executed.
*/
final class ManualExecutor extends Executor {
private val queue = new ConcurrentLinkedQueue[Runnable]()

override def execute(runnable: Runnable) = queue.add(runnable)

/**
* Returns the number of tasks waiting to be executed.
*
* @return the number of tasks waiting to be executed
*/
def waitingTasks: Int = queue.size()

/**
* Executes all tasks waiting to be executed.
*
* @return the number of tasks executed
*/
def executeAll(): Int = executeAllImpl(0)

@tailrec
private def executeAllImpl(executed: Int): Int = {
val r = queue.poll()
if (r == null) executed
else {
r.run()
executeAllImpl(executed + 1)
}
}

/**
* Executes `count` tasks waiting to be executed, or all
* tasks if there are fewer waiting.
*
* @param count the maximum number of tasks to execute
*/
def executeAtMost(count: Int): Int = executeAtMostImpl(count, 0)

@tailrec
private def executeAtMostImpl(count: Int, executed: Int): Int = {
if (count <= 0) executed
else {
val r = queue.poll()
if (r == null) executed
else {
r.run()
executeAtMostImpl(count - 1, executed + 1)
}
}
}

/**
* Executes the next waiting task, if one exists.
*
* @return `true` if the next tasks was executed; `false`
* if there are no tasks waiting
*/
def tryExecuteNext(): Boolean = {
val r = queue.poll()
if (r == null) false
else {
r.run()
true
}
}

/**
* Executes the next waiting task.
*
* @throws NoSuchElementException if there is no task waiting
* to be executed
*/
@throws[NoSuchElementException]
def executeNext(): Unit = if (!tryExecuteNext()) throw new NoSuchElementException("No tasks to execute")

/**
* Contains this [[Executor]] as an `implicit` [[ExecutionContext]].
*/
object Implicits {
implicit val asExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(ManualExecutor.this)
}
}

object ManualExecutor {
/**
* Returns a new [[ManualExecutor]].
*
* @return a new ManualExecutor
*/
def apply(): ManualExecutor = new ManualExecutor
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.nthportal.testing.concurrent

import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

class ManualExecutorTest extends FlatSpec with Matchers {

import ManualExecutorTest._

behavior of "ManualExecutorTest"

it should "execute all waiting tasks" in {
val executor = ManualExecutor()
val executables = List.fill(3)(new Executable)

executables.foreach(executor.execute(_))
executor.waitingTasks should be (3)

executor.executeAll() should be (3)
executor.waitingTasks should be (0)
executables.forall(_.executed) should be (true)
}

it should "execute at most some number of waiting tasks" in {
val executor = ManualExecutor()
val executables = List.fill(3)(new Executable)

executables.foreach(executor.execute(_))
executor.waitingTasks should be (3)

executor.executeAtMost(2) should be (2)
executor.waitingTasks should be (1)
executables.head.executed should be (true)
executables(1).executed should be (true)

executor.executeAtMost(2) should be (1)
executor.waitingTasks should be (0)
executables.forall(_.executed) should be (true)

executor.executeAtMost(2) should be (0)
executor.waitingTasks should be (0)
}

it should "execute the next waiting task" in {
val executor = ManualExecutor()
val executables = List.fill(3)(new Executable)

executables.foreach(executor.execute(_))
executor.waitingTasks should be (3)

executor.executeNext()
executor.waitingTasks should be (2)
executables.head.executed should be (true)

executor.executeNext()
executor.waitingTasks should be (1)
executables(1).executed should be (true)

executor.executeNext()
executor.waitingTasks should be (0)
executables.forall(_.executed) should be (true)

a [NoSuchElementException] should be thrownBy {executor.executeNext()}
}

it should "behave properly as an `ExecutionContext`" in {
val executor = ManualExecutor()

import executor.Implicits._

val ex = new Executable
val f = Future {ex.run(); ex}

ex.executed should be (false)
f.isCompleted should be (false)

executor.executeAll()
ex.executed should be (true)
f.isCompleted should be (true)
Await.result(f, Duration.Zero) should be theSameInstanceAs ex
}

}

object ManualExecutorTest {
class Executable extends Runnable {
private var _executed: Boolean = false

override def run() = _executed = true

def executed: Boolean = _executed
}
}

0 comments on commit 2007c27

Please sign in to comment.