Skip to content
This repository
tree: f19a7a6531
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 102 lines (92 sloc) 4.285 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/

package akka

import language.implicitConversions

import scala.util.continuations._
import scala.concurrent.{ Promise, Future, ExecutionContext }
import scala.util.control.NonFatal
import scala.util.Failure

package object dataflow {
  /**
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
* Continuations plugin.
*
* Within the block, the result of a Future may be accessed by calling Future.apply. At that point
* execution is suspended with the rest of the block being stored in a continuation until the result
* of the Future is available. If an Exception is thrown while processing, it will be contained
* within the resulting Future.
*
* This allows working with Futures in an imperative style without blocking for each result.
*
* Completing a Future using 'Promise << Future' will also suspend execution until the
* value of the other Future is available.
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
  def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
    val p = Promise[A]
    executor.execute(
      new Runnable {
        def run = try {
          (reify(body) foreachFull (r p.success(r).future, f p.failure(f).future): Future[Any]) onFailure {
            case NonFatal(e) p tryComplete Failure(e)
          }
        } catch {
          case NonFatal(e) p tryComplete Failure(e)
        }
      })
    p.future
  }

  implicit class DataflowPromise[T](val promise: Promise[T]) extends AnyVal {

    /**
* Completes the Promise with the specified value or throws an exception if already
* completed. See Promise.success(value) for semantics.
*
* @param value The value which denotes the successful value of the Promise
* @return This Promise's Future
*/
    final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
      cont: (Future[T] Future[Any]) cont(promise.success(value).future)
    }

    /**
* Completes this Promise with the value of the specified Future when/if it completes.
*
* @param other The Future whose value will be transferred to this Promise upon completion
* @param ec An ExecutionContext which will be used to execute callbacks registered in this method
* @return A Future representing the result of this operation
*/
    final def <<(other: Future[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = shift {
      cont: (Future[T] Future[Any])
        val fr = Promise[Any]()
        (promise completeWith other).future onComplete {
          v try { fr completeWith cont(promise.future) } catch { case NonFatal(e) fr failure e }
        }
        fr.future
    }

    /**
* Completes this Promise with the value of the specified Promise when/if it completes.
*
* @param other The Promise whose value will be transferred to this Promise upon completion
* @param ec An ExecutionContext which will be used to execute callbacks registered in this method
* @return A Future representing the result of this operation
*/
    final def <<(other: Promise[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = <<(other.future)

    /**
* For use only within a flow block or another compatible Delimited Continuations reset block.
*
* Returns the result of this Promise without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
    final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T Future[Any]))
  }

  implicit class DataflowFuture[T](val future: Future[T]) extends AnyVal {
    /**
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
* Returns the result of this Future without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
    final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(future flatMap (_: T Future[Any]))
  }
}
Something went wrong with that request. Please try again.