diff --git a/docs/usage-pool.md b/docs/usage-pool.md index ac5d3f71..70151407 100644 --- a/docs/usage-pool.md +++ b/docs/usage-pool.md @@ -36,6 +36,7 @@ Note that `pool.queue()` will schedule a task to be run in a deferred way. It mi ```ts interface PoolOptions { concurrency?: number + maxQueuedJobs?: number name?: string size?: number } @@ -49,6 +50,7 @@ The first argument passed to the `Pool()` factory must be a function that spawns The second argument is optional and can either be the number of workers to spawn as a `number` or an options object (see `PoolOptions`): - `options.concurrency`: number of tasks to run simultaneously per worker, defaults to one +- `options.maxQueuedJobs`: maximum number of tasks to queue before throwing on `.queue()`, defaults to unlimited - `options.name`: give the pool a custom name to use in the debug log, so you can tell multiple pools apart when debugging - `options.size`: number of workers to spawn, defaults to the number of CPU cores diff --git a/src/master/pool.ts b/src/master/pool.ts index eef67e69..e0c49f1b 100644 --- a/src/master/pool.ts +++ b/src/master/pool.ts @@ -91,6 +91,9 @@ export interface PoolOptions { /** Maximum no. of tasks to run on one worker thread at a time. Defaults to one. */ concurrency?: number + /** Maximum no. of jobs to be queued for execution before throwing an error. */ + maxQueuedJobs?: number + /** Gives that pool a name to be used for debug logging, letting you distinguish between log output of different pools. */ name?: string @@ -267,6 +270,8 @@ class WorkerPool implements Pool { } public queue(taskFunction: TaskRunFunction) { + const { maxQueuedJobs = Infinity } = this.options + if (this.isClosing) { throw Error(`Cannot schedule pool tasks after terminate() has been called.`) } @@ -297,6 +302,14 @@ class WorkerPool implements Pool { } } + if (this.taskQueue.length >= maxQueuedJobs) { + throw Error( + "Maximum number of pool tasks queued. Refusing to queue another one.\n" + + "This usually happens for one of two reasons: We are either at peak " + + "workload right now or some tasks just won't finish, thus blocking the pool." + ) + } + this.debug(`Queueing task #${task.id}...`) this.taskQueue.push(task)