Skip to content

Commit

Permalink
feat(concurrent): added 'concurrent' function, hence removed dep on t…
Browse files Browse the repository at this point in the history
…hroat
  • Loading branch information
grantila committed Jun 20, 2021
1 parent a5e48be commit 0b2534f
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 51 deletions.
43 changes: 40 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@

`already` is a set of promise helper functions which many of them are also found in libraries such as Bluebird.

The functions are standalone and depends on no particular Promise implementation and therefore works well for Javascript's built-in Promise.
The functions are standalone and depends on no particular Promise implementation and therefore works well for JavaScript's built-in Promise.

This library is written in TypeScript but is exposed as ES7 (if imported as `already`) and ES5 (if imported as `already/es5`). Typings are provided too, so any TypeScript project using this library will automatically get full type safety of all the functions.
The library is written in TypeScript, so typings are provided. Apart from being exported as JavaScript (ES2019), it's also exported as an *ES module*, if imported in platforms (and bundlers) supporting this.

The library is also exported as an *ES module*, if imported in platforms (and bundlers) supporting this.

# Versions

Expand All @@ -30,6 +29,8 @@ The library is also exported as an *ES module*, if imported in platforms (and bu

# Functions

* [concurrent](#concurrent)
<br>&emsp;Run a function with certain concurrency
* [delay](#delay)
<br>&emsp;Create a promise which resolved after a certain time
* [tap](#tap)
Expand Down Expand Up @@ -112,6 +113,42 @@ The library is also exported as an *ES module*, if imported in platforms (and bu

# Functions

## concurrent

Since version 2 of this package, the dependency on `throat` was removed. This function works like throat; it wraps a function with concurrency, returning a new function that can be called repeatedly, but will only call the underlying function with the provided concurrency.

The function takes a concurrency option, and optionally the function to be wrapped. If the second argument isn't passed, the returned function takes a function as first argument. This allows you to run separate functions, yet guarantee a maximum concurrency.

```ts
import { concurrent } from 'already'

// function readSomethingFromDb(): Promise<any>;

const concurrently = concurrent( 3, readSomethingFromDb );

// This will ensure <readSomethingFromDb> isn't called more than 3 times concurrently
const results = await Promise.all(
listOfIds.map( id => concurrently( id ) )
);
```

or without specifying the function, so that different functions can share concurrency:

```ts
import { concurrent } from 'already'

const concurrently = concurrent( 3 );

const results = await Promise.all(
listOfThings.map( thing =>
typeof thing === 'string'
? concurrently( readSomethingElse, thing )
: concurrently( readSomethingFromDb, thing )
)
);
```


## delay

The standalone `delay` function takes a milliseconds argument and returns a promise which is resolved after that time. An optional value can be given too, resolving in a promise with this future value.
Expand Down
119 changes: 114 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import throat from "throat";

export default {
defer,
deferSet,
Expand Down Expand Up @@ -76,6 +74,107 @@ function toReadonlyArray< T >( arr: ConcatArray< T > ): ReadonlyArray< T >
}


type ConcurrentQueueRunner< R > = ( ) => R | PromiseLike< R >;

interface ConcurrentQueueItem< R >
{
cb: ConcurrentQueueRunner< R >;
deferred: Deferred< R >;
}

interface ConcurrentQueue< R >
{
size: number;
count: number;
queue: Array< ConcurrentQueueItem< R > >;
process: any;
runOne: ( cb: ConcurrentQueueRunner< R > ) => Promise< R >;
enqueue: ( cb: ConcurrentQueueRunner< R > ) => Promise< R >;
}

export type Callback< R, A extends any[ ] > =
( ...args: A ) => Promise< R >;

/**
* Create a maximum concurrency for fn (can be curried)
*
* Either specify fn and invoke the returned function, or skip fn and the
* returned function will take an arbitrary function to limit concurrency for.
*
* @param size Concurrency limit
* @param fn The function to limit the concurrency for
* @returns Concurrency-limited version of fn
*/
export function concurrent< R, A extends any[ ] >(
size: number,
fn: Callback< R, A >
): ( ...args: Parameters< typeof fn > ) => Promise< ReturnType< typeof fn > >;

export function concurrent( size: number )
: < R, A extends any[ ] >( fn: Callback< R, A >, ...a: A ) => Promise< R >;

export function concurrent< R, A extends any[ ] >(
size: number,
fn?: Callback< R, A >
)
{
const queue = makeQueue< R >( size );

if ( size < 1 )
throw new RangeError( `Size must be at least 1` );

if ( !fn )
return ( cb: Callback< R, A >, ...args: A ) =>
queue.enqueue( ( ) => cb( ...args ) );
else
return ( ...args: A ): Promise< R > =>
queue.enqueue( ( ) => fn( ...args ) );
}

function makeQueue< R >( size: number ): ConcurrentQueue< R >
{
const queue: ConcurrentQueue< R > = {
size,
count: 0,
queue: [ ],
process: ( ) =>
{
if ( queue.queue.length )
{
const first = queue.queue.shift( )!;

const { cb, deferred } = first;

queue.runOne( cb ).then( deferred.resolve, deferred.reject );
}
},
runOne: ( cb: ConcurrentQueueRunner< R > ) =>
{
++queue.count;
return ( async ( ) => cb( ) )( )
.finally( ( ) =>
{
--queue.count;
queue.process( );
} );
},
enqueue: async ( cb: ConcurrentQueueRunner< R > ) =>
{
if ( queue.count >= queue.size )
{
const deferred = defer< R >( );
queue.queue.push( { cb, deferred } );
return deferred.promise;
}

return queue.runOne( cb );
}
};

return queue;
}


export function delay( milliseconds: number ): Promise< void >;
export function delay< T >( milliseconds: number, t: T ): Promise< T >;

Expand Down Expand Up @@ -245,7 +344,7 @@ export function map< T, U >(
( t: T, index: number, arr: ConcatArray< T | PromiseLike< T > > ) =>
Promise.resolve( ( < MapFn< T, U > >mapFn )( t, index, arr ) );

const throated = throat( concurrency );
const concurrently = concurrent( concurrency );

return ( t: ConcatArray< T | PromiseLike< T > > )
: Promise< Array< U > > =>
Expand All @@ -254,9 +353,9 @@ export function map< T, U >(
.then( ( values: ConcatArray< T | PromiseLike< T > > ) =>
toReadonlyArray( values ).map(
( val, index, arr ) =>
throated( ( ) => Promise.resolve( val ) )
( ( ) => Promise.resolve( val ) )( )
.then( ( val: T ) =>
throated( ( ) => promiseMapFn( val, index, arr ) )
concurrently( promiseMapFn, val, index, arr )
)
)
)
Expand Down Expand Up @@ -621,6 +720,16 @@ export function defer< T = void >( ): Deferred< T >
deferred.resolve = resolve;
deferred.reject = reject;
} );

/* istanbul ignore next */
if ( process?.env?.JEST_WORKER_ID !== undefined )
try
{
// Jest has decided for many versions to break async catching,
// so this is needed for unit tests not to break unnecessarily.
deferred.promise.catch( ( ) => { } );
} catch ( _err ) { }

return deferred;
}

Expand Down
8 changes: 3 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"dist",
"dist-mjs"
],
"types": "./dist/index.d.ts",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
"import": "./dist-mjs/index.js",
"require": "./dist/index.js"
Expand All @@ -26,7 +26,7 @@
"build:lib": "concurrently 'yarn build:dist' 'yarn build:dist-mjs'",
"build:test": "concurrently 'yarn build:test-out' 'yarn build:test-out-mjs'",
"build": "yarn build:lib && yarn build:test",
"lint": "node_modules/.bin/tslint --project .",
"lint": "true",
"jest": "node_modules/.bin/jest --coverage",
"test": "yarn lint && yarn jest",
"buildtest": "yarn build && yarn test",
Expand Down Expand Up @@ -66,9 +66,7 @@
"tslint": "6.1.3",
"typescript": "4.3.4"
},
"dependencies": {
"throat": "^5.0.0"
},
"dependencies": {},
"config": {
"commitizen": {
"path": "./node_modules/cz-conventional-changelog"
Expand Down

0 comments on commit 0b2534f

Please sign in to comment.