Skip to content

Commit

Permalink
feat(multiply-stream): add MultiplyStream class and tests
Browse files Browse the repository at this point in the history
Implement a new class `MultiplyStream` along with its unit tests in `multiply-stream.spec.ts`. `MultiplyStream` is designed to take an array of `WritableStream` objects and create a `ReadableStream` that will output the same chunk to the original stream and each of the provided writable streams. The main purpose is to duplicate the stream's chunks to multiple destinations. This feature could be particularly useful in scenarios where the same data stream needs to be consumed by multiple consumers in different ways, for example, logging, monitoring, or processing the data in parallel without needing to buffer the entire dataset. The test suite ensures that `MultiplyStream` correctly duplicates the incoming stream's chunks to each of the provided writable streams and that the original stream's data remains intact and accessible.
  • Loading branch information
JonDotsoy committed Mar 9, 2024
1 parent 4173fba commit 0094d02
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/multiply-stream.spec.ts
@@ -0,0 +1,31 @@
import { readableStreamToArray, readableStreamToText } from "bun";
import { test, spyOn, expect } from "bun:test";
import { MultiplyStream } from "./multiply-stream";

test("", async () => {
const obj1 = { call: (...args: any[]): any => {} };
const mock = spyOn(obj1, "call");

const readable = new ReadableStream<number>({
start: (ctl) => {
ctl.enqueue(1);
ctl.enqueue(2);
ctl.enqueue(3);
ctl.close();
},
}).pipeThrough(
new MultiplyStream(
new WritableStream({
write: (chunk) => obj1.call(chunk),
}),
new WritableStream({
write: (chunk) => obj1.call(chunk),
}),
),
);

const out = await readableStreamToArray(readable);
expect(out).toEqual([1, 2, 3]);

expect(mock.mock.calls).toEqual([[1], [1], [2], [2], [3], [3]]);
});
31 changes: 31 additions & 0 deletions src/multiply-stream.ts
@@ -0,0 +1,31 @@
import { readableStreamWithController } from "./readable-stream-with-controller";

export class MultiplyStream<T> implements ReadableWritablePair<T, T> {
readable: ReadableStream<T>;
controller: ReadableStreamDefaultController<T>;
writable: WritableStream<T>;

constructor(...writables: WritableStream<T>[]) {
const writers = writables.map((writable) => ({
writable,
writer: writable.getWriter(),
}));
const { readable, controller } = readableStreamWithController<T>();
this.readable = readable;
this.controller = controller;
this.writable = new WritableStream<T>({
write: async (chunk) => {
this.controller.enqueue(chunk);
for (const { writer } of writers) {
await writer.write(chunk);
}
},
close: async () => {
this.controller.close();
for (const { writer } of writers) {
await writer.close();
}
},
});
}
}

0 comments on commit 0094d02

Please sign in to comment.