-
Notifications
You must be signed in to change notification settings - Fork 623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(streams): concatReadableStreams()
#4747
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4747 +/- ##
==========================================
- Coverage 91.89% 91.45% -0.44%
==========================================
Files 484 486 +2
Lines 41296 41340 +44
Branches 5319 5288 -31
==========================================
- Hits 37947 37807 -140
- Misses 3292 3474 +182
- Partials 57 59 +2 ☔ View full report in Codecov by Sentry. |
I'm not sure it's good idea to implement this as TransformStream. No Deno API or Web API returns stream of streams or stream of iterables. The original comment of #4500 suggests a function which concatenate the streams in an array. That design makes more sense to me. |
- Converted ConcatStream from a TransformStream into a ReadableStream, also now with proper cleaning up if the `.cancel()` method is called.
I changed it up to a ReadableStream constructor instead. Taking in an iterable, like an array, of readable streams and concatenating them on a pulling method. There is also extra code to preform a clean up if the |
Example: const streams = new Array(10).fill(0).map((_x, i) =>
ReadableStream.from(function* () {
for (let j = i * 10; j < i * 10 + 10; ++j) {
yield j;
}
}())
);
console.log(await Array.fromAsync(new ConcatStreams(streams))); [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95,
96, 97, 98, 99
] |
I believe Yoshiya meant that he thought this should be a function, like @crowlKats, able to take a look at this? |
Oh okay, converting it to a function would be quite simple. Essentially just: function concatStreams<T>(streams: ...) {
const gen = ...
let lock = false
return new ReadableStream<T>({
...
})
} I can do this if you'd like |
Sounds good 👍🏾 |
new ConcatStreams()
concatStreams(streams)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might using for await...of
in the implementation make things a little simpler and clearer?
concatStreams(streams)
concatStreams()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, let's make this more consistent with other similar APIs - mergeReadableStreams()
and zipReadableStreams()
. Can you please rename this to concatReadableStreams()
and make it accept ...streams: ReadableStream<T>[]
?
While the initial issue requested it accepts AsyncIterable<ReadableStream<T>>
, handling arrays may be sufficient for most, if not almost all, use cases. If there's sufficient demand for async iterators in the future, we can extend the function without breaking changes.
here is a completely alternative implementation to match the above comment: export function concatReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
let currentStream = 0;
return new ReadableStream<T>({
async pull(controller) {
const stream = streams[currentStream];
const reader = stream.getReader();
try {
const read = await reader.read();
if (read.done) {
currentStream++;
if (streams.length == currentStream) {
controller.close();
} else {
await this.pull(controller);
}
} else {
controller.enqueue(read.value);
}
} catch (e) {
controller.error(e);
}
reader.releaseLock();
},
});
} |
I don't think this line is needed as you're essentially forcing the contents of all the streams inside the queue regardless if the queue is being emptied or not. Essentially turning a pulling method into a pushing one. |
@BlackAsLight that line is necessary because if a pull is called but the read of the current internal stream is |
Oh ya, you're right. I was thinking about it wrong |
concatStreams()
concatReadableStreams()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you. Looking nice now 👍🏾
@kt3k, please review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Co-authored-by: crowlKats <crowlkats@toaxl.com>
Implements #4500
This implementation broadens the scope of concatenating streams to concatenating anything that implements the
Symbol.asyncIterator
orSymbol.iterator
. The reason for this is that the only difference needed to support the broader scope compared to just concatenating ReadableStreams exists in TypeScript alone.This implementation allows it to be used in two ways, either;
new ConcatStreams(streams)
; or.pipeThrough
methodWhen the static method
ReadableStream.from
is supported more widely in the browser, this code can be simplified by several lines.