-
Notifications
You must be signed in to change notification settings - Fork 26
/
INode.scala
172 lines (159 loc) · 6.42 KB
/
INode.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package sbt
import java.lang.Runnable
import java.util.concurrent.{atomic, Executor, LinkedBlockingQueue}
import atomic.{AtomicBoolean, AtomicInteger}
import Types.{:+:, Id}
object EvaluationState extends Enumeration {
val New, Blocked, Ready, Calling, Evaluated = Value
}
abstract class EvaluateSettings[Scope]
{
protected val init: Init[Scope]
import init._
protected def executor: Executor
protected def compiledSettings: Seq[Compiled[_]]
import EvaluationState.{Value => EvaluationState, _}
private[this] val complete = new LinkedBlockingQueue[Option[Throwable]]
private[this] val static = PMap.empty[ScopedKey, INode]
private[this] def getStatic[T](key: ScopedKey[T]): INode[T] = static get key getOrElse error("Illegal reference to key " + key)
private[this] val transform: Initialize ~> INode = new (Initialize ~> INode) { def apply[T](i: Initialize[T]): INode[T] = i match {
case k: Keyed[s, T] => single(getStatic(k.scopedKey), k.transform)
case a: Apply[hl,T] => new MixedNode(a.inputs transform transform, a.f)
case u: Uniform[s, T] => new UniformNode(u.inputs map transform.fn[s], u.f)
case b: Bind[s,T] => new BindNode[s,T]( transform(b.in), x => transform(b.f(x)))
case v: Value[T] => constant(v.value)
case o: Optional[s,T] => o.a match {
case None => constant( () => o.f(None) )
case Some(i) => single[s,T](transform(i), x => o.f(Some(x)))
}
}}
private[this] val roots: Seq[INode[_]] = compiledSettings flatMap { cs =>
(cs.settings map { s =>
val t = transform(s.init)
static(s.key) = t
t
}): Seq[INode[_]]
}
private[this] var running = new AtomicInteger
private[this] var cancel = new AtomicBoolean(false)
def run(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
{
assert(running.get() == 0, "Already running")
startWork()
roots.foreach( _.registerIfNew() )
workComplete()
complete.take() foreach { ex =>
cancel.set(true)
throw ex
}
getResults(delegates)
}
private[this] def getResults(implicit delegates: Scope => Seq[Scope]) = (empty /: static.toTypedSeq) { case (ss, static.TPair(key, node)) => ss.set(key.scope, key.key, node.get) }
private[this] val getValue = new (INode ~> Id) { def apply[T](node: INode[T]) = node.get }
private[this] def submitEvaluate(node: INode[_]) = submit(node.evaluate())
private[this] def submitCallComplete[T](node: BindNode[_, T], value: T) = submit(node.callComplete(value))
private[this] def submit(work: => Unit): Unit =
{
startWork()
executor.execute(new Runnable { def run = if(!cancel.get()) run0(work) })
}
private[this] def run0(work: => Unit): Unit =
{
try { work } catch { case e => complete.put( Some(e) ) }
workComplete()
}
private[this] def startWork(): Unit = running.incrementAndGet()
private[this] def workComplete(): Unit =
if(running.decrementAndGet() == 0)
complete.put( None )
private[this] sealed abstract class INode[T]
{
private[this] var state: EvaluationState = New
private[this] var value: T = _
private[this] val blocking = new collection.mutable.ListBuffer[INode[_]]
private[this] var blockedOn: Int = 0
private[this] val calledBy = new collection.mutable.ListBuffer[BindNode[_, T]]
override def toString = getClass.getName + " (state=" + state + ",blockedOn=" + blockedOn + ",calledBy=" + calledBy.size + ",blocking=" + blocking.size + "): " +
( (static.toSeq.flatMap { case (key, value) => if(value eq this) key.toString :: Nil else Nil }).headOption getOrElse "non-static")
final def get: T = synchronized {
assert(value != null, toString + " not evaluated")
value
}
final def doneOrBlock(from: INode[_]): Boolean = synchronized {
val ready = state == Evaluated
if(!ready) blocking += from
registerIfNew()
ready
}
final def isDone: Boolean = synchronized { state == Evaluated }
final def isNew: Boolean = synchronized { state == New }
final def isCalling: Boolean = synchronized { state == Calling }
final def registerIfNew(): Unit = synchronized { if(state == New) register() }
private[this] def register()
{
assert(state == New, "Already registered and: " + toString)
val deps = dependsOn
blockedOn = deps.size - deps.count(_.doneOrBlock(this))
if(blockedOn == 0)
schedule()
else
state = Blocked
}
final def schedule(): Unit = synchronized {
assert(state == New || state == Blocked, "Invalid state for schedule() call: " + toString)
state = Ready
submitEvaluate(this)
}
final def unblocked(): Unit = synchronized {
assert(state == Blocked, "Invalid state for unblocked() call: " + toString)
blockedOn -= 1
assert(blockedOn >= 0, "Negative blockedOn: " + blockedOn + " for " + toString)
if(blockedOn == 0) schedule()
}
final def evaluate(): Unit = synchronized { evaluate0() }
protected final def makeCall(source: BindNode[_, T], target: INode[T]) {
assert(state == Ready, "Invalid state for call to makeCall: " + toString)
state = Calling
target.call(source)
}
protected final def setValue(v: T) {
assert(state != Evaluated, "Already evaluated (trying to set value to " + v + "): " + toString)
value = v
state = Evaluated
blocking foreach { _.unblocked() }
blocking.clear()
calledBy foreach { node => submitCallComplete(node, value) }
calledBy.clear()
}
final def call(by: BindNode[_, T]): Unit = synchronized {
registerIfNew()
state match {
case Evaluated => submitCallComplete(by, value)
case _ => calledBy += by
}
}
protected def dependsOn: Seq[INode[_]]
protected def evaluate0(): Unit
}
private[this] def constant[T](f: () => T): INode[T] = new MixedNode[HNil, T](KNil, _ => f())
private[this] def single[S,T](in: INode[S], f: S => T): INode[T] = new MixedNode[S :+: HNil, T](in :^: KNil, hl => f(hl.head))
private[this] final class BindNode[S,T](in: INode[S], f: S => INode[T]) extends INode[T]
{
protected def dependsOn = in :: Nil
protected def evaluate0(): Unit = makeCall(this, f(in.get) )
def callComplete(value: T): Unit = synchronized {
assert(isCalling, "Invalid state for callComplete(" + value + "): " + toString)
setValue(value)
}
}
private[this] final class UniformNode[S,T](in: Seq[INode[S]], f: Seq[S] => T) extends INode[T]
{
protected def dependsOn = in
protected def evaluate0(): Unit = setValue( f(in.map(_.get)) )
}
private[this] final class MixedNode[HL <: HList, T](in: KList[INode, HL], f: HL => T) extends INode[T]
{
protected def dependsOn = in.toList
protected def evaluate0(): Unit = setValue( f( in down getValue ) )
}
}