Skip to content

Commit

Permalink
Improved documentation and added .flat (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eyal-Shalev committed Sep 22, 2021
1 parent b759d3a commit 1b0db56
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 32 deletions.
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,29 @@ const ch = new Channel<unknown>();
```typescript
import { Channel } from "https://deno.land/x/async_channels/mod.ts";

const sleep = (duration: number) =>
new Promise<void>((res) => {
const sleep = (duration: number) => {
return new Promise<void>((res) => {
setTimeout(() => res(), duration);
});
};

function produce(num: number) {
const ch = new Channel(0);

(async () => {
for (let i = 0; i < num; i++) {
await sleep(500); // Do some work...
await ch.add(i++);
await sleep(100); // Do some work...
await ch.send(i);
}
ch.close();
})();

return ch;
}

sleep(200).then(() => console.log("boo"));
sleep(300).then(() => console.log("boo"));

for await (let product of produce(5)) {
for await (const product of produce(4)) {
console.log({ product });
}
```
135 changes: 110 additions & 25 deletions channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,21 @@ export interface ChannelOptions {
debugExtra?: Record<string, unknown>;
}

/**
* Options for pipe operations.
*
* @see Channel.map
* @see Channel.flatMap
* @see Channel.foreach
* @see Channel.filter
* @see Channel.reduce
*/
export interface ChannelPipeOptions extends ChannelOptions {
/**
* If provided, the pipe operation will halt when the signal is triggered.
*
* @type {AbortSignal}
*/
signal?: AbortSignal;
}

Expand Down Expand Up @@ -168,7 +182,8 @@ export interface Receiver<T> extends AsyncIterable<T> {
* map returns a receiver channel that contains the results of applying `fn`
* to each value of `this` channel.
*
* The receiver channel will close, when the original channel closes.
* The receiver channel will close, when the original channel closes (or if
* the provided signal is triggered).
*
* @template TOut
* @param {(val: T) => TOut} fn
Expand All @@ -180,15 +195,44 @@ export interface Receiver<T> extends AsyncIterable<T> {
options?: ChannelPipeOptions,
): Receiver<TOut>;

/**
* flatMap returns a receiver channel that contains the flattened (1 level)
* results of applying `fn` to each value of `this` channel.
*
* The receiver channel will close, when the original channel closes (or if
* the provided signal is triggered).
*
* @template TOut
* @param {(val: T) => Iterable<TOut> | AsyncIterable<TOut>} fn
* @param {number} [bufferSize]
* @param {ChannelPipeOptions} [options]
*/
flatMap<TOut>(
fn: (val: T) => Iterable<TOut> | AsyncIterable<TOut>,
bufferSize?: number,
options?: ChannelPipeOptions,
): Receiver<TOut>;

/**
* flat returns a receiver channel that contains the flattened (1 level)
* values of each value of `this` channel.
*
* The receiver channel will close, when the original channel closes (or if
* the provided signal is triggered).
*
* @param {number} [bufferSize]
* @param {ChannelPipeOptions} [options]
*/
flat<K>(
this: Channel<Iterable<K> | AsyncIterable<K>>,
bufferSize?: number,
options?: ChannelPipeOptions,
): Receiver<K>;

/**
* forEach applies `fn` to each value in `this` channel, and returns a channel
* that will close after `this` channel closes.
* that will close after `this` channel closes (or if
* the provided signal is triggered).
*
* @param {(val: T) => void} fn
* @return {Receiver<void>}
Expand Down Expand Up @@ -431,11 +475,39 @@ export class Channel<T>
throw e;
}
}
})().catch((err) => this.error("map", fn, err))
})().catch((err) => this.error("flatMap", fn, err))
.finally(() => outChan.close());
return outChan;
}

flat<K>(
this: Channel<Iterable<K> | AsyncIterable<K>>,
bufferSize?: number,
options: ChannelPipeOptions | undefined = this.options,
): Receiver<K> {
const outChan = new Channel<K>(bufferSize, options);
(async () => {
while (true) {
const ctrl = new AbortController();
options?.signal?.addEventListener("abort", () => ctrl.abort());
try {
const res = await this.receive();
if (!res[1]) return;

for await (const item of res[0]) {
await outChan.send(item);
}
} catch (e) {
if (e instanceof AbortedError) return;
throw e;
}
}
})().catch((err) => this.error("flat", err))
.finally(() => outChan.close());

return outChan;
}

forEach(
fn: (val: T) => void | Promise<void>,
bufferSize = this.queue.capacity,
Expand Down Expand Up @@ -702,35 +774,48 @@ function isReceiver(x: unknown): x is Receiver<unknown> {
typeof x["receive"] === "function";
}

export function merge(): never;
export function merge(inChans: Receiver<unknown>): never;
export type MergeOptions = ChannelOptions & { bufferSize?: number };
export function merge(
inChans: [],
options?: MergeOptions,
): never;
export function merge(
inChans: [Receiver<unknown>],
options?: MergeOptions,
): never;
export function merge<T1, T2>(
inChan1: Receiver<T1>,
inChan2: Receiver<T2>,
inChans: [
Receiver<T1>,
Receiver<T2>,
],
options?: MergeOptions,
): Receiver<T1 | T2>;
export function merge<T1, T2, T3>(
inChan1: Receiver<T1>,
inChan2: Receiver<T2>,
inChan3: Receiver<T3>,
inChans: [
Receiver<T1>,
Receiver<T2>,
Receiver<T3>,
],
options?: MergeOptions,
): Receiver<T1 | T2 | T3>;
export function merge<T1, T2, T3, T4>(
inChan1: Receiver<T1>,
inChan2: Receiver<T2>,
inChan3: Receiver<T3>,
inChan4: Receiver<T4>,
): Receiver<T1 | T2 | T3 | T4>;
export function merge<T1, T2, T3, T4, T5>(
inChan1: Receiver<T1>,
inChan2: Receiver<T2>,
inChan3: Receiver<T3>,
inChan4: Receiver<T4>,
inChan5: Receiver<T5>,
): Receiver<T1 | T2 | T3 | T4 | T5>;
export function merge<T>(...inChans: Receiver<T>[]): Receiver<T> {
/**
* Takes a collection of source channels and returns a channel
* which contains all values taken from them.
*
* @template T
* @param {Receiver<T>[]} inChans
* @param {MergeOptions} [options={}]
* @returns {Receiver<T>}
*/
export function merge<T>(
inChans: Receiver<T>[],
options: MergeOptions = {},
): Receiver<T> {
if (inChans.length < 2) {
throw new TypeError("cannot merge less than 2 channels");
}
const outChan = new Channel<T>();
const { bufferSize, ...chOpts } = options;
const outChan = new Channel<T>(bufferSize, chOpts);

Promise.all(inChans.map((inChan) =>
(async () => {
Expand Down
20 changes: 19 additions & 1 deletion channel_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,28 @@ Deno.test("map", async () => {
assertEquals(expected.length, 0, "expected stack isn't empty");
});

Deno.test("flat", async () => {
await 0;
const ch = new Channel<string[]>();
const flatCh = ch.flat();
const p = Promise.all([
ch.send(["Hello", "world"]),
ch.send(["from", "Array"]),
]).then(() => ch.close());

const expected = ["Hello", "world", "from", "Array"];
for await (const x of flatCh) {
assertEquals(x, expected.shift());
}

await p;
assertEquals(expected.length, 0, "expected stack isn't empty");
});

Deno.test("merge", async () => {
const ch1 = new Channel<string>();
const ch2 = new Channel<number>();
const mergedChan = merge(ch1, ch2);
const mergedChan = merge([ch1, ch2]);

const p = Promise.all([
ch1.send("Hello"),
Expand Down

0 comments on commit 1b0db56

Please sign in to comment.