Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make promise throttle level consistent #240

Merged
merged 9 commits into from
Jul 5, 2023
Merged
50 changes: 40 additions & 10 deletions src/throttledPromiseAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ export type PromiseItem<T, O = T | undefined> = {
producer: (source: T, throttledPromise: ThrottledPromiseAll<T, O | undefined>) => Promise<O | undefined>;
};

type IndexedProducer<T, O = T> = PromiseItem<T, O> & {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal use type to add index to input and output.

Index is used to maintain result order to align results with Promise.all

index: number;
};

type IndexedResult<O> = {
index: number;
result: O | undefined;
};

/**
* A promise that throttles the number of promises running at a time.
*
Expand All @@ -39,7 +48,7 @@ export class ThrottledPromiseAll<T, O = T> {
private readonly concurrency: number;
private wait: Duration;
private timeout: NodeJS.Timeout | undefined;
readonly #results: Array<O | undefined> = [];
readonly #results: Array<IndexedResult<O> | undefined> = [];

/**
* Construct a new ThrottledPromiseAll.
Expand All @@ -56,7 +65,7 @@ export class ThrottledPromiseAll<T, O = T> {
* Returns the results of the promises that have been resolved.
*/
public get results(): Array<O | undefined> {
return this.#results;
return this.#results.sort((a, b) => (a?.index ?? 0) - (b?.index ?? 0)).map((r) => r?.result);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return the results in same order supplied by user

}

/**
Expand Down Expand Up @@ -109,7 +118,7 @@ export class ThrottledPromiseAll<T, O = T> {
await this.dequeue();
}
this.stop();
return this.#results;
return this.results;
} catch (e) {
this.stop();
throw e;
Expand All @@ -124,14 +133,35 @@ export class ThrottledPromiseAll<T, O = T> {
}

private async dequeue(): Promise<void> {
while (this.queue.length > 0) {
const next = this.queue.slice(0, this.concurrency);
this.queue.splice(0, this.concurrency);
// eslint-disable-next-line no-await-in-loop
const results = await Promise.all(
next.map((item) => item.producer(item.source, this).catch((e) => Promise.reject(e)))
const generator = function* (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes code a bit more concise

data: Array<PromiseItem<T, O | undefined>>
): Generator<PromiseItem<T, O | undefined> | undefined> {
while (data.length > 0) {
yield data.shift();
}
};
const concurrencyPool: Array<Promise<IndexedResult<O> | undefined>> = [];
const get = generator(this.queue);
let index = 0;
while (this.queue.length > 0 || concurrencyPool.length > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stay here while there are things to do.

while (concurrencyPool.length < this.concurrency) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure there are always running promises at the concurrency level

const item = get.next().value as PromiseItem<T, O | undefined>;
if (!item) {
break;
}

const p: IndexedProducer<T, O> = { ...item, index: index++ };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add in the index property to the supplied producer

concurrencyPool.push(
p
.producer(item.source, this)
.then((result) => ({ index: p.index, result }))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

capture the index in the result

.catch((e) => Promise.reject(e))
);
}
this.#results.push(
// eslint-disable-next-line no-await-in-loop
await Promise.race([concurrencyPool.shift(), ...concurrencyPool])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race until one finishes.

);
this.#results.push(...results);
}
}
}
11 changes: 7 additions & 4 deletions test/promiseQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
import { expect } from 'chai';
import { ThrottledPromiseAll } from '../src/throttledPromiseAll';
import { Duration } from '../src/duration';
import { ThrottledPromiseAll } from '../src';
import { Duration } from '../src';

describe('throttledPromiseAll', () => {
const numberProducer = (
Expand All @@ -19,7 +19,10 @@ describe('throttledPromiseAll', () => {
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 1 });
for (const i of [1, 2, 3, 4, 5]) {
// eslint-disable-next-line no-await-in-loop
throttledPromiseAll.add(i, numberProducer);
throttledPromiseAll.add(
i,
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), (5 - i) * 100))
);
}
await throttledPromiseAll.all();
const results = throttledPromiseAll.results as number[];
Expand Down Expand Up @@ -65,7 +68,7 @@ describe('throttledPromiseAll', () => {
});
throttledPromiseAll.add(
[1, 2, 3, 4, 5],
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), 10000))
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), 200))
);
await throttledPromiseAll.all();
} catch (e) {
Expand Down
Loading