forked from davidflanagan/jstdg7
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AsyncQueue.js
93 lines (85 loc) · 3.42 KB
/
AsyncQueue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* An asynchronously iterable queue class. Add values with enqueue()
* and remove them with dequeue(). dequeue() returns a Promise, which
* means that values can be dequeued before they are enqueued. The
* class implements [Symbol.asyncIterator] and next() so that it can
* be used with the for/await loop (which will not terminate until
* the close() method is called.)
*/
class AsyncQueue {
constructor() {
// Values that have been queued but not dequeued yet are stored here
this.values = [];
// When Promises are dequeued before their corresponding values are
// queued, the resolve methods for those Promises are stored here.
this.resolvers = [];
// Once closed, no more values can be enqueued, and no more unfulfilled
// Promises returned.
this.closed = false;
}
enqueue(value) {
if (this.closed) {
throw new Error("AsyncQueue closed");
}
if (this.resolvers.length > 0) {
// If this value has already been promised, resolve that Promise
const resolve = this.resolvers.shift();
resolve(value);
}
else {
// Otherwise, queue it up
this.values.push(value);
}
}
dequeue() {
if (this.values.length > 0) {
// If there is a queued value, return a resolved Promise for it
const value = this.values.shift();
return Promise.resolve(value);
}
else if (this.closed) {
// If no queued values and we're closed, return a resolved
// Promise for the "end-of-stream" marker
return Promise.resolve(AsyncQueue.EOS);
}
else {
// Otherwise, return an unresolved Promise,
// queuing the resolver function for later use
return new Promise((resolve) => { this.resolvers.push(resolve); });
}
}
close() {
// Once the queue is closed, no more values will be enqueued.
// So resolve any pending Promises with the end-of-stream marker
while(this.resolvers.length > 0) {
this.resolvers.shift()(AsyncQueue.EOS);
}
this.closed = true;
}
// Define the method that makes this class asynchronously iterable
[Symbol.asyncIterator]() { return this; }
// Define the method that makes this an asynchronous iterator. The
// dequeue() Promise resolves to a value or the EOS sentinel if we're
// closed. Here, we need to return a Promise that resolves to an
// iterator result object.
next() {
return this.dequeue().then(value => (value === AsyncQueue.EOS)
? { value: undefined, done: true }
: { value: value, done: false });
}
}
// A sentinel value returned by dequeue() to mark "end of stream" when closed
AsyncQueue.EOS = Symbol("end-of-stream");
// Push events of the specified type on the specified document element
// onto an AsyncQueue object, and return the queue for use as an event stream
function eventStream(elt, type) {
const q = new AsyncQueue(); // Create a queue
elt.addEventListener(type, e=>q.enqueue(e)); // Enqueue events
return q;
}
async function handleKeys() {
// Get a stream of keypress events and loop once for each one
for await (const event of eventStream(document, "keypress")) {
console.log(event.key);
}
}