Skip to content

Commit

Permalink
feat(helper): add bottleneck helper; (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
DigitalBrainJS committed Mar 19, 2024
1 parent 47a3ac7 commit 9463140
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 15 deletions.
141 changes: 141 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,147 @@ const p = new AxiosPromise((resolve, reject, {onCancel}) => {

> Note: uncaught `CanceledError` rejection won't lead to `unhandledrejection` warning
## Helpers
### bottleneck

A helper that creates a simple queue by decorating an asynchronous function.

`bottleneck(fn, options?: {concurrency: number, cancelRunning: boolean, sync: boolean, timeout: number, taskTimeout: number, queueTimeout: number})`

```js
const fn = bottleneck(function* () {
console.log('start');
yield AxiosPromise.delay(2000);
console.log('end');
return 'foo'
}, {concurrency: 1});

const results = await Promise.allSettled([
fn(),
fn(),
fn()
]);

console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.toString() || value)))
```
Log:
```
start
end
start
end
start
end
[ 'fulfilled : foo', 'fulfilled : foo', 'fulfilled : foo' ]
````

### Options

`cancelRunning` - cancel running tasks if the concurrency limit is reached

```js
const fn = bottleneck(function* () {
console.log('start');
yield AxiosPromise.delay(2000);
console.log('end');
return 'foo'
}, {cancelRunning: true, concurrency: 1});
const results = await Promise.allSettled([
fn(),
fn(),
fn()
]);
console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.message || value)))
```
Log:
```
start
end
[
'rejected : CanceledError: task limit reached',
'rejected : CanceledError: task limit reached',
'fulfilled : foo'
]
```

`timeout/taskTimeout/queueTimeout` - use AxiosPromiseSync instead AxiosPromise

Sets appropriate timeouts

```js
(async () => {
const fn = bottleneck(function* (t = 2000) {
console.log('start');
yield AxiosPromise.delay(t);
console.log('end');
return 'foo'
}, {concurrency: 1, taskTimeout: 1000});
const results = await Promise.allSettled([
fn(),
fn(),
fn(500)
]);
console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.toString() || value)))
})();
```

Log:

```
start
start
start
end
[
'rejected : TimeoutError: task timeout',
'rejected : TimeoutError: task timeout',
'fulfilled : foo'
]
```

### Manual cancellation

```js
const fn = bottleneck(function* (t = 2000) {
console.log('start');
yield AxiosPromise.delay(t);
console.log('end');
return 'foo'
}, {concurrency: 1});
const tasks = [
fn(),
fn(),
fn()
];
setTimeout(() => {
tasks[1].cancel('Oops!');
}, 500);
const results = await Promise.allSettled(tasks);
console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.toString() || value)))
```

Log:

```
start
end
start
end
[
'fulfilled : foo',
'rejected : CanceledError: Oops!',
'fulfilled : foo'
]
```

## License

The MIT License Copyright (c) 2023 Dmitriy Mozgovoy robotshara@gmail.com
Expand Down
3 changes: 2 additions & 1 deletion index.d.cts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ declare namespace AxiosPromise {

timeout(ms: number, errorOrMessage?: Error|string|number): AxiosPromise<R>;

listen(signal: GenericAbortSignal): AxiosPromise<R>;
listen(signal?: GenericAbortSignal): AxiosPromise<R>;

cancel(reason?: any): boolean;

Expand Down Expand Up @@ -164,5 +164,6 @@ declare namespace AxiosPromise {
function setImmediate(handler: ()=> void): void;
function asap(handler: ()=> void): void;
function symbols(...names: string[]): IterableIterator<symbol>;
function bottleneck<TResult = any, TArgs extends any[] = []>(fn: (...args: TArgs) => any, options?: {concurrency: number, cancelRunning: boolean, sync: boolean, timeout: number, taskTimeout: number, queueTimeout: number}) : (...args: TArgs) => AxiosPromise<TResult>
const global: object;
}
3 changes: 2 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export class AxiosPromise <R = any> implements Thenable <R> {
finally<U = any> (onFinally?: (result: {value: U, status: SettledStatus}, scope: AxiosPromise<U>) => any | Thenable<any>): AxiosPromise<R>;
atomic<U = any>(mode?: AtomicMode): AxiosPromise<U>;
timeout(ms: number, errorOrMessage?: Error|string|number): AxiosPromise<R>;
listen(signal: GenericAbortSignal): AxiosPromise<R>;
listen(signal?: GenericAbortSignal): AxiosPromise<R>;
cancel(reason?: any): boolean;
onCancel(onCancelListener: OnCancelListener): void;
tag(str: string): AxiosPromise<R>;
Expand Down Expand Up @@ -141,4 +141,5 @@ export function defineConstants(obj: object, props: Record<symbol|string, any>,
export function setImmediate(handler: ()=> void): void;
export function asap(handler: ()=> void): void;
export function symbols(...names: string[]): IterableIterator<symbol>;
export function bottleneck<TResult = any, TArgs extends any[] = []>(fn: (...args: TArgs) => any, options?: {concurrency: number, cancelRunning: boolean, sync: boolean, timeout: number, taskTimeout: number, queueTimeout: number}) : (...args: TArgs) => AxiosPromise<TResult>
export const global: object;
6 changes: 6 additions & 0 deletions lib/AbortController.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const _AbortSignal = hasNativeSupport ? AbortSignal : class AbortSignal extends
this.emit(type, event)
}

throwIfAborted() {
if (this[kAborted]) {
throw this[kReason];
}
}

get [Symbol.toStringTag]() {
return 'AbortSignal'
}
Expand Down
2 changes: 1 addition & 1 deletion lib/CanceledError.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const kSignature = Symbol.for(`AxiosPromise.CanceledError`);

export class CanceledError extends Error {
constructor(message, code) {
super(message || 'canceled');
super(message || 'This operation was aborted');
const internal = this.constructor[kInternals];
this.name = internal.name;
this.code = code || internal.code;
Expand Down
67 changes: 55 additions & 12 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const {
global,
setImmediate,
isAbortController,
asap
asap,
drop
} = utils;

const kPromiseSign = Symbol.for('AxiosPromise');
Expand Down Expand Up @@ -86,11 +87,11 @@ const hasConsole = typeof console !== 'undefined' && console;

const noop = () => {};

const getMethod = (obj, name) => {
const getThen = (obj) => {
let type;
let then;

if(((type = typeof obj) === 'object' || type === 'function') && typeof (then = obj[name]) === 'function') {
if(((type = typeof obj) === 'object' || type === 'function') && typeof (then = obj.then) === 'function') {
return then;
}
}
Expand Down Expand Up @@ -264,7 +265,7 @@ export class AxiosPromise {
}

listen(signal) {
if (!this[kFinalized]) {
if (signal != null && !this[kFinalized]) {
if (!isAbortSignal(signal)) {
throw TypeError('expected AbortSignal object');
}
Expand Down Expand Up @@ -389,10 +390,9 @@ export class AxiosPromise {
}

this[kValue] = value;
this[kState] = isRejected ? STATE_REJECTED : STATE_FULFILLED;

if (!settled) {
this[kState] = isRejected ? STATE_REJECTED : STATE_FULFILLED;

this[kSync] ? this[kFinalize]() : asap(() => this[kFinalize]());
}
}
Expand All @@ -409,7 +409,7 @@ export class AxiosPromise {
then = (value = constructor[kResolveGenerator](value, new constructor(noop))).then;
} else if (value) {
try {
then = getMethod(value, 'then');
then = getThen(value);
} catch (err) {
return this[kResolveTo](err, true);
}
Expand Down Expand Up @@ -647,11 +647,7 @@ export class AxiosPromise {
return promise[kResolve](r.value);
}

const innerPromise = this.resolve(r.value).then(onFulfilled, onRejected);

promise[kInnerThenable] = innerPromise;

return innerPromise;
promise[kInnerThenable] = this.resolve(r.value).then(onFulfilled, onRejected);
}

onFulfilled();
Expand Down Expand Up @@ -743,6 +739,52 @@ export class AxiosPromiseSync extends AxiosPromise {

AxiosPromiseSync.prototype[kSync] = true;

const bottleneck = (fn, {concurrency = 1, cancelRunning, sync, timeout, taskTimeout, queueTimeout} = {}) => {
const queue = [];
const running = [];
let pending = 0;

const constructor = sync ? AxiosPromiseSync : AxiosPromise;

fn = constructor.promisify(fn);

return function(...args) {
let done;
let pushed;

const promise = new constructor((resolve) => {
if (pending++ < concurrency) {
return resolve();
}

if (cancelRunning && running[0]) {
running.shift().cancel('task limit reached');
}

return queue.push(resolve);
})
.timeout(queueTimeout, 'queue timeout')
.then(() => {
return constructor.resolve(fn.apply(this, args)).timeout(taskTimeout, 'task timeout')
}).finally((v) => {
done = true;
pending--;
if (pushed) {
const index = running.indexOf(promise);
running.splice(index, 1);
}
queue.length && queue.shift()();
});

if (!done) {
pushed = true;
running.push(promise);
}

return promise.timeout(timeout);
}
}

export {
isGenerator,
isGeneratorFunction,
Expand All @@ -762,5 +804,6 @@ export {
EventEmitter,
CanceledError,
TimeoutError,
bottleneck,
AxiosPromise as default
};
37 changes: 37 additions & 0 deletions test/specs/bottleneck.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import assert from 'assert';
import {bottleneck, AxiosPromise} from "../../lib/index.js";
import sinon from "sinon";
import {promisify} from 'util';

describe('bottleneck', () => {
it('should support for concurrency', () => {
let pending = 0;
const concurrency = 2;

const fn = bottleneck(function* () {
assert.ok(pending++ < concurrency, `pending tasks count exceeds concurrency`);
yield AxiosPromise.delay(100);
pending--;
}, {concurrency});

return Promise.all([
fn(),
fn(),
fn(),
fn(),
])
});

it('should support for cancellation of preempted tasks', () => {
const fn = bottleneck(function* () {
yield AxiosPromise.delay(500);
}, {cancelRunning: true, concurrency: 2});

return Promise.all([
assert.rejects(fn, /CanceledError/),
assert.rejects(fn, /CanceledError/),
fn(),
fn(),
])
})
});

0 comments on commit 9463140

Please sign in to comment.