diff --git a/README.md b/README.md index 8f73ecc..257a046 100644 --- a/README.md +++ b/README.md @@ -382,7 +382,7 @@ This means that the returned type from `reduce` doesn't need to be the same as t ## each -`each` iterates an array of promises or values, very much like `map`, although always serially as if `concurrency` was set to `1`. +`each` iterates an array of promises or values, very much like `map`, although with a default `concurrency` of `1`. The iterator function cannot return a value (or it will be ignored), but can return an empty promise which will be awaited before the next iteration. It's like `tap` but for elements in an array. @@ -403,6 +403,29 @@ const outArray = await each( inArray, iteratorFun ); // outArray ~ inArray, not necessarily the *same* array, but the same content ``` +### Concurrency and time-chunking + +Just like `filter` and `map` have [concurrency](#filter-concurrency) and [time-chunking](#filter-operations-chunked-by-idle-time) options, so does `each`. An optional argument before the predicate/iterator function can be used. + +For concurrency: + +```ts +import { each } from 'already' + +await each( array, { concurrency: 4 }, iteratorFun ); +``` + +and for time-chunking: + +```ts +import { each } from 'already' + +// Time-chunk every 50 milliseconds +await each( array, { chunk: 50 }, iteratorFun ); +// Time-chunk dynamically based on requestIdleCallback() +await each( array, { chunk: 'idle' }, iteratorFun ); +``` + ## some @@ -845,7 +868,7 @@ expect( ret ).to.equal( "yo" ); Ensuring exclusive calls to a function can be implemented in multiple ways. With asynchrony, this gets quite complicated. -Many problems can be generalized to only running one function at a time (awaiting it if necessary). For this, the [`throat`](https://www.npmjs.com/package/throat) package is useful (it is used by `already`). Sometimes a more fine grained control is desired, such as allowing a _test and early return_ as well as signalling that the concurrent logic is complete (to allow the next function call) before the whole function is complete. This results in a more understandable flow. +Many problems can be generalized to only running one function at a time (awaiting it if necessary). For this, [`concurrent`](#concurrent) is useful. Sometimes a more fine grained control is desired, such as allowing a _test and early return_ as well as signalling that the concurrent logic is complete (to allow the next function call) before the whole function is complete. This results in a more understandable flow. For this, `funnel()` is extremely handy. @@ -869,7 +892,7 @@ The above is a connection pool, we might only want a certain number of connectio Is the above code safe? It isn't. Two synchronously immediate calls to `getConnection` will likely get the same answer from `getReusableConnection`, i.e. *falsy*. This means, they'll both call `connect`, although maybe just one should have done so. Only one should have created a connection, then `registerToConnectionPool` while the other should wait until the first is complete, then retry `getConnection` from scratch to see if a connection can be re-used. -The `getConnection` could be wrapped inside a [`throat`](https://www.npmjs.com/package/throat) wrapper, but that wouldn't be as performant as possible. Consider two calls to `getConnection` when there are connections in the pool, but none is free. One of the two calls should create a new connection, but while this takes place (which may take time), another might be freed. This newly freed connection should be re-usable by the second call to `getConnection`. +The `getConnection` could be wrapped inside a [`concurrent`](#concurrent) wrapper, but that wouldn't be as performant as possible. Consider two calls to `getConnection` when there are connections in the pool, but none is free. One of the two calls should create a new connection, but while this takes place (which may take time), another might be freed. This newly freed connection should be re-usable by the second call to `getConnection`. `funnel` makes this trivial. Wrap the `getConnection` logic in a funnel. Allow concurrent access to `getReusableConnection` which is concurrency _safe_. Then create a _synchronization barrier_ (using `shouldRetry`/`retry`): diff --git a/lib/index.ts b/lib/index.ts index 2343673..1e12111 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -610,18 +610,30 @@ async function reduceImpl< T, R >( } +const defaultEachOptions: FilterMapOptions = { + concurrency: 1 +}; + export type EachFn< T > = ( t: T, index: number, length: number ) => void | Promise< void >; export function each< T >( eachFn: EachFn< T > ) : ( t: ConcatArray< T | PromiseLike< T > > ) => Promise< Array< T > >; +export function each< T >( opts: FilterMapOptions, eachFn: EachFn< T > ) +: ( t: ConcatArray< T | PromiseLike< T > > ) => Promise< Array< T > >; +export function each< T >( + arr: ConcatArray< T | PromiseLike< T > >, + eachFn: EachFn< T > +): Promise< Array< T > >; export function each< T >( arr: ConcatArray< T | PromiseLike< T > >, + opts: FilterMapOptions, eachFn: EachFn< T > ): Promise< Array< T > >; export function each< T >( - arr: ConcatArray< T | PromiseLike< T > > | EachFn< T >, + arr: ConcatArray< T | PromiseLike< T > > | EachFn< T > | FilterMapOptions, + opts?: EachFn< T > | FilterMapOptions, eachFn?: EachFn< T > ) : @@ -629,11 +641,35 @@ export function each< T >( ( Promise< Array< T > > ) { if ( Array.isArray( arr ) ) - return eachImpl( < EachFn< T > >eachFn )( arr ); - return eachImpl( < EachFn< T > >arr ); + { + if ( typeof opts === "function" ) + { + eachFn = opts; + opts = defaultEachOptions; + } + + const _opts = < FilterMapOptions >opts; + + return eachImpl( _opts, eachFn! )( arr ); + } + + if ( typeof arr === "function" ) + { + eachFn = arr; + opts = defaultEachOptions; + } + else + { + eachFn = opts as EachFn< T >; + opts = arr as FilterMapOptions; + } + + const _opts = < FilterMapOptions >opts; + + return eachImpl( _opts, eachFn! ); } -export function eachImpl< T >( eachFn: EachFn< T > ) +function eachImpl< T >( opts: FilterMapOptions, eachFn: EachFn< T > ) : ( t: ConcatArray< T | PromiseLike< T > > ) => Promise< Array< T > > { return async ( arr: ConcatArray< T | PromiseLike< T > > ) @@ -646,7 +682,7 @@ export function eachImpl< T >( eachFn: EachFn< T > ) await eachFn( t, index, length ); return t; } - return map( arr, { concurrency: 1 }, iterator ); + return map( arr, opts, iterator ); }; } diff --git a/test/index.ts b/test/index.ts index 9221775..1f18007 100644 --- a/test/index.ts +++ b/test/index.ts @@ -574,6 +574,43 @@ describe( "time-chunking", ( ) => expect( arr2 ).toEqual( [ 3, 6, 9 ] ); expect( mock.mock.calls.length ).toBeGreaterThan( 0 ); } ) ); + + it( "each with idle", withIdle( async ( mock ) => + { + const handled: Array< number > = [ ]; + + const arr = [ + 1, + 2, + Promise.resolve( 3 ), + delayChain( 5 )( 4 ), + 5, + 6, + 7, + 8, + 9, + 10, + ]; + await each( + arr, + { + chunk: 'idle', + }, + async t => + { + const shouldHandle = + ( t % 2 === 0 ) + ? await delay( 5 ).then( ( ) => t % 3 === 0 ) + : t % 3 === 0; + + if ( shouldHandle ) + handled.push( t ); + } + ); + + expect( handled ).toStrictEqual( [ 3, 6, 9 ] ); + expect( mock.mock.calls.length ).toBeGreaterThan( 0 ); + } ) ); } ); describe( "flatMap", ( ) => @@ -1115,6 +1152,56 @@ describe( "each", ( ) => expect( order ).toEqual( [ 0, 0, 1, 1, 2, 2 ] ); } ); } ); + + describe( "concurrency", ( ) => + { + it.concurrent( "should handle concurrency with array", async ( ) => + { + const handled: Array< number > = [ ]; + + await each( + [ 1, 2, 3, 4 ], + { + concurrency: 2 + }, + async x => + { + if ( x % 2 === 1 ) + await delay( x * 5 ); + + handled.push( x ); + + return; + } + ); + + expect( handled ).toStrictEqual( [ 2, 1, 4, 3 ] ); + } ); + + it.concurrent( "should handle concurrency with currying", async ( ) => + { + const handled: Array< number > = [ ]; + + const fn = each( + { + concurrency: 2 + }, + async ( x: number ) => + { + if ( x % 2 === 1 ) + await delay( x * 5 ); + + handled.push( x ); + + return; + } + ); + + await fn( [ 1, 2, 3, 4 ] ); + + expect( handled ).toStrictEqual( [ 2, 1, 4, 3 ] ); + } ); + } ); } );