-
Notifications
You must be signed in to change notification settings - Fork 576
/
readable_stream_from_iterable.ts
63 lines (62 loc) · 2.2 KB
/
readable_stream_from_iterable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.
/** Create a `ReadableStream` from any kind of iterable.
*
* ```ts
* import { readableStreamFromIterable } from "https://deno.land/std@$STD_VERSION/streams/readable_stream_from_iterable.ts";
*
* const r1 = readableStreamFromIterable(["foo, bar, baz"]);
* const r2 = readableStreamFromIterable(async function* () {
* await new Promise(((r) => setTimeout(r, 1000)));
* yield "foo";
* await new Promise(((r) => setTimeout(r, 1000)));
* yield "bar";
* await new Promise(((r) => setTimeout(r, 1000)));
* yield "baz";
* }());
* ```
*
* If the produced iterator (`iterable[Symbol.asyncIterator]()` or
* `iterable[Symbol.iterator]()`) is a generator, or more specifically is found
* to have a `.throw()` method on it, that will be called upon
* `readableStream.cancel()`. This is the case for the second input type above:
*
* ```ts
* import { readableStreamFromIterable } from "https://deno.land/std@$STD_VERSION/streams/readable_stream_from_iterable.ts";
*
* const r3 = readableStreamFromIterable(async function* () {
* try {
* yield "foo";
* } catch (error) {
* console.log(error); // Error: Cancelled by consumer.
* }
* }());
* const reader = r3.getReader();
* console.log(await reader.read()); // { value: "foo", done: false }
* await reader.cancel(new Error("Cancelled by consumer."));
* ```
*/
export function readableStreamFromIterable<T>(
iterable: Iterable<T> | AsyncIterable<T>,
): ReadableStream<T> {
const iterator: Iterator<T> | AsyncIterator<T> =
(iterable as AsyncIterable<T>)[Symbol.asyncIterator]?.() ??
(iterable as Iterable<T>)[Symbol.iterator]?.();
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
async cancel(reason) {
if (typeof iterator.throw == "function") {
try {
await iterator.throw(reason);
} catch { /* `iterator.throw()` always throws on site. We catch it. */ }
}
},
});
}