Skip to content

Commit

Permalink
Auto merge pull request #417 from atomist/automation-client
Browse files Browse the repository at this point in the history
Introduce promise pool to limit number of concurrent executions
  • Loading branch information
atomist-bot committed Dec 8, 2018
2 parents ad1f472 + a0cb4d2 commit 445874d
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 25 deletions.
3 changes: 3 additions & 0 deletions index.ts
Expand Up @@ -251,6 +251,9 @@ export {
doWithRetry,
RetryOptions,
} from "./lib/util/retry";
export {
executeAll,
} from "./lib/util/pool";
export * from "./lib/util/spawn";
export {
Maker,
Expand Down
41 changes: 16 additions & 25 deletions lib/operations/common/repoUtils.ts
@@ -1,7 +1,7 @@
import * as _ from "lodash";
import { HandlerContext } from "../../HandlerContext";
import { Project } from "../../project/Project";
import { logger } from "../../util/logger";
import { executeAll } from "../../util/pool";
import { defaultRepoLoader } from "./defaultRepoLoader";
import { ProjectOperationCredentials } from "./ProjectOperationCredentials";
import {
Expand All @@ -12,11 +12,6 @@ import { RepoFinder } from "./repoFinder";
import { RepoRef } from "./RepoId";
import { RepoLoader } from "./repoLoader";

/**
* Specify how many repos should be edited concurrently at max
*/
export let EditAllChunkSize = 5;

/**
* Perform an action against all the given repos.
* Skip over repos that cannot be loaded, logging a warning.
Expand All @@ -37,25 +32,21 @@ export async function doWithAllRepos<R, P>(ctx: HandlerContext,
repoFilter: RepoFilter = AllRepos,
repoLoader: RepoLoader =
defaultRepoLoader(credentials)): Promise<R[]> {
const allIds = await relevantRepos(ctx, repoFinder, repoFilter);
const idChunks = _.chunk(allIds , EditAllChunkSize);
const results: R[] = [];
for (const ids of idChunks) {
results.push(...(await Promise.all(ids.map(id =>
repoLoader(id)
.catch(err => {
logger.warn("Unable to load repo %s/%s: %s", id.owner, id.repo, err);
logger.debug(err.stack);
return undefined;
})
.then(p => {
if (p) {
return action(p, parameters);
}
})))
.then(proms => proms.filter(prom => prom))));
}
return results;
const ids = await relevantRepos(ctx, repoFinder, repoFilter);
const promises = ids.map(id =>
() => repoLoader(id)
.catch(err => {
logger.warn("Unable to load repo %s/%s: %s", id.owner, id.repo, err);
logger.debug(err.stack);
return undefined;
})
.then(p => {
if (p) {
return action(p, parameters);
}
}));

return (await executeAll<R>(promises)).filter(result => result);
}

export function relevantRepos(ctx: HandlerContext,
Expand Down
39 changes: 39 additions & 0 deletions lib/util/pool.ts
@@ -0,0 +1,39 @@
import { configurationValue } from "../configuration";

function concurrentDefault(): number {
return configurationValue<number>("pool.concurrent", 5);
}

/**
* Execute all provided promises with a max concurrency
* Results will be in the same order as the provided promises; if one promise rejects,
* execution is stopped and the returned promise is rejected as well.
* @param promises all promises to execute
* @param concurrent the max number of concurrent promise executions
*/
export async function executeAll<T>(promises: Array<() => Promise<T>>,
concurrent: number = concurrentDefault()): Promise<T[]> {
let index = 0;
const results: any[] = [];
const producer = () => {
if (index < promises.length) {
const promise = promises[index]();
results[index] = promise;
index++;
return promise;
} else {
return null;
}
};

const PromisePool = require("es6-promise-pool");
const pool = new PromisePool(producer, concurrent);

pool.addEventListener("fulfilled", (event: any) => {
results[results.indexOf(event.data.promise)] = event.data.result;
});

await pool.start(); // start only returns a promise; not an [] of results

return results;
}
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -73,6 +73,7 @@
"cluster": "^0.7.7",
"cors": "^2.8.5",
"cross-spawn": "^6.0.5",
"es6-promise-pool": "^2.5.0",
"express": "^4.16.4",
"express-session": "^1.15.6",
"find-up": "^3.0.0",
Expand Down
64 changes: 64 additions & 0 deletions test/util/pool.test.ts
@@ -0,0 +1,64 @@
/*
* Copyright © 2018 Atomist, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import * as assert from "power-assert";
import { executeAll } from "../../lib/util/pool";

describe("pool", () => {

describe("executeAll", () => {

it("should preserve order", async () => {
const results = await executeAll([() => {
return new Promise<string>(resolve => {
setTimeout(() => resolve("1"), 1000);
});
}, () => {
return new Promise<string>(resolve => {
setTimeout(() => resolve("2"), 500);
});
}, () => {
return new Promise<string>(resolve => {
setTimeout(() => resolve("3"), 100);
});
}], 1);
assert.deepStrictEqual(results, ["1", "2", "3"]);
});

it("should reject with results and errors", async () => {
try {
await executeAll([() => {
return new Promise<string>(resolve => {
setTimeout(() => resolve("1"), 1000);
});
}, () => {
return new Promise<string>((resolve, reject) => {
reject(new Error("2"));
});
}, () => {
return new Promise<string>(resolve => {
setTimeout(() => resolve("3"), 100);
});
}], 1);
} catch (err) {
assert.strictEqual(err.message, "2");
}
});

});

});

0 comments on commit 445874d

Please sign in to comment.