From a0cb4d285967a407cefc2542e11390feaea12a05 Mon Sep 17 00:00:00 2001 From: Christian Dupuis Date: Fri, 7 Dec 2018 20:19:32 +0100 Subject: [PATCH] Introduce promise pool to limit number of concurrent executions --- index.ts | 3 ++ lib/operations/common/repoUtils.ts | 41 ++++++++----------- lib/util/pool.ts | 39 ++++++++++++++++++ package-lock.json | 5 +++ package.json | 1 + test/util/pool.test.ts | 64 ++++++++++++++++++++++++++++++ 6 files changed, 128 insertions(+), 25 deletions(-) create mode 100644 lib/util/pool.ts create mode 100644 test/util/pool.test.ts diff --git a/index.ts b/index.ts index bf8806d8d..aeeed9e4d 100644 --- a/index.ts +++ b/index.ts @@ -251,6 +251,9 @@ export { doWithRetry, RetryOptions, } from "./lib/util/retry"; +export { + executeAll, +} from "./lib/util/pool"; export * from "./lib/util/spawn"; export { Maker, diff --git a/lib/operations/common/repoUtils.ts b/lib/operations/common/repoUtils.ts index 105ba123a..e257c3235 100644 --- a/lib/operations/common/repoUtils.ts +++ b/lib/operations/common/repoUtils.ts @@ -1,7 +1,7 @@ -import * as _ from "lodash"; import { HandlerContext } from "../../HandlerContext"; import { Project } from "../../project/Project"; import { logger } from "../../util/logger"; +import { executeAll } from "../../util/pool"; import { defaultRepoLoader } from "./defaultRepoLoader"; import { ProjectOperationCredentials } from "./ProjectOperationCredentials"; import { @@ -12,11 +12,6 @@ import { RepoFinder } from "./repoFinder"; import { RepoRef } from "./RepoId"; import { RepoLoader } from "./repoLoader"; -/** - * Specify how many repos should be edited concurrently at max - */ -export let EditAllChunkSize = 5; - /** * Perform an action against all the given repos. * Skip over repos that cannot be loaded, logging a warning. @@ -37,25 +32,21 @@ export async function doWithAllRepos(ctx: HandlerContext, repoFilter: RepoFilter = AllRepos, repoLoader: RepoLoader = defaultRepoLoader(credentials)): Promise { - const allIds = await relevantRepos(ctx, repoFinder, repoFilter); - const idChunks = _.chunk(allIds , EditAllChunkSize); - const results: R[] = []; - for (const ids of idChunks) { - results.push(...(await Promise.all(ids.map(id => - repoLoader(id) - .catch(err => { - logger.warn("Unable to load repo %s/%s: %s", id.owner, id.repo, err); - logger.debug(err.stack); - return undefined; - }) - .then(p => { - if (p) { - return action(p, parameters); - } - }))) - .then(proms => proms.filter(prom => prom)))); - } - return results; + const ids = await relevantRepos(ctx, repoFinder, repoFilter); + const promises = ids.map(id => + () => repoLoader(id) + .catch(err => { + logger.warn("Unable to load repo %s/%s: %s", id.owner, id.repo, err); + logger.debug(err.stack); + return undefined; + }) + .then(p => { + if (p) { + return action(p, parameters); + } + })); + + return (await executeAll(promises)).filter(result => result); } export function relevantRepos(ctx: HandlerContext, diff --git a/lib/util/pool.ts b/lib/util/pool.ts new file mode 100644 index 000000000..42697d3ff --- /dev/null +++ b/lib/util/pool.ts @@ -0,0 +1,39 @@ +import { configurationValue } from "../configuration"; + +function concurrentDefault(): number { + return configurationValue("pool.concurrent", 5); +} + +/** + * Execute all provided promises with a max concurrency + * Results will be in the same order as the provided promises; if one promise rejects, + * execution is stopped and the returned promise is rejected as well. + * @param promises all promises to execute + * @param concurrent the max number of concurrent promise executions + */ +export async function executeAll(promises: Array<() => Promise>, + concurrent: number = concurrentDefault()): Promise { + let index = 0; + const results: any[] = []; + const producer = () => { + if (index < promises.length) { + const promise = promises[index](); + results[index] = promise; + index++; + return promise; + } else { + return null; + } + }; + + const PromisePool = require("es6-promise-pool"); + const pool = new PromisePool(producer, concurrent); + + pool.addEventListener("fulfilled", (event: any) => { + results[results.indexOf(event.data.promise)] = event.data.result; + }); + + await pool.start(); // start only returns a promise; not an [] of results + + return results; +} diff --git a/package-lock.json b/package-lock.json index bd7f60daf..0cb5ae6b4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2486,6 +2486,11 @@ "resolved": "https://registry.npmjs.org/es6-promise/-/es6-promise-4.2.5.tgz", "integrity": "sha512-n6wvpdE43VFtJq+lUDYDBFUwV8TZbuGXLV4D6wKafg13ldznKsyEvatubnmUe31zcvelSzOHF+XbaT+Bl9ObDg==" }, + "es6-promise-pool": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/es6-promise-pool/-/es6-promise-pool-2.5.0.tgz", + "integrity": "sha1-FHxhKza0fxBQJ/nSv1SlmKmdnMs=" + }, "es6-promisify": { "version": "5.0.0", "resolved": "http://registry.npmjs.org/es6-promisify/-/es6-promisify-5.0.0.tgz", diff --git a/package.json b/package.json index 8c491be8c..d21532ad1 100644 --- a/package.json +++ b/package.json @@ -73,6 +73,7 @@ "cluster": "^0.7.7", "cors": "^2.8.5", "cross-spawn": "^6.0.5", + "es6-promise-pool": "^2.5.0", "express": "^4.16.4", "express-session": "^1.15.6", "find-up": "^3.0.0", diff --git a/test/util/pool.test.ts b/test/util/pool.test.ts new file mode 100644 index 000000000..6c52825f3 --- /dev/null +++ b/test/util/pool.test.ts @@ -0,0 +1,64 @@ +/* + * Copyright © 2018 Atomist, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import * as assert from "power-assert"; +import { executeAll } from "../../lib/util/pool"; + +describe("pool", () => { + + describe("executeAll", () => { + + it("should preserve order", async () => { + const results = await executeAll([() => { + return new Promise(resolve => { + setTimeout(() => resolve("1"), 1000); + }); + }, () => { + return new Promise(resolve => { + setTimeout(() => resolve("2"), 500); + }); + }, () => { + return new Promise(resolve => { + setTimeout(() => resolve("3"), 100); + }); + }], 1); + assert.deepStrictEqual(results, ["1", "2", "3"]); + }); + + it("should reject with results and errors", async () => { + try { + await executeAll([() => { + return new Promise(resolve => { + setTimeout(() => resolve("1"), 1000); + }); + }, () => { + return new Promise((resolve, reject) => { + reject(new Error("2")); + }); + }, () => { + return new Promise(resolve => { + setTimeout(() => resolve("3"), 100); + }); + }], 1); + } catch (err) { + assert.strictEqual(err.message, "2"); + } + }); + + }); + +});