Skip to content

Commit

Permalink
fix: wait for running items before end resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
wdavidw committed Dec 22, 2023
1 parent cc02fa7 commit f2212fe
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Multiple items (arrays) are merged. Muliple options (objects) are merged as well
- `call`
Execute one or several items and return a promise with the resolved value. Unless the `fluent` option is `false`, it is also possible to chain additional functions.
- `end(error|options)`
Close the scheduler. No further items are allowed to register with `call`, or an error is thrown. It returns a promise that resolves once all previously scheduled items are resolved. When `end` is called and each is in paused state, all paused items are resolved with `undefined` or an error if any.
Close the scheduler. No further items are allowed to register with `call`, or the returned promise is rejected. It returns a promise that resolves once all previously scheduled items resolve. When `end` is called and the scheduler is in paused state, all paused items are resolved with `undefined` or an error if any.
Available options:
- `error`
Reject the returned promise and every registered item that is not yet executed with an error. All scheduled items not yet executed are resolved with an error. In `relax` mode, only the promise returned by `end` is rejected with an error.
Expand Down
23 changes: 17 additions & 6 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,20 @@ export default function () {
running: 0,
count: 0,
stack: [],
stack_running: [],
close_item: undefined,
};
const internal = {
pump: function () {
detach(async function () {
if (state.closed && state.close_item && state.stack_running.length === 0) {
if (state.error && !options.relax) {
state.close_item.reject(state.error);
} else {
state.close_item.resolve();
}
return;
}
if (!state.stack.length) {
return;
}
Expand All @@ -105,12 +115,9 @@ export default function () {
}
const item = state.stack.shift();
if (item.type === "END") {
if (state.stack.length !== 0) console.error("EACH_INVALID_STATE");
if (state.error && !options.relax) {
item.reject(state.error);
} else {
item.resolve();
}
// Place the item on the side to ensure that running item are completed.
state.close_item = item;
internal.pump();
return;
} else if (item.type === "ERROR") {
state.error = item.value;
Expand All @@ -129,10 +136,14 @@ export default function () {
: typeof item.handler === "function"
? await item.handler.call()
: await item.handler;
const position = state.stack_running.map(i => i === item ? i : -1).filter(i => i !== -1);
state.stack_running.splice(position, 1);
state.running--;
item.resolve.call(null, result);
internal.pump();
} catch (error) {
const position = state.stack_running.map(i => i === item ? i : -1).filter(i => i !== -1);
state.stack_running.splice(position, 1);
state.running--;
state.error = error;
item.reject.call(null, error);
Expand Down

0 comments on commit f2212fe

Please sign in to comment.