Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
357 lines (245 sloc) 7.76 KB

scalaz.concurrent.Task

Why? Why not?

IndyScala, November 3, 2014


Probably 1 of 3? Yeah, probably.

  1. scalaz.concurrent.Task (November, 2014)
  2. scalaz-stream (???)
  3. http4s (???)

Ye Olden Tymes

Way back in Scala 2.9.2

  • java.util.concurrent.Future ☠☠☠
    • com.google.common.util.concurrent.ListenableFuture
  • scala.actors.Future
  • akka.dispatch.Future
  • com.twitter.util.Future
  • scalaz.concurrent.Promise

That begat sff4s

!scala
val factory = sff4s.impl.ActorsFuture
val f = factory future {
   Thread.sleep(1000)
   1
}
val g = f map { _ + 1 }
g(2000) 

... which was nice for unopinionated frameworks, but never really caught on.


Enter SIP-14

  • Futures and Promises
  • Introduced in Scala 2.10
  • Backported to Scala 2.9.3
  • You know it as scala.concurrent.Future
    • You know, the thing ? returns from an actor.

Standardization???

  • java.util.concurrent.Future ☠☠☠
    • com.google.common.util.concurrent.ListenableFuture
    • java.util.concurrent.CompletableFuture
  • scala.actors.Future
  • scalaz.concurrent.Promise
  • akka.dispatch.Future
  • com.twitter.util.Future
  • scala.concurrent.Future
  • scalaz.concurrent.Promise
  • scalaz.concurrent.Task

This is why we can't have nice things


Wrong presentation

  • Tomorrow night is why we can't have nice things.
  • For concurrent programming in Scala, we do have two rather nice things.

A review of scala.concurrent.Future

!scala
def isOk(url: String): Future[Boolean] = get(url)
  .map { _.status == 200 } // If we got a response, was it a 200?
  .recover { case e => e.printStackTrace(); false}
val sites = Seq("http://twitter.com", "http://github.com/")
val panic = Future.fold(sites.map(isOk))(false) {
  (acc, isSiteOk) => acc || !isSiteOk
}
println("Panic: "+Await.result(panic, 10.seconds))
  • A Future is an asynchronous computation.
  • A Future tries (that is, scala.util.Trys) to complete that computation.
  • Yup. Future is a monad.

Enter scalaz.concurrent.Future

  • Like Scala Future, a Scalaz Future is an asynchronous computation.
  • Yup. This Future is a monad, too.
  • But there's no error handling in this monad.

Enter scalaz.concurrent.FutureTask

!scala
def isDown(url: String): Future[Boolean] = get(url)
  .map { _.status != 200 } // If we got a response, was it a 200?
  .recover { case e => e.printStackTrace(); false}
val sites = Seq("http://twitter.com", "http://github.com/")
val panic = Future.fold(sites.map(isDown))(false)(_ || _)
println("Panic: "+Await.result(panic, 10.seconds))
  • A Task[A] is a thin wrapper around Future[Throwable \/ A].
    • scalaz.\/ is commonly pronounced as either.
      • Because either it worked or it didn't.
      • Yup. \/ is a monad. (scala.Either, incidentally, is not.)
    • Yup. Task is a monad.

So the difference is just in method names?

  • This is just more of that NIH Syndrome that Scala is famous for, isn't it?

No.


Future.apply immediately submits to executor

!scala
Future { TheNukes.launch() }

Mushroom cloud

By Federal Government of the United States [Public domain], via Wikimedia Commons


Tasks don't run ...

!scala
Task { TheNukes.launch() }

Bucolic scene

Alison Rawson CC-BY-SA-2.0, via Wikimedia Commons


... until you ask them to.

!scala
val task = Task { TheNukes.launch() }
headForShelter()
task.run

Mushroom cloud


A Future's side effects run once.

!scala
val f = Future { TheNukes.launch() }
Await.result(f, 1.second)
Await.result(f, 1.second)

Mushroom cloud


A Task's side effects run each time.

!scala
val t = Task { TheNukes.launch() }
t.run
t.run

Mushroom cloud Mushroom cloud


I like how Future does it.

Fine.

!scala
val t = Task.unsafeStart { TheNukes.launch() }
t.run
t.run

Mushroom cloud


Site monitor reprise

!scala
val f = Future.fold(sites.map(isDown))(false)(_ || _)
val t = Task.reduceUnordered[Boolean, Boolean](sites.map(isDown))
  • f will always contain the same value when completed.
  • t will rerun the monitoring and produce a current value on each run.

Future methods that submit to executor:

  • andThen
  • collect
  • filter
  • flatMap
  • foreach
  • onFailure
  • onSuccess
  • recover
  • transform
  • withFilter

Task methods that submit to executor:

[This slide intentionally left blank]


Execution model

  • Future makes you opt out of thread hopping by explicitly setting an executor that stays on the same thread.

    • It's easy to saturate your thread pool.
    • It's also easy to waste time submitting trivial work to it.
    • For optimal performance, swap custom execution contexts into the hotspots.
  • Task makes you opt into thread hopping.

    • It's easy to accidentally only use one core in your complicated
    • But it'll work really efficiently on that one core.
    • For optimal performance, salt to taste with Task.apply and Task.fork
  • Bottom line: both ultimately offer the same control, promote a different kind of naive mistake.


Hey, what about that Twitter thing?

  • Twitter's Future is cancellable.
  • Scala's Future is not cancellable.
  • Scalaz's Task is cancellable.

Tasks are cancellable

!scala
val neverMind = new AtomicBoolean(false)
Task {
  Thread.sleep(500)
  neverMind.set(true)
}.runAsync {
  case -\/(t) => t.printStackTrace()
  case \/-(()) => println("Cancelled")
}
val t = Task.delay {
  Thread.sleep(1000)
  TheNukes.launch()
}
t.runAsyncInterruptibly({
  case -\/(t) => t.printStackTrace()
  case \/-(()) => println("Completed")
}, neverMind)

bucolic cow


... but not to the point of nuclear safety

!scala
val neverMind = new AtomicBoolean(false)
Task {
  Thread.sleep(500)
  neverMind.set(true)
}.runAsync {
  case -\/(t) => t.printStackTrace()
  case \/-(()) => println("Cancelled")
}

val t = Task.delay {
  Thread.sleep(1000)
  TheNukes.launch()
}
t.runAsyncInterruptibly({
  case -\/(t) => t.printStackTrace()
  case \/-(()) => println("Completed")
}, neverMind)

Mushroom cloud


Futures are for Typesafe people and Tasks are for Typelevel people?


Usually, but it doesn't have to be that way.

handshake


Convert a Task to a Future

!scala
val p = Promise[A]()
task.runAsync {
  case \/-(a) => p.success(a)
  case -\/(t) => p.failure(t)
}
p.future

Convert a Future to a Task

!scala
Task.async { f =>
  future.onComplete {
    case Success(a) => f(right(a))
    case Failure(t) => f(left(t))
  }
}

Futher reading


Ross A. Baker

You can’t perform that action at this time.