forked from typelevel/cats-effect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MVar.scala
216 lines (201 loc) · 6.8 KB
/
MVar.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
/*
* Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cats.effect
package concurrent
import cats.effect.internals.{MVarAsync, MVarConcurrent}
/**
* A mutable location, that is either empty or contains
* a value of type `A`.
*
* It has 3 fundamental atomic operations:
*
* - [[put]] which fills the var if empty, or blocks
* (asynchronously) until the var is empty again
* - [[take]] which empties the var if full, returning the contained
* value, or blocks (asynchronously) otherwise until there is
* a value to pull
* - [[read]] which reads the current value without touching it,
* assuming there is one, or otherwise it waits until a value
* is made available via `put`
*
* The `MVar` is appropriate for building synchronization
* primitives and performing simple inter-thread communications.
* If it helps, it's similar with a `BlockingQueue(capacity = 1)`,
* except that it doesn't block any threads, all waiting being
* done asynchronously (via [[Async]] or [[Concurrent]] data types,
* such as [[IO]]).
*
* Given its asynchronous, non-blocking nature, it can be used on
* top of Javascript as well.
*
* Inspired by `Control.Concurrent.MVar` from Haskell and
* by `scalaz.concurrent.MVar`.
*/
abstract class MVar[F[_], A] {
/**
* Fills the `MVar` if it is empty, or blocks (asynchronously)
* if the `MVar` is full, until the given value is next in
* line to be consumed on [[take]].
*
* This operation is atomic.
*
* @return a task that on evaluation will complete when the
* `put` operation succeeds in filling the `MVar`,
* with the given value being next in line to
* be consumed
*/
def put(a: A): F[Unit]
/**
* Empties the `MVar` if full, returning the contained value,
* or blocks (asynchronously) until a value is available.
*
* This operation is atomic.
*
* @return a task that on evaluation will be completed after
* a value was retrieved
*/
def take: F[A]
/**
* Tries reading the current value, or blocks (asynchronously)
* until there is a value available.
*
* This operation is atomic.
*
* @return a task that on evaluation will be completed after
* a value has been read
*/
def read: F[A]
}
/** Builders for [[MVar]]. */
object MVar {
/**
* Builds an [[MVar]] value for `F` data types that are [[Concurrent]].
*
* Due to `Concurrent`'s capabilities, the yielded values by [[MVar.take]]
* and [[MVar.put]] are cancelable.
*
* This builder uses the
* [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type]]
* technique.
*
* For creating an empty `MVar`:
* {{{
* MVar[IO].empty[Int] <-> MVar.empty[IO, Int]
* }}}
*
* For creating an `MVar` with an initial value:
* {{{
* MVar[IO].init("hello") <-> MVar.init[IO, String]("hello")
* }}}
*
* @see [[init]], [[initF]] and [[empty]]
*/
def apply[F[_]](implicit F: Concurrent[F]): ApplyBuilders[F] =
new ApplyBuilders[F](F)
/**
* Creates a cancelable `MVar` that starts as empty.
*
* @see [[emptyAsync]] for non-cancelable MVars
*
* @param F is a [[Concurrent]] constraint, needed in order to
* describe cancelable operations
*/
def empty[F[_], A](implicit F: Concurrent[F]): F[MVar[F, A]] =
F.delay(MVarConcurrent.empty)
/**
* Creates a non-cancelable `MVar` that starts as empty.
*
* The resulting `MVar` has non-cancelable operations.
*
* @see [[empty]] for creating cancelable MVars
*/
def emptyAsync[F[_], A](implicit F: Async[F]): F[MVar[F, A]] =
F.delay(MVarAsync.empty)
/**
* Creates a cancelable `MVar` that's initialized to an `initial`
* value.
*
* @see [[initAsync]] for non-cancelable MVars
*
* @param initial is a value that will be immediately available
* for the first `read` or `take` operation
*
* @param F is a [[Concurrent]] constraint, needed in order to
* describe cancelable operations
*/
def init[F[_], A](initial: A)(implicit F: Concurrent[F]): F[MVar[F, A]] =
F.delay(MVarConcurrent(initial))
/**
* Creates a non-cancelable `MVar` that's initialized to an `initial`
* value.
*
* The resulting `MVar` has non-cancelable operations.
*
* @see [[init]] for creating cancelable MVars
*/
def initAsync[F[_], A](initial: A)(implicit F: Async[F]): F[MVar[F, A]] =
F.delay(MVarAsync(initial))
/**
* Creates a cancelable `MVar` initialized with a value given
* in the `F[A]` context, thus the initial value being lazily evaluated.
*
* @see [[init]] for creating MVars initialized with strict values
* @see [[initAsyncF]] for building non-cancelable MVars
* @param fa is the value that's going to be used as this MVar's
* initial value, available then for the first `take` or `read`
* @param F is a [[Concurrent]] constraint, needed in order to
* describe cancelable operations
*/
def initF[F[_], A](fa: F[A])(implicit F: Concurrent[F]): F[MVar[F, A]] =
F.map(fa)(MVarConcurrent.apply(_))
/**
* Creates a non-cancelable `MVar` initialized with a value given
* in the `F[A]` context, thus the initial value being lazily evaluated.
*
* @see [[initAsync]] for creating MVars initialized with strict values
* @see [[initF]] for building cancelable MVars
* @param fa is the value that's going to be used as this MVar's
* initial value, available then for the first `take` or `read`
*/
def initAsyncF[F[_], A](fa: F[A])(implicit F: Async[F]): F[MVar[F, A]] =
F.map(fa)(MVarAsync.apply(_))
/**
* Returned by the [[apply]] builder.
*/
final class ApplyBuilders[F[_]](val F: Concurrent[F]) extends AnyVal {
/**
* Builds an `MVar` with an initial value.
*
* @see documentation for [[MVar.init]]
*/
def init[A](a: A): F[MVar[F, A]] =
MVar.init(a)(F)
/**
* Builds an `MVar` with an initial value that's lazily evaluated.
*
* @see documentation for [[MVar.initF]]
*/
def initF[A](fa: F[A]): F[MVar[F, A]] =
MVar.initF(fa)(F)
/**
* Builds an empty `MVar`.
*
* @see documentation for [[MVar.empty]]
*/
def empty[A]: F[MVar[F, A]] =
MVar.empty(F)
}
}