/
Await.scala
87 lines (78 loc) · 3.1 KB
/
Await.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.thoughtworks.dsl.keywords
import com.thoughtworks.dsl.Dsl
import com.thoughtworks.dsl.Dsl.{!!, Keyword}
import scala.concurrent.Await.result
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.language.implicitConversions
/** [[Await]] is a [[Dsl.Keyword Keyword]] to extract value from a [[scala.concurrent.Future]].
*
* This keyword is available in functions whose return types are
* [[scala.concurrent.Future Future]], [[domains.task.Task]],
* or any exception aware continuations as `(_ !! Throwable !! _)`.
*
* @example Given a [[scala.concurrent.Future Future]]:
* {{{
* import scala.concurrent.Future
* val myFuture40 = Future {
* 40
* }
* }}}
*
* It can be [[Await]] in another [[scala.concurrent.Future Future]]
*
* {{{
* import scala.concurrent.Future
* def myFuture42 = Future {
* !Await(myFuture40) + 2
* }
* }}}
*
* It can be converted to a [[domains.task.Task]]
* with the help of [[Await]].
*
* {{{
* import com.thoughtworks.dsl.domains.task._
* import com.thoughtworks.dsl.keywords._
* val myTask = Task {
* !Await(myFuture42)
* }
* }}}
*
* Then a [[domains.task.Task]] can be converted back to a [[scala.concurrent.Future]]
* via [[domains.task.Task.toFuture]].
*
* {{{
* val myAssertionTask = Task {
* !Shift(myTask) should be(42)
* }
* Task.toFuture(myAssertionTask)
* }}}
* @author 杨博 (Yang Bo)
*/
final case class Await[Value](future: Future[Value]) extends AnyVal with Keyword[Await[Value], Value]
object Await {
implicit def implicitAwait[Value](future: Future[Value]): Await[Value] = Await[Value](future)
implicit def streamAwaitDsl[Value, That](
implicit executionContext: ExecutionContext): Dsl[Await[Value], Stream[Future[That]], Value] =
new Dsl[Await[Value], Stream[Future[That]], Value] {
def cpsApply(keyword: Await[Value], handler: Value => Stream[Future[That]]): Stream[Future[That]] = {
import keyword.future
val futureOfStream = future.map(handler)
new Stream.Cons(futureOfStream.flatMap(_.head), result(futureOfStream, Duration.Inf).tail)
}
}
implicit def awaitDsl[Value, That](
implicit executionContext: ExecutionContext): Dsl[Await[Value], Future[That], Value] =
new Dsl[Await[Value], Future[That], Value] {
def cpsApply(keyword: Await[Value], handler: Value => Future[That]): Future[That] = {
keyword.future.flatMap(handler)
}
}
implicit def continuationAwaitDsl[Value](
implicit executionContext: ExecutionContext): Dsl[Await[Value], Unit !! Throwable, Value] =
new Dsl[Await[Value], Unit !! Throwable, Value] {
def cpsApply(keyword: Await[Value], handler: Value => Unit !! Throwable): Unit !! Throwable =
!!.fromTryContinuation(keyword.future.onComplete)(handler)
}
}