Skip to content

Commit 00a51e6

Browse files
committed
feat(orchestration): add GraphRuntime with execute, stream, resume, and conditional edges
1 parent bc5d1c2 commit 00a51e6

3 files changed

Lines changed: 799 additions & 0 deletions

File tree

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
/**
2+
* @file graph-runtime.test.ts
3+
* @description Integration tests for `GraphRuntime`.
4+
*
5+
* Covers:
6+
* 1. Linear graph end-to-end — START→a→b→END with mock executor returns defined output.
7+
* 2. Streaming events — correct event types emitted in causal order.
8+
* 3. Checkpoints saved — graph with `checkpointPolicy='every_node'` creates checkpoint entries.
9+
* 4. Conditional edges — routing to 'b' or 'c' based on `state.scratch.goToB`.
10+
* 5. Resume from checkpoint — fork a checkpoint with patched state, resume, verify completion.
11+
*/
12+
13+
import { describe, it, expect, vi } from 'vitest';
14+
import { GraphRuntime } from '../runtime/GraphRuntime.js';
15+
import { NodeExecutor } from '../runtime/NodeExecutor.js';
16+
import { InMemoryCheckpointStore } from '../checkpoint/InMemoryCheckpointStore.js';
17+
import type { CompiledExecutionGraph, GraphNode, GraphState } from '../ir/types.js';
18+
import { START, END } from '../ir/types.js';
19+
import type { NodeExecutionResult } from '../runtime/NodeExecutor.js';
20+
21+
// ---------------------------------------------------------------------------
22+
// Test helpers
23+
// ---------------------------------------------------------------------------
24+
25+
/**
26+
* Build a minimal `GraphNode` with sensible defaults.
27+
*
28+
* @param id - Unique node identifier.
29+
* @param overrides - Optional field overrides.
30+
*/
31+
function makeNode(id: string, overrides: Partial<GraphNode> = {}): GraphNode {
32+
return {
33+
id,
34+
type: 'gmi',
35+
executorConfig: { type: 'gmi', instructions: `node-${id}` },
36+
executionMode: 'single_turn',
37+
effectClass: 'pure',
38+
checkpoint: 'none',
39+
...overrides,
40+
};
41+
}
42+
43+
/**
44+
* Build a `CompiledExecutionGraph` with START→[nodeIds in order]→END edges.
45+
*
46+
* The helper adds static edges START→first, last→END, and consecutive node→node edges.
47+
*
48+
* @param id - Graph identifier.
49+
* @param nodes - Array of nodes to include (order determines static edge chain).
50+
* @param options - Optional overrides for the graph-level fields.
51+
*/
52+
function makeLinearGraph(
53+
id: string,
54+
nodes: GraphNode[],
55+
options: Partial<CompiledExecutionGraph> = {},
56+
): CompiledExecutionGraph {
57+
const edges = nodes.map((n, i) => ({
58+
id: `e${i}`,
59+
source: i === 0 ? START : nodes[i - 1]!.id,
60+
target: n.id,
61+
type: 'static' as const,
62+
}));
63+
edges.push({
64+
id: `e${nodes.length}`,
65+
source: nodes[nodes.length - 1]!.id,
66+
target: END,
67+
type: 'static' as const,
68+
});
69+
70+
return {
71+
id,
72+
name: id,
73+
nodes,
74+
edges,
75+
stateSchema: { input: {}, scratch: {}, artifacts: {} },
76+
reducers: {},
77+
checkpointPolicy: 'explicit',
78+
memoryConsistency: 'live',
79+
...options,
80+
};
81+
}
82+
83+
/**
84+
* Create a `NodeExecutor` whose `execute()` is fully controlled by the supplied mock.
85+
*
86+
* @param mockFn - `vi.fn()` (or similar) that replaces `execute()`.
87+
*/
88+
function makeExecutorWithMock(
89+
mockFn: (node: GraphNode, state: Partial<GraphState>) => Promise<NodeExecutionResult>,
90+
): NodeExecutor {
91+
const executor = new NodeExecutor({});
92+
// Replace the public method directly so we don't need to subclass.
93+
executor.execute = mockFn as typeof executor.execute;
94+
return executor;
95+
}
96+
97+
// ---------------------------------------------------------------------------
98+
// Tests
99+
// ---------------------------------------------------------------------------
100+
101+
describe('GraphRuntime', () => {
102+
// ── 1. Linear graph end-to-end ─────────────────────────────────────────────
103+
104+
it('executes a linear START→a→b→END graph and returns defined output', async () => {
105+
const store = new InMemoryCheckpointStore();
106+
const executeMock = vi.fn().mockResolvedValue({
107+
success: true,
108+
output: 'ok',
109+
artifactsUpdate: { result: 'final' },
110+
} satisfies NodeExecutionResult);
111+
112+
const runtime = new GraphRuntime({
113+
checkpointStore: store,
114+
nodeExecutor: makeExecutorWithMock(executeMock),
115+
});
116+
117+
const graph = makeLinearGraph('g-linear', [makeNode('a'), makeNode('b')]);
118+
const result = await runtime.invoke(graph, { query: 'hello' });
119+
120+
// Both nodes were executed.
121+
expect(executeMock).toHaveBeenCalledTimes(2);
122+
// Final output should be defined (artifacts object).
123+
expect(result).toBeDefined();
124+
});
125+
126+
// ── 2. Streaming events in correct order ───────────────────────────────────
127+
128+
it('streams events in correct causal order for a linear graph', async () => {
129+
const store = new InMemoryCheckpointStore();
130+
const executeMock = vi.fn().mockResolvedValue({
131+
success: true,
132+
output: 'step-output',
133+
} satisfies NodeExecutionResult);
134+
135+
const runtime = new GraphRuntime({
136+
checkpointStore: store,
137+
nodeExecutor: makeExecutorWithMock(executeMock),
138+
});
139+
140+
const graph = makeLinearGraph('g-events', [makeNode('a'), makeNode('b')]);
141+
const events: string[] = [];
142+
143+
for await (const event of runtime.stream(graph, {})) {
144+
events.push(event.type);
145+
}
146+
147+
// Verify run_start appears first and run_end appears last.
148+
expect(events[0]).toBe('run_start');
149+
expect(events[events.length - 1]).toBe('run_end');
150+
151+
// node_start and node_end must each appear twice (once per node).
152+
expect(events.filter(t => t === 'node_start')).toHaveLength(2);
153+
expect(events.filter(t => t === 'node_end')).toHaveLength(2);
154+
155+
// Every node_start must be immediately followed by node_end (linear graph, no checkpoints).
156+
for (let i = 0; i < events.length; i++) {
157+
if (events[i] === 'node_start') {
158+
expect(events[i + 1]).toBe('node_end');
159+
}
160+
}
161+
});
162+
163+
// ── 3. Checkpoints saved ───────────────────────────────────────────────────
164+
165+
it('saves checkpoints after every node when checkpointPolicy is every_node', async () => {
166+
const store = new InMemoryCheckpointStore();
167+
const executeMock = vi.fn().mockResolvedValue({
168+
success: true,
169+
output: 'cp-output',
170+
} satisfies NodeExecutionResult);
171+
172+
const runtime = new GraphRuntime({
173+
checkpointStore: store,
174+
nodeExecutor: makeExecutorWithMock(executeMock),
175+
});
176+
177+
const graph = makeLinearGraph(
178+
'g-checkpoint',
179+
[makeNode('a'), makeNode('b')],
180+
{ checkpointPolicy: 'every_node' },
181+
);
182+
183+
await runtime.invoke(graph, {});
184+
185+
// There should be at least one checkpoint per node (a + b = 2).
186+
const checkpoints = await store.list('g-checkpoint');
187+
expect(checkpoints.length).toBeGreaterThanOrEqual(2);
188+
});
189+
190+
// ── 4. Conditional edges ───────────────────────────────────────────────────
191+
192+
it('routes to node b when condition fn returns b based on scratch.goToB', async () => {
193+
const store = new InMemoryCheckpointStore();
194+
195+
/**
196+
* Node 'a' sets `scratch.goToB = true`.
197+
* Nodes 'b' and 'c' are passive — they just return success.
198+
*/
199+
const executeMock = vi.fn().mockImplementation(
200+
async (node: GraphNode): Promise<NodeExecutionResult> => {
201+
if (node.id === 'a') {
202+
return { success: true, output: 'a-done', scratchUpdate: { goToB: true } };
203+
}
204+
return { success: true, output: `${node.id}-done` };
205+
},
206+
);
207+
208+
const runtime = new GraphRuntime({
209+
checkpointStore: store,
210+
nodeExecutor: makeExecutorWithMock(executeMock),
211+
});
212+
213+
const nodeA = makeNode('a');
214+
const nodeB = makeNode('b');
215+
const nodeC = makeNode('c');
216+
217+
/**
218+
* Graph topology:
219+
* START ──static──► a ──conditional──► b (if goToB)
220+
* └──conditional──► c (if !goToB)
221+
* b ──static──► END
222+
* c ──static──► END
223+
*/
224+
const graph: CompiledExecutionGraph = {
225+
id: 'g-conditional',
226+
name: 'conditional-test',
227+
nodes: [nodeA, nodeB, nodeC],
228+
edges: [
229+
{ id: 'e0', source: START, target: 'a', type: 'static' },
230+
{
231+
id: 'e1',
232+
source: 'a',
233+
target: 'b',
234+
type: 'conditional',
235+
condition: {
236+
type: 'function',
237+
fn: (state: GraphState) =>
238+
(state.scratch as Record<string, unknown>).goToB ? 'b' : 'c',
239+
},
240+
},
241+
{
242+
id: 'e2',
243+
source: 'a',
244+
target: 'c',
245+
type: 'conditional',
246+
condition: {
247+
type: 'function',
248+
fn: (state: GraphState) =>
249+
(state.scratch as Record<string, unknown>).goToB ? 'b' : 'c',
250+
},
251+
},
252+
{ id: 'e3', source: 'b', target: END, type: 'static' },
253+
{ id: 'e4', source: 'c', target: END, type: 'static' },
254+
],
255+
stateSchema: { input: {}, scratch: {}, artifacts: {} },
256+
reducers: {},
257+
checkpointPolicy: 'explicit',
258+
memoryConsistency: 'live',
259+
};
260+
261+
const visitedIds: string[] = [];
262+
for await (const event of runtime.stream(graph, {})) {
263+
if (event.type === 'node_start') visitedIds.push(event.nodeId);
264+
}
265+
266+
// Node 'a' and 'b' should have run; 'c' should have been skipped.
267+
expect(visitedIds).toContain('a');
268+
expect(visitedIds).toContain('b');
269+
expect(visitedIds).not.toContain('c');
270+
});
271+
272+
// ── 5. Resume from checkpoint ──────────────────────────────────────────────
273+
274+
it('resumes a run from a forked checkpoint and completes successfully', async () => {
275+
const store = new InMemoryCheckpointStore();
276+
277+
/**
278+
* Node 'a' runs first, then 'b'. We'll interrupt after 'a', fork the checkpoint,
279+
* and resume — verifying 'b' executes on resume.
280+
*/
281+
const executeMock = vi.fn().mockResolvedValue({
282+
success: true,
283+
output: 'resume-output',
284+
} satisfies NodeExecutionResult);
285+
286+
const runtime = new GraphRuntime({
287+
checkpointStore: store,
288+
nodeExecutor: makeExecutorWithMock(executeMock),
289+
});
290+
291+
const graph = makeLinearGraph(
292+
'g-resume',
293+
[makeNode('a'), makeNode('b')],
294+
{ checkpointPolicy: 'every_node' },
295+
);
296+
297+
// First: run to completion so checkpoints are created.
298+
await runtime.invoke(graph, { seed: 42 });
299+
300+
// Find the checkpoint for node 'a'.
301+
const allCheckpoints = await store.list('g-resume');
302+
expect(allCheckpoints.length).toBeGreaterThan(0);
303+
304+
const cpForA = allCheckpoints.find(cp => cp.nodeId === 'a');
305+
expect(cpForA).toBeDefined();
306+
307+
// Fork the checkpoint — this simulates restarting from after node 'a'.
308+
const forkedRunId = await store.fork(cpForA!.id);
309+
310+
// Reset mock call count to measure only the resumed execution.
311+
executeMock.mockClear();
312+
313+
// Resume the forked run.
314+
const resumeResult = await runtime.resume(graph, forkedRunId);
315+
316+
// The resume should complete without throwing.
317+
expect(resumeResult).toBeDefined();
318+
});
319+
});

0 commit comments

Comments
 (0)