-
Notifications
You must be signed in to change notification settings - Fork 576
/
to_transform_stream.ts
75 lines (73 loc) · 2.35 KB
/
to_transform_stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.
/**
* Convert the generator function into a {@linkcode TransformStream}.
*
* @example
* ```ts
* import { toTransformStream } from "https://deno.land/std@$STD_VERSION/streams/to_transform_stream.ts";
*
* const readable = ReadableStream.from([0, 1, 2])
* .pipeThrough(toTransformStream(async function* (src) {
* for await (const chunk of src) {
* yield chunk * 100;
* }
* }));
*
* for await (const chunk of readable) {
* console.log(chunk);
* }
* // output: 0, 100, 200
* ```
*
* @param transformer A function to transform.
* @param writableStrategy An object that optionally defines a queuing strategy for the stream.
* @param readableStrategy An object that optionally defines a queuing strategy for the stream.
*/
export function toTransformStream<I, O>(
transformer: (src: ReadableStream<I>) => Iterable<O> | AsyncIterable<O>,
writableStrategy?: QueuingStrategy<I>,
readableStrategy?: QueuingStrategy<O>,
): TransformStream<I, O> {
const {
writable,
readable,
} = new TransformStream<I, I>(undefined, writableStrategy);
const iterable = transformer(readable);
const iterator: Iterator<O> | AsyncIterator<O> =
(iterable as AsyncIterable<O>)[Symbol.asyncIterator]?.() ??
(iterable as Iterable<O>)[Symbol.iterator]?.();
return {
writable,
readable: new ReadableStream<O>({
async pull(controller) {
let result: IteratorResult<O>;
try {
result = await iterator.next();
} catch (error) {
// Propagate error to stream from iterator
// If the stream status is "errored", it will be thrown, but ignore.
await readable.cancel(error).catch(() => {});
controller.error(error);
return;
}
if (result.done) {
controller.close();
return;
}
controller.enqueue(result.value);
},
async cancel(reason) {
// Propagate cancellation to readable and iterator
if (typeof iterator.throw === "function") {
try {
await iterator.throw(reason);
} catch {
/* `iterator.throw()` always throws on site. We catch it. */
}
}
await readable.cancel(reason);
},
}, readableStrategy),
};
}