Skip to content

Commit

Permalink
Solve unhandled promise rejection error by handling lists earlier
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Jun 29, 2023
1 parent 531a62f commit cadc31c
Showing 1 changed file with 143 additions and 126 deletions.
269 changes: 143 additions & 126 deletions grafast/grafast/src/engine/executeBucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ export function executeBucket(
return;
}
const phase = phases[phaseIndex];
const { _allSteps } = phase;
/**
* To ensure we don't enter a situation where an "unhandled" promise
* rejection causes Node to exit, we must process each completed step during the
* same tick in which it completes.
*/
let indexesPendingLoopOver: Array<number> = [];

let executePromises:
| PromiseLike<GrafastResultsList<any> | GrafastResultStreamList<any>>[]
Expand All @@ -136,6 +143,7 @@ export function executeBucket(
const r = newGrafastError(timeoutError, step.id);
const result = arrayOfLength(bucket.size, r);
results[normalStepIndex] = result;
indexesPendingLoopOver.push(normalStepIndex);
bucket.hasErrors = true;
}
}
Expand Down Expand Up @@ -167,11 +175,13 @@ export function executeBucket(
}
} else {
results[normalStepIndex] = r;
indexesPendingLoopOver.push(normalStepIndex);
}
} catch (e) {
const r = newGrafastError(e, step.id);
const result = arrayOfLength(bucket.size, r);
results[normalStepIndex] = result;
indexesPendingLoopOver.push(normalStepIndex);
bucket.hasErrors = true;
}
}
Expand All @@ -189,48 +199,41 @@ export function executeBucket(
return nextPhase(phaseIndex + 1);
};

const loopOverResults = () => {
const { _allSteps } = phase;
const allStepsLength = _allSteps.length;
const executedLength = results.length;

if (isDev) {
assert.strictEqual(
executedLength,
phase.normalSteps?.length ?? 0,
"Expected only and all normalSteps to have executed",
);
}
/**
* Loops over (and resets) the indexesPendingLoopOver array, ensuring that
* all errors are handled.
*/
const loopOverResults = (): PromiseOrDirect<undefined | unknown> => {
if (indexesPendingLoopOver.length === 0) return;
const indexesToProcess = indexesPendingLoopOver;
indexesPendingLoopOver = [];

// Validate executed steps
for (
let allStepsIndex = 0;
allStepsIndex < executedLength;
allStepsIndex++
) {
for (const allStepsIndex of indexesToProcess) {
const result = results[allStepsIndex];
if (!Array.isArray(result)) {
const finishedStep = _allSteps[allStepsIndex];
throw new Error(
`Result from ${finishedStep} should be an array, instead received ${inspect(
result,
{ colors: true },
)}`,
);
}
const resultLength = result.length;
const finishedStep = _allSteps[allStepsIndex];
const resultLength = result?.length;
if (resultLength !== size) {
const finishedStep = _allSteps[allStepsIndex];
if (!Array.isArray(result)) {
throw new Error(
`Result from ${finishedStep} should be an array, instead received ${inspect(
result,
{ colors: true },
)}`,
);
}
throw new Error(
`Result array from ${finishedStep} should have length ${size}, instead it had length ${result.length}`,
);
}
bucket.store.set(finishedStep.id, arrayOfLength(size));
}

// Need to complete promises, check for errors, etc.
// **DO NOT THROW, DO NOT ALLOW AN ERROR TO BE RAISED!**
// **USE DEFENSIVE PROGRAMMING HERE!**

/** PROMISES ADDED HERE MUST NOT REJECT */
let promises: PromiseLike<void>[] | undefined;
let pendingPromises: PromiseLike<any>[] | undefined;
let pendingPromiseIndexes:
Expand All @@ -242,10 +245,6 @@ export function executeBucket(
}>
| undefined;

for (const step of _allSteps) {
bucket.store.set(step.id, arrayOfLength(size));
}

const success = (
finishedStep: ExecutableStep,
finalResult: any[],
Expand Down Expand Up @@ -342,91 +341,7 @@ export function executeBucket(
}
};

const runSyncSteps = () => {
if (!phase.unbatchedSyncAndSafeSteps) {
return next();
}
const extras: ExecutionExtra[] = [];
for (
let allStepsIndex = executedLength;
allStepsIndex < allStepsLength;
allStepsIndex++
) {
const step = _allSteps[allStepsIndex];
const meta =
step.metaKey !== undefined
? metaByMetaKey[step.metaKey]
: undefined;
extras[allStepsIndex] = {
stopTime,
meta,
eventEmitter,
_bucket: bucket,
_requestContext: requestContext,
};
}
outerLoop: for (let dataIndex = 0; dataIndex < size; dataIndex++) {
if (sideEffectPlanIdsWithErrors) {
for (const depId of sideEffectPlanIdsWithErrors) {
const depVal = bucket.store.get(depId)![dataIndex];
if (isGrafastError(depVal)) {
for (
let allStepsIndex = executedLength;
allStepsIndex < allStepsLength;
allStepsIndex++
) {
const step = _allSteps[
allStepsIndex
] as UnbatchedExecutableStep;
const storeEntry = bucket.store.get(step.id)!;
storeEntry[dataIndex] = depVal;
}
continue outerLoop;
}
}
}

stepLoop: for (
let allStepsIndex = executedLength;
allStepsIndex < allStepsLength;
allStepsIndex++
) {
const step = _allSteps[allStepsIndex] as UnbatchedExecutableStep;
const storeEntry = bucket.store.get(step.id)!;
try {
const deps: any = [];
for (const $dep of step.dependencies) {
const depVal = bucket.store.get($dep.id)![dataIndex];
if (bucket.hasErrors && isGrafastError(depVal)) {
storeEntry[dataIndex] = depVal;
continue stepLoop;
}
deps.push(depVal);
}
storeEntry[dataIndex] = step.unbatchedExecute(
extras[allStepsIndex],
...deps,
);
} catch (e) {
bucket.hasErrors = true;
storeEntry[dataIndex] = newGrafastError(e, step.id);
}
}
}
return next();
};

const awaitPromises = async () => {
// This _should not_ throw.
await Promise.all(promises!);
return runSyncSteps();
};

for (
let allStepsIndex = 0;
allStepsIndex < executedLength;
allStepsIndex++
) {
for (const allStepsIndex of indexesToProcess) {
const step = _allSteps[allStepsIndex];
const result = results[allStepsIndex]!;
const storeEntry = bucket.store.get(step.id)!;
Expand Down Expand Up @@ -477,7 +392,7 @@ export function executeBucket(
if (bucket.hasErrors && sideEffectPlanIds) {
handleSideEffectPlanIds();
}
return promises ? awaitPromises() : runSyncSteps();
return promises ? Promise.all(promises) : undefined;
})
.then(null, (e) => {
// THIS SHOULD NEVER HAPPEN!
Expand Down Expand Up @@ -507,20 +422,122 @@ export function executeBucket(
if (bucket.hasErrors && sideEffectPlanIds) {
handleSideEffectPlanIds();
}
return promises ? awaitPromises() : runSyncSteps();
return promises ? Promise.all(promises) : undefined;
}
};

const runSyncSteps = () => {
const executedLength = results.length;
if (isDev) {
assert.strictEqual(
executedLength,
phase.normalSteps?.length ?? 0,
"Expected only and all normalSteps to have executed",
);
}
if (!phase.unbatchedSyncAndSafeSteps) {
return next();
}
const allStepsLength = _allSteps.length;
const extras: ExecutionExtra[] = [];
for (
let allStepsIndex = executedLength;
allStepsIndex < allStepsLength;
allStepsIndex++
) {
const step = _allSteps[allStepsIndex];
const meta =
step.metaKey !== undefined ? metaByMetaKey[step.metaKey] : undefined;
extras[allStepsIndex] = {
stopTime,
meta,
eventEmitter,
_bucket: bucket,
_requestContext: requestContext,
};
bucket.store.set(step.id, arrayOfLength(size));
}
outerLoop: for (let dataIndex = 0; dataIndex < size; dataIndex++) {
if (sideEffectPlanIdsWithErrors) {
for (const depId of sideEffectPlanIdsWithErrors) {
const depVal = bucket.store.get(depId)![dataIndex];
if (isGrafastError(depVal)) {
for (
let allStepsIndex = executedLength;
allStepsIndex < allStepsLength;
allStepsIndex++
) {
const step = _allSteps[
allStepsIndex
] as UnbatchedExecutableStep;
const storeEntry = bucket.store.get(step.id)!;
storeEntry[dataIndex] = depVal;
}
continue outerLoop;
}
}
}

stepLoop: for (
let allStepsIndex = executedLength;
allStepsIndex < allStepsLength;
allStepsIndex++
) {
const step = _allSteps[allStepsIndex] as UnbatchedExecutableStep;
const storeEntry = bucket.store.get(step.id)!;
try {
const deps: any = [];
for (const $dep of step.dependencies) {
const depVal = bucket.store.get($dep.id)![dataIndex];
if (bucket.hasErrors && isGrafastError(depVal)) {
storeEntry[dataIndex] = depVal;
continue stepLoop;
}
deps.push(depVal);
}
storeEntry[dataIndex] = step.unbatchedExecute(
extras[allStepsIndex],
...deps,
);
} catch (e) {
bucket.hasErrors = true;
storeEntry[dataIndex] = newGrafastError(e, step.id);
}
}
}
return next();
};

if (executePromises !== null) {
return Promise.all(executePromises).then((promiseResults) => {
for (let i = 0, l = promiseResults.length; i < l; i++) {
const index = executePromiseResultIndex![i];
results[index] = promiseResults[i];
const processedPromises: PromiseLike<any>[] = [];
if (indexesPendingLoopOver.length > 0) {
// This **must be done in the same tick**
const promiseOrNot = loopOverResults();
if (isPromiseLike(promiseOrNot)) {
processedPromises.push(promiseOrNot);
}
return loopOverResults();
});
}
for (let i = 0, l = executePromises.length; i < l; i++) {
const executePromise = executePromises[i];
const index = executePromiseResultIndex![i];
processedPromises.push(
executePromise.then((promiseResult) => {
results[index] = promiseResult;
indexesPendingLoopOver.push(index);
// We must loop over the results in the same tick in which the
// promise resolved.
return loopOverResults();
}),
);
}
return Promise.all(processedPromises).then(runSyncSteps);
} else {
return loopOverResults();
const promiseOrNot = loopOverResults();
if (isPromiseLike(promiseOrNot)) {
return promiseOrNot.then(runSyncSteps);
} else {
return runSyncSteps();
}
}
};

Expand Down

0 comments on commit cadc31c

Please sign in to comment.