/
merge.ts
66 lines (63 loc) · 1.96 KB
/
merge.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
import { deferred } from "../async/deferred.ts";
/**
* Merge multiple streams into a single one, not taking order into account.
* If a stream ends before other ones, the other will continue adding data,
* and the finished one will not add any more data.
*/
export function mergeReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
const resolvePromises = streams.map(() => deferred<void>());
return new ReadableStream<T>({
start(controller) {
Promise.all(resolvePromises).then(() => {
controller.close();
});
try {
for (const [key, stream] of Object.entries(streams)) {
(async () => {
for await (const data of stream) {
controller.enqueue(data);
}
resolvePromises[+key].resolve();
})();
}
} catch (e) {
controller.error(e);
}
},
});
}
/**
* Merge multiple streams into a single one, taking order into account, and each stream
* will wait for a chunk to enqueue before the next stream can append another chunk.
* If a stream ends before other ones, the other will continue adding data in order,
* and the finished one will not add any more data.
*/
export function zipReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
const readers = streams.map((s) => s.getReader());
return new ReadableStream<T>({
async start(controller) {
try {
let resolved = 0;
while (resolved != streams.length) {
for (const [key, reader] of Object.entries(readers)) {
const { value, done } = await reader.read();
if (!done) {
controller.enqueue(value!);
} else {
resolved++;
readers.splice(+key, 1);
}
}
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
}