diff --git a/README.md b/README.md index e99b5bd..704da70 100644 --- a/README.md +++ b/README.md @@ -215,6 +215,147 @@ const p = new AxiosPromise((resolve, reject, {onCancel}) => { > Note: uncaught `CanceledError` rejection won't lead to `unhandledrejection` warning +## Helpers +### bottleneck + +A helper that creates a simple queue by decorating an asynchronous function. + +`bottleneck(fn, options?: {concurrency: number, cancelRunning: boolean, sync: boolean, timeout: number, taskTimeout: number, queueTimeout: number})` + +```js + const fn = bottleneck(function* () { + console.log('start'); + yield AxiosPromise.delay(2000); + console.log('end'); + return 'foo' + }, {concurrency: 1}); + + const results = await Promise.allSettled([ + fn(), + fn(), + fn() + ]); + + console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.toString() || value))) +``` +Log: +``` +start +end +start +end +start +end +[ 'fulfilled : foo', 'fulfilled : foo', 'fulfilled : foo' ] +```` + +### Options + +`cancelRunning` - cancel running tasks if the concurrency limit is reached + +```js +const fn = bottleneck(function* () { + console.log('start'); + yield AxiosPromise.delay(2000); + console.log('end'); + return 'foo' +}, {cancelRunning: true, concurrency: 1}); + +const results = await Promise.allSettled([ + fn(), + fn(), + fn() +]); + +console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.message || value))) +``` +Log: +``` +start +end +[ + 'rejected : CanceledError: task limit reached', + 'rejected : CanceledError: task limit reached', + 'fulfilled : foo' +] +``` + +`timeout/taskTimeout/queueTimeout` - use AxiosPromiseSync instead AxiosPromise + +Sets appropriate timeouts + +```js +(async () => { + const fn = bottleneck(function* (t = 2000) { + console.log('start'); + yield AxiosPromise.delay(t); + console.log('end'); + return 'foo' + }, {concurrency: 1, taskTimeout: 1000}); + + const results = await Promise.allSettled([ + fn(), + fn(), + fn(500) + ]); + + console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.toString() || value))) +})(); +``` + +Log: + +``` +start +start +start +end +[ + 'rejected : TimeoutError: task timeout', + 'rejected : TimeoutError: task timeout', + 'fulfilled : foo' +] +``` + +### Manual cancellation + +```js + const fn = bottleneck(function* (t = 2000) { + console.log('start'); + yield AxiosPromise.delay(t); + console.log('end'); + return 'foo' + }, {concurrency: 1}); + + const tasks = [ + fn(), + fn(), + fn() + ]; + + setTimeout(() => { + tasks[1].cancel('Oops!'); + }, 500); + + const results = await Promise.allSettled(tasks); + + console.log(results.map(({status, reason, value}) => status + ' : ' + (reason?.toString() || value))) +``` + +Log: + +``` +start +end +start +end +[ + 'fulfilled : foo', + 'rejected : CanceledError: Oops!', + 'fulfilled : foo' +] +``` + ## License The MIT License Copyright (c) 2023 Dmitriy Mozgovoy robotshara@gmail.com diff --git a/index.d.cts b/index.d.cts index 3d9f68a..f674c87 100644 --- a/index.d.cts +++ b/index.d.cts @@ -100,7 +100,7 @@ declare namespace AxiosPromise { timeout(ms: number, errorOrMessage?: Error|string|number): AxiosPromise; - listen(signal: GenericAbortSignal): AxiosPromise; + listen(signal?: GenericAbortSignal): AxiosPromise; cancel(reason?: any): boolean; @@ -164,5 +164,6 @@ declare namespace AxiosPromise { function setImmediate(handler: ()=> void): void; function asap(handler: ()=> void): void; function symbols(...names: string[]): IterableIterator; + function bottleneck(fn: (...args: TArgs) => any, options?: {concurrency: number, cancelRunning: boolean, sync: boolean, timeout: number, taskTimeout: number, queueTimeout: number}) : (...args: TArgs) => AxiosPromise const global: object; } diff --git a/index.d.ts b/index.d.ts index 86e2d54..4f08b89 100644 --- a/index.d.ts +++ b/index.d.ts @@ -94,7 +94,7 @@ export class AxiosPromise implements Thenable { finally (onFinally?: (result: {value: U, status: SettledStatus}, scope: AxiosPromise) => any | Thenable): AxiosPromise; atomic(mode?: AtomicMode): AxiosPromise; timeout(ms: number, errorOrMessage?: Error|string|number): AxiosPromise; - listen(signal: GenericAbortSignal): AxiosPromise; + listen(signal?: GenericAbortSignal): AxiosPromise; cancel(reason?: any): boolean; onCancel(onCancelListener: OnCancelListener): void; tag(str: string): AxiosPromise; @@ -141,4 +141,5 @@ export function defineConstants(obj: object, props: Record, export function setImmediate(handler: ()=> void): void; export function asap(handler: ()=> void): void; export function symbols(...names: string[]): IterableIterator; +export function bottleneck(fn: (...args: TArgs) => any, options?: {concurrency: number, cancelRunning: boolean, sync: boolean, timeout: number, taskTimeout: number, queueTimeout: number}) : (...args: TArgs) => AxiosPromise export const global: object; diff --git a/lib/AbortController.js b/lib/AbortController.js index 1e8231b..bab2ca7 100644 --- a/lib/AbortController.js +++ b/lib/AbortController.js @@ -41,6 +41,12 @@ const _AbortSignal = hasNativeSupport ? AbortSignal : class AbortSignal extends this.emit(type, event) } + throwIfAborted() { + if (this[kAborted]) { + throw this[kReason]; + } + } + get [Symbol.toStringTag]() { return 'AbortSignal' } diff --git a/lib/CanceledError.js b/lib/CanceledError.js index 2465c39..d6efff3 100644 --- a/lib/CanceledError.js +++ b/lib/CanceledError.js @@ -3,7 +3,7 @@ const kSignature = Symbol.for(`AxiosPromise.CanceledError`); export class CanceledError extends Error { constructor(message, code) { - super(message || 'canceled'); + super(message || 'This operation was aborted'); const internal = this.constructor[kInternals]; this.name = internal.name; this.code = code || internal.code; diff --git a/lib/index.js b/lib/index.js index 91777ea..2fc93e1 100644 --- a/lib/index.js +++ b/lib/index.js @@ -20,7 +20,8 @@ const { global, setImmediate, isAbortController, - asap + asap, + drop } = utils; const kPromiseSign = Symbol.for('AxiosPromise'); @@ -86,11 +87,11 @@ const hasConsole = typeof console !== 'undefined' && console; const noop = () => {}; -const getMethod = (obj, name) => { +const getThen = (obj) => { let type; let then; - if(((type = typeof obj) === 'object' || type === 'function') && typeof (then = obj[name]) === 'function') { + if(((type = typeof obj) === 'object' || type === 'function') && typeof (then = obj.then) === 'function') { return then; } } @@ -264,7 +265,7 @@ export class AxiosPromise { } listen(signal) { - if (!this[kFinalized]) { + if (signal != null && !this[kFinalized]) { if (!isAbortSignal(signal)) { throw TypeError('expected AbortSignal object'); } @@ -389,10 +390,9 @@ export class AxiosPromise { } this[kValue] = value; + this[kState] = isRejected ? STATE_REJECTED : STATE_FULFILLED; if (!settled) { - this[kState] = isRejected ? STATE_REJECTED : STATE_FULFILLED; - this[kSync] ? this[kFinalize]() : asap(() => this[kFinalize]()); } } @@ -409,7 +409,7 @@ export class AxiosPromise { then = (value = constructor[kResolveGenerator](value, new constructor(noop))).then; } else if (value) { try { - then = getMethod(value, 'then'); + then = getThen(value); } catch (err) { return this[kResolveTo](err, true); } @@ -647,11 +647,7 @@ export class AxiosPromise { return promise[kResolve](r.value); } - const innerPromise = this.resolve(r.value).then(onFulfilled, onRejected); - - promise[kInnerThenable] = innerPromise; - - return innerPromise; + promise[kInnerThenable] = this.resolve(r.value).then(onFulfilled, onRejected); } onFulfilled(); @@ -743,6 +739,52 @@ export class AxiosPromiseSync extends AxiosPromise { AxiosPromiseSync.prototype[kSync] = true; +const bottleneck = (fn, {concurrency = 1, cancelRunning, sync, timeout, taskTimeout, queueTimeout} = {}) => { + const queue = []; + const running = []; + let pending = 0; + + const constructor = sync ? AxiosPromiseSync : AxiosPromise; + + fn = constructor.promisify(fn); + + return function(...args) { + let done; + let pushed; + + const promise = new constructor((resolve) => { + if (pending++ < concurrency) { + return resolve(); + } + + if (cancelRunning && running[0]) { + running.shift().cancel('task limit reached'); + } + + return queue.push(resolve); + }) + .timeout(queueTimeout, 'queue timeout') + .then(() => { + return constructor.resolve(fn.apply(this, args)).timeout(taskTimeout, 'task timeout') + }).finally((v) => { + done = true; + pending--; + if (pushed) { + const index = running.indexOf(promise); + running.splice(index, 1); + } + queue.length && queue.shift()(); + }); + + if (!done) { + pushed = true; + running.push(promise); + } + + return promise.timeout(timeout); + } +} + export { isGenerator, isGeneratorFunction, @@ -762,5 +804,6 @@ export { EventEmitter, CanceledError, TimeoutError, + bottleneck, AxiosPromise as default }; diff --git a/test/specs/bottleneck.js b/test/specs/bottleneck.js new file mode 100644 index 0000000..34297f8 --- /dev/null +++ b/test/specs/bottleneck.js @@ -0,0 +1,37 @@ +import assert from 'assert'; +import {bottleneck, AxiosPromise} from "../../lib/index.js"; +import sinon from "sinon"; +import {promisify} from 'util'; + +describe('bottleneck', () => { + it('should support for concurrency', () => { + let pending = 0; + const concurrency = 2; + + const fn = bottleneck(function* () { + assert.ok(pending++ < concurrency, `pending tasks count exceeds concurrency`); + yield AxiosPromise.delay(100); + pending--; + }, {concurrency}); + + return Promise.all([ + fn(), + fn(), + fn(), + fn(), + ]) + }); + + it('should support for cancellation of preempted tasks', () => { + const fn = bottleneck(function* () { + yield AxiosPromise.delay(500); + }, {cancelRunning: true, concurrency: 2}); + + return Promise.all([ + assert.rejects(fn, /CanceledError/), + assert.rejects(fn, /CanceledError/), + fn(), + fn(), + ]) + }) +});