s_mach.concurrent is an open-source Scala library that provides asynchronous serial and parallel execution flow control primitives for working with asynchronous tasks. An asynchronous task consists of two or more calls to function(s) that return a future result A ⇒ Future[B] instead of the result A ⇒ B. s_mach.concurrent also provides utility &…
Scala Shell
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
changelogs
project
src
.gitignore
.travis.yml
LICENSE
README.asciidoc
build.sbt
version.sbt

README.asciidoc

s_mach.concurrent: Futures utility library

Build Status Test Coverage Codacy Badge Scaladocs: 2.11 2.12

Include in SBT

  1. Add to build.sbt

    libraryDependencies += "net.s_mach" %% "concurrent" % "2.0.0"
    Note
    s_mach.concurrent is cross compiled for Scala 2.11/JDK6 and 2.12/JDK8

Why do I need this?

You love Scala Futures and functional concurrent coding! But you…​

  • Wish there were methods for controlling how multiple Futures execute (such as retrying failures, throttling or controlling parallelism)

  • Hate all the boilerplate needed to create multiple futures that run in parallel

  • Didn’t realize Future.sequence only returns the first exception (and throws away the rest! you didn’t want those anyways right?)

  • Didn’t realize Future.sequence doesn’t fail immediately (a quick failure on an hour long Future.sequence might not return the failure for an hour!)

  • Wish Future had common idioms such as fold, flatten, etc

  • Are annoyed that you have to use Java’s ScheduledExecutorService for delayed or periodically repeating tasks (or even worse - an ActorSystem)

Quick look

Express parallel Future operations in far less lines of code:

val read: String => Future[Int] = ???

// Turn this parallel future boilerplate:
val f1 = read("1")
val f2 = read("2")
val f3 = read("3")
val future : Future[(Int,Int,Int)] =
  for {
    i1 <- f1
    i2 <- f2
    i3 <- f3
  } yield (i1,i2,i3)

// Into this:
val future : Future[(Int,Int,Int)] =
  async.par.run(read("1"), read("2"), read("3"))

Create async tasks and configure them to:

  • Retry errors

  • Control the number of simultaneous operations

  • Limit the number of operations per second

val read : String => Future[Int] = ???
val read2 : Int => Future[Double] = ???
val read3 : Double => Future[String] = ???

// Tuple-based async task (heterogeneous)
val future : Future[(Int,Double,String)] =
  async // start a new async task
    .par(2) // run 2 ops at a time
    .progress(1.second)(progress => println(progress)) // print progress once a second
    .retry { // retry some errors with a random delay
      case _:TimeoutException =>
        Future.delayed(Random.nextInt(100).millis)(true)
      case _ => false.future
    }
    .run(
      read("1"),
      read2(2),
      read3(3.0)
    ) // sequence results and fail-immediately on exception (unlike Future.sequence)
    // If the task fails, get the full list of exceptions (unlike Future.sequence)

// Collection-based async task (homogeneous)
val future : Future[List[Int]] =
  List("1","2","3")
    .async // start a new async task
    .throttle(3.second) // perform operations no faster than 1 every 3 seconds
    .retry { // retry some errors with a random delay
      case _:TimeoutException =>
        Future.delayed(Random.nextInt(100).millis)(true)
      case _ => false.future
    }
    .map(read) // sequence results and fail-immediately on exception (unlike Future.sequence)
    // If the task fails, get the full list of exceptions (unlike Future.sequence)

Save your async task config for later reuse:

val myAsyncConfig =
  async
    .par(2) // run 2 ops at a time
    .progress(1.second)(progress => println(progress)) // print progress once a second
    .retry { // retry some errors with a random delay
      case _:TimeoutException =>
        Future.delayed(Random.nextInt(100).millis)(true)
      case _ => false.future
    }

// Reuse config on tuple-based async task
myAsyncConfig.run("1","2","3")

// Reuse config on collection-based async task
List("1","2","3").async.using(myAsyncConfig).map(read)

Convenience and utility methods:

  • A.future - stop typing Future.successful(A)

  • Future.get - Await.result without the flagellation

  • Future.flatten - flatten Future[Future[A]] into Future[A]

  • Future.toTry - expand Future[A] to Future[Try[A]] that always succeeds

  • Future.fold - fold Future[A] (which is either Success[A] or Failure[A]) to Future[B] that always succeeds

  • Collection[Future[A]].sequence - idiomatic version of Future.sequence(A) and less typing

  • Future.onTimeout - do something if a Future doesn’t complete within a given amount of time

  • Future.happensBefore - start a Future after another Future completes

  • Future.sideEffect - compose a Future and a side effect (that starts after the Future completes) into a new Future. The new Future completes successfully only after both the original Future and the side effect successfully complete. If either the original Future or the side effect fail, the composed Future fails.

  • Future.delayed or ScheduledExecutionContext.schedule - start a Future after a delay

  • ScheduledExecutionContext.scheduleCancellable - start a Future after a delay and optionally cancel the future if it hasn’t started yet. If cancelled, instead of throwing an exception the future completes successfully with a value you specify!

  • ScheduledExecutionContext.scheduleAtFixedRate - create a repeating task that can be paused or cancelled

Overview

s_mach.concurrent is an open-source Scala library that provides asynchronous serial and parallel execution flow control primitives for working with asynchronous tasks. An asynchronous task consists of two or more calls to function(s) that return a future result A => Future[B] instead of the result A => B. s_mach.concurrent also provides utility & convenience code for working with scala.concurrent.Future.

  • Adds concurrent flow control primitives async and async.par for performing fixed size heterogeneous (tuple) and variable size homogeneous (collection) asynchronous tasks. These primitives:

    • Allow enabling optional progress reporting, failure retry and/or throttle control for asynchronous tasks

    • Ensure proper sequencing of returned futures, e.g. given f: Int => Future[String]:

      • List(1,2,3).async.map(f) returns Future[List[String]]

      • async.par.run(f(1),f(2),f(3)) returns Future[(String,String,String)]

    • Ensure fail-immediate sequencing of future results (see the 'Under the hood: Merge' section for details)

    • Ensure all exceptions generated during asynchronous task processing can be retrieved (Future.sequence returns only the first)

  • collection.async and collection.async.par support collection operations such as map, flatMap and foreach on asynchronous functions, i.e. A => Future[B]

  • async.par.run(future1, future2, ...) supports running fixed size heterogeneous asynchronous task (of up to 22 futures) in parallel

  • Adds ScheduledExecutionContext, a Scala interface wrapper for java.util.concurrent.ScheduledExecutorService that provides for scheduling delayed and periodic tasks

  • Adds non-blocking concurrent control primitives such as Barrier, Latch, Lock, Semaphore and AtomicFSM

  • Adds future utility methods such as Future.onTimeout, Future.sideEffect and Future.happensBefore

  • Provides convenience methods for writing more readable, concise and DRY code such as Future.get, Future.toTry and Future.fold

Versioning

s_mach.concurrent uses semantic versioning (http://semver.org/). s_mach.concurrent does not use the package private modifier. Instead, all code files outside of the s_mach.concurrent.impl package form the public interface and are governed by the rules of semantic versioning. Code files inside the s_mach.concurrent.impl package may be used by downstream applications and libraries. However, no guarantees are made as to the stability or interface of code in the s_mach.concurrent.impl package between versions.

Imports for Examples

All code examples assume the following imports:

import scala.util._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import s_mach.concurrent._
import s_mach.concurrent.util._

implicit val scheduledExecutionContext = ScheduledExecutionContext(2)
case class Item(id: String, value: Int, relatedItemId: String)
def read(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def readFail(id: String) : Future[Item] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }
def longRead(id: String) : Future[Item] = Future { Thread.sleep(2000); println(id); Item(id,id.toInt,(id.toInt+1).toString) }
def write(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); true }
def writeFail(id: String, item: Item) : Future[Boolean] = Future { Thread.sleep(1000); println(id); throw new RuntimeException(id.toString) }

Asynchronously transform or traverse collections

A common task when working with futures is transforming or traversing a collection in serial or parallel that will call a function that returns a future. With only a few levels of nesting, the standard idioms for accomplishing this lead to difficult to read code. In the following example, a collection of ten identifiers is grouped to batch identifier reads. The flow of execution for each batch is serial while the flow of the identifiers within each batch is parallel.

Example 1: Transform and traverse collections, standard idiom
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch
        // Serially perform read of each batch
        .foldLeft(Future.successful(List[Item]())) { (facc, idBatch) =>
          for {
            acc <- facc
            // Parallel read batch
            oomItem <- Future.sequence(idBatch.map(read))
          } yield acc ::: oomItem
        }
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(2).toList
    oomResult <- {
      println("Writing...")
      oomNewItemBatch
        // Serially perform write of each batch
        .foldLeft(Future.successful(List[Boolean]())) { (facc, itemBatch) =>
          for {
            acc <- facc
            // Parallel write batch
            oomResult <- Future.sequence(itemBatch.map(item => write(item.id, item)))
          } yield acc ::: oomResult
        }
    }
  } yield oomResult.forall(_ == true)
}

The same code, rewritten using async and async.par:

Example 2: Using async and async.par to transform and traverse collections:
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch.async.flatMap(_.async.par.map(read))
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
    oomResult <- {
      println("Writing...")
      oomNewItemBatch.async.flatMap(_.async.par.map(item => write(item.id, item)))
    }
  } yield oomResult.forall(_ == true)
}

Limiting the maximum number of simultaneous workers

async.par allows specifying the maximum number of simultaneous workers used during an asynchronous task. In the following example, batches are processed in parallel with at most two workers, while each identifier within a batch is processed with at most four workers.

Example 3: Using s_mach.concurrent workers to transform and traverse collections:
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch.async.par(2).flatMap(_.async.par(4).map(read))
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
    oomResult <- {
      println("Writing...")
      oomNewItemBatch.async.par(2).flatMap(_.async.par(4).map(item => write(item.id, item)))
    }
  } yield oomResult.forall(_ == true)
}

Adding progress reporting, retry and throttle control to asynchronous tasks

async and async.par can be optionally modified to report progress, retry failures and/or limit iteration speed to a specific time period for asynchronous tasks. In the following example, completion of each batch reports progress and batches may not complete faster than one every three seconds. For each identifier that is read and fails, the first three TimeoutExceptions or SocketTimeoutExceptions are retried. All other exceptions cause the entire task to fail.

Example 4: Adding progress reporting, retry and throttle control to collection concurrent operations
val oomItemIdBatch = (1 to 10).toList.map(_.toString).grouped(2).toList
val future = { // necessary for pasting into repl
  for {
    oomItem <- {
      println("Reading...")
      oomItemIdBatch
        .async
        .progress(1.second)(progress => println(progress))
        .throttle(3.seconds)
        .flatMap { batch =>
          batch
            .async.par
            // Retry at most first 3 timeout and socket exceptions after delaying 100 milliseconds
            .retry {
              case (_: TimeoutException) :: tail if tail.size < 3 =>
                Future.delayed(100.millis)(true)
              case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
                Future.delayed(100.millis)(true)
              case _ => false.future
            }
            .map(read)
        }
    }
    _ = println("Computing...")
    oomNewItemBatch = oomItem.map(item => item.copy(value = item.value + 1)).grouped(10).toVector
    oomResult <- {
      println("Writing...")
      oomNewItemBatch.workers(2).flatMap(_.workers(4).map(item => write(item.id, item)))
    }
  } yield oomResult.forall(_ == true)
}

async.par workflow for fixed size heterogeneous asynchronous tasks

When first using Future with a for-comprehension, it is natural to assume the following will produce parallel operation:

Example 5: Does not execute futures in parallel
for {
  i1 <- read("1")
  i2 <- read("2")
  i3 <- read("3")
} yield (i1,i2,i3)

Sadly, this code will compile and run just fine, but it will not execute in parallel. To correctly implement parallel operation, the following standard pattern is used:

Example 6: Correct Future parallel operation:
val f1 = read("1")
val f2 = read("2")
val f3 = read("3")
val future = { // necessary for pasting into repl
  for {
    i1 <- f1
    i2 <- f2
    i3 <- f3
  } yield (i1,i2,i3)
}

For parallel operation, all of the futures must be started before the for-comprehension. The for-comprehension is a monadic workflow which captures commands that must take place in a specific sequential order. The pattern in Example 6 is necessary because Scala lacks an applicative workflow which captures commands that may be run in any order. s_mach.concurrent adds the async.par.run workflow which is an applicative workflow specifically for fixed size heterogeneous asynchronous tasks. This workflow can more concisely express the pattern above.

In the example below, all futures are started at the same time by async.par.run which returns a Future[(Int,Int,Int)] that completes once all supplied futures complete. After this returned future completes, the tuple value results can be extracted using normal Scala idioms.

Example 7: async.par.run workflow
for {
  (i1,i2,i3) <- async.par.run(read("1"), read("2"), read("3"))
} yield (i1,i2,i3)

Additionally, all of the configuration options available for collection.async.par are valid for async.par.run. In the example below, the number of workers is limited to two, progress is reported once a second and certain failures are retried.

Example 8: async.par.run workflow with two workers, progress reporting and failure retry
for {
  (i1,i2,i3) <-
    async
      .par(2)
      .progress(1.second)(progress => println(progress))
      .retry {
        case (_: TimeoutException) :: tail if tail.size < 3 =>
          Future.delayed(100.millis)(true)
        case (_: SocketTimeoutException) :: tail if tail.size < 3 =>
          Future.delayed(100.millis)(true)
        case _ => false.future
      }
      .run(
        read("1"),
        read("2"),
        read("3")
      )
} yield (i1,i2,i3)

Under the hood: Merge function

The async and async.par primitives utilize the merge and flatMerge sequencing functions to ensure that execution ends immediately once a failure occurs. This is in contrast to Future.sequence which may not always fail immediately when a failure occurs.

The merge function performs the same function as Future.sequence (it calls Future.sequence internally) but it ensures that the returned future completes immediately after an exception occurs in any of the futures. Because Future.sequence waits on all futures in left to right order before completing, an exception thrown at the beginning of the computation by a future at the far right will not be detected until after all other futures have completed. For long running computations, this can mean a significant amount of wasted time waiting on futures to complete whose results will be discarded.

Additionally, while the scala parallel collections correctly handle multiple parallel exceptions, Future.sequence only returns the first exception encountered. In Future.sequence, all further exceptions past the first are discarded. The merge and flatMerge methods fix these problems by throwing AsyncParThrowable. AsyncParThrowable has a member method to access both the first exception thrown and a future of all exceptions thrown during the computation.

Example 9: Future.sequence gets stuck waiting on longRead to complete and only returns the first exception:
scala> val t = Future.sequence(Vector(longRead("1"),readFail("2"),readFail("3"),read("4"))).getTry
3
4
2
1
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(java.lang.RuntimeException: 2)

scala>
Example 10: merge method fails immediately on the first exception and throws AsyncParThrowable which can retrieve all exceptions:
scala> val t = Vector(longRead("1"),readFail("2"),readFail("3"),read("4")).merge.getTry
2
t: scala.util.Try[scala.collection.immutable.Vector[Item]] = Failure(AsyncParThrowable(java.lang.RuntimeException: 2))
3

scala> 4
1

scala> val allFailures = t.failed.get.asInstanceOf[AsyncParThrowable].allFailure.get
allFailures: Vector[Throwable] = Vector(java.lang.RuntimeException: 2, java.lang.RuntimeException: 3)