-
Notifications
You must be signed in to change notification settings - Fork 29
/
task.scala
115 lines (96 loc) · 3.17 KB
/
task.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package com.thoughtworks.dsl
import com.thoughtworks.dsl.Dsl.{!!, Continuation, reset}
import com.thoughtworks.dsl.keywords.Shift
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future, Promise, SyncVar}
import scala.util.control.{NonFatal, TailCalls}
import scala.util.{Failure, Success, Try}
import scala.util.control.TailCalls.TailRec
/**
* @author 杨博 (Yang Bo)
*/
object task {
type TaskDomain = TailRec[Unit] !! Throwable
type Task[+A] = TaskDomain !! A
object Task {
@inline
def now[A](a: A): Task[A] = _(a)
@inline
def delay[A](f: () => A): Task[A] = _(f())
@inline
def reset[A](a: => A): Task[A] @reset = delay(a _)
/** Returns a task that does nothing but let the succeeding tasks run on `executionContext`
*
* @example All the code after a `!switchExecutionContext` should be executed on `executionContext`
* {{{
* import com.thoughtworks.dsl.task.Task
* import org.scalatest.Assertion
* import scala.concurrent.ExecutionContext
* import com.thoughtworks.dsl.keywords.Shift.implicitShift
* def myTask: Task[Assertion] = _ {
* val originalThread = Thread.currentThread
* !Task.switchExecutionContext(ExecutionContext.global)
* Thread.currentThread should not be originalThread
* }
*
* Task.toFuture(myTask)
*
* }}}
*/
@inline
def switchExecutionContext(executionContext: ExecutionContext): Task[Unit] = { continue => failureHandler =>
executionContext.execute(new Runnable {
@inline
private def stackSafeRun(): TailRec[Unit] = {
val protectedContinuation = try {
continue(())
} catch {
case NonFatal(e) =>
return failureHandler(e)
}
protectedContinuation(failureHandler)
}
@noinline
def run(): Unit = stackSafeRun().result
})
TailCalls.done(())
}
@inline
def join[Element, That](element: Element)(
implicit canBuildFrom: CanBuildFrom[Nothing, Element, That]): Task[That] @reset = now {
(canBuildFrom() += element).result()
}
def onComplete[A](task: Task[A])(continue: (Try[A] => Unit)) = {
Continuation
.toTryContinuation(task) { result =>
TailCalls.done(continue(result))
}
.result
}
def blockingAwait[A](task: Task[A], timeout: Duration = Duration.Inf): A = {
val syncVar = new SyncVar[Try[A]]
Continuation
.toTryContinuation(task) { result =>
syncVar.put(result)
TailCalls.done(())
}
.result
if (timeout.isFinite()) {
syncVar.take(timeout.toMillis).get
} else {
syncVar.take.get
}
}
def toFuture[A](task: Task[A]): Future[A] = {
val promise = Promise[A]()
Continuation
.toTryContinuation(task) { tryResult =>
promise.complete(tryResult)
TailCalls.done(())
}
.result
promise.future
}
}
}