Skip to content

Commit

Permalink
feat(each): added support for concurrency and time-chunking
Browse files Browse the repository at this point in the history
  • Loading branch information
grantila committed Dec 31, 2021
1 parent 0280855 commit 96bc7e6
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 8 deletions.
29 changes: 26 additions & 3 deletions README.md
Expand Up @@ -382,7 +382,7 @@ This means that the returned type from `reduce` doesn't need to be the same as t

## each

`each` iterates an array of promises or values, very much like `map`, although always serially as if `concurrency` was set to `1`.
`each` iterates an array of promises or values, very much like `map`, although with a default `concurrency` of `1`.

The iterator function cannot return a value (or it will be ignored), but can return an empty promise which will be awaited before the next iteration. It's like `tap` but for elements in an array.

Expand All @@ -403,6 +403,29 @@ const outArray = await each( inArray, iteratorFun );
// outArray ~ inArray, not necessarily the *same* array, but the same content
```

### Concurrency and time-chunking

Just like `filter` and `map` have [concurrency](#filter-concurrency) and [time-chunking](#filter-operations-chunked-by-idle-time) options, so does `each`. An optional argument before the predicate/iterator function can be used.

For concurrency:

```ts
import { each } from 'already'

await each( array, { concurrency: 4 }, iteratorFun );
```

and for time-chunking:

```ts
import { each } from 'already'

// Time-chunk every 50 milliseconds
await each( array, { chunk: 50 }, iteratorFun );
// Time-chunk dynamically based on requestIdleCallback()
await each( array, { chunk: 'idle' }, iteratorFun );
```


## some

Expand Down Expand Up @@ -845,7 +868,7 @@ expect( ret ).to.equal( "yo" );

Ensuring exclusive calls to a function can be implemented in multiple ways. With asynchrony, this gets quite complicated.

Many problems can be generalized to only running one function at a time (awaiting it if necessary). For this, the [`throat`](https://www.npmjs.com/package/throat) package is useful (it is used by `already`). Sometimes a more fine grained control is desired, such as allowing a _test and early return_ as well as signalling that the concurrent logic is complete (to allow the next function call) before the whole function is complete. This results in a more understandable flow.
Many problems can be generalized to only running one function at a time (awaiting it if necessary). For this, [`concurrent`](#concurrent) is useful. Sometimes a more fine grained control is desired, such as allowing a _test and early return_ as well as signalling that the concurrent logic is complete (to allow the next function call) before the whole function is complete. This results in a more understandable flow.

For this, `funnel()` is extremely handy.

Expand All @@ -869,7 +892,7 @@ The above is a connection pool, we might only want a certain number of connectio

Is the above code safe? It isn't. Two synchronously immediate calls to `getConnection` will likely get the same answer from `getReusableConnection`, i.e. *falsy*. This means, they'll both call `connect`, although maybe just one should have done so. Only one should have created a connection, then `registerToConnectionPool` while the other should wait until the first is complete, then retry `getConnection` from scratch to see if a connection can be re-used.

The `getConnection` could be wrapped inside a [`throat`](https://www.npmjs.com/package/throat) wrapper, but that wouldn't be as performant as possible. Consider two calls to `getConnection` when there are connections in the pool, but none is free. One of the two calls should create a new connection, but while this takes place (which may take time), another might be freed. This newly freed connection should be re-usable by the second call to `getConnection`.
The `getConnection` could be wrapped inside a [`concurrent`](#concurrent) wrapper, but that wouldn't be as performant as possible. Consider two calls to `getConnection` when there are connections in the pool, but none is free. One of the two calls should create a new connection, but while this takes place (which may take time), another might be freed. This newly freed connection should be re-usable by the second call to `getConnection`.

`funnel` makes this trivial. Wrap the `getConnection` logic in a funnel. Allow concurrent access to `getReusableConnection` which is concurrency _safe_. Then create a _synchronization barrier_ (using `shouldRetry`/`retry`):

Expand Down
46 changes: 41 additions & 5 deletions lib/index.ts
Expand Up @@ -610,30 +610,66 @@ async function reduceImpl< T, R >(
}


const defaultEachOptions: FilterMapOptions = {
concurrency: 1
};

export type EachFn< T > =
( t: T, index: number, length: number ) => void | Promise< void >;

export function each< T >( eachFn: EachFn< T > )
: ( t: ConcatArray< T | PromiseLike< T > > ) => Promise< Array< T > >;
export function each< T >( opts: FilterMapOptions, eachFn: EachFn< T > )
: ( t: ConcatArray< T | PromiseLike< T > > ) => Promise< Array< T > >;
export function each< T >(
arr: ConcatArray< T | PromiseLike< T > >,
eachFn: EachFn< T >
): Promise< Array< T > >;
export function each< T >(
arr: ConcatArray< T | PromiseLike< T > >,
opts: FilterMapOptions,
eachFn: EachFn< T >
): Promise< Array< T > >;

export function each< T >(
arr: ConcatArray< T | PromiseLike< T > > | EachFn< T >,
arr: ConcatArray< T | PromiseLike< T > > | EachFn< T > | FilterMapOptions,
opts?: EachFn< T > | FilterMapOptions,
eachFn?: EachFn< T >
)
:
( ( t: ConcatArray< T | PromiseLike< T > > ) => Promise< Array< T > > ) |
( Promise< Array< T > > )
{
if ( Array.isArray( arr ) )
return eachImpl( < EachFn< T > >eachFn )( arr );
return eachImpl( < EachFn< T > >arr );
{
if ( typeof opts === "function" )
{
eachFn = opts;
opts = defaultEachOptions;
}

const _opts = < FilterMapOptions >opts;

return eachImpl( _opts, eachFn! )( arr );
}

if ( typeof arr === "function" )
{
eachFn = arr;
opts = defaultEachOptions;
}
else
{
eachFn = opts as EachFn< T >;
opts = arr as FilterMapOptions;
}

const _opts = < FilterMapOptions >opts;

return eachImpl( _opts, eachFn! );
}

export function eachImpl< T >( eachFn: EachFn< T > )
function eachImpl< T >( opts: FilterMapOptions, eachFn: EachFn< T > )
: ( t: ConcatArray< T | PromiseLike< T > > ) => Promise< Array< T > >
{
return async ( arr: ConcatArray< T | PromiseLike< T > > )
Expand All @@ -646,7 +682,7 @@ export function eachImpl< T >( eachFn: EachFn< T > )
await eachFn( t, index, length );
return t;
}
return map( arr, { concurrency: 1 }, iterator );
return map( arr, opts, iterator );
};
}

Expand Down
87 changes: 87 additions & 0 deletions test/index.ts
Expand Up @@ -574,6 +574,43 @@ describe( "time-chunking", ( ) =>
expect( arr2 ).toEqual( [ 3, 6, 9 ] );
expect( mock.mock.calls.length ).toBeGreaterThan( 0 );
} ) );

it( "each with idle", withIdle( async ( mock ) =>
{
const handled: Array< number > = [ ];

const arr = [
1,
2,
Promise.resolve( 3 ),
delayChain( 5 )( 4 ),
5,
6,
7,
8,
9,
10,
];
await each(
arr,
{
chunk: 'idle',
},
async t =>
{
const shouldHandle =
( t % 2 === 0 )
? await delay( 5 ).then( ( ) => t % 3 === 0 )
: t % 3 === 0;

if ( shouldHandle )
handled.push( t );
}
);

expect( handled ).toStrictEqual( [ 3, 6, 9 ] );
expect( mock.mock.calls.length ).toBeGreaterThan( 0 );
} ) );
} );

describe( "flatMap", ( ) =>
Expand Down Expand Up @@ -1115,6 +1152,56 @@ describe( "each", ( ) =>
expect( order ).toEqual( [ 0, 0, 1, 1, 2, 2 ] );
} );
} );

describe( "concurrency", ( ) =>
{
it.concurrent( "should handle concurrency with array", async ( ) =>
{
const handled: Array< number > = [ ];

await each(
[ 1, 2, 3, 4 ],
{
concurrency: 2
},
async x =>
{
if ( x % 2 === 1 )
await delay( x * 5 );

handled.push( x );

return;
}
);

expect( handled ).toStrictEqual( [ 2, 1, 4, 3 ] );
} );

it.concurrent( "should handle concurrency with currying", async ( ) =>
{
const handled: Array< number > = [ ];

const fn = each(
{
concurrency: 2
},
async ( x: number ) =>
{
if ( x % 2 === 1 )
await delay( x * 5 );

handled.push( x );

return;
}
);

await fn( [ 1, 2, 3, 4 ] );

expect( handled ).toStrictEqual( [ 2, 1, 4, 3 ] );
} );
} );
} );


Expand Down

0 comments on commit 96bc7e6

Please sign in to comment.