Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib/init/tools/run-commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export async function runCommands(
message: `Command failed: ${command.original}`,
data: {
exitCode: result.exitCode,
stdout: result.stdout.slice(0, 500),
stderr: result.stderr.slice(0, 500),
cwd: payload.cwd,
},
Expand Down
60 changes: 36 additions & 24 deletions src/lib/init/wizard-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ async function preamble(

const MAX_RESUME_RETRIES = 3;
const RETRY_BACKOFF_MS = [2000, 4000, 8000];
const RUN_STATE_RECOVERY_BACKOFF_MS = [0, 250, 750, 1500];
const RUN_STATE_RECOVERY_TIMEOUT_MS = 10_000;

type ResumeRetryArgs = {
run: {
Expand All @@ -454,11 +456,13 @@ type ResumeRetryArgs = {
};

/**
* Detect Mastra's "step not suspended" 500 — means the server already
* Detect Mastra's "not suspended" 500 — means the server already
* processed this step (our previous request succeeded but the response was
* dropped before we received it). The MastraClientError message embeds the
* server body, e.g.:
* "HTTP error! status: 500 - {"error":"This workflow step 'X' was not suspended..."}"
* or:
* "HTTP error! status: 500 - {"error":"This workflow run was not suspended"}"
*/
function isStepAlreadyAdvancedError(err: unknown): boolean {
return err instanceof Error && err.message.includes("was not suspended");
Expand All @@ -473,31 +477,39 @@ async function tryRecoverCurrentRunState(
workflow: ResumeRetryArgs["workflow"],
runId: string
): Promise<WorkflowRunResult | null> {
try {
const raw = await withTimeout(
workflow.runById(runId, {
fields: ["steps", "activeStepsPath", "result"],
}),
API_TIMEOUT_MS,
"Run state recovery"
);
// runById returns activeStepsPath (Record<stepId, executionPath>) but
// not suspended (string[][]). The main loop reads result.suspended to
// find the active step; without it, stepId falls back to "unknown" and
// extractSuspendPayload iterates all steps — picking the first with any
// suspendPayload, which could be a completed step with stale D1 data.
// Derive suspended from the activeStepsPath keys so the lookup is
// deterministic: those keys are exactly the currently-active step IDs.
const state = raw as Record<string, unknown>;
if (!state.suspended && state.activeStepsPath) {
state.suspended = Object.keys(
state.activeStepsPath as Record<string, unknown>
).map((id) => [id]);
for (const delayMs of RUN_STATE_RECOVERY_BACKOFF_MS) {
if (delayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
try {
const raw = await withTimeout(
workflow.runById(runId, {
fields: ["steps", "activeStepsPath", "result"],
}),
RUN_STATE_RECOVERY_TIMEOUT_MS,
"Run state recovery"
);
// runById returns activeStepsPath (Record<stepId, executionPath>) but
// not suspended (string[][]). The main loop reads result.suspended to
// find the active step; without it, stepId falls back to "unknown" and
// extractSuspendPayload iterates all steps — picking the first with any
// suspendPayload, which could be a completed step with stale D1 data.
// Derive suspended from the activeStepsPath keys so the lookup is
// deterministic: those keys are exactly the currently-active step IDs.
const state = raw as Record<string, unknown>;
if (!state.suspended && state.activeStepsPath) {
state.suspended = Object.keys(
state.activeStepsPath as Record<string, unknown>
).map((id) => [id]);
}
return assertWorkflowResult(state);
} catch {
// Mastra/D1 can briefly return a not-yet-readable or intermediate run
// state immediately after rejecting a stale resume. Poll a few times
// before surfacing the original 500 to the user.
}
return assertWorkflowResult(state);
} catch {
return null;
}
return null;
}

// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: retry loop branches across transient errors, stale-step recovery, and backoff
Expand Down
39 changes: 35 additions & 4 deletions test/lib/init/wizard-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,13 @@ describe("runWizard — resumeWithRetry stale-step recovery", () => {
);
}

function staleRunError(): Error {
return new Error(
"HTTP error! status: 500 - " +
JSON.stringify({ error: "This workflow run was not suspended" })
);
}

test("recovers when server has already advanced to the next step", async () => {
mockStartResult = {
status: "suspended",
Expand Down Expand Up @@ -1005,13 +1012,37 @@ describe("runWizard — resumeWithRetry stale-step recovery", () => {
expect(resumeCount).toBe(1);
});

test("throws immediately when stale-step error occurs and runById fails", async () => {
test("recovers from run-level not-suspended errors after transient runById failure", async () => {
mockStartResult = {
status: "suspended",
suspended: [["tool-step"]],
steps: { "tool-step": { suspendPayload: toolPayload } },
};
runByIdMock
.mockRejectedValueOnce(new Error("D1 snapshot not ready"))
.mockResolvedValueOnce({ status: "success" });

let resumeCount = 0;
makeStaleStepRun(() => {
resumeCount += 1;
return Promise.reject(staleRunError());
});

await runWizard(makeOptions());

expect(formatResultSpy).toHaveBeenCalled();
expect(runByIdMock).toHaveBeenCalledTimes(2);
expect(resumeCount).toBe(1);
});

test("throws when stale-step error occurs and runById keeps failing", async () => {
mockStartResult = {
status: "suspended",
suspended: [["tool-step"]],
steps: { "tool-step": { suspendPayload: toolPayload } },
};
// runById is unreachable — recovery fails, wizard throws without retrying.
// runById is unreachable — recovery fails, wizard throws without retrying
// the stale resume request.
mockRunByIdResult = new Error("runById network error");

let resumeCount = 0;
Expand All @@ -1022,9 +1053,9 @@ describe("runWizard — resumeWithRetry stale-step recovery", () => {

await expect(runWizard(makeOptions())).rejects.toThrow(WizardError);

// Threw immediately after recovery failed — no futile retries of the stale step.
// Threw after recovery polling failed — no futile retries of the stale step.
expect(resumeCount).toBe(1);
expect(runByIdMock).toHaveBeenCalledTimes(1);
expect(runByIdMock).toHaveBeenCalledTimes(4);
});
});

Expand Down
3 changes: 2 additions & 1 deletion test/lib/run-commands.mocked.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type { RunCommandsPayload } from "../../src/lib/init/types.js";
type Breadcrumb = {
level: string;
message: string;
data: { exitCode: number; stderr: string; cwd: string };
data: { exitCode: number; stdout: string; stderr: string; cwd: string };
};

const { breadcrumbs } = vi.hoisted(() => ({
Expand Down Expand Up @@ -60,6 +60,7 @@ describe("runCommands breadcrumb on failure", () => {
expect(crumb.level).toBe("error");
expect(crumb.message).toContain("ls");
expect(crumb.data.exitCode).not.toBe(0);
expect(typeof crumb.data.stdout).toBe("string");
expect(typeof crumb.data.stderr).toBe("string");
expect(crumb.data.cwd).toBe("/tmp");
});
Expand Down
Loading