Skip to content

Commit

Permalink
add worker and workerConcurrency config options
Browse files Browse the repository at this point in the history
  • Loading branch information
zisismaras committed Sep 19, 2019
1 parent 146cb7d commit ffb7c0a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
10 changes: 10 additions & 0 deletions src/runner/parseConfig.ts
Expand Up @@ -117,6 +117,16 @@ export type Config = {
* Sets the port of the internal devTools protocol server, default is 9730. Use 0 for a random port.
*/
protocolPort?: number
/**
* Sets the number of workers to use.
* Defaults to "auto" which will spawn as many workers as needed by the current pipeline (not more than the total system thread count).
*/
workers?: number
/**
* Sets the number of steps each worker can execute at the same time.
* Defaults to 1. It should only be increased if there are more parallel steps than the worker count and the steps are mostly I/O bound.
*/
workerConcurrency?: number
},
/**
* Execute the steps in a serial manner by passing each step's output to next one's input.
Expand Down
19 changes: 16 additions & 3 deletions src/runner/runner.ts
Expand Up @@ -150,14 +150,27 @@ export async function run(projectFolder: string, config: Config, options: {
}

//launch pipeproc
const stepCount = steps.length <= 4 ? 1 : countSteps(steps) - 3;
const workers = stepCount > cpus().length ? cpus().length : stepCount;
opLog.info(`using workers: ${workers}`);
let workers: number;
if (config.config && config.config.workers && config.config.workers > 0) {
workers = config.config.workers;
} else {
const stepCount = steps.length <= 4 ? 1 : countSteps(steps) - 3;
workers = stepCount > cpus().length ? cpus().length : stepCount;
}
let workerConcurrency: number;
if (config.config && config.config.workerConcurrency && config.config.workerConcurrency > 0) {
workerConcurrency = config.config.workerConcurrency;
opLog.info(`using workers: ${workers} (concurrency: ${workerConcurrency})`);
} else {
workerConcurrency = 1;
opLog.info(`using workers: ${workers}`);
}
const waiter = opLog.waiter("initializing");
await pipeprocClient.spawn({
socket: `ipc://${pathResolve(storeProjectFolder, "run.sock")}`,
location: getPipeprocFolder(storeProjectFolder),
workers: workers,
workerConcurrency: workerConcurrency,
workerRestartAfter: 100
});
const SIGINT = "SIGINT";
Expand Down

0 comments on commit ffb7c0a

Please sign in to comment.