-
Notifications
You must be signed in to change notification settings - Fork 1
/
group.gleam
283 lines (251 loc) · 7.08 KB
/
group.gleam
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
//// This is a local process registry that groups Subjects under a name to later reference or broadcast messages to them.
import gleam/dict.{type Dict}
import gleam/erlang/process.{
type Pid, type ProcessDown, type ProcessMonitor, type Selector, type Subject,
}
import gleam/function
import gleam/list
import gleam/option.{type Option, None, Some}
import gleam/otp/actor
import gleam/set.{type Set}
type Registry(group, message) =
Subject(Message(group, message))
/// This is the message type used internally by group.
///
/// When building out your system it may be useful to state the group types on startup. For example:
///
/// ```gleam
/// let assert Ok(server) = chat.start()
/// let server: process.Subject(Chat)
///
/// type Topic {
/// General
/// OffTopic
/// Cats
/// }
///
/// let assert Ok(registry) = group.start()
/// let registry: process.Subject(group.Message(Topic, Chat))
/// ```
///
/// By specifying the types we can document the kind of registry we are working with; in the example
/// above we can tell we're creating different "chat servers" under different topic groups.
pub opaque type Message(group, msg) {
GroupedSubjects(client: Subject(List(Subject(msg))), group: group)
GroupedRegistrant(subject: Subject(msg), group: group)
Demonitor(index: Index)
}
type Index {
Index(pid: Pid, monitor: ProcessMonitor)
}
type Location(group, msg) {
Location(group, Subject(msg))
}
type State(group, msg) {
State(
// A reference to the actor's subject.
self: Registry(group, msg),
// This tags multiple subjects under a group.
groups: Dict(group, Set(Subject(msg))),
// Index to help track monitored subjects and where to look on de-registration.
subject_track: Dict(Pid, Set(Location(group, msg))),
// There's no way of retrieving previous selector from current process, so we manually track it here.
selector: Selector(Message(group, msg)),
)
}
/// Starts the registry.
///
/// ## Example
///
/// ```gleam
/// > group.start()
/// Ok(registry)
/// ```
pub fn start() -> Result(Registry(group, msg), actor.StartError) {
actor.start_spec(actor.Spec(
init: handle_init,
init_timeout: 10,
loop: handle_message,
))
}
/// Registers a Subject under a shared name.
///
/// ## Example
///
/// ```gleam
/// > group.register(registry, process.new_subject(), "group-a")
/// Nil
/// ```
pub fn register(
registry: Subject(Message(group, message)),
subject: Subject(message),
group: group,
) -> Nil {
process.send(registry, GroupedRegistrant(subject, group))
}
/// Looks up Subjects under a named group.
///
/// ### Example
///
/// ```gleam
/// > group.find(registry, "group-a")
/// [subject_1, subject_2, subject_3]
/// ```
pub fn members(registry, group) -> List(Subject(msg)) {
process.call(registry, GroupedSubjects(_, group), 10)
}
/// Executes a callback for all Subjects under a named group.
///
/// ### Example
///
/// ```gleam
/// > group.dispatch(registry, "group-a", fn(subject) {
/// > process.send(subject, Message(data))
/// > })
/// Nil
/// ```
pub fn dispatch(
registry: Subject(Message(group, message)),
group: group,
callback: fn(Subject(message)) -> x,
) -> Nil {
let subjects = members(registry, group)
use subject <- list.each(subjects)
callback(subject)
}
fn handle_init() {
let self = process.new_subject()
let state =
State(
self: self,
groups: dict.new(),
subject_track: dict.new(),
selector: process.new_selector()
|> process.selecting(self, function.identity),
)
actor.Ready(state, state.selector)
}
fn handle_message(
message: Message(group, message),
state: State(group, message),
) {
case message {
GroupedSubjects(client, group) -> {
let subjects = case dict.get(state.groups, group) {
Ok(subjects) -> set.to_list(subjects)
Error(Nil) -> []
}
process.send(client, subjects)
actor.continue(state)
}
GroupedRegistrant(subject, group) -> {
let pid = process.subject_owner(subject)
let selection = monitor(state, pid)
state
|> into_group(group, subject)
|> into_tracker(pid, Location(group, subject))
|> into_selector(selection)
|> actor.Continue(selection)
}
Demonitor(Index(pid, monitor)) -> {
process.demonitor_process(monitor)
state
|> remove_from_group(pid)
|> remove_from_tracker(pid)
|> actor.continue()
}
}
}
fn into_group(
state: State(group, msg),
group: group,
subject: Subject(msg),
) -> State(group, msg) {
let add_subject = fn(option) {
case option {
Some(subjects) -> set.insert(subjects, subject)
None -> set.insert(set.new(), subject)
}
}
State(..state, groups: dict.update(state.groups, group, add_subject))
}
fn into_tracker(
state: State(group, msg),
pid: Pid,
location: Location(group, msg),
) -> State(group, msg) {
let add_location = fn(option) {
case option {
Some(locations) -> set.insert(locations, location)
None -> set.insert(set.new(), location)
}
}
State(
..state,
subject_track: dict.update(state.subject_track, pid, add_location),
)
}
fn into_selector(
state: State(group, msg),
selection: Option(Selector(Message(group, msg))),
) -> State(group, msg) {
case selection {
Some(selector) -> State(..state, selector: selector)
None -> state
}
}
fn remove_from_group(state: State(group, msg), pid: Pid) -> State(group, msg) {
let locations = case dict.get(state.subject_track, pid) {
Ok(locations) -> {
set.to_list(locations)
}
Error(Nil) -> {
panic as "Impossible state, couldn't find a pid when removing from group."
}
}
list.fold(locations, state, fn(state, location) {
let Location(group, subject) = location
case dict.get(state.groups, group) {
Ok(subjects) -> {
let subjects = set.delete(subjects, subject)
let groups = dict.insert(state.groups, group, subjects)
State(..state, groups: groups)
}
Error(Nil) -> {
panic as "Impossible state, couldn't find the group when removing."
}
}
})
}
fn remove_from_tracker(state: State(group, msg), pid: Pid) -> State(group, msg) {
State(..state, subject_track: dict.delete(state.subject_track, pid))
}
fn monitor(
state: State(group, msg),
pid: Pid,
) -> Option(Selector(Message(group, msg))) {
// Check if this process is already registered.
case dict.get(state.subject_track, pid) {
Ok(_locations) -> {
// When process is already registered do nothing.
None
}
Error(Nil) -> {
// When it is a new process, monitor it.
let monitor = process.monitor_process(pid)
let selector = select_process_down(state.selector, pid, monitor)
Some(selector)
}
}
}
fn select_process_down(
selector: Selector(Message(group, msg)),
pid: Pid,
monitor: ProcessMonitor,
) -> Selector(Message(group, msg)) {
// Build the selector with an index to track down the location of subjects
// when a the process goes down.
let index = Index(pid, monitor)
let handle = fn(_: ProcessDown) { Demonitor(index) }
process.selecting_process_down(selector, monitor, handle)
}