Skip to content

Commit

Permalink
feat(streams): concatReadableStreams() (#4747)
Browse files Browse the repository at this point in the history
* feat(streams): `new ConcatStreams()`

* refactor(streams): new ConcatStream to be a ReadableStream instead

- Converted ConcatStream from a TransformStream into a ReadableStream, also now with proper cleaning up if the `.cancel()` method is called.

* adjust(streams): ConcatStreams class into function

* Adjust(streams): based off comments

* adjust(streams): Remove redundant locking

* adjust(streams): based off comments

* tweaks

* fix

* tweak

* add Leo as co-author

Co-authored-by: crowlKats <crowlkats@toaxl.com>

---------

Co-authored-by: Asher Gomez <ashersaupingomez@gmail.com>
Co-authored-by: crowlKats <crowlkats@toaxl.com>
  • Loading branch information
3 people committed May 20, 2024
1 parent 52116b2 commit 39c2a4c
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 0 deletions.
50 changes: 50 additions & 0 deletions streams/concat_readable_streams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

/**
* Concatenates multiple `ReadableStream`s into a single ordered
* `ReadableStream`.
*
* Cancelling the resulting stream will cancel all the input streams.
*
* @typeParam T Type of the chunks in the streams.
*
* @param streams An iterable of `ReadableStream`s.
*
* @example Usage
* ```ts
* import { concatReadableStreams } from "@std/streams/concat-readable-streams";
* import { assertEquals } from "@std/assert/assert-equals";
*
* const stream1 = ReadableStream.from([1, 2, 3]);
* const stream2 = ReadableStream.from([4, 5, 6]);
* const stream3 = ReadableStream.from([7, 8, 9]);
*
* assertEquals(
* await Array.fromAsync(concatReadableStreams(stream1, stream2, stream3)),
* [1, 2, 3, 4, 5, 6, 7, 8, 9],
* );
* ```
*/
export function concatReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
let i = 0;
return new ReadableStream<T>({
async pull(controller) {
const reader = streams[i]!.getReader();
const { done, value } = await reader.read();
if (done) {
if (streams.length === ++i) {
return controller.close();
}
return await this.pull!(controller);
}
controller.enqueue(value);
reader.releaseLock();
},
async cancel(reason) {
const promises = streams.map((stream) => stream.cancel(reason));
await Promise.allSettled(promises);
},
});
}
95 changes: 95 additions & 0 deletions streams/concat_readable_streams_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { assertEquals, assertRejects } from "../assert/mod.ts";
import { concatReadableStreams } from "./concat_readable_streams.ts";

Deno.test("concatStreams()", async () => {
const readable1 = ReadableStream.from([1, 2, 3]);
const readable2 = ReadableStream.from([4, 5, 6]);
const readable3 = ReadableStream.from([7, 8, 9]);

assertEquals(
await Array.fromAsync(
concatReadableStreams(readable1, readable2, readable3),
),
[
1,
2,
3,
4,
5,
6,
7,
8,
9,
],
);
});

Deno.test("concatStreams() with empty streams", async () => {
const readable1 = ReadableStream.from([]);
const readable2 = ReadableStream.from([]);
const readable3 = ReadableStream.from([]);

assertEquals(
await Array.fromAsync(
concatReadableStreams(readable1, readable2, readable3),
),
[],
);
});

Deno.test("concatStreams() with one empty stream", async () => {
const readable1 = ReadableStream.from([1, 2, 3]);
const readable2 = ReadableStream.from([]);
const readable3 = ReadableStream.from([7, 8, 9]);

assertEquals(
await Array.fromAsync(
concatReadableStreams(readable1, readable2, readable3),
),
[
1,
2,
3,
7,
8,
9,
],
);
});

Deno.test("concatStreams() handles errors", async () => {
const readable1 = ReadableStream.from([1, 2, 3]);
const readable2 = ReadableStream.from(async function* () {
yield 4;
yield 5;
yield 6;
throw new TypeError("I am an error!");
}());
const readable3 = ReadableStream.from([7, 8, 9]);

const results: number[] = [];
await assertRejects(
async () => {
for await (
const value of concatReadableStreams(readable1, readable2, readable3)
) {
results.push(value);
}
},
TypeError,
"I am an error!",
);
assertEquals(
results,
[
1,
2,
3,
4,
5,
6,
],
);
});
1 change: 1 addition & 0 deletions streams/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
".": "./mod.ts",
"./buffer": "./buffer.ts",
"./byte-slice-stream": "./byte_slice_stream.ts",
"./concat-readable-streams": "./concat_readable_streams.ts",
"./delimiter-stream": "./delimiter_stream.ts",
"./early-zip-readable-streams": "./early_zip_readable_streams.ts",
"./iterate-reader": "./iterate_reader.ts",
Expand Down
1 change: 1 addition & 0 deletions streams/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

export * from "./buffer.ts";
export * from "./byte_slice_stream.ts";
export * from "./concat_readable_streams.ts";
export * from "./delimiter_stream.ts";
export * from "./early_zip_readable_streams.ts";
export * from "./iterate_reader.ts";
Expand Down

0 comments on commit 39c2a4c

Please sign in to comment.