Skip to content

Commit eac5cf7

Browse files
authored
[Workflows/vitest-pool-workers] Add a test handler to get the result of a workflow instance (#11648)
* [Workflows/vitest-pool-workers] Add Workflows result testing utils * [Workflows/vitest-pool-workers] Add a test handler to get the result of a workflow instance
1 parent b2769bf commit eac5cf7

File tree

9 files changed

+191
-17
lines changed

9 files changed

+191
-17
lines changed

.changeset/wicked-cities-attend.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
---
2+
"@cloudflare/vitest-pool-workers": minor
3+
"@cloudflare/workflows-shared": minor
4+
"miniflare": minor
5+
---
6+
7+
Add Workflows test handlers in vitest-pool-workers to get the Workflow instance output and error:
8+
9+
- `getOutput()`: Returns the output of the successfully completed Workflow instance.
10+
- `getError()`: Returns the error information of the errored Workflow instance.
11+
12+
Example:
13+
14+
```ts
15+
// First wait for the workflow instance to complete:
16+
await expect(
17+
instance.waitForStatus({ status: "complete" })
18+
).resolves.not.toThrow();
19+
20+
// Then, get its output
21+
const output = await instance.getOutput();
22+
23+
// Or for errored workflow instances, get their error:
24+
await expect(
25+
instance.waitForStatus({ status: "errored" })
26+
).resolves.not.toThrow();
27+
const error = await instance.getError();
28+
```

fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ it("workflow should be able to reach the end and be successful", async () => {
2626
);
2727
await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow();
2828

29+
const output = await instance.getOutput();
30+
expect(output).toEqual({ status: "auto_approved" });
31+
2932
// DISPOSE: ensured by `await using`
3033
});
3134

@@ -52,6 +55,9 @@ it("workflow batch should be able to reach the end and be successful", async ()
5255
await expect(
5356
instance.waitForStatus(STATUS_COMPLETE)
5457
).resolves.not.toThrow();
58+
59+
const output = await instance.getOutput();
60+
expect(output).toEqual({ status: "auto_approved" });
5561
}
5662
} finally {
5763
// DISPOSE:

fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ it("should mock a non-violation score and complete", async () => {
3434

3535
await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow();
3636

37+
const output = await instance.getOutput();
38+
expect(output).toEqual({ status: "auto_approved" });
39+
3740
// DISPOSE: ensured by `await using`
3841
});
3942

@@ -70,6 +73,9 @@ it("should mock the violation score calculation to fail 2 times and then complet
7073
await expect(
7174
instance.waitForStatus(STATUS_COMPLETE)
7275
).resolves.not.toThrow();
76+
77+
const output = await instance.getOutput();
78+
expect(output).toEqual({ status: "auto_approved" });
7379
} finally {
7480
// DISPOSE:
7581
// Workflow introspector should be disposed the end of each test, if no `await using` dyntax is used
@@ -102,6 +108,9 @@ it("should mock a violation score and complete", async () => {
102108
).toEqual({ status: "auto_rejected" });
103109

104110
await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow();
111+
112+
const output = await instance.getOutput();
113+
expect(output).toEqual({ status: "auto_rejected" });
105114
});
106115

107116
it("should be reviewed, accepted and complete", async () => {
@@ -132,6 +141,9 @@ it("should be reviewed, accepted and complete", async () => {
132141
).toEqual({ status: "moderated", decision: "approve" });
133142

134143
await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow();
144+
145+
const output = await instance.getOutput();
146+
expect(output).toEqual({ decision: "approve", status: "moderated" });
135147
});
136148

137149
it("should force human review to timeout and error", async () => {
@@ -156,4 +168,8 @@ it("should force human review to timeout and error", async () => {
156168
);
157169

158170
await expect(instance.waitForStatus(STATUS_ERROR)).resolves.not.toThrow();
171+
172+
const error = await instance.getError();
173+
expect(error.name).toEqual("Error");
174+
expect(error.message).toContain("Execution timed out");
159175
});

packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ class WorkflowImpl implements Workflow {
5454
async unsafeWaitForStatus(instanceId: string, status: string): Promise<void> {
5555
return await this.binding.unsafeWaitForStatus(instanceId, status);
5656
}
57+
58+
public async unsafeGetOutputOrError(
59+
instanceId: string,
60+
isOutput: boolean
61+
): Promise<unknown> {
62+
return this.binding.unsafeGetOutputOrError(instanceId, isOutput);
63+
}
5764
}
5865

5966
class InstanceImpl implements WorkflowInstance {

packages/vitest-pool-workers/src/worker/workflows.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,17 @@ class WorkflowInstanceIntrospectorHandle
103103
await this.#workflow.unsafeWaitForStatus(this.#instanceId, status);
104104
}
105105

106+
async getOutput(): Promise<unknown> {
107+
return await this.#workflow.unsafeGetOutputOrError(this.#instanceId, true);
108+
}
109+
110+
async getError(): Promise<{ name: string; message: string }> {
111+
return (await this.#workflow.unsafeGetOutputOrError(
112+
this.#instanceId,
113+
false
114+
)) as { name: string; message: string };
115+
}
116+
106117
async dispose(): Promise<void> {
107118
await this.#workflow.unsafeAbort(this.#instanceId, "Instance dispose");
108119
}

packages/vitest-pool-workers/types/cloudflare-test.d.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ declare module "cloudflare:test" {
165165
* // 3. ASSERTION
166166
* await instance.waitForStatus("complete");
167167
*
168+
* const output = await instance.getOutput();
169+
* expect(output).toEqual({ success: true });
170+
*
168171
* // 4. DISPOSE is implicit and automatic here.
169172
* });
170173
*/
@@ -211,6 +214,49 @@ declare module "cloudflare:test" {
211214
*/
212215
waitForStatus(status: InstanceStatus["status"]): Promise<void>;
213216

217+
/**
218+
* Retrieves the output value returned by the Workflow instance upon successful completion.
219+
*
220+
* This method should only be called after the Workflow instance has completed successfully.
221+
* It's recommended to use `waitForStatus("complete")` before calling this method.
222+
*
223+
* @example
224+
* ```ts
225+
* it('my workflow test', async () => {
226+
* await using instance = await introspectWorkflowInstance(env.MY_WORKFLOW, "123");
227+
* await env.MY_WORKFLOW.create({ id: "123" });
228+
*
229+
* await instance.waitForStatus("complete");
230+
*
231+
* const output = await instance.getOutput();
232+
* expect(output).toEqual({ success: true });
233+
* });
234+
* ```
235+
*/
236+
getOutput(): Promise<unknown>;
237+
238+
/**
239+
* Retrieves the error information from a failed Workflow instance.
240+
*
241+
* This method should only be called after the Workflow instance has failed.
242+
* It's recommended to use `waitForStatus("errored")` before calling this method.
243+
*
244+
* @example
245+
* ```ts
246+
* it('my workflow test', async () => {
247+
* await using instance = await introspectWorkflowInstance(env.MY_WORKFLOW, "123");
248+
* await env.MY_WORKFLOW.create({ id: "123" });
249+
*
250+
* await instance.waitForStatus("errored");
251+
*
252+
* const error = await instance.getError();
253+
* expect(error.name).toBe("Error");
254+
* expect(error.message).toContain("some error");
255+
* });
256+
* ```
257+
*/
258+
getError(): Promise<{ name: string; message: string }>;
259+
214260
/**
215261
* Disposes the Workflow instance introspector.
216262
*
@@ -433,6 +479,9 @@ declare module "cloudflare:test" {
433479
* const instances = introspector.get();
434480
* for(const instance of instances) {
435481
* await instance.waitForStatus("complete");
482+
*
483+
* const output = await instance.getOutput();
484+
* expect(output).toEqual({ success: true });
436485
* }
437486
*
438487
* // 4. DISPOSE is implicit and automatic here.

packages/workflows-shared/src/binding.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,15 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> {
135135
const stub = this.env.ENGINE.get(stubId);
136136
return await stub.waitForStatus(status);
137137
}
138+
139+
public async unsafeGetOutputOrError(
140+
instanceId: string,
141+
isOutput: boolean
142+
): Promise<unknown> {
143+
const stubId = this.env.ENGINE.idFromName(instanceId);
144+
const stub = this.env.ENGINE.get(stubId);
145+
return await stub.getOutputOrError(isOutput);
146+
}
138147
}
139148

140149
export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
@@ -167,17 +176,13 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
167176
public async status(): Promise<
168177
InstanceStatus & { __LOCAL_DEV_STEP_OUTPUTS: unknown[] }
169178
> {
170-
const status = await this.stub.getStatus(0, this.id);
179+
const status = await this.stub.getStatus();
171180

172181
// NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise<EngineLogs>
173182
using logs = await (this.stub.readLogs() as unknown as Promise<
174183
EngineLogs & Disposable
175184
>);
176185

177-
const workflowSuccessEvent = logs.logs
178-
.filter((log) => log.event === InstanceEvent.WORKFLOW_SUCCESS)
179-
.at(0);
180-
181186
const filteredLogs = logs.logs.filter(
182187
(log) =>
183188
log.event === InstanceEvent.STEP_SUCCESS ||
@@ -191,15 +196,19 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
191196
);
192197

193198
const workflowOutput =
194-
workflowSuccessEvent !== undefined
195-
? workflowSuccessEvent.metadata.result
196-
: null;
199+
logs.logs.find((log) => log.event === InstanceEvent.WORKFLOW_SUCCESS)
200+
?.metadata.result ?? null;
201+
202+
const workflowError = logs.logs.find(
203+
(log) => log.event === InstanceEvent.WORKFLOW_FAILURE
204+
)?.metadata.error;
197205

198206
return {
199207
status: instanceStatusName(status),
200208
__LOCAL_DEV_STEP_OUTPUTS: stepOutputs,
201209
output: workflowOutput,
202-
}; // output, error
210+
error: workflowError,
211+
};
203212
}
204213

205214
public async sendEvent(args: {

packages/workflows-shared/src/engine.ts

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ export type Log = {
6060
group: string | null;
6161
target: string | null;
6262
metadata: {
63-
result: unknown;
64-
payload: unknown;
63+
result: string;
64+
payload: string;
65+
error: { name: string; message: string };
6566
};
6667
};
6768

@@ -167,10 +168,29 @@ export class Engine extends DurableObject<Env> {
167168
};
168169
}
169170

170-
async getStatus(
171-
_accountId: number,
172-
_instanceId: string
173-
): Promise<InstanceStatus> {
171+
readLogsFromEvent(eventType: InstanceEvent): EngineLogs {
172+
const logs = [
173+
...this.ctx.storage.sql.exec<{
174+
event: InstanceEvent;
175+
groupKey: string | null;
176+
target: string | null;
177+
metadata: string;
178+
}>(
179+
"SELECT event, groupKey, target, metadata FROM states WHERE event = ?",
180+
eventType
181+
),
182+
];
183+
184+
return {
185+
logs: logs.map((log) => ({
186+
...log,
187+
metadata: JSON.parse(log.metadata),
188+
group: log.groupKey,
189+
})),
190+
};
191+
}
192+
193+
async getStatus(): Promise<InstanceStatus> {
174194
if (this.accountId === undefined) {
175195
// Engine could have restarted, so we try to restore from its state
176196
const metadata =
@@ -353,6 +373,34 @@ export class Engine extends DurableObject<Env> {
353373
}
354374
}
355375

376+
async getOutputOrError(isOutput: boolean): Promise<unknown> {
377+
const status = await this.getStatus();
378+
379+
if (isOutput) {
380+
if (status !== InstanceStatus.Complete) {
381+
throw new Error(
382+
`Cannot retrieve output: Workflow instance is in status "${instanceStatusName(status)}" but must be "complete" to have an output available`
383+
);
384+
}
385+
const logs = this.readLogsFromEvent(InstanceEvent.WORKFLOW_SUCCESS).logs;
386+
return logs.at(0)?.metadata.result;
387+
} else {
388+
if (status !== InstanceStatus.Errored) {
389+
throw new Error(
390+
`Cannot retrieve error: Workflow instance is in status "${instanceStatusName(status)}" but must be "errored" to have error information available`
391+
);
392+
}
393+
const logs = this.readLogsFromEvent(InstanceEvent.WORKFLOW_FAILURE).logs;
394+
const log = logs.at(0);
395+
if (!log?.metadata.error) {
396+
throw new Error(
397+
"Cannot retrieve error: No workflow instance failure log found"
398+
);
399+
}
400+
return log.metadata.error;
401+
}
402+
}
403+
356404
async abort(_reason: string) {
357405
// TODO: Maybe don't actually kill but instead check a flag and return early if true
358406
}
@@ -497,7 +545,7 @@ export class Engine extends DurableObject<Env> {
497545
this.instanceId = instance.id;
498546
this.workflowName = workflow.name;
499547

500-
const status = await this.getStatus(accountId, instance.id);
548+
const status = await this.getStatus();
501549
if (
502550
[
503551
InstanceStatus.Errored, // TODO (WOR-85): Remove this once upgrade story is done

packages/workflows-shared/tests/engine.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ describe("Engine", () => {
252252
const restartedStub = env.ENGINE.get(engineId);
253253

254254
const status = await runInDurableObject(restartedStub, (engine) => {
255-
return engine.getStatus(accountId, instanceId);
255+
return engine.getStatus();
256256
});
257257

258258
expect(status).toBe(InstanceStatus.Running);

0 commit comments

Comments
 (0)