From 1b0db56b8a49f1dfe4dc302dc2eda2261642dda9 Mon Sep 17 00:00:00 2001 From: Eyal Shalev Date: Thu, 23 Sep 2021 01:50:42 +0300 Subject: [PATCH] Improved documentation and added .flat (#35) --- README.md | 14 ++--- channel.ts | 135 +++++++++++++++++++++++++++++++++++++++--------- channel_test.ts | 20 ++++++- 3 files changed, 137 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 831dee8..41fa023 100644 --- a/README.md +++ b/README.md @@ -71,17 +71,19 @@ const ch = new Channel(); ```typescript import { Channel } from "https://deno.land/x/async_channels/mod.ts"; -const sleep = (duration: number) => - new Promise((res) => { +const sleep = (duration: number) => { + return new Promise((res) => { setTimeout(() => res(), duration); }); +}; function produce(num: number) { const ch = new Channel(0); + (async () => { for (let i = 0; i < num; i++) { - await sleep(500); // Do some work... - await ch.add(i++); + await sleep(100); // Do some work... + await ch.send(i); } ch.close(); })(); @@ -89,9 +91,9 @@ function produce(num: number) { return ch; } -sleep(200).then(() => console.log("boo")); +sleep(300).then(() => console.log("boo")); -for await (let product of produce(5)) { +for await (const product of produce(4)) { console.log({ product }); } ``` diff --git a/channel.ts b/channel.ts index 14fff55..61ed6cd 100644 --- a/channel.ts +++ b/channel.ts @@ -51,7 +51,21 @@ export interface ChannelOptions { debugExtra?: Record; } +/** + * Options for pipe operations. + * + * @see Channel.map + * @see Channel.flatMap + * @see Channel.foreach + * @see Channel.filter + * @see Channel.reduce + */ export interface ChannelPipeOptions extends ChannelOptions { + /** + * If provided, the pipe operation will halt when the signal is triggered. + * + * @type {AbortSignal} + */ signal?: AbortSignal; } @@ -168,7 +182,8 @@ export interface Receiver extends AsyncIterable { * map returns a receiver channel that contains the results of applying `fn` * to each value of `this` channel. * - * The receiver channel will close, when the original channel closes. + * The receiver channel will close, when the original channel closes (or if + * the provided signal is triggered). * * @template TOut * @param {(val: T) => TOut} fn @@ -180,15 +195,44 @@ export interface Receiver extends AsyncIterable { options?: ChannelPipeOptions, ): Receiver; + /** + * flatMap returns a receiver channel that contains the flattened (1 level) + * results of applying `fn` to each value of `this` channel. + * + * The receiver channel will close, when the original channel closes (or if + * the provided signal is triggered). + * + * @template TOut + * @param {(val: T) => Iterable | AsyncIterable} fn + * @param {number} [bufferSize] + * @param {ChannelPipeOptions} [options] + */ flatMap( fn: (val: T) => Iterable | AsyncIterable, bufferSize?: number, options?: ChannelPipeOptions, ): Receiver; + /** + * flat returns a receiver channel that contains the flattened (1 level) + * values of each value of `this` channel. + * + * The receiver channel will close, when the original channel closes (or if + * the provided signal is triggered). + * + * @param {number} [bufferSize] + * @param {ChannelPipeOptions} [options] + */ + flat( + this: Channel | AsyncIterable>, + bufferSize?: number, + options?: ChannelPipeOptions, + ): Receiver; + /** * forEach applies `fn` to each value in `this` channel, and returns a channel - * that will close after `this` channel closes. + * that will close after `this` channel closes (or if + * the provided signal is triggered). * * @param {(val: T) => void} fn * @return {Receiver} @@ -431,11 +475,39 @@ export class Channel throw e; } } - })().catch((err) => this.error("map", fn, err)) + })().catch((err) => this.error("flatMap", fn, err)) .finally(() => outChan.close()); return outChan; } + flat( + this: Channel | AsyncIterable>, + bufferSize?: number, + options: ChannelPipeOptions | undefined = this.options, + ): Receiver { + const outChan = new Channel(bufferSize, options); + (async () => { + while (true) { + const ctrl = new AbortController(); + options?.signal?.addEventListener("abort", () => ctrl.abort()); + try { + const res = await this.receive(); + if (!res[1]) return; + + for await (const item of res[0]) { + await outChan.send(item); + } + } catch (e) { + if (e instanceof AbortedError) return; + throw e; + } + } + })().catch((err) => this.error("flat", err)) + .finally(() => outChan.close()); + + return outChan; + } + forEach( fn: (val: T) => void | Promise, bufferSize = this.queue.capacity, @@ -702,35 +774,48 @@ function isReceiver(x: unknown): x is Receiver { typeof x["receive"] === "function"; } -export function merge(): never; -export function merge(inChans: Receiver): never; +export type MergeOptions = ChannelOptions & { bufferSize?: number }; +export function merge( + inChans: [], + options?: MergeOptions, +): never; +export function merge( + inChans: [Receiver], + options?: MergeOptions, +): never; export function merge( - inChan1: Receiver, - inChan2: Receiver, + inChans: [ + Receiver, + Receiver, + ], + options?: MergeOptions, ): Receiver; export function merge( - inChan1: Receiver, - inChan2: Receiver, - inChan3: Receiver, + inChans: [ + Receiver, + Receiver, + Receiver, + ], + options?: MergeOptions, ): Receiver; -export function merge( - inChan1: Receiver, - inChan2: Receiver, - inChan3: Receiver, - inChan4: Receiver, -): Receiver; -export function merge( - inChan1: Receiver, - inChan2: Receiver, - inChan3: Receiver, - inChan4: Receiver, - inChan5: Receiver, -): Receiver; -export function merge(...inChans: Receiver[]): Receiver { +/** + * Takes a collection of source channels and returns a channel + * which contains all values taken from them. + * + * @template T + * @param {Receiver[]} inChans + * @param {MergeOptions} [options={}] + * @returns {Receiver} + */ +export function merge( + inChans: Receiver[], + options: MergeOptions = {}, +): Receiver { if (inChans.length < 2) { throw new TypeError("cannot merge less than 2 channels"); } - const outChan = new Channel(); + const { bufferSize, ...chOpts } = options; + const outChan = new Channel(bufferSize, chOpts); Promise.all(inChans.map((inChan) => (async () => { diff --git a/channel_test.ts b/channel_test.ts index a4fbcf5..19e08cf 100644 --- a/channel_test.ts +++ b/channel_test.ts @@ -208,10 +208,28 @@ Deno.test("map", async () => { assertEquals(expected.length, 0, "expected stack isn't empty"); }); +Deno.test("flat", async () => { + await 0; + const ch = new Channel(); + const flatCh = ch.flat(); + const p = Promise.all([ + ch.send(["Hello", "world"]), + ch.send(["from", "Array"]), + ]).then(() => ch.close()); + + const expected = ["Hello", "world", "from", "Array"]; + for await (const x of flatCh) { + assertEquals(x, expected.shift()); + } + + await p; + assertEquals(expected.length, 0, "expected stack isn't empty"); +}); + Deno.test("merge", async () => { const ch1 = new Channel(); const ch2 = new Channel(); - const mergedChan = merge(ch1, ch2); + const mergedChan = merge([ch1, ch2]); const p = Promise.all([ ch1.send("Hello"),