This repository has been archived by the owner on Jul 14, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mailbox.ts
55 lines (46 loc) · 1.5 KB
/
mailbox.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import { fork, monitor, Operation } from 'effection';
import { EventEmitter } from 'events';
import { onEach, EventName } from '../events';
import { compile } from './pattern';
export { any } from './pattern';
export class Mailbox {
private subscriptions = new EventEmitter();
private messages = new Set();
*send(message: unknown): Operation {
this.messages.add(message);
this.subscriptions.emit('message', message);
}
receive(pattern: unknown = undefined): Operation {
let match = compile(pattern);
return ({ resume, ensure }) => {
let dispatch = (message: unknown) => {
if (this.messages.has(message) && match(message)) {
this.messages.delete(message);
resume(message);
return true;
}
}
for (let message of this.messages) {
if (dispatch(message)) {
return;
};
}
this.subscriptions.on('message', dispatch);
ensure(() => this.subscriptions.off('message', dispatch));
};
}
static *watch(
emitter: EventEmitter,
events: EventName | EventName[],
prepare: (event: { event: string; args: unknown[] }) => unknown = x => x
): Operation {
let mailbox = new Mailbox();
let parent = yield ({ resume, context: { parent }}) => resume(parent.parent);
parent.spawn(monitor(function* () {
for (let name of [].concat(events)) {
yield fork(onEach(emitter, name, (...args) => mailbox.send(prepare({ event: name, args }))));
}
}));
return mailbox;
}
}