Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement maxQueuedJobs pool option #188

Merged
merged 1 commit into from
Dec 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/usage-pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Note that `pool.queue()` will schedule a task to be run in a deferred way. It mi
```ts
interface PoolOptions {
concurrency?: number
maxQueuedJobs?: number
name?: string
size?: number
}
Expand All @@ -49,6 +50,7 @@ The first argument passed to the `Pool()` factory must be a function that spawns
The second argument is optional and can either be the number of workers to spawn as a `number` or an options object (see `PoolOptions`):

- `options.concurrency`: number of tasks to run simultaneously per worker, defaults to one
- `options.maxQueuedJobs`: maximum number of tasks to queue before throwing on `.queue()`, defaults to unlimited
- `options.name`: give the pool a custom name to use in the debug log, so you can tell multiple pools apart when debugging
- `options.size`: number of workers to spawn, defaults to the number of CPU cores

Expand Down
13 changes: 13 additions & 0 deletions src/master/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ export interface PoolOptions {
/** Maximum no. of tasks to run on one worker thread at a time. Defaults to one. */
concurrency?: number

/** Maximum no. of jobs to be queued for execution before throwing an error. */
maxQueuedJobs?: number

/** Gives that pool a name to be used for debug logging, letting you distinguish between log output of different pools. */
name?: string

Expand Down Expand Up @@ -267,6 +270,8 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
}

public queue(taskFunction: TaskRunFunction<ThreadType, any>) {
const { maxQueuedJobs = Infinity } = this.options

if (this.isClosing) {
throw Error(`Cannot schedule pool tasks after terminate() has been called.`)
}
Expand Down Expand Up @@ -297,6 +302,14 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
}
}

if (this.taskQueue.length >= maxQueuedJobs) {
throw Error(
"Maximum number of pool tasks queued. Refusing to queue another one.\n" +
"This usually happens for one of two reasons: We are either at peak " +
"workload right now or some tasks just won't finish, thus blocking the pool."
)
}

this.debug(`Queueing task #${task.id}...`)
this.taskQueue.push(task)

Expand Down