-
Notifications
You must be signed in to change notification settings - Fork 6
/
shared.scala
117 lines (101 loc) · 3.94 KB
/
shared.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package com.thoughtworks.raii
import java.util.concurrent.atomic.AtomicReference
import com.thoughtworks.continuation._
import com.thoughtworks.raii.covariant.{Releasable, ResourceT}
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scalaz.Free.Trampoline
import scalaz.{ContT, Trampoline}
import scalaz.syntax.all._
import scalaz.std.iterable._
/**
* @author 杨博 (Yang Bo) <pop.atry@gmail.com>
*/
object shared {
private[shared] sealed trait State[A]
private[shared] final case class Closed[A]() extends State[A]
private[shared] final case class Opening[A](handlers: Queue[Releasable[UnitContinuation, A] => Trampoline[Unit]])
extends State[A]
private[shared] final case class Open[A](data: Releasable[UnitContinuation, A], count: Int) extends State[A]
implicit final class UnitContinuationResourceSharedOps[A](raii: ResourceT[UnitContinuation, A]) {
def shared: ResourceT[UnitContinuation, A] = {
val sharedReference = new SharedStateMachine(raii)
ResourceT[UnitContinuation, A](sharedReference.acquire)
}
}
private[shared] final class SharedStateMachine[A](underlying: ResourceT[UnitContinuation, A])
extends AtomicReference[State[A]](Closed())
with Releasable[UnitContinuation, A] {
private def sharedCloseable = this
override def value: A = state.get().asInstanceOf[Open[A]].data.value
override def release: UnitContinuation[Unit] = {
@tailrec
def retry(continue: Unit => Trampoline[Unit]): Trampoline[Unit] = {
state.get() match {
case oldState @ Open(data, count) =>
if (count == 1) {
if (state.compareAndSet(oldState, Closed())) {
val Continuation(contT) = data.release
contT(continue)
} else {
retry(continue)
}
} else {
if (state.compareAndSet(oldState, oldState.copy(count = count - 1))) {
Trampoline.suspend(continue(()))
} else {
retry(continue)
}
}
case Opening(_) | Closed() =>
throw new IllegalStateException("Cannot release more than once")
}
}
Continuation(retry)
}
private def state = this
@tailrec
private def complete(data: Releasable[UnitContinuation, A]): Trampoline[Unit] = {
state.get() match {
case oldState @ Opening(handlers) =>
val newState = Open(data, handlers.length)
if (state.compareAndSet(oldState, newState)) {
handlers.traverse_ { continue: (Releasable[UnitContinuation, A] => Trampoline[Unit]) =>
Trampoline.suspend(continue(sharedCloseable))
}
} else {
complete(data)
}
case Open(_, _) | Closed() =>
throw new IllegalStateException("Cannot trigger handler more than once")
}
}
private[shared] def acquire: UnitContinuation[Releasable[UnitContinuation, A]] = {
@tailrec
def retry(handler: Releasable[UnitContinuation, A] => Trampoline[Unit]): Trampoline[Unit] = {
state.get() match {
case oldState @ Closed() =>
if (state.compareAndSet(oldState, Opening(Queue(handler)))) {
val ResourceT(Continuation(contT)) = underlying
contT(complete)
} else {
retry(handler)
}
case oldState @ Opening(handlers: Queue[Releasable[UnitContinuation, A] => Trampoline[Unit]]) =>
if (state.compareAndSet(oldState, Opening(handlers.enqueue(handler)))) {
Trampoline.done(())
} else {
retry(handler)
}
case oldState @ Open(data, count) =>
if (state.compareAndSet(oldState, oldState.copy(count = count + 1))) {
handler(sharedCloseable)
} else {
retry(handler)
}
}
}
Continuation(retry)
}
}
}