Skip to content

Commit

Permalink
Merge pull request #10 from grimmer0125/optioin_skipIfFull
Browse files Browse the repository at this point in the history
Add option dropWhenReachLimit
  • Loading branch information
grimmer0125 committed Nov 15, 2021
2 parents 5761197 + b9d1021 commit e7e9975
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 25 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ All notable changes to this project will be documented in this file. See [standa

Those versions which only include documentation change might not be included here.

### [1.7.0](https://github.com/grimmer0125/d4c-queue/compare/v1.6.9...v1.7.0) (2021-11-15)

Add option dropWhenReachLimit for better throtting effect.

### [1.6.9](https://github.com/grimmer0125/d4c-queue/compare/v1.6.5...v1.6.9) (2021-10-03)

Update README. Fix potential security vulnerabilities in dependencies and gh-pages publishing.

### [1.6.5](https://github.com/grimmer0125/d4c-queue/compare/v1.6.4...v1.6.5) (2021-07-13)

Update README and fix potential security vulnerabilities in dependencies.
Expand Down
34 changes: 26 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ Wrap an [async](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Referenc
8. Well tested.
9. Optional parameter, `inheritPreErr`. If current task is waiting for previous tasks, set it as `true` to inherit the error of the previous task and the task will not be executed and throw a custom error `new PreviousError(task.preError.message ?? task.preError)`. If this parameter is omitted or set as `false`, the task will continue whether previous tasks happen errors or not.
10. Optional parameter, `noBlockCurr`. Set it as `true` to forcibly execute the current task in the another (microtask) execution of the event loop. This is useful if you pass a sync function as the first task but do not want it to block the current event loop.
11. Optional parameter, `dropWhenReachLimit`. Set it as `true`. Then it will be dropped when it is called but the system detects the queue concurrency limit is reached. It is like a kind of throttle mechanism but not time interval based. The dropped function call will not be really executed and will throw a execption whose message is `QueueIsFull` and you need to catch it.

## Installation

This package includes two builds.

- ES6 build (ES2015) with CommonJS module for `main` build in package.json.
- ES6 build (ES2015) with ES6 module for `module` build. Some tools will follow the `module` field in `package.json`, like Rollup, Webpack, or Parcel. It is good to let build tools can tree-shake this module build to import only the code they need.
- ES6 build (ES2015) with ES6 module for `module` build. Some tools will follow the `module` field in `package.json`, like Rollup, Webpack, or Parcel.

Either `npm install d4c-queue` or `yarn add d4c-queue`. Then import this package.

Expand Down Expand Up @@ -126,18 +127,18 @@ d4c.apply(syncFun, { args: ['syncFun_arg1'] })

#### Concurrency mode

Is it useful for rate-limiting tasks. For example, setup some concurrency limit to avoid send GitHub GraphQL API requests too fast, since it has rate limits control.
Is it useful for rate-limiting or throttling tasks. For example, setup some concurrency limit to avoid send GitHub GraphQL API requests too fast, since it has rate limits control.

Default concurrency limit of D4C instance is `1` in this library.

Usage:

```ts
/** change concurrency limit applied on default queues */
const d4c = new D4C([{ concurrency: { limit: 100 }}])
const d4c = new D4C([{ concurrency: { limit: 100 } }])

/** setup concurrency for specific queue: "2" */
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' }}])
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' } }])
```

You can adjust concurrency via `setConcurrency`.
Expand All @@ -151,6 +152,20 @@ d4c.setConcurrency([{ limit: 10 }])
d4c.setConcurrency([{ limit: 10, tag: 'queue2' }])
```

When this async task function is called and the system detects the concurrency limit is reached, this tasks will not be really executed and will be enqueued. If you want to drop this task function call, you can set `dropWhenReachLimit` option when wrapping/applying the task function. e.g.

```ts
const fn1 = d4c.wrap(taskFun, { dropWhenReachLimit: true })

try {
await fn1()
} catch (err) {
// when the concurrency limit is reached at this moment.
// err.message is QueueIsFull
console.log({ err })
}
```

### Decorators usage

#### Synchronization mode
Expand Down Expand Up @@ -188,7 +203,7 @@ class TestController {
@concurrent
static async fetchData(url: string) {}

@concurrent({ tag: '2' })
@concurrent({ tag: '2', dropWhenReachLimit: true })
async fetchData2(url: string) {}

/** You can still use @synchronized, as long as
Expand Down Expand Up @@ -453,6 +468,7 @@ function concurrent(option?: {
tag?: string | symbol
inheritPreErr?: boolean
noBlockCurr?: boolean
dropWhenReachLimit?: boolean
}) {}
```
Expand All @@ -467,7 +483,7 @@ Example:
@concurrent
@concurrent()
@concurrent({ tag: "world", inheritPreErr: true })
@concurrent({ inheritPreErr: true, noBlockCurr: true })
@concurrent({ inheritPreErr: true, noBlockCurr: true, dropWhenReachLimit: true })

```
Expand All @@ -490,10 +506,10 @@ usage:
const d4c = new D4C()

/** concurrency limit 500 applied on default queues */
const d4c = new D4C([{ concurrency: { limit: 500 }}])
const d4c = new D4C([{ concurrency: { limit: 500 } }])

/** setup concurrency for specific queue: "2" */
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' }}])
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' } }])
```
- setConcurrency
Expand All @@ -513,6 +529,7 @@ public wrap<T extends IAnyFn>(
tag?: string | symbol;
inheritPreErr?: boolean;
noBlockCurr?: boolean;
dropWhenReachLimit?: boolean;
}
)
```
Expand All @@ -530,6 +547,7 @@ public apply<T extends IAnyFn>(
tag?: string | symbol;
inheritPreErr?: boolean;
noBlockCurr?: boolean;
dropWhenReachLimit?: boolean;
args?: Parameters<typeof func>;
}
)
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "d4c-queue",
"version": "1.6.9",
"version": "1.7.0",
"description": "A task queue executes tasks sequentially or concurrently. Wrap an async/promise-returning/sync function as a queue-ready async function for easy reusing. Support passing arguments/getting return value, @synchronized/@concurrent decorator, Node.js/Browser.",
"main": "build/main/index.js",
"typings": "build/main/index.d.ts",
Expand Down Expand Up @@ -47,7 +47,7 @@
"task-queue",
"tasks",
"task-runner",
"microtask",
"microtask",
"angular"
],
"scripts": {
Expand Down
29 changes: 24 additions & 5 deletions src/lib/D4C.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ const funcPromise = (input: string[], input2: string): Promise<string> => {
return Promise.resolve(input[0] + input2)
}

const timeout = (seconds: number, target: { str: string }) => {
const timeout = (seconds: number, target?: { str: string }) => {
return new Promise<void>((resolve, _) =>
setTimeout(() => {
target.str += seconds
if (target?.str != undefined && target?.str != null) {
target.str += seconds
}
resolve()
}, seconds * 100)
)
}

const timeoutError = (seconds: number, result) => {
const timeoutError = (seconds: number, result: string | Error) => {
return new Promise((_, reject) =>
setTimeout(() => {
reject(result)
Expand Down Expand Up @@ -606,13 +608,15 @@ test('Decorator usage', async (t) => {
t.is(test.str, '0.10.5')

/** composite case: D4C instance on no autobind decorated method */
let error = null
try {
const d4c = new D4C()
const newFunc = d4c.wrap(testController.greet)
const resp = await newFunc('')
} catch (err) {
t.is(err.message, ErrMsg.MissingThisDueBindIssue)
error = err
}
t.is(error.message, ErrMsg.MissingThisDueBindIssue)

/** composite case: D4C instance on autobind decorated method */
const d4c = new D4C()
Expand Down Expand Up @@ -647,7 +651,7 @@ test('Decorator usage', async (t) => {
t.is(test.str, '0.10.5')

/** test invalid decorator */
let error = null
error = null
try {
class TestController4 {
@synchronized({ tag: true } as any)
Expand All @@ -669,6 +673,7 @@ test('Decorator usage', async (t) => {
// console.log(" err by purpose")
}
})()

error = null
try {
await testController.instanceTimeout(0.1, { str: '' })
Expand Down Expand Up @@ -840,3 +845,17 @@ test("Instance usage: option inheritPreErr enable: task2 inherit task1's error i

t.is(error.message, 'some_error')
})

test('Instance usage: test option dropWhenReachLimit', async (t) => {
const d4c = new D4C([{ concurrency: { limit: 2 } }])
const fn1 = d4c.wrap(timeout, { dropWhenReachLimit: true })

let error = null
try {
await fn1(3)
await Promise.all([fn1(3), fn1(3), fn1(3)])
} catch (err) {
error = err
}
t.is(error.message, ErrMsg.QueueIsFull)
})
32 changes: 22 additions & 10 deletions src/lib/D4C.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export enum ErrMsg {
TwoDecoratorsIncompatible = 'TwoDecoratorsInCompatible',
ClassAndMethodDecoratorsIncompatible = 'ClassAndMethodDecoratorsIncompatible',
MissingThisDueBindIssue = 'missingThisDueBindIssue',
QueueIsFull = 'QueueIsFull',
}

const queueSymbol = Symbol('d4cQueues') // subQueue system
Expand Down Expand Up @@ -213,6 +214,8 @@ function checkIfDecoratorOptionObject(obj: any): boolean {
(typeof obj.inheritPreErr === 'boolean' ||
obj.inheritPreErr === undefined) &&
(typeof obj.noBlockCurr === 'boolean' || obj.noBlockCurr === undefined) &&
(typeof obj.dropWhenReachLimit === 'boolean' ||
obj.dropWhenReachLimit === undefined) &&
checkTag(obj.tag)
) {
return true
Expand Down Expand Up @@ -240,6 +243,7 @@ export function concurrent(option?: {
tag?: string | symbol
inheritPreErr?: boolean
noBlockCurr?: boolean
dropWhenReachLimit?: boolean
}): MethodDecoratorType
export function concurrent(
targetOrOption?: any,
Expand Down Expand Up @@ -329,6 +333,7 @@ function _q<T extends IAnyFn>(
tag?: QueueTag
inheritPreErr?: boolean
noBlockCurr?: boolean
dropWhenReachLimit?: boolean
}
): (...args: Parameters<typeof func>) => Promise<UnwrapPromise<typeof func>> {
return async function (...args: any[]): Promise<any> {
Expand Down Expand Up @@ -381,16 +386,21 @@ function _q<T extends IAnyFn>(
let err: Error
let task: Task
if (taskQueue.runningTask === taskQueue.concurrency) {
const promise = new Promise(function (resolve) {
task = {
unlock: resolve,
preError: null,
inheritPreErr: option?.inheritPreErr,
}
})
taskQueue.queue.push(task)
await promise
taskQueue.runningTask += 1
if (!option?.dropWhenReachLimit) {
const promise = new Promise(function (resolve) {
task = {
unlock: resolve,
preError: null,
inheritPreErr: option?.inheritPreErr,
}
})
taskQueue.queue.push(task)
await promise
taskQueue.runningTask += 1
} else {
// drop this time, throttle mechanism
throw new Error(ErrMsg.QueueIsFull)
}
} else if (option?.noBlockCurr) {
taskQueue.runningTask += 1
await Promise.resolve()
Expand Down Expand Up @@ -528,6 +538,7 @@ export class D4C {
tag?: string | symbol
inheritPreErr?: boolean
noBlockCurr?: boolean
dropWhenReachLimit?: boolean
args?: Parameters<typeof func>
}
): Promise<UnwrapPromise<typeof func>> {
Expand All @@ -542,6 +553,7 @@ export class D4C {
tag?: string | symbol
inheritPreErr?: boolean
noBlockCurr?: boolean
dropWhenReachLimit?: boolean
}
): (...args: Parameters<typeof func>) => Promise<UnwrapPromise<typeof func>> {
if (!option || checkTag(option.tag)) {
Expand Down

0 comments on commit e7e9975

Please sign in to comment.