Skip to content

Commit 15656c8

Browse files
committed
fix(orchestration): stop on node failure, persist skippedNodes in checkpoints, fix branch resume
1 parent ea7de63 commit 15656c8

6 files changed

Lines changed: 366 additions & 38 deletions

File tree

src/orchestration/__tests__/checkpoint-store.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ describe('InMemoryCheckpointStore', () => {
6262
expect(loaded!.runId).toBe('run-1');
6363
});
6464

65+
// -------------------------------------------------------------------------
66+
it('get returns a checkpoint by exact checkpoint id', async () => {
67+
const cp = makeCheckpoint({ id: 'cp-direct', runId: 'run-1' });
68+
await store.save(cp);
69+
70+
const loaded = await store.get('cp-direct');
71+
expect(loaded).not.toBeNull();
72+
expect(loaded!.id).toBe('cp-direct');
73+
});
74+
6575
// -------------------------------------------------------------------------
6676
it('load returns null for an unknown runId', async () => {
6777
const result = await store.load('run-missing');

src/orchestration/__tests__/graph-runtime.test.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,132 @@ describe('GraphRuntime', () => {
316316
// The resume should complete without throwing.
317317
expect(resumeResult).toBeDefined();
318318
});
319+
320+
it('accepts an exact checkpoint id in resume()', async () => {
321+
const store = new InMemoryCheckpointStore();
322+
const executeMock = vi.fn().mockResolvedValue({
323+
success: true,
324+
output: 'resume-output',
325+
} satisfies NodeExecutionResult);
326+
327+
const runtime = new GraphRuntime({
328+
checkpointStore: store,
329+
nodeExecutor: makeExecutorWithMock(executeMock),
330+
});
331+
332+
const graph = makeLinearGraph(
333+
'g-resume-checkpoint-id',
334+
[makeNode('a'), makeNode('b')],
335+
{ checkpointPolicy: 'every_node' },
336+
);
337+
338+
await runtime.invoke(graph, { seed: 7 });
339+
const checkpoints = await store.list('g-resume-checkpoint-id');
340+
const checkpointForA = checkpoints.find((cp) => cp.nodeId === 'a');
341+
expect(checkpointForA).toBeDefined();
342+
343+
executeMock.mockClear();
344+
const resumeResult = await runtime.resume(graph, checkpointForA!.id);
345+
346+
expect(resumeResult).toBeDefined();
347+
expect(executeMock).toHaveBeenCalled();
348+
});
349+
350+
it('halts on node failure and emits error/interruption events', async () => {
351+
const store = new InMemoryCheckpointStore();
352+
const executeMock = vi.fn().mockImplementation(async (node: GraphNode): Promise<NodeExecutionResult> => {
353+
if (node.id === 'a') {
354+
return { success: false, error: 'boom' };
355+
}
356+
return { success: true, output: `${node.id}-done` };
357+
});
358+
359+
const runtime = new GraphRuntime({
360+
checkpointStore: store,
361+
nodeExecutor: makeExecutorWithMock(executeMock),
362+
});
363+
364+
const graph = makeLinearGraph('g-failure', [makeNode('a'), makeNode('b')]);
365+
const events = [];
366+
for await (const event of runtime.stream(graph, {})) {
367+
events.push(event);
368+
}
369+
370+
expect(executeMock).toHaveBeenCalledTimes(1);
371+
expect(events.some((event) => event.type === 'error')).toBe(true);
372+
expect(events.some((event) => event.type === 'interrupt')).toBe(true);
373+
expect(events.some((event) => event.type === 'run_end')).toBe(true);
374+
expect(events.some((event) => event.type === 'node_start' && event.nodeId === 'b')).toBe(false);
375+
});
376+
377+
it('persists skipped conditional branches so resume does not execute the bypassed arm', async () => {
378+
const store = new InMemoryCheckpointStore();
379+
const executeMock = vi.fn().mockImplementation(async (node: GraphNode): Promise<NodeExecutionResult> => {
380+
if (node.id === 'a') {
381+
return { success: true, output: 'a-done', scratchUpdate: { goToB: true } };
382+
}
383+
return { success: true, output: `${node.id}-done` };
384+
});
385+
386+
const runtime = new GraphRuntime({
387+
checkpointStore: store,
388+
nodeExecutor: makeExecutorWithMock(executeMock),
389+
});
390+
391+
const nodeA = makeNode('a');
392+
const nodeB = makeNode('b');
393+
const nodeC = makeNode('c');
394+
395+
const graph: CompiledExecutionGraph = {
396+
id: 'g-conditional-resume',
397+
name: 'conditional-resume-test',
398+
nodes: [nodeA, nodeB, nodeC],
399+
edges: [
400+
{ id: 'e0', source: START, target: 'a', type: 'static' },
401+
{
402+
id: 'e1',
403+
source: 'a',
404+
target: 'b',
405+
type: 'conditional',
406+
condition: {
407+
type: 'function',
408+
fn: (state: GraphState) =>
409+
(state.scratch as Record<string, unknown>).goToB ? 'b' : 'c',
410+
},
411+
},
412+
{
413+
id: 'e2',
414+
source: 'a',
415+
target: 'c',
416+
type: 'conditional',
417+
condition: {
418+
type: 'function',
419+
fn: (state: GraphState) =>
420+
(state.scratch as Record<string, unknown>).goToB ? 'b' : 'c',
421+
},
422+
},
423+
{ id: 'e3', source: 'b', target: END, type: 'static' },
424+
{ id: 'e4', source: 'c', target: END, type: 'static' },
425+
],
426+
stateSchema: { input: {}, scratch: {}, artifacts: {} },
427+
reducers: {},
428+
checkpointPolicy: 'every_node',
429+
memoryConsistency: 'snapshot',
430+
};
431+
432+
await runtime.invoke(graph, {});
433+
434+
const checkpoints = await store.list('g-conditional-resume');
435+
const checkpointForA = checkpoints.find((cp) => cp.nodeId === 'a');
436+
expect(checkpointForA).toBeDefined();
437+
438+
const forkedRunId = await store.fork(checkpointForA!.id);
439+
executeMock.mockClear();
440+
441+
await runtime.resume(graph, forkedRunId);
442+
443+
const executedNodeIds = executeMock.mock.calls.map(([node]) => (node as GraphNode).id);
444+
expect(executedNodeIds).toContain('b');
445+
expect(executedNodeIds).not.toContain('c');
446+
});
319447
});

src/orchestration/__tests__/integration.test.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* 3. mission: compile → invoke lifecycle
1313
* 4. Checkpoint time-travel: fork with modified state
1414
* 5. Streaming emits correct event sequence
15-
* 6. Error handling: node failure with graceful continuation
15+
* 6. Error handling: node failure halts the run with explicit error events
1616
*/
1717

1818
import { describe, it, expect, vi } from 'vitest';
@@ -160,15 +160,14 @@ describe('E2E Integration — AgentGraph lifecycle', () => {
160160
scratch: z.object({}),
161161
artifacts: z.object({ answer: z.string().optional() }),
162162
})
163-
.addNode('step1', toolNode('search'))
164-
.addNode('step2', toolNode('summarize'))
163+
.addNode('step1', gmiNode({ instructions: 'Search for the answer.' }))
164+
.addNode('step2', gmiNode({ instructions: 'Summarize the answer.' }))
165165
.addEdge(START, 'step1')
166166
.addEdge('step1', 'step2')
167167
.addEdge('step2', END)
168168
.compile();
169169

170170
const result = await graph.invoke({ query: 'test' });
171-
// Default NodeExecutor returns a stub — result should be defined (empty artifacts object)
172171
expect(result).toBeDefined();
173172
});
174173

@@ -178,8 +177,8 @@ describe('E2E Integration — AgentGraph lifecycle', () => {
178177
scratch: z.object({}),
179178
artifacts: z.object({}),
180179
})
181-
.addNode('a', toolNode('tool_a'))
182-
.addNode('b', toolNode('tool_b'))
180+
.addNode('a', gmiNode({ instructions: 'Step A' }))
181+
.addNode('b', gmiNode({ instructions: 'Step B' }))
183182
.addEdge(START, 'a')
184183
.addEdge('a', 'b')
185184
.addEdge('b', END)
@@ -469,7 +468,7 @@ describe('E2E Integration — streaming event sequence', () => {
469468
// ---------------------------------------------------------------------------
470469

471470
describe('E2E Integration — error handling', () => {
472-
it('invoke resolves even when node executor returns success:false', async () => {
471+
it('halts the run when node executor returns success:false', async () => {
473472
const executor = {
474473
execute: vi.fn().mockResolvedValue({
475474
success: false,
@@ -480,9 +479,13 @@ describe('E2E Integration — error handling', () => {
480479
const store = new InMemoryCheckpointStore();
481480
const runtime = new GraphRuntime({ checkpointStore: store, nodeExecutor: executor as any });
482481

483-
const graph = makeLinearGraph('error-graph', [makeNode('failing-node')]);
484-
// The runtime does not throw on node success:false — it returns final artifacts
485-
await expect(runtime.invoke(graph, {})).resolves.toBeDefined();
482+
const graph = makeLinearGraph('error-graph', [makeNode('failing-node'), makeNode('downstream-node')]);
483+
const events = await collectEvents(runtime.stream(graph, {}));
484+
485+
expect(executor.execute).toHaveBeenCalledTimes(1);
486+
expect(events.some((event) => event.type === 'error')).toBe(true);
487+
expect(events.some((event) => event.type === 'interrupt')).toBe(true);
488+
expect(events.some((event) => event.type === 'node_start' && (event as any).nodeId === 'downstream-node')).toBe(false);
486489
});
487490

488491
it('resume throws when no checkpoint exists for runId', async () => {

src/orchestration/checkpoint/ICheckpointStore.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ export interface Checkpoint {
8989
/** Ordered list of node ids that had completed execution when this checkpoint was taken. */
9090
visitedNodes: string[];
9191

92+
/**
93+
* Ordered list of node ids that were explicitly bypassed by routing decisions
94+
* (for example, the non-selected arm of a conditional branch).
95+
*
96+
* Persisting this list is required for correct resume semantics on branched
97+
* graphs: otherwise a resumed run cannot distinguish "not run yet" from
98+
* "intentionally skipped" and may stall on dead branches.
99+
*/
100+
skippedNodes?: string[];
101+
92102
/** Ids of edges that had been emitted but whose target nodes had not yet started. */
93103
pendingEdges: string[];
94104
}
@@ -115,6 +125,14 @@ export interface ICheckpointStore {
115125
*/
116126
save(checkpoint: Checkpoint): Promise<void>;
117127

128+
/**
129+
* Load a checkpoint by its unique checkpoint identifier.
130+
*
131+
* @param checkpointId - The exact checkpoint id assigned at save-time.
132+
* @returns The matching checkpoint, or `null` when none exists.
133+
*/
134+
get(checkpointId: string): Promise<Checkpoint | null>;
135+
118136
/**
119137
* Load a checkpoint for the given `runId`.
120138
*

src/orchestration/checkpoint/InMemoryCheckpointStore.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ export class InMemoryCheckpointStore implements ICheckpointStore {
6262
this._checkpoints.set(checkpoint.id, checkpoint);
6363
}
6464

65+
/**
66+
* Load a checkpoint by its unique checkpoint id.
67+
*
68+
* {@inheritDoc ICheckpointStore.get}
69+
*/
70+
async get(checkpointId: string): Promise<Checkpoint | null> {
71+
return this._checkpoints.get(checkpointId) ?? null;
72+
}
73+
6574
/**
6675
* Load a checkpoint for the given `runId`.
6776
*

0 commit comments

Comments
 (0)