From 8ef84d1179e9eac1ecda8a09009c0aa334cb7c53 Mon Sep 17 00:00:00 2001 From: Robin Malfait Date: Sat, 22 Feb 2020 17:18:16 +0100 Subject: [PATCH 1/5] add a delay function --- README.md | 26 ++++++++++++++++++ src/__snapshots__/index.test.ts.snap | 1 + src/delay.test.ts | 40 ++++++++++++++++++++++++++++ src/delay.ts | 16 +++++++++++ src/index.ts | 1 + 5 files changed, 84 insertions(+) create mode 100644 src/delay.test.ts create mode 100644 src/delay.ts diff --git a/README.md b/README.md index 0e43374..e420dde 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,10 @@ the next step. This is where lazy collections come in, under the hood we use generators so that your data flows like a stream to have the optimal speed. +All functions should work with both `iterator` and `asyncIterator`, if one of +the functions uses an `asyncIterator` (for example when you introduce +`delay(100)`), don't forget to `await` the result! + ```js const program = pipe( map(x => x * 2), @@ -58,6 +62,7 @@ program(range(0, 1000000)); - [Utilities](#utilities) - [`chunk`](#chunk) - [`compact`](#compact) + - [`delay`](#delay) - [`flatten`](#flatten) - [`generate`](#generate) - [`groupBy`](#groupby) @@ -426,6 +431,27 @@ program([0, 1, true, false, null, undefined, '', 'test', NaN]); // [ 1, true, 'test' ]; ``` +#### `delay` + +[Table of contents](#table-of-contents) + +Will make he whole program async. It will add a delay of x milliseconds when an +item goes through the stream. + +```js +import { pipe, range, delay, map, toArray } from 'lazy-collections'; + +const program = pipe( + range(0, 4), + delay(5000), // 5 seconds + map(() => new Date().toLocaleTimeString()), + toArray() +); + +await program(); +// [ '10:00:00', '10:00:05', '10:00:10', '10:00:15', '10:00:20' ]; +``` + #### `flatten` [Table of contents](#table-of-contents) diff --git a/src/__snapshots__/index.test.ts.snap b/src/__snapshots__/index.test.ts.snap index 86da013..eb8a82a 100644 --- a/src/__snapshots__/index.test.ts.snap +++ b/src/__snapshots__/index.test.ts.snap @@ -7,6 +7,7 @@ Object { "compact": [Function], "compose": [Function], "concat": [Function], + "delay": [Function], "every": [Function], "filter": [Function], "find": [Function], diff --git a/src/delay.test.ts b/src/delay.test.ts new file mode 100644 index 0000000..1bdbaac --- /dev/null +++ b/src/delay.test.ts @@ -0,0 +1,40 @@ +import { pipe } from './pipe'; +import { range } from './range'; +import { delay } from './delay'; +import { map } from './map'; +import { every } from './every'; +import { tap } from './tap'; +import { chunk } from './chunk'; + +it('should delay each value by 100ms', async () => { + const counter = jest.fn(); + + const program = pipe( + // Create a range of 6 values. + range(0, 5), + + // Delay each value with 100ms. + delay(100), + + // Map each value to the current date. + map(() => Date.now()), + + // Call the counter, just to be sure that we actually saw 6 values. + tap(() => counter()), + + // Group values per 2, this way we can compare the difference between them. + chunk(2), + + // Map the chunked values to the difference between the two values. + map(([a, b]: [number, number]) => b - a), + + // Verify that the diff is 100ms or more, this way we can validate that + // there has been a delay of 100ms at minimum. + every((diff: number) => diff >= 100) + ); + + const result = await program(); + + expect(result).toBe(true); + expect(counter).toHaveBeenCalledTimes(6); +}); diff --git a/src/delay.ts b/src/delay.ts new file mode 100644 index 0000000..fd097e3 --- /dev/null +++ b/src/delay.ts @@ -0,0 +1,16 @@ +function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +export function delay(ms: number) { + return async function* delayFn(data: Iterable | AsyncIterable) { + if (data == null) { + return; + } + + for await (let datum of data) { + await sleep(ms); + yield datum; + } + }; +} diff --git a/src/index.ts b/src/index.ts index 868dfcd..6eed355 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,7 @@ export * from './some'; export * from './average'; export * from './chunk'; export * from './compact'; +export * from './delay'; export * from './flatten'; export * from './generate'; export * from './groupBy'; From bab361d7c218615d4e1c8451a051c78d5a3e2b85 Mon Sep 17 00:00:00 2001 From: Robin Malfait Date: Sun, 23 Feb 2020 00:32:45 +0100 Subject: [PATCH 2/5] introduce async versions of the various utilities --- README.md | 8 ++- src/average.test.ts | 17 +++++ src/average.ts | 33 ++++++--- src/chunk.test.ts | 28 +++++++- src/chunk.ts | 80 +++++++++++++++------ src/compact.test.ts | 15 ++++ src/compose.ts | 4 +- src/concat.test.ts | 19 +++++ src/concat.ts | 28 +++++++- src/delay.ts | 10 ++- src/every.test.ts | 39 +++++++++- src/every.ts | 23 +++++- src/filter.test.ts | 23 ++++++ src/filter.ts | 40 ++++++++--- src/find.test.ts | 41 +++++++++-- src/find.ts | 21 +++++- src/findIndex.test.ts | 57 +++++++++++++-- src/findIndex.ts | 23 +++++- src/flatten.test.ts | 25 +++++++ src/flatten.ts | 68 +++++++++++++----- src/{generator.test.ts => generate.test.ts} | 0 src/groupBy.test.ts | 28 ++++++++ src/groupBy.ts | 24 ++++++- src/head.test.ts | 25 +++++++ src/head.ts | 22 +++++- src/map.test.ts | 24 +++++++ src/map.ts | 31 +++++++- src/max.test.ts | 7 ++ src/max.ts | 14 +--- src/min.test.ts | 7 ++ src/min.ts | 14 +--- src/partition.test.ts | 28 ++++++++ src/partition.ts | 30 +++++++- src/pipe.ts | 2 +- src/reduce.test.ts | 19 +++++ src/reduce.ts | 16 ++++- src/reverse.test.ts | 36 ++++++++++ src/reverse.ts | 47 +++++++++--- src/shared-types.ts | 1 + src/skip.test.ts | 19 +++++ src/slice.test.ts | 40 ++++++++++- src/slice.ts | 65 ++++++++++++----- src/some.test.ts | 37 +++++++++- src/some.ts | 21 +++++- src/sum.test.ts | 33 +++++++++ src/take.test.ts | 19 +++++ src/takeWhile.test.ts | 26 +++++++ src/takeWhile.ts | 37 ++++++++-- src/tap.test.ts | 30 ++++++++ src/toArray.test.ts | 13 ++++ src/toArray.ts | 10 ++- src/unique.test.ts | 22 ++++++ src/unique.ts | 35 +++++++-- src/utils/iterator.ts | 15 ++++ src/where.test.ts | 57 +++++++++++++++ 55 files changed, 1303 insertions(+), 153 deletions(-) rename src/{generator.test.ts => generate.test.ts} (100%) create mode 100644 src/shared-types.ts create mode 100644 src/utils/iterator.ts diff --git a/README.md b/README.md index e420dde..face6a4 100644 --- a/README.md +++ b/README.md @@ -14,17 +14,19 @@ --- -Working with methods like `.map()`, `.filter()` and `.reduce()` are nice, +Working with methods like `.map()`, `.filter()` and `.reduce()` is nice, however they create new arrays and everything is eagerly done before going to the next step. -This is where lazy collections come in, under the hood we use generators so that -your data flows like a stream to have the optimal speed. +This is where lazy collections come in, under the hood we use [iterators][1] and +async iterators so that your data flows like a stream to have the optimal speed. All functions should work with both `iterator` and `asyncIterator`, if one of the functions uses an `asyncIterator` (for example when you introduce `delay(100)`), don't forget to `await` the result! +[1]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#The_iterator_protocol + ```js const program = pipe( map(x => x * 2), diff --git a/src/average.test.ts b/src/average.test.ts index 406b7dc..b915a55 100644 --- a/src/average.test.ts +++ b/src/average.test.ts @@ -1,5 +1,6 @@ import { pipe } from './pipe'; import { average } from './average'; +import { delay } from './delay'; it('should be possible to get an average of all the values', () => { const program = pipe(average()); @@ -7,3 +8,19 @@ it('should be possible to get an average of all the values', () => { expect(program([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])).toEqual(5); expect(program([10, 10, 10])).toEqual(10); }); + +it('should be possible to get an average of all the values (async)', async () => { + const program = pipe(delay(0), average()); + + expect(await program([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])).toEqual(5); + expect(await program([10, 10, 10])).toEqual(10); +}); + +it('should be possible to get an average of all the values (Promise async)', async () => { + const program = pipe(delay(0), average()); + + expect( + await program(Promise.resolve([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) + ).toEqual(5); + expect(await program(Promise.resolve([10, 10, 10]))).toEqual(10); +}); diff --git a/src/average.ts b/src/average.ts index 873651a..6e097ce 100644 --- a/src/average.ts +++ b/src/average.ts @@ -1,14 +1,29 @@ -export function average() { - return function averageFn(data: Iterable) { - let sum = 0; - let count = 0; +import { reduce } from './reduce'; +import { chunk } from './chunk'; +import { map } from './map'; +import { head } from './head'; +import { pipe } from './pipe'; +import { MaybePromise } from './shared-types'; - for (let datum of data) { - sum += datum; - count++; - } +export function average() { + return function averageFn( + data: MaybePromise | AsyncIterable> + ) { + const program = pipe( + reduce<[number, number], number>( + (acc, current) => { + acc[0] += current; + acc[1] += 1; + return acc; + }, + [0, 0] + ), + chunk(2), + map(([sum, count]: [number, number]) => sum / count), + head() + ); - return sum / count; + return program(data); }; } diff --git a/src/chunk.test.ts b/src/chunk.test.ts index 38777b7..75dfebc 100644 --- a/src/chunk.test.ts +++ b/src/chunk.test.ts @@ -2,11 +2,35 @@ import { pipe } from './pipe'; import { range } from './range'; import { chunk } from './chunk'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should create chunked items', () => { - const program = pipe(range(0, 10), chunk(3), toArray()); + const program = pipe(chunk(3), toArray()); - expect(program()).toEqual([ + expect(program(range(0, 10))).toEqual([ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8], + [9, 10], + ]); + expect(program(range(0, 10))).toEqual([ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8], + [9, 10], + ]); +}); + +it('should create chunked items (async)', async () => { + const program = pipe(delay(0), chunk(3), toArray()); + + expect(await program(range(0, 10))).toEqual([ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8], + [9, 10], + ]); + expect(await program(range(0, 10))).toEqual([ [0, 1, 2], [3, 4, 5], [6, 7, 8], diff --git a/src/chunk.ts b/src/chunk.ts index b05b55f..c50f734 100644 --- a/src/chunk.ts +++ b/src/chunk.ts @@ -1,26 +1,64 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + export function chunk(size: number) { - return function* chunkFn(data: Iterable) { - // Let's have a placeholder for our current chunk - let chunk = []; - - // Loop over our data - for (let datum of data) { - // Add item to our current chunk - chunk.push(datum); - - if (chunk.length === size) { - // Our current chunk is full, let's yield it - yield chunk; - - // Let's also clear our chunk for the next chunk - chunk = []; - } - } + return function chunkFn(data: MaybePromise | AsyncIterable>) { + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + + // Let's have a placeholder for our current chunk + let chunk = []; + + // Loop over our data + for await (let datum of stream) { + // Add item to our current chunk + chunk.push(datum); + + if (chunk.length === size) { + // Our current chunk is full, let's yield it + yield chunk; + + // Let's also clear our chunk for the next chunk + chunk = []; + } + } - // When the chunk is not full yet, but when we are at the end of the data we - // have to ensure that this one is also yielded - if (chunk.length > 0) { - yield chunk; + // When the chunk is not full yet, but when we are at the end of the data we + // have to ensure that this one is also yielded + if (chunk.length > 0) { + yield chunk; + } + }, + }; } + + return { + *[Symbol.iterator]() { + // Let's have a placeholder for our current chunk + let chunk = []; + + // Loop over our data + for (let datum of data) { + // Add item to our current chunk + chunk.push(datum); + + if (chunk.length === size) { + // Our current chunk is full, let's yield it + yield chunk; + + // Let's also clear our chunk for the next chunk + chunk = []; + } + } + + // When the chunk is not full yet, but when we are at the end of the data we + // have to ensure that this one is also yielded + if (chunk.length > 0) { + yield chunk; + } + }, + }; }; } diff --git a/src/compact.test.ts b/src/compact.test.ts index 098ca69..99a7f64 100644 --- a/src/compact.test.ts +++ b/src/compact.test.ts @@ -1,6 +1,7 @@ import { pipe } from './pipe'; import { compact } from './compact'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should remove all falsey values', () => { const program = pipe(compact(), toArray()); @@ -8,4 +9,18 @@ it('should remove all falsey values', () => { expect( program([0, 1, true, false, null, undefined, '', 'test', NaN]) ).toEqual([1, true, 'test']); + expect( + program([0, 1, true, false, null, undefined, '', 'test', NaN]) + ).toEqual([1, true, 'test']); +}); + +it('should remove all falsey values (async)', async () => { + const program = pipe(delay(0), compact(), toArray()); + + expect( + await program([0, 1, true, false, null, undefined, '', 'test', NaN]) + ).toEqual([1, true, 'test']); + expect( + await program([0, 1, true, false, null, undefined, '', 'test', NaN]) + ).toEqual([1, true, 'test']); }); diff --git a/src/compose.ts b/src/compose.ts index c56420f..445f84f 100644 --- a/src/compose.ts +++ b/src/compose.ts @@ -3,8 +3,8 @@ import { ensureFunction } from './utils/ensureFunction'; type Fn = (...args: any) => any; export function compose( - fn: Fn | Iterable, - ...fns: (Fn | Iterable)[] + fn: Fn | Iterable | AsyncIterable, + ...fns: (Fn | Iterable | AsyncIterable)[] ): Fn { return fns.reduce((f: Fn, g) => { const g_ = ensureFunction(g); diff --git a/src/concat.test.ts b/src/concat.test.ts index 1c67cb7..e9a5271 100644 --- a/src/concat.test.ts +++ b/src/concat.test.ts @@ -2,6 +2,7 @@ import { pipe } from './pipe'; import { concat } from './concat'; import { range } from './range'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should concat arrays', () => { const program = pipe( @@ -20,3 +21,21 @@ it('should concat iterators', () => { expect(program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); + +it('should concat iterators (async)', async () => { + const program = pipe( + concat(range(0, 3), delay(0)(range(4, 7)), range(8, 10)), + toArray() + ); + + expect(await program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); +}); + +it('should concat iterators (Promise async)', async () => { + const program = pipe( + concat(range(0, 3), range(4, 7), Promise.resolve(range(8, 10))), + toArray() + ); + + expect(await program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); +}); diff --git a/src/concat.ts b/src/concat.ts index 3a5156e..95e6008 100644 --- a/src/concat.ts +++ b/src/concat.ts @@ -1,5 +1,27 @@ -export function* concat(...data: Iterable[]) { - for (let datum of data) { - yield* datum; +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + +export function concat( + ...data: MaybePromise | AsyncIterable>[] +) { + if ( + data.some(isAsyncIterable) || + data.some(datum => datum instanceof Promise) + ) { + return { + async *[Symbol.asyncIterator]() { + for await (let datum of await Promise.all(data)) { + yield* datum; + } + }, + }; } + + return { + *[Symbol.iterator]() { + for (let datum of data as Iterable[]) { + yield* datum; + } + }, + }; } diff --git a/src/delay.ts b/src/delay.ts index fd097e3..b51ff24 100644 --- a/src/delay.ts +++ b/src/delay.ts @@ -1,14 +1,20 @@ +import { MaybePromise } from './shared-types'; + function sleep(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)); } export function delay(ms: number) { - return async function* delayFn(data: Iterable | AsyncIterable) { + return async function* delayFn( + data: MaybePromise | AsyncIterable> + ) { if (data == null) { return; } - for await (let datum of data) { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { await sleep(ms); yield datum; } diff --git a/src/every.test.ts b/src/every.test.ts index 7620b5e..9eae04a 100644 --- a/src/every.test.ts +++ b/src/every.test.ts @@ -1,8 +1,9 @@ import { pipe } from './pipe'; import { range } from './range'; import { every } from './every'; +import { delay } from './delay'; -it('should return true when every value matches the predicete', () => { +it('should return true when every value matches the predicate', () => { const program = pipe( range(0, 25), every(x => typeof x === 'number') @@ -19,3 +20,39 @@ it("should return false when one of the values doesn't meet the predicate", () = expect(program()).toEqual(false); }); + +it('should return true when every value matches the predicate (async)', async () => { + const program = pipe( + delay(0), + every(x => typeof x === 'number') + ); + + expect(await program(range(0, 25))).toEqual(true); + expect(await program(range(0, 25))).toEqual(true); +}); + +it("should return false when one of the values doesn't meet the predicate (async)", async () => { + const program = pipe( + delay(0), + every((x: number) => x < 100) // 100 is not less than 100 + ); + + expect(await program(range(0, 100))).toEqual(false); + expect(await program(range(0, 100))).toEqual(false); +}); + +it('should return true when every value matches the predicate (Promise async)', async () => { + const program = pipe(every(x => typeof x === 'number')); + + expect(await program(Promise.resolve(range(0, 25)))).toEqual(true); + expect(await program(Promise.resolve(range(0, 25)))).toEqual(true); +}); + +it("should return false when one of the values doesn't meet the predicate (Promise async)", async () => { + const program = pipe( + every((x: number) => x < 100) // 100 is not less than 100 + ); + + expect(await program(Promise.resolve(range(0, 100)))).toEqual(false); + expect(await program(Promise.resolve(range(0, 100)))).toEqual(false); +}); diff --git a/src/every.ts b/src/every.ts index ffbbc23..83d9cd5 100644 --- a/src/every.ts +++ b/src/every.ts @@ -1,7 +1,28 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (input: T) => boolean; export function every(predicate: Fn) { - return function everyFn(data: Iterable): boolean { + return function everyFn(data: MaybePromise | AsyncIterable>) { + if (data == null) { + return; + } + + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + if (!predicate(datum)) { + return false; + } + } + + return true; + })(); + } + for (let datum of data) { if (!predicate(datum)) { return false; diff --git a/src/filter.test.ts b/src/filter.test.ts index d9f7ea9..c0dac99 100644 --- a/src/filter.test.ts +++ b/src/filter.test.ts @@ -1,6 +1,7 @@ import { pipe } from './pipe'; import { filter } from './filter'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should be possible to filter data', () => { const program = pipe( @@ -9,4 +10,26 @@ it('should be possible to filter data', () => { ); expect(program([1, 2, 3])).toEqual([2]); + expect(program([1, 2, 3])).toEqual([2]); +}); + +it('should be possible to filter data (async)', async () => { + const program = pipe( + delay(0), + filter((x: number) => x % 2 === 0), // Is even + toArray() + ); + + expect(await program([1, 2, 3])).toEqual([2]); + expect(await program([1, 2, 3])).toEqual([2]); +}); + +it('should be possible to filter data (Promise async)', async () => { + const program = pipe( + filter((x: number) => x % 2 === 0), // Is even + toArray() + ); + + expect(await program(Promise.resolve([1, 2, 3]))).toEqual([2]); + expect(await program(Promise.resolve([1, 2, 3]))).toEqual([2]); }); diff --git a/src/filter.ts b/src/filter.ts index ad394c2..7b18214 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,14 +1,38 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (datum: T) => boolean; export function filter(fn: Fn) { - return function* filterFn(data: Iterable) { - for (let datum of data) { - // Ignore values that do not meet the criteria - if (!fn(datum)) { - continue; - } - - yield datum; + return function filterFn(data: MaybePromise | AsyncIterable>) { + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + // Ignore values that do not meet the criteria + if (!fn(datum)) { + continue; + } + + yield datum; + } + }, + }; } + + return { + *[Symbol.iterator]() { + for (let datum of data as Iterable) { + // Ignore values that do not meet the criteria + if (!fn(datum)) { + continue; + } + + yield datum; + } + }, + }; }; } diff --git a/src/find.test.ts b/src/find.test.ts index 3ea42ff..d3a884d 100644 --- a/src/find.test.ts +++ b/src/find.test.ts @@ -1,21 +1,52 @@ import { pipe } from './pipe'; import { range } from './range'; import { find } from './find'; +import { delay } from './delay'; it('should find a value in the stream', () => { + const program = pipe(find(x => x === 2)); + + expect(program(range(0, 100))).toEqual(2); + expect(program(range(0, 100))).toEqual(2); +}); + +it('should return undefined when the value is not found', () => { + const program = pipe(find(x => x === 101)); + + expect(program(range(0, 100))).toEqual(undefined); + expect(program(range(0, 100))).toEqual(undefined); +}); + +it('should find a value in the stream (async)', async () => { const program = pipe( - range(0, 100), + delay(0), find(x => x === 2) ); - expect(program()).toEqual(2); + expect(await program(range(0, 100))).toEqual(2); + expect(await program(range(0, 100))).toEqual(2); }); -it('should return undefined when the value is not found', () => { +it('should return undefined when the value is not found (async)', async () => { const program = pipe( - range(0, 100), + delay(0), find(x => x === 101) ); - expect(program()).toEqual(undefined); + expect(await program(range(0, 100))).toEqual(undefined); + expect(await program(range(0, 100))).toEqual(undefined); +}); + +it('should find a value in the stream (Promise async)', async () => { + const program = pipe(find(x => x === 2)); + + expect(await program(Promise.resolve(range(0, 100)))).toEqual(2); + expect(await program(Promise.resolve(range(0, 100)))).toEqual(2); +}); + +it('should return undefined when the value is not found (Promise async)', async () => { + const program = pipe(find(x => x === 101)); + + expect(await program(Promise.resolve(range(0, 100)))).toEqual(undefined); + expect(await program(Promise.resolve(range(0, 100)))).toEqual(undefined); }); diff --git a/src/find.ts b/src/find.ts index 968d698..50c7b56 100644 --- a/src/find.ts +++ b/src/find.ts @@ -1,7 +1,26 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (input: T) => boolean; export function find(predicate: Fn) { - return function findFn(data: Iterable): T | undefined { + return function findFn( + data: MaybePromise | AsyncIterable> + ): (T | undefined) | Promise { + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + if (predicate(datum)) { + return datum; + } + } + + return undefined; + })(); + } + for (let datum of data) { if (predicate(datum)) { return datum; diff --git a/src/findIndex.test.ts b/src/findIndex.test.ts index f3ad12a..f999a18 100644 --- a/src/findIndex.test.ts +++ b/src/findIndex.test.ts @@ -2,22 +2,71 @@ import { pipe } from './pipe'; import { range } from './range'; import { findIndex } from './findIndex'; import { map } from './map'; +import { delay } from './delay'; it('should find the index based on the predicate', () => { const program = pipe( - range(0, 25), map((x: number) => String.fromCharCode(x + 65)), findIndex(x => x === 'T') ); - expect(program()).toEqual('ABCDEFGHIJKLMNOPQRSTUVWXYZ'.indexOf('T')); + expect(program(range(0, 25))).toEqual( + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.indexOf('T') + ); + expect(program(range(0, 25))).toEqual( + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.indexOf('T') + ); }); it('should return -1 when the index is not found', () => { + const program = pipe(findIndex(x => x === 101)); + + expect(program(range(0, 100))).toEqual(-1); + expect(program(range(0, 100))).toEqual(-1); +}); + +it('should find the index based on the predicate (async)', async () => { + const program = pipe( + delay(0), + map((x: number) => String.fromCharCode(x + 65)), + findIndex(x => x === 'T') + ); + + expect(await program(range(0, 25))).toEqual( + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.indexOf('T') + ); + expect(await program(range(0, 25))).toEqual( + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.indexOf('T') + ); +}); + +it('should return -1 when the index is not found (async)', async () => { const program = pipe( - range(0, 100), + delay(0), findIndex(x => x === 101) ); - expect(program()).toEqual(-1); + expect(await program(range(0, 100))).toEqual(-1); + expect(await program(range(0, 100))).toEqual(-1); +}); + +it('should find the index based on the predicate (Promise async)', async () => { + const program = pipe( + map((x: number) => String.fromCharCode(x + 65)), + findIndex(x => x === 'T') + ); + + expect(await program(Promise.resolve(range(0, 25)))).toEqual( + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.indexOf('T') + ); + expect(await program(Promise.resolve(range(0, 25)))).toEqual( + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.indexOf('T') + ); +}); + +it('should return -1 when the index is not found (Promise async)', async () => { + const program = pipe(findIndex(x => x === 101)); + + expect(await program(Promise.resolve(range(0, 100)))).toEqual(-1); + expect(await program(Promise.resolve(range(0, 100)))).toEqual(-1); }); diff --git a/src/findIndex.ts b/src/findIndex.ts index 7a37cf6..dca603c 100644 --- a/src/findIndex.ts +++ b/src/findIndex.ts @@ -1,7 +1,28 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (input: T) => boolean; export function findIndex(predicate: Fn) { - return function findIndexFn(data: Iterable): number { + return function findIndexFn( + data: MaybePromise | AsyncIterable> + ): number | Promise { + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + + let i = 0; + for await (let datum of stream) { + if (predicate(datum)) { + return i; + } + i++; + } + + return -1; + })(); + } + let i = 0; for (let datum of data) { if (predicate(datum)) { diff --git a/src/flatten.test.ts b/src/flatten.test.ts index b5062f3..b4ac8b6 100644 --- a/src/flatten.test.ts +++ b/src/flatten.test.ts @@ -2,15 +2,40 @@ import { pipe } from './pipe'; import { flatten } from './flatten'; import { range } from './range'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should be possible to flatten data (shallow)', () => { const program = pipe(flatten({ shallow: true }), toArray()); expect(program([1, [2], range(3, 10)])).toEqual(Array.from(range(1, 10))); + expect(program([1, [2], range(3, 10)])).toEqual(Array.from(range(1, 10))); }); it('should be possible to deep flatten data', () => { const program = pipe(flatten(), toArray()); expect(program([1, [2, [3, [[[4]], [5]]]]])).toEqual(Array.from(range(1, 5))); + expect(program([1, [2, [3, [[[4]], [5]]]]])).toEqual(Array.from(range(1, 5))); +}); + +it('should be possible to deep flatten data (async)', async () => { + const program = pipe(delay(0), flatten(), toArray()); + + expect(await program([1, [2, [3, [[[4]], [5]]]]])).toEqual( + Array.from(range(1, 5)) + ); + expect(await program([1, [2, [3, [[[4]], [5]]]]])).toEqual( + Array.from(range(1, 5)) + ); +}); + +it('should be possible to deep flatten data (Promise async)', async () => { + const program = pipe(flatten(), toArray()); + + expect(await program(Promise.resolve([1, [2, [3, [[[4]], [5]]]]]))).toEqual( + Array.from(range(1, 5)) + ); + expect(await program(Promise.resolve([1, [2, [3, [[[4]], [5]]]]]))).toEqual( + Array.from(range(1, 5)) + ); }); diff --git a/src/flatten.ts b/src/flatten.ts index 16399c9..4c82cd7 100644 --- a/src/flatten.ts +++ b/src/flatten.ts @@ -1,3 +1,6 @@ +import { MaybePromise } from './shared-types'; +import { isAsyncIterable, isIterable } from './utils/iterator'; + type Options = { shallow?: boolean; }; @@ -5,24 +8,57 @@ type Options = { export function flatten(options: Options = {}) { const { shallow = false } = options; - return function* flattenFn(data: Iterable): any { - if (!Array.isArray(data) && !data[Symbol.iterator]) { - yield data; - } else { - for (let datum of data) { - if (shallow) { - // If the value itself is an iterator, we have to flatten that as - // well. - if ((datum as any)[Symbol.iterator]) { - yield* datum as any; - } else { - yield datum; + return function flattenFn( + data: MaybePromise | AsyncIterable> + ): any { + if (data == null) { + return; + } + + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + if (shallow) { + // If the value itself is an iterator, we have to flatten that as + // well. + if (isAsyncIterable(datum)) { + yield* datum; + } else { + yield datum; + } + } else { + // Let's go recursive + yield* await flattenFn(datum as any); + } } + }, + }; + } + + return { + *[Symbol.iterator]() { + if (!Array.isArray(data)) { + yield data; } else { - // Let's go recursive - yield* flattenFn(datum as any); + for (let datum of data) { + if (shallow) { + // If the value itself is an iterator, we have to flatten that as + // well. + if (isIterable(datum)) { + yield* datum; + } else { + yield datum; + } + } else { + // Let's go recursive + yield* flattenFn(datum); + } + } } - } - } + }, + }; }; } diff --git a/src/generator.test.ts b/src/generate.test.ts similarity index 100% rename from src/generator.test.ts rename to src/generate.test.ts diff --git a/src/groupBy.test.ts b/src/groupBy.test.ts index 1a9c7d6..1626a72 100644 --- a/src/groupBy.test.ts +++ b/src/groupBy.test.ts @@ -1,6 +1,7 @@ import { pipe } from './pipe'; import { range } from './range'; import { groupBy } from './groupBy'; +import { delay } from './delay'; function snap(multitude: number, value: number) { return Math.ceil(value / multitude) * multitude; @@ -18,3 +19,30 @@ it('should be possible to group an iterator by something', () => { 10: [6, 7, 8, 9, 10], }); }); + +it('should be possible to group an iterator by something (async)', async () => { + const program = pipe( + range(0, 10), + delay(0), + groupBy((x: number) => snap(5, x)) + ); + + expect(await program()).toEqual({ + 0: [0], + 5: [1, 2, 3, 4, 5], + 10: [6, 7, 8, 9, 10], + }); +}); + +it('should be possible to group an iterator by something (Promise async)', async () => { + const program = pipe( + Promise.resolve(range(0, 10)) as any, + groupBy((x: number) => snap(5, x)) + ); + + expect(await program()).toEqual({ + 0: [0], + 5: [1, 2, 3, 4, 5], + 10: [6, 7, 8, 9, 10], + }); +}); diff --git a/src/groupBy.ts b/src/groupBy.ts index b485186..c568dcf 100644 --- a/src/groupBy.ts +++ b/src/groupBy.ts @@ -1,7 +1,29 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type KeyFn = (input: T) => string | number; export function groupBy(keySelector: KeyFn) { - return function groupByFn(data: Iterable) { + return function groupByFn( + data: MaybePromise | AsyncIterable> + ) { + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + let map: Record, T[]> = {}; + for await (let datum of stream) { + const key = keySelector(datum); + if (map[key] === undefined) { + map[key] = []; + } + + map[key].push(datum); + } + + return map; + })(); + } + let map: Record, T[]> = {}; for (let datum of data) { const key = keySelector(datum); diff --git a/src/head.test.ts b/src/head.test.ts index d4ea88f..3eaba4d 100644 --- a/src/head.test.ts +++ b/src/head.test.ts @@ -1,6 +1,7 @@ import { pipe } from './pipe'; import { range } from './range'; import { head, first } from './head'; +import { delay } from './delay'; it('should return the first element of the iterator', () => { const program = pipe(range(20, 25), head()); @@ -25,3 +26,27 @@ it('should return undefined when there is an empty array', () => { expect(program([])).toEqual(undefined); }); + +it('should return the first element of the iterator (async)', async () => { + const program = pipe(range(20, 25), delay(0), head()); + + expect(await program()).toEqual(20); +}); + +it('should return the first element of the iterator (using an alias) (async)', async () => { + const program = pipe(range(20, 25), delay(0), first()); + + expect(await program()).toEqual(20); +}); + +it('should return undefined when there is no data (async)', async () => { + const program = pipe(delay(0), first()); + + expect(await program()).toEqual(undefined); +}); + +it('should return undefined when there is an empty array (async)', async () => { + const program = pipe(delay(0), first()); + + expect(await program([])).toEqual(undefined); +}); diff --git a/src/head.ts b/src/head.ts index 1bb1018..7ae1564 100644 --- a/src/head.ts +++ b/src/head.ts @@ -1,7 +1,23 @@ +import { isAsyncIterable } from './utils/iterator'; + export function head() { - return function headFn(data: Iterable): T | undefined { - if (data == null || (!Array.isArray(data) && !data[Symbol.iterator])) { - return undefined; + return function headFn( + data: Iterable | AsyncIterable + ): T | undefined | Promise { + if (data == null) { + return; + } + + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + return datum; + } + + return undefined; + })(); } for (let datum of data) { diff --git a/src/map.test.ts b/src/map.test.ts index 0e7daf7..c9fe439 100644 --- a/src/map.test.ts +++ b/src/map.test.ts @@ -1,6 +1,7 @@ import { pipe } from './pipe'; import { map } from './map'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should be possible to map data from A to B', () => { const program = pipe( @@ -9,4 +10,27 @@ it('should be possible to map data from A to B', () => { ); expect(program([1, 2, 3])).toEqual([2, 4, 6]); + expect(program([1, 2, 3])).toEqual([2, 4, 6]); +}); + +it('should be possible to map data from A to B (async)', async () => { + const program = pipe( + delay(0), + map((x: number) => x * 2), // Double + toArray() + ); + + expect(await program([1, 2, 3])).toEqual([2, 4, 6]); + expect(await program([1, 2, 3])).toEqual([2, 4, 6]); +}); + +it('should be possible to map data from A to B (Promise async)', async () => { + const program = pipe( + delay(0), + map((x: number) => x * 2), // Double + toArray() + ); + + expect(await program(Promise.resolve([1, 2, 3]))).toEqual([2, 4, 6]); + expect(await program(Promise.resolve([1, 2, 3]))).toEqual([2, 4, 6]); }); diff --git a/src/map.ts b/src/map.ts index 62b5ebe..1c1784a 100644 --- a/src/map.ts +++ b/src/map.ts @@ -1,9 +1,34 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (datum: T) => R; export function map(fn: Fn) { - return function* mapFn(data: Iterable) { - for (let datum of data) { - yield fn(datum); + return function mapFn(data: MaybePromise | AsyncIterable>) { + if (data == null) { + return; } + + // Handle the async version + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + yield fn(datum); + } + }, + }; + } + + // Handle the sync version + return { + *[Symbol.iterator]() { + for (let datum of data) { + yield fn(datum); + } + }, + }; }; } diff --git a/src/max.test.ts b/src/max.test.ts index 9e215cc..f2ae8a0 100644 --- a/src/max.test.ts +++ b/src/max.test.ts @@ -1,9 +1,16 @@ import { pipe } from './pipe'; import { range } from './range'; import { max } from './max'; +import { delay } from './delay'; it('should find the max value of the iterator', () => { const program = pipe(range(5, 10), max()); expect(program()).toEqual(10); }); + +it('should find the max value of the iterator (async)', async () => { + const program = pipe(range(5, 10), delay(0), max()); + + expect(await program()).toEqual(10); +}); diff --git a/src/max.ts b/src/max.ts index 9389bb7..87a3cb9 100644 --- a/src/max.ts +++ b/src/max.ts @@ -1,13 +1,5 @@ +import { reduce } from './reduce'; + export function max() { - return function maxFn(data: number[]) { - let max: number | undefined = undefined; - for (let datum of data) { - if (max === undefined) { - max = datum; - continue; - } - max = Math.max(max, datum); - } - return max; - }; + return reduce(Math.max, -Infinity); } diff --git a/src/min.test.ts b/src/min.test.ts index fa87b51..d21b4dd 100644 --- a/src/min.test.ts +++ b/src/min.test.ts @@ -1,9 +1,16 @@ import { pipe } from './pipe'; import { range } from './range'; import { min } from './min'; +import { delay } from './delay'; it('should find the min value of the iterator', () => { const program = pipe(range(5, 10), min()); expect(program()).toEqual(5); }); + +it('should find the min value of the iterator (async)', async () => { + const program = pipe(range(5, 10), delay(0), min()); + + expect(await program()).toEqual(5); +}); diff --git a/src/min.ts b/src/min.ts index d5f08c3..9cd7031 100644 --- a/src/min.ts +++ b/src/min.ts @@ -1,13 +1,5 @@ +import { reduce } from './reduce'; + export function min() { - return function minFn(data: number[]) { - let min: number | undefined = undefined; - for (let datum of data) { - if (min === undefined) { - min = datum; - continue; - } - min = Math.min(min, datum); - } - return min; - }; + return reduce(Math.min, Infinity); } diff --git a/src/partition.test.ts b/src/partition.test.ts index cb6d875..eaec402 100644 --- a/src/partition.test.ts +++ b/src/partition.test.ts @@ -2,6 +2,7 @@ import { pipe } from './pipe'; import { range } from './range'; import { toArray } from './toArray'; import { partition } from './partition'; +import { delay } from './delay'; it('should partition the data into 2 streams based on the predicate', () => { const program = pipe( @@ -15,3 +16,30 @@ it('should partition the data into 2 streams based on the predicate', () => { [2, 4], ]); }); + +it('should partition the data into 2 streams based on the predicate (async)', async () => { + const program = pipe( + range(1, 4), + delay(0), + partition((x: number) => x % 2 !== 0), + toArray() + ); + + expect(await program()).toEqual([ + [1, 3], + [2, 4], + ]); +}); + +it('should partition the data into 2 streams based on the predicate (Promise async)', async () => { + const program = pipe( + Promise.resolve(range(1, 4)) as any, + partition((x: number) => x % 2 !== 0), + toArray() + ); + + expect(await program()).toEqual([ + [1, 3], + [2, 4], + ]); +}); diff --git a/src/partition.ts b/src/partition.ts index 84ce7a7..0228092 100644 --- a/src/partition.ts +++ b/src/partition.ts @@ -1,7 +1,35 @@ +import { MaybePromise } from './shared-types'; +import { isAsyncIterable } from './utils/iterator'; + type Fn = (input: T) => boolean; export function partition(predicate: Fn) { - return function partitionFn(data: Iterable): [T[], T[]] { + return function partitionFn( + data: MaybePromise | AsyncIterable> + ): MaybePromise<[T[], T[]]> | undefined { + if (data == null) { + return; + } + + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + + let a: T[] = []; + let b: T[] = []; + + for await (let datum of stream) { + if (predicate(datum)) { + a.push(datum); + } else { + b.push(datum); + } + } + + return [a, b] as [T[], T[]]; + })(); + } + let a = []; let b = []; diff --git a/src/pipe.ts b/src/pipe.ts index 988bd06..3bc180e 100644 --- a/src/pipe.ts +++ b/src/pipe.ts @@ -2,7 +2,7 @@ import { ensureFunction } from './utils/ensureFunction'; type Fn = (...args: any) => any; -export function pipe(...fns: (Fn | Iterable)[]): Fn { +export function pipe(...fns: (Fn | Iterable | AsyncIterable)[]): Fn { return fns.reduceRight((f: Fn, g) => { const g_ = ensureFunction(g); return (...args) => f(g_(...args)); diff --git a/src/reduce.test.ts b/src/reduce.test.ts index 2def6da..7c1b142 100644 --- a/src/reduce.test.ts +++ b/src/reduce.test.ts @@ -1,6 +1,25 @@ import { reduce } from './reduce'; +import { pipe } from './pipe'; +import { delay } from './delay'; it('should be possible to sum numbers (via reduce)', () => { const program = reduce((total, current) => total + current, 0); expect(program([1, 2, 3])).toEqual(6); + expect(program([1, 2, 3])).toEqual(6); +}); + +it('should be possible to sum numbers (via reduce) (async)', async () => { + const program = pipe( + delay(0), + reduce((total, current) => total + current, 0) + ); + expect(await program([1, 2, 3])).toEqual(6); + expect(await program([1, 2, 3])).toEqual(6); +}); + +it('should be possible to sum numbers (via reduce) (Promise async)', async () => { + const program = pipe(reduce((total, current) => total + current, 0)); + + expect(await program(Promise.resolve([1, 2, 3]))).toEqual(6); + expect(await program(Promise.resolve([1, 2, 3]))).toEqual(6); }); diff --git a/src/reduce.ts b/src/reduce.ts index c69914f..cd28567 100644 --- a/src/reduce.ts +++ b/src/reduce.ts @@ -1,8 +1,22 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (acc: R, datum: T) => R; export function reduce(fn: Fn, initial: R) { - return function reduceFn(data: Iterable) { + return function reduceFn(data: MaybePromise | AsyncIterable>) { let acc = initial; + + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + for await (let datum of stream) { + acc = fn(acc, datum); + } + return acc; + })(); + } + for (let datum of data) { acc = fn(acc, datum); } diff --git a/src/reverse.test.ts b/src/reverse.test.ts index 18abd88..78c299c 100644 --- a/src/reverse.test.ts +++ b/src/reverse.test.ts @@ -3,6 +3,7 @@ import { range } from './range'; import { toArray } from './toArray'; import { take } from './take'; import { reverse } from './reverse'; +import { delay } from './delay'; it('should be possible to reverse an iterator', () => { const program = pipe(range(0, 1_000), reverse(), take(5), toArray()); @@ -15,3 +16,38 @@ it('should be possible to reverse an iterator', () => { 1_000 - 4, ]); }); + +it('should be possible to reverse an iterator (async)', async () => { + const program = pipe( + range(0, 1_000), + delay(0), + reverse(), + take(5), + toArray() + ); + + expect(await program()).toEqual([ + 1_000, + 1_000 - 1, + 1_000 - 2, + 1_000 - 3, + 1_000 - 4, + ]); +}); + +it('should be possible to reverse an iterator (Promise async)', async () => { + const program = pipe( + Promise.resolve(range(0, 1_000)) as any, + reverse(), + take(5), + toArray() + ); + + expect(await program()).toEqual([ + 1_000, + 1_000 - 1, + 1_000 - 2, + 1_000 - 3, + 1_000 - 4, + ]); +}); diff --git a/src/reverse.ts b/src/reverse.ts index 0669d45..3426401 100644 --- a/src/reverse.ts +++ b/src/reverse.ts @@ -1,12 +1,43 @@ +import { isAsyncIterable } from './utils/iterator'; +import { pipe } from './pipe'; +import { toArray } from './toArray'; +import { MaybePromise } from './shared-types'; + +/** + * This is pretty slow because it has to first go through the whole iterator + * (to make it an array), then reverse the whole thing and then start + * yielding again. + */ export function reverse() { - return function* reverseFn(data: Iterable) { - /** - * This is pretty slow because it has to first go through the whole iterator - * (to make it an array), then reverse the whole thing and then start - * yielding again. - */ - for (let datum of Array.from(data).reverse()) { - yield datum; + return function reverseFn( + data: MaybePromise | AsyncIterable> + ) { + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + + const program = pipe(toArray()); + const array = await program(stream); + + for await (let datum of array.reverse()) { + yield datum; + } + }, + }; } + + return { + *[Symbol.iterator]() { + /** + * This is pretty slow because it has to first go through the whole iterator + * (to make it an array), then reverse the whole thing and then start + * yielding again. + */ + for (let datum of Array.from(data).reverse()) { + yield datum; + } + }, + }; }; } diff --git a/src/shared-types.ts b/src/shared-types.ts new file mode 100644 index 0000000..9cd354b --- /dev/null +++ b/src/shared-types.ts @@ -0,0 +1 @@ +export type MaybePromise = T | Promise; diff --git a/src/skip.test.ts b/src/skip.test.ts index 650293a..89e910d 100644 --- a/src/skip.test.ts +++ b/src/skip.test.ts @@ -3,6 +3,7 @@ import { range } from './range'; import { toArray } from './toArray'; import { skip } from './skip'; import { take } from './take'; +import { delay } from './delay'; it('should skip x values', () => { const program = pipe(skip(5), toArray()); @@ -15,3 +16,21 @@ it('should skip x values and take y values', () => { expect(program(range(0, 10))).toEqual([5, 6, 7]); }); + +it('should skip x values (async)', async () => { + const program = pipe(delay(0), skip(5), toArray()); + + expect(await program(range(0, 10))).toEqual([5, 6, 7, 8, 9, 10]); +}); + +it('should skip x values and take y values (async)', async () => { + const program = pipe(delay(0), skip(5), take(3), toArray()); + + expect(await program(range(0, 10))).toEqual([5, 6, 7]); +}); + +it('should skip x values and take y values (Promise async)', async () => { + const program = pipe(skip(5), take(3), toArray()); + + expect(await program(Promise.resolve(range(0, 10)))).toEqual([5, 6, 7]); +}); diff --git a/src/slice.test.ts b/src/slice.test.ts index 609056b..b61a1f3 100644 --- a/src/slice.test.ts +++ b/src/slice.test.ts @@ -2,6 +2,7 @@ import { pipe } from './pipe'; import { range } from './range'; import { slice } from './slice'; import { toArray } from './toArray'; +import { delay } from './delay'; it.each([ [0, 10, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]], @@ -13,10 +14,45 @@ it.each([ expect(program(range(0, 1_000_000_000))).toEqual(expected); }); +it.each([ + [0, 10, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]], + [4, 6, [4, 5, 6]], +])( + 'should be possible to slice from %p until %p (async)', + async (start, end, expected) => { + const program = pipe(delay(0), slice(start, end), toArray()); + + expect(await program(range(0, 1_000_000_000))).toEqual(expected); + expect(await program(range(0, 1_000_000_000))).toEqual(expected); + } +); + it('should be possible to pass an array to slice', () => { - expect(Array.from(slice(0, 1)([1, 2, 3, 4]))).toEqual([1, 2]); + const program = pipe(slice(0, 1), toArray()); + expect(program([1, 2, 3, 4])).toEqual([1, 2]); }); it('should be possible to pass an iterator to slice', () => { - expect(Array.from(slice(0, 1)(range(1, 4)))).toEqual([1, 2]); + const program = pipe(slice(0, 1), toArray()); + expect(program(range(1, 4))).toEqual([1, 2]); +}); + +it('should be possible to pass an array to slice (async)', async () => { + const program = pipe(delay(0), slice(0, 1), toArray()); + expect(await program([1, 2, 3, 4])).toEqual([1, 2]); +}); + +it('should be possible to pass an iterator to slice (async)', async () => { + const program = pipe(delay(0), slice(0, 1), toArray()); + expect(await program(range(1, 4))).toEqual([1, 2]); +}); + +it('should be possible to pass an array to slice (Promise async)', async () => { + const program = pipe(slice(0, 1), toArray()); + expect(await program(Promise.resolve([1, 2, 3, 4]))).toEqual([1, 2]); +}); + +it('should be possible to pass an iterator to slice (Promise async)', async () => { + const program = pipe(slice(0, 1), toArray()); + expect(await program(Promise.resolve(range(1, 4)))).toEqual([1, 2]); }); diff --git a/src/slice.ts b/src/slice.ts index f70595e..7a3fccf 100644 --- a/src/slice.ts +++ b/src/slice.ts @@ -1,24 +1,57 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + export function slice(begin = 0, end = Infinity) { - return function* sliceFn(data: Iterable) { - const iterator = Array.isArray(data) - ? (data[Symbol.iterator]() as IterableIterator) - : (data as IterableIterator); + return function sliceFn(data: MaybePromise | AsyncIterable>) { + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + const iterator = stream as AsyncIterableIterator; - let local_begin = begin; - let local_end = end - local_begin; + let local_begin = begin; + let local_end = end - local_begin; - // Skip the first X values - while (local_begin-- > 0) { - iterator.next(); - } + // Skip the first X values + while (local_begin-- > 0) { + iterator.next(); + } - // Loop through the remaining items until the end is reached - for (let datum of iterator) { - yield datum; + // Loop through the remaining items until the end is reached + for await (let datum of iterator) { + yield datum; - if (--local_end < 0) { - return; - } + if (--local_end < 0) { + return; + } + } + }, + }; } + + return { + *[Symbol.iterator]() { + const iterator = Array.isArray(data) + ? (data[Symbol.iterator]() as IterableIterator) + : (data as IterableIterator); + + let local_begin = begin; + let local_end = end - local_begin; + + // Skip the first X values + while (local_begin-- > 0) { + iterator.next(); + } + + // Loop through the remaining items until the end is reached + for (let datum of iterator) { + yield datum; + + if (--local_end < 0) { + return; + } + } + }, + }; }; } diff --git a/src/some.test.ts b/src/some.test.ts index a964df2..5fe6e59 100644 --- a/src/some.test.ts +++ b/src/some.test.ts @@ -1,8 +1,9 @@ import { pipe } from './pipe'; import { range } from './range'; import { some } from './some'; +import { delay } from './delay'; -it('should return true when some value matches the predicete', () => { +it('should return true when some value matches the predicate', () => { const program = pipe( range(0, 25), some(x => x === 12) @@ -19,3 +20,37 @@ it('should return false when non of the values match the predicate', () => { expect(program()).toEqual(false); }); + +it('should return true when some value matches the predicate (async)', async () => { + const program = pipe( + delay(0), + some(x => x === 12) + ); + + expect(await program(range(0, 25))).toEqual(true); + expect(await program(range(0, 25))).toEqual(true); +}); + +it('should return false when non of the values match the predicate (async)', async () => { + const program = pipe( + delay(0), + some((x: number) => x > 100) + ); + + expect(await program(range(0, 100))).toEqual(false); + expect(await program(range(0, 100))).toEqual(false); +}); + +it('should return true when some value matches the predicate (Promise async)', async () => { + const program = pipe(some(x => x === 12)); + + expect(await program(Promise.resolve(range(0, 25)))).toEqual(true); + expect(await program(Promise.resolve(range(0, 25)))).toEqual(true); +}); + +it('should return false when non of the values match the predicate (Promise async)', async () => { + const program = pipe(some((x: number) => x > 100)); + + expect(await program(Promise.resolve(range(0, 100)))).toEqual(false); + expect(await program(Promise.resolve(range(0, 100)))).toEqual(false); +}); diff --git a/src/some.ts b/src/some.ts index a9faa8e..b25c96d 100644 --- a/src/some.ts +++ b/src/some.ts @@ -1,7 +1,26 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (input: T) => boolean; export function some(predicate: Fn) { - return function someFn(data: Iterable): boolean { + return function someFn( + data: MaybePromise | AsyncIterable> + ): boolean | Promise { + if (isAsyncIterable(data) || data instanceof Promise) { + return (async () => { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + if (predicate(datum)) { + return true; + } + } + + return false; + })(); + } + for (let datum of data) { if (predicate(datum)) { return true; diff --git a/src/sum.test.ts b/src/sum.test.ts index e7f84eb..3e0b850 100644 --- a/src/sum.test.ts +++ b/src/sum.test.ts @@ -1,6 +1,7 @@ import { pipe } from './pipe'; import { sum } from './sum'; import { range } from './range'; +import { delay } from './delay'; it('should be possible to sum an array', () => { const program = pipe(sum()); @@ -15,3 +16,35 @@ it('should be possible to sum an iterator', () => { expect(program(range(0, 10))).toEqual(55); expect(program(range(0, 10))).toEqual(55); }); + +it('should be possible to sum an array (async)', async () => { + const program = pipe(delay(0), sum()); + + expect(await program([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])).toEqual(55); + expect(await program([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])).toEqual(55); +}); + +it('should be possible to sum an iterator (async)', async () => { + const program = pipe(delay(0), sum()); + + expect(await program(range(0, 10))).toEqual(55); + expect(await program(range(0, 10))).toEqual(55); +}); + +it('should be possible to sum an array (Promise async)', async () => { + const program = pipe(sum()); + + expect( + await program(Promise.resolve([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) + ).toEqual(55); + expect( + await program(Promise.resolve([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) + ).toEqual(55); +}); + +it('should be possible to sum an iterator (Promise async)', async () => { + const program = pipe(sum()); + + expect(await program(Promise.resolve(range(0, 10)))).toEqual(55); + expect(await program(Promise.resolve(range(0, 10)))).toEqual(55); +}); diff --git a/src/take.test.ts b/src/take.test.ts index b93ff63..5a82396 100644 --- a/src/take.test.ts +++ b/src/take.test.ts @@ -2,9 +2,28 @@ import { pipe } from './pipe'; import { range } from './range'; import { take } from './take'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should take a only X values', () => { const program = pipe(take(5), toArray()); expect(program(range(0, 1_000))).toEqual([0, 1, 2, 3, 4]); }); + +it('should take a only X values (async)', async () => { + const program = pipe(delay(0), take(5), toArray()); + + expect(await program(range(0, 1_000))).toEqual([0, 1, 2, 3, 4]); +}); + +it('should take a only X values (Promise async)', async () => { + const program = pipe(take(5), toArray()); + + expect(await program(Promise.resolve(range(0, 1_000)))).toEqual([ + 0, + 1, + 2, + 3, + 4, + ]); +}); diff --git a/src/takeWhile.test.ts b/src/takeWhile.test.ts index f50392a..ca183d9 100644 --- a/src/takeWhile.test.ts +++ b/src/takeWhile.test.ts @@ -2,6 +2,7 @@ import { pipe } from './pipe'; import { takeWhile } from './takeWhile'; import { range } from './range'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should be possible to take values as long as they meet a certain condition', () => { const program = pipe( @@ -11,3 +12,28 @@ it('should be possible to take values as long as they meet a certain condition', expect(program(range(0, 1_000))).toEqual([0, 1, 2, 3, 4]); }); + +it('should be possible to take values as long as they meet a certain condition (async)', async () => { + const program = pipe( + delay(0), + takeWhile((x: number) => x < 5), + toArray() + ); + + expect(await program(range(0, 1_000))).toEqual([0, 1, 2, 3, 4]); +}); + +it('should be possible to take values as long as they meet a certain condition (Promise async)', async () => { + const program = pipe( + takeWhile((x: number) => x < 5), + toArray() + ); + + expect(await program(Promise.resolve(range(0, 1_000)))).toEqual([ + 0, + 1, + 2, + 3, + 4, + ]); +}); diff --git a/src/takeWhile.ts b/src/takeWhile.ts index b2bf176..2b84a44 100644 --- a/src/takeWhile.ts +++ b/src/takeWhile.ts @@ -1,13 +1,38 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + type Fn = (datum: T) => boolean; export function takeWhile(fn: Fn) { - return function* takeWhileFn(data: Iterable) { - for (let datum of data) { - if (!fn(datum)) { - return; - } + return function takeWhileFn( + data: MaybePromise | AsyncIterable> + ) { + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + if (!fn(datum)) { + return; + } - yield datum; + yield datum; + } + }, + }; } + + return { + *[Symbol.iterator]() { + for (let datum of data) { + if (!fn(datum)) { + return; + } + + yield datum; + } + }, + }; }; } diff --git a/src/tap.test.ts b/src/tap.test.ts index 24fcddf..5a085ac 100644 --- a/src/tap.test.ts +++ b/src/tap.test.ts @@ -2,6 +2,7 @@ import { pipe } from './pipe'; import { toArray } from './toArray'; import { range } from './range'; import { tap } from './tap'; +import { delay } from './delay'; it('should be possible to tap into the current sequence', () => { const fn = jest.fn(); @@ -16,3 +17,32 @@ it('should be possible to tap into the current sequence', () => { expect(program()).toEqual([0, 1, 2, 3, 4, 5]); expect(fn).toHaveBeenCalledTimes(6); }); + +it('should be possible to tap into the current sequence (async)', async () => { + const fn = jest.fn(); + const program = pipe( + range(0, 5), + delay(0), + tap(value => { + fn(value); + }), + toArray() + ); + + expect(await program()).toEqual([0, 1, 2, 3, 4, 5]); + expect(fn).toHaveBeenCalledTimes(6); +}); + +it('should be possible to tap into the current sequence (Promise async)', async () => { + const fn = jest.fn(); + const program = pipe( + Promise.resolve(range(0, 5)) as any, + tap(value => { + fn(value); + }), + toArray() + ); + + expect(await program()).toEqual([0, 1, 2, 3, 4, 5]); + expect(fn).toHaveBeenCalledTimes(6); +}); diff --git a/src/toArray.test.ts b/src/toArray.test.ts index 4e7edb7..96eba6e 100644 --- a/src/toArray.test.ts +++ b/src/toArray.test.ts @@ -1,9 +1,22 @@ import { pipe } from './pipe'; import { range } from './range'; import { toArray } from './toArray'; +import { delay } from './delay'; it('should convert an iterator to an array', () => { const program = pipe(range(0, 10), toArray()); expect(program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); + +it('should convert an iterator to an array (async)', async () => { + const program = pipe(range(0, 10), delay(0), toArray()); + + expect(await program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); +}); + +it('should convert an iterator to an array (Promise async)', async () => { + const program = pipe(Promise.resolve(range(0, 10)) as any, toArray()); + + expect(await program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); +}); diff --git a/src/toArray.ts b/src/toArray.ts index cffe292..654dafb 100644 --- a/src/toArray.ts +++ b/src/toArray.ts @@ -1,5 +1,11 @@ +import { reduce } from './reduce'; +import { MaybePromise } from './shared-types'; + export function toArray() { - return function toArrayFn(data: Iterable): T[] { - return Array.from(data); + return (data: MaybePromise | AsyncIterable>) => { + return reduce((acc, current) => { + acc.push(current); + return acc; + }, [])(data); }; } diff --git a/src/unique.test.ts b/src/unique.test.ts index 37e3af8..6261285 100644 --- a/src/unique.test.ts +++ b/src/unique.test.ts @@ -3,6 +3,7 @@ import { range } from './range'; import { unique } from './unique'; import { map } from './map'; import { toArray } from './toArray'; +import { delay } from './delay'; function snap(multitude: number, value: number) { return Math.ceil(value / multitude) * multitude; @@ -17,3 +18,24 @@ it('should be possible to create a unique stream', () => { expect(program(range(0, 10))).toEqual([0, 5, 10]); }); + +it('should be possible to create a unique stream (async)', async () => { + const program = pipe( + delay(0), + map((x: number) => snap(5, x)), + unique(), + toArray() + ); + + expect(await program(range(0, 10))).toEqual([0, 5, 10]); +}); + +it('should be possible to create a unique stream (Promise async)', async () => { + const program = pipe(unique(), toArray()); + + expect(await program(Promise.resolve([0, 0, 5, 5, 5, 10, 10]))).toEqual([ + 0, + 5, + 10, + ]); +}); diff --git a/src/unique.ts b/src/unique.ts index 3521977..9742be4 100644 --- a/src/unique.ts +++ b/src/unique.ts @@ -1,11 +1,34 @@ +import { isAsyncIterable } from './utils/iterator'; +import { MaybePromise } from './shared-types'; + export function unique() { - return function* uniqueFn(data: Iterable) { + return function uniqueFn(data: MaybePromise | AsyncIterable>) { const seen = new Set([]); - for (let datum of data) { - if (!seen.has(datum)) { - seen.add(datum); - yield datum; - } + + if (isAsyncIterable(data) || data instanceof Promise) { + return { + async *[Symbol.asyncIterator]() { + const stream = data instanceof Promise ? await data : data; + + for await (let datum of stream) { + if (!seen.has(datum)) { + seen.add(datum); + yield datum; + } + } + }, + }; } + + return { + *[Symbol.iterator]() { + for (let datum of data) { + if (!seen.has(datum)) { + seen.add(datum); + yield datum; + } + } + }, + }; }; } diff --git a/src/utils/iterator.ts b/src/utils/iterator.ts new file mode 100644 index 0000000..c82a1d7 --- /dev/null +++ b/src/utils/iterator.ts @@ -0,0 +1,15 @@ +export function isIterable(input: any): input is Iterable { + if (typeof input !== 'object' || input === null) { + return false; + } + + return input[Symbol.iterator] !== undefined; +} + +export function isAsyncIterable(input: any): input is AsyncIterable { + if (typeof input !== 'object' || input === null) { + return false; + } + + return input[Symbol.asyncIterator] !== undefined; +} diff --git a/src/where.test.ts b/src/where.test.ts index a5d58d6..c66a57e 100644 --- a/src/where.test.ts +++ b/src/where.test.ts @@ -3,6 +3,7 @@ import { range } from './range'; import { map } from './map'; import { toArray } from './toArray'; import { where } from './where'; +import { delay } from './delay'; it('should be possible to get the items containing certain properties', () => { const program = pipe( @@ -30,3 +31,59 @@ it('should be possible to get the items containing certain magic properties like [3, 3], ]); }); + +it('should be possible to get the items containing certain properties (async)', async () => { + const program = pipe( + range(0, 10), + delay(0), + map((x: number) => ({ x, y: x + 1 })), + where({ x: 3, y: 4 }), + toArray() + ); + + expect(await program()).toEqual([{ x: 3, y: 4 }]); +}); + +it('should be possible to get the items containing certain magic properties like array lengths (async)', async () => { + const program = pipe( + range(0, 3), + delay(0), + map((x: number) => [x, x]), + where({ length: 2 }), + toArray() + ); + + expect(await program()).toEqual([ + [0, 0], + [1, 1], + [2, 2], + [3, 3], + ]); +}); + +it('should be possible to get the items containing certain properties (Promise async)', async () => { + const program = pipe( + Promise.resolve(range(0, 10)) as any, + map((x: number) => ({ x, y: x + 1 })), + where({ x: 3, y: 4 }), + toArray() + ); + + expect(await program()).toEqual([{ x: 3, y: 4 }]); +}); + +it('should be possible to get the items containing certain magic properties like array lengths (Promise async)', async () => { + const program = pipe( + Promise.resolve(range(0, 3)) as any, + map((x: number) => [x, x]), + where({ length: 2 }), + toArray() + ); + + expect(await program()).toEqual([ + [0, 0], + [1, 1], + [2, 2], + [3, 3], + ]); +}); From d8d523a413fc5a9308c10b1e65f041542f3dfed2 Mon Sep 17 00:00:00 2001 From: Robin Malfait Date: Sun, 23 Feb 2020 18:22:18 +0100 Subject: [PATCH 3/5] simplify data type --- src/average.ts | 6 ++---- src/chunk.ts | 4 ++-- src/compose.ts | 5 +++-- src/concat.ts | 6 ++---- src/delay.ts | 6 ++---- src/every.ts | 4 ++-- src/filter.ts | 4 ++-- src/find.ts | 4 ++-- src/findIndex.ts | 6 ++---- src/flatten.ts | 6 ++---- src/groupBy.ts | 6 ++---- src/head.ts | 3 ++- src/map.ts | 4 ++-- src/partition.ts | 4 ++-- src/pipe.ts | 3 ++- src/reduce.ts | 4 ++-- src/reverse.ts | 6 ++---- src/shared-types.ts | 2 ++ src/slice.ts | 4 ++-- src/some.ts | 6 ++---- src/takeWhile.ts | 6 ++---- src/toArray.ts | 4 ++-- src/unique.ts | 4 ++-- 23 files changed, 47 insertions(+), 60 deletions(-) diff --git a/src/average.ts b/src/average.ts index 6e097ce..30fd976 100644 --- a/src/average.ts +++ b/src/average.ts @@ -3,12 +3,10 @@ import { chunk } from './chunk'; import { map } from './map'; import { head } from './head'; import { pipe } from './pipe'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; export function average() { - return function averageFn( - data: MaybePromise | AsyncIterable> - ) { + return function averageFn(data: LazyIterable) { const program = pipe( reduce<[number, number], number>( (acc, current) => { diff --git a/src/chunk.ts b/src/chunk.ts index c50f734..175e492 100644 --- a/src/chunk.ts +++ b/src/chunk.ts @@ -1,8 +1,8 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; export function chunk(size: number) { - return function chunkFn(data: MaybePromise | AsyncIterable>) { + return function chunkFn(data: LazyIterable) { if (isAsyncIterable(data) || data instanceof Promise) { return { async *[Symbol.asyncIterator]() { diff --git a/src/compose.ts b/src/compose.ts index 445f84f..4807995 100644 --- a/src/compose.ts +++ b/src/compose.ts @@ -1,10 +1,11 @@ import { ensureFunction } from './utils/ensureFunction'; +import { LazyIterable } from './shared-types'; type Fn = (...args: any) => any; export function compose( - fn: Fn | Iterable | AsyncIterable, - ...fns: (Fn | Iterable | AsyncIterable)[] + fn: Fn | LazyIterable, + ...fns: (Fn | LazyIterable)[] ): Fn { return fns.reduce((f: Fn, g) => { const g_ = ensureFunction(g); diff --git a/src/concat.ts b/src/concat.ts index 95e6008..c75b900 100644 --- a/src/concat.ts +++ b/src/concat.ts @@ -1,9 +1,7 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; -export function concat( - ...data: MaybePromise | AsyncIterable>[] -) { +export function concat(...data: LazyIterable[]) { if ( data.some(isAsyncIterable) || data.some(datum => datum instanceof Promise) diff --git a/src/delay.ts b/src/delay.ts index b51ff24..0df9673 100644 --- a/src/delay.ts +++ b/src/delay.ts @@ -1,13 +1,11 @@ -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; function sleep(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)); } export function delay(ms: number) { - return async function* delayFn( - data: MaybePromise | AsyncIterable> - ) { + return async function* delayFn(data: LazyIterable) { if (data == null) { return; } diff --git a/src/every.ts b/src/every.ts index 83d9cd5..c50b3fc 100644 --- a/src/every.ts +++ b/src/every.ts @@ -1,10 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (input: T) => boolean; export function every(predicate: Fn) { - return function everyFn(data: MaybePromise | AsyncIterable>) { + return function everyFn(data: LazyIterable) { if (data == null) { return; } diff --git a/src/filter.ts b/src/filter.ts index 7b18214..c73fa7e 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,10 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (datum: T) => boolean; export function filter(fn: Fn) { - return function filterFn(data: MaybePromise | AsyncIterable>) { + return function filterFn(data: LazyIterable) { if (isAsyncIterable(data) || data instanceof Promise) { return { async *[Symbol.asyncIterator]() { diff --git a/src/find.ts b/src/find.ts index 50c7b56..ae6d1b3 100644 --- a/src/find.ts +++ b/src/find.ts @@ -1,11 +1,11 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (input: T) => boolean; export function find(predicate: Fn) { return function findFn( - data: MaybePromise | AsyncIterable> + data: LazyIterable ): (T | undefined) | Promise { if (isAsyncIterable(data) || data instanceof Promise) { return (async () => { diff --git a/src/findIndex.ts b/src/findIndex.ts index dca603c..46a5a53 100644 --- a/src/findIndex.ts +++ b/src/findIndex.ts @@ -1,12 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (input: T) => boolean; export function findIndex(predicate: Fn) { - return function findIndexFn( - data: MaybePromise | AsyncIterable> - ): number | Promise { + return function findIndexFn(data: LazyIterable): number | Promise { if (isAsyncIterable(data) || data instanceof Promise) { return (async () => { const stream = data instanceof Promise ? await data : data; diff --git a/src/flatten.ts b/src/flatten.ts index 4c82cd7..14e73f3 100644 --- a/src/flatten.ts +++ b/src/flatten.ts @@ -1,4 +1,4 @@ -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; import { isAsyncIterable, isIterable } from './utils/iterator'; type Options = { @@ -8,9 +8,7 @@ type Options = { export function flatten(options: Options = {}) { const { shallow = false } = options; - return function flattenFn( - data: MaybePromise | AsyncIterable> - ): any { + return function flattenFn(data: LazyIterable): any { if (data == null) { return; } diff --git a/src/groupBy.ts b/src/groupBy.ts index c568dcf..941713e 100644 --- a/src/groupBy.ts +++ b/src/groupBy.ts @@ -1,12 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type KeyFn = (input: T) => string | number; export function groupBy(keySelector: KeyFn) { - return function groupByFn( - data: MaybePromise | AsyncIterable> - ) { + return function groupByFn(data: LazyIterable) { if (isAsyncIterable(data) || data instanceof Promise) { return (async () => { const stream = data instanceof Promise ? await data : data; diff --git a/src/head.ts b/src/head.ts index 7ae1564..e63ec06 100644 --- a/src/head.ts +++ b/src/head.ts @@ -1,8 +1,9 @@ import { isAsyncIterable } from './utils/iterator'; +import { LazyIterable } from './shared-types'; export function head() { return function headFn( - data: Iterable | AsyncIterable + data: LazyIterable ): T | undefined | Promise { if (data == null) { return; diff --git a/src/map.ts b/src/map.ts index 1c1784a..7c5bac0 100644 --- a/src/map.ts +++ b/src/map.ts @@ -1,10 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (datum: T) => R; export function map(fn: Fn) { - return function mapFn(data: MaybePromise | AsyncIterable>) { + return function mapFn(data: LazyIterable) { if (data == null) { return; } diff --git a/src/partition.ts b/src/partition.ts index 0228092..cc7de1d 100644 --- a/src/partition.ts +++ b/src/partition.ts @@ -1,11 +1,11 @@ -import { MaybePromise } from './shared-types'; +import { MaybePromise, LazyIterable } from './shared-types'; import { isAsyncIterable } from './utils/iterator'; type Fn = (input: T) => boolean; export function partition(predicate: Fn) { return function partitionFn( - data: MaybePromise | AsyncIterable> + data: LazyIterable ): MaybePromise<[T[], T[]]> | undefined { if (data == null) { return; diff --git a/src/pipe.ts b/src/pipe.ts index 3bc180e..854dcfa 100644 --- a/src/pipe.ts +++ b/src/pipe.ts @@ -1,8 +1,9 @@ import { ensureFunction } from './utils/ensureFunction'; +import { LazyIterable } from './shared-types'; type Fn = (...args: any) => any; -export function pipe(...fns: (Fn | Iterable | AsyncIterable)[]): Fn { +export function pipe(...fns: (Fn | LazyIterable)[]): Fn { return fns.reduceRight((f: Fn, g) => { const g_ = ensureFunction(g); return (...args) => f(g_(...args)); diff --git a/src/reduce.ts b/src/reduce.ts index cd28567..49638f6 100644 --- a/src/reduce.ts +++ b/src/reduce.ts @@ -1,10 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (acc: R, datum: T) => R; export function reduce(fn: Fn, initial: R) { - return function reduceFn(data: MaybePromise | AsyncIterable>) { + return function reduceFn(data: LazyIterable) { let acc = initial; if (isAsyncIterable(data) || data instanceof Promise) { diff --git a/src/reverse.ts b/src/reverse.ts index 3426401..0bd2236 100644 --- a/src/reverse.ts +++ b/src/reverse.ts @@ -1,7 +1,7 @@ import { isAsyncIterable } from './utils/iterator'; import { pipe } from './pipe'; import { toArray } from './toArray'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; /** * This is pretty slow because it has to first go through the whole iterator @@ -9,9 +9,7 @@ import { MaybePromise } from './shared-types'; * yielding again. */ export function reverse() { - return function reverseFn( - data: MaybePromise | AsyncIterable> - ) { + return function reverseFn(data: LazyIterable) { if (isAsyncIterable(data) || data instanceof Promise) { return { async *[Symbol.asyncIterator]() { diff --git a/src/shared-types.ts b/src/shared-types.ts index 9cd354b..ff64c4e 100644 --- a/src/shared-types.ts +++ b/src/shared-types.ts @@ -1 +1,3 @@ export type MaybePromise = T | Promise; + +export type LazyIterable = MaybePromise | AsyncIterable>; diff --git a/src/slice.ts b/src/slice.ts index 7a3fccf..8940165 100644 --- a/src/slice.ts +++ b/src/slice.ts @@ -1,8 +1,8 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; export function slice(begin = 0, end = Infinity) { - return function sliceFn(data: MaybePromise | AsyncIterable>) { + return function sliceFn(data: LazyIterable) { if (isAsyncIterable(data) || data instanceof Promise) { return { async *[Symbol.asyncIterator]() { diff --git a/src/some.ts b/src/some.ts index b25c96d..35e112d 100644 --- a/src/some.ts +++ b/src/some.ts @@ -1,12 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (input: T) => boolean; export function some(predicate: Fn) { - return function someFn( - data: MaybePromise | AsyncIterable> - ): boolean | Promise { + return function someFn(data: LazyIterable): boolean | Promise { if (isAsyncIterable(data) || data instanceof Promise) { return (async () => { const stream = data instanceof Promise ? await data : data; diff --git a/src/takeWhile.ts b/src/takeWhile.ts index 2b84a44..858f3c4 100644 --- a/src/takeWhile.ts +++ b/src/takeWhile.ts @@ -1,12 +1,10 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; type Fn = (datum: T) => boolean; export function takeWhile(fn: Fn) { - return function takeWhileFn( - data: MaybePromise | AsyncIterable> - ) { + return function takeWhileFn(data: LazyIterable) { if (isAsyncIterable(data) || data instanceof Promise) { return { async *[Symbol.asyncIterator]() { diff --git a/src/toArray.ts b/src/toArray.ts index 654dafb..f0515f7 100644 --- a/src/toArray.ts +++ b/src/toArray.ts @@ -1,8 +1,8 @@ import { reduce } from './reduce'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; export function toArray() { - return (data: MaybePromise | AsyncIterable>) => { + return (data: LazyIterable) => { return reduce((acc, current) => { acc.push(current); return acc; diff --git a/src/unique.ts b/src/unique.ts index 9742be4..ac02a91 100644 --- a/src/unique.ts +++ b/src/unique.ts @@ -1,8 +1,8 @@ import { isAsyncIterable } from './utils/iterator'; -import { MaybePromise } from './shared-types'; +import { LazyIterable } from './shared-types'; export function unique() { - return function uniqueFn(data: MaybePromise | AsyncIterable>) { + return function uniqueFn(data: LazyIterable) { const seen = new Set([]); if (isAsyncIterable(data) || data instanceof Promise) { From 6304af9b44bb6e9643b5dd091009988e25993370 Mon Sep 17 00:00:00 2001 From: Robin Malfait Date: Sun, 23 Feb 2020 21:38:33 +0100 Subject: [PATCH 4/5] remove any types :tada: --- src/groupBy.test.ts | 2 +- src/partition.test.ts | 2 +- src/reverse.test.ts | 2 +- src/tap.test.ts | 2 +- src/toArray.test.ts | 2 +- src/where.test.ts | 4 ++-- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/groupBy.test.ts b/src/groupBy.test.ts index 1626a72..4beacbc 100644 --- a/src/groupBy.test.ts +++ b/src/groupBy.test.ts @@ -36,7 +36,7 @@ it('should be possible to group an iterator by something (async)', async () => { it('should be possible to group an iterator by something (Promise async)', async () => { const program = pipe( - Promise.resolve(range(0, 10)) as any, + Promise.resolve(range(0, 10)), groupBy((x: number) => snap(5, x)) ); diff --git a/src/partition.test.ts b/src/partition.test.ts index eaec402..71ac5b7 100644 --- a/src/partition.test.ts +++ b/src/partition.test.ts @@ -33,7 +33,7 @@ it('should partition the data into 2 streams based on the predicate (async)', as it('should partition the data into 2 streams based on the predicate (Promise async)', async () => { const program = pipe( - Promise.resolve(range(1, 4)) as any, + Promise.resolve(range(1, 4)), partition((x: number) => x % 2 !== 0), toArray() ); diff --git a/src/reverse.test.ts b/src/reverse.test.ts index 78c299c..8b676c9 100644 --- a/src/reverse.test.ts +++ b/src/reverse.test.ts @@ -37,7 +37,7 @@ it('should be possible to reverse an iterator (async)', async () => { it('should be possible to reverse an iterator (Promise async)', async () => { const program = pipe( - Promise.resolve(range(0, 1_000)) as any, + Promise.resolve(range(0, 1_000)), reverse(), take(5), toArray() diff --git a/src/tap.test.ts b/src/tap.test.ts index 5a085ac..e8b38c9 100644 --- a/src/tap.test.ts +++ b/src/tap.test.ts @@ -36,7 +36,7 @@ it('should be possible to tap into the current sequence (async)', async () => { it('should be possible to tap into the current sequence (Promise async)', async () => { const fn = jest.fn(); const program = pipe( - Promise.resolve(range(0, 5)) as any, + Promise.resolve(range(0, 5)), tap(value => { fn(value); }), diff --git a/src/toArray.test.ts b/src/toArray.test.ts index 96eba6e..e054c5c 100644 --- a/src/toArray.test.ts +++ b/src/toArray.test.ts @@ -16,7 +16,7 @@ it('should convert an iterator to an array (async)', async () => { }); it('should convert an iterator to an array (Promise async)', async () => { - const program = pipe(Promise.resolve(range(0, 10)) as any, toArray()); + const program = pipe(Promise.resolve(range(0, 10)), toArray()); expect(await program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); diff --git a/src/where.test.ts b/src/where.test.ts index c66a57e..495785a 100644 --- a/src/where.test.ts +++ b/src/where.test.ts @@ -63,7 +63,7 @@ it('should be possible to get the items containing certain magic properties like it('should be possible to get the items containing certain properties (Promise async)', async () => { const program = pipe( - Promise.resolve(range(0, 10)) as any, + Promise.resolve(range(0, 10)), map((x: number) => ({ x, y: x + 1 })), where({ x: 3, y: 4 }), toArray() @@ -74,7 +74,7 @@ it('should be possible to get the items containing certain properties (Promise a it('should be possible to get the items containing certain magic properties like array lengths (Promise async)', async () => { const program = pipe( - Promise.resolve(range(0, 3)) as any, + Promise.resolve(range(0, 3)), map((x: number) => [x, x]), where({ length: 2 }), toArray() From e2d4acf968e8a87af4dd0088dc16967ffead2ba2 Mon Sep 17 00:00:00 2001 From: Robin Malfait Date: Sun, 23 Feb 2020 22:48:36 +0100 Subject: [PATCH 5/5] add some tests for edge cases --- src/average.test.ts | 2 +- src/chunk.test.ts | 17 +++++++++++++++++ src/compact.test.ts | 15 +++++++++++++++ src/every.test.ts | 6 ++++++ src/every.ts | 2 +- src/map.test.ts | 10 ++++++++++ src/partition.test.ts | 9 +++++++++ src/reduce.ts | 4 ++++ src/where.test.ts | 16 ++++++++++++++++ src/where.ts | 2 +- 10 files changed, 80 insertions(+), 3 deletions(-) diff --git a/src/average.test.ts b/src/average.test.ts index b915a55..8a96781 100644 --- a/src/average.test.ts +++ b/src/average.test.ts @@ -17,7 +17,7 @@ it('should be possible to get an average of all the values (async)', async () => }); it('should be possible to get an average of all the values (Promise async)', async () => { - const program = pipe(delay(0), average()); + const program = pipe(average()); expect( await program(Promise.resolve([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) diff --git a/src/chunk.test.ts b/src/chunk.test.ts index 75dfebc..74df58c 100644 --- a/src/chunk.test.ts +++ b/src/chunk.test.ts @@ -37,3 +37,20 @@ it('should create chunked items (async)', async () => { [9, 10], ]); }); + +it('should create chunked items (Promise async)', async () => { + const program = pipe(chunk(3), toArray()); + + expect(await program(Promise.resolve(range(0, 10)))).toEqual([ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8], + [9, 10], + ]); + expect(await program(Promise.resolve(range(0, 10)))).toEqual([ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8], + [9, 10], + ]); +}); diff --git a/src/compact.test.ts b/src/compact.test.ts index 99a7f64..ce6764d 100644 --- a/src/compact.test.ts +++ b/src/compact.test.ts @@ -24,3 +24,18 @@ it('should remove all falsey values (async)', async () => { await program([0, 1, true, false, null, undefined, '', 'test', NaN]) ).toEqual([1, true, 'test']); }); + +it('should remove all falsey values (Promise async)', async () => { + const program = pipe(compact(), toArray()); + + expect( + await program( + Promise.resolve([0, 1, true, false, null, undefined, '', 'test', NaN]) + ) + ).toEqual([1, true, 'test']); + expect( + await program( + Promise.resolve([0, 1, true, false, null, undefined, '', 'test', NaN]) + ) + ).toEqual([1, true, 'test']); +}); diff --git a/src/every.test.ts b/src/every.test.ts index 9eae04a..f2b6493 100644 --- a/src/every.test.ts +++ b/src/every.test.ts @@ -12,6 +12,12 @@ it('should return true when every value matches the predicate', () => { expect(program()).toEqual(true); }); +it('should return false when no stream is passing through it', () => { + const program = pipe(every(x => typeof x === 'number')); + + expect(program()).toEqual(false); +}); + it("should return false when one of the values doesn't meet the predicate", () => { const program = pipe( range(0, 100), diff --git a/src/every.ts b/src/every.ts index c50b3fc..ae7860b 100644 --- a/src/every.ts +++ b/src/every.ts @@ -6,7 +6,7 @@ type Fn = (input: T) => boolean; export function every(predicate: Fn) { return function everyFn(data: LazyIterable) { if (data == null) { - return; + return false; } if (isAsyncIterable(data) || data instanceof Promise) { diff --git a/src/map.test.ts b/src/map.test.ts index c9fe439..18fd574 100644 --- a/src/map.test.ts +++ b/src/map.test.ts @@ -13,6 +13,16 @@ it('should be possible to map data from A to B', () => { expect(program([1, 2, 3])).toEqual([2, 4, 6]); }); +it('should return undefined when no stream is passing through it', () => { + const program = pipe( + map((x: number) => x * 2), // Double + toArray() + ); + + expect(program()).toEqual(undefined); + expect(program()).toEqual(undefined); +}); + it('should be possible to map data from A to B (async)', async () => { const program = pipe( delay(0), diff --git a/src/partition.test.ts b/src/partition.test.ts index 71ac5b7..c58646b 100644 --- a/src/partition.test.ts +++ b/src/partition.test.ts @@ -17,6 +17,15 @@ it('should partition the data into 2 streams based on the predicate', () => { ]); }); +it('should return undefined when no stream is passing through it', () => { + const program = pipe( + partition((x: number) => x % 2 !== 0), + toArray() + ); + + expect(program()).toEqual(undefined); +}); + it('should partition the data into 2 streams based on the predicate (async)', async () => { const program = pipe( range(1, 4), diff --git a/src/reduce.ts b/src/reduce.ts index 49638f6..fce23bd 100644 --- a/src/reduce.ts +++ b/src/reduce.ts @@ -5,6 +5,10 @@ type Fn = (acc: R, datum: T) => R; export function reduce(fn: Fn, initial: R) { return function reduceFn(data: LazyIterable) { + if (data == null) { + return; + } + let acc = initial; if (isAsyncIterable(data) || data instanceof Promise) { diff --git a/src/where.test.ts b/src/where.test.ts index 495785a..ca9743b 100644 --- a/src/where.test.ts +++ b/src/where.test.ts @@ -16,6 +16,22 @@ it('should be possible to get the items containing certain properties', () => { expect(program()).toEqual([{ x: 3, y: 4 }]); }); +it('should not crash on values that it does not understand', () => { + const program = pipe(where({ include: true }), toArray()); + + expect( + program([ + 0, + null, + true, + false, + 'hello', + Object.assign(function() {}, { include: false }), + { include: true, name: 'winner' }, + ]) + ).toEqual([{ include: true, name: 'winner' }]); +}); + it('should be possible to get the items containing certain magic properties like array lengths', () => { const program = pipe( range(0, 3), diff --git a/src/where.ts b/src/where.ts index dcf9e63..911c73d 100644 --- a/src/where.ts +++ b/src/where.ts @@ -4,7 +4,7 @@ export function where(properties: Record) { const entries = Object.entries(properties); return filter(datum => { - if (!(typeof datum === 'object' && datum !== null)) { + if (datum == null) { return false; }