Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/kernel/runtime-dynamic-worker/src/executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ describe("buildExecutorModule", () => {
it("catches errors and returns them", () => {
const module = buildExecutorModule("async () => 42", 5000);
expect(module).toContain("catch (err)");
expect(module).toContain("error: err.message");
expect(module).toContain("const __serializeThrownError = (err) =>");
expect(module).toContain("if (!data.ok)");
expect(module).toContain("error: __serializeThrownError(err)");
});
});
190 changes: 171 additions & 19 deletions packages/kernel/runtime-dynamic-worker/src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
* DynamicWorkerExecutor — runs sandboxed code in an isolated Cloudflare
* Worker via the WorkerLoader binding.
*
* Tool calls are dispatched over Workers RPC: the host creates a
* Tool calls are dispatched over Workers RPC: the host creates a
* `ToolDispatcher` (an `RpcTarget`) that bridges back to the
* `SandboxToolInvoker` from codemode-core, and passes it to the dynamic
* worker's `evaluate()` entrypoint.
* `SandboxToolInvoker` from codemode-core, and passes it to the
* dynamic worker's `evaluate()` entrypoint.
*/

import { RpcTarget } from "cloudflare:workers";
import * as Cause from "effect/Cause";
import * as Data from "effect/Data";
import * as Effect from "effect/Effect";

Expand Down Expand Up @@ -50,20 +51,169 @@ export type DynamicWorkerExecutorOptions = {
readonly modules?: Record<string, string>;
};

export type SerializedWorkerErrorValue = unknown;

export type SerializedWorkerError = {
readonly kind: "fail" | "die" | "interrupt" | "mixed" | "unknown";
readonly message: string;
readonly primary: SerializedWorkerErrorValue | null;
readonly failures: ReadonlyArray<SerializedWorkerErrorValue>;
readonly defects: ReadonlyArray<SerializedWorkerErrorValue>;
readonly interrupted: boolean;
};

type WorkerRpcSuccess = {
readonly ok: true;
readonly result: unknown;
};

type WorkerRpcFailure = {
readonly ok: false;
readonly error: SerializedWorkerError;
};

type WorkerRpcResponse = WorkerRpcSuccess | WorkerRpcFailure;

// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------

const DEFAULT_TIMEOUT_MS = 5 * 60_000;
const ENTRY_MODULE = "executor.js";

const normalizeErrorObject = (error: Error) => ({
__type: "Error" as const,
name: error.name,
message: error.message,
...(typeof error.stack === "string" && error.stack.length > 0 ? { stack: error.stack } : {}),
});

const isNormalizedErrorObject = (
value: unknown,
): value is { readonly __type: "Error"; readonly message: string } =>
typeof value === "object" &&
value !== null &&
"__type" in value &&
value.__type === "Error" &&
"message" in value &&
typeof value.message === "string";

const serializeWorkerErrorValue = (value: unknown): SerializedWorkerErrorValue => {
if (value instanceof Error) {
return normalizeErrorObject(value);
}

if (
value === null ||
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
return value;
}

try {
return JSON.parse(JSON.stringify(value)) as SerializedWorkerErrorValue;
Comment thread
RhysSullivan marked this conversation as resolved.
} catch {
return String(value);
}
};

const renderTransportMessage = (value: unknown): string => {
if (typeof value === "string") {
return value;
}

if (isNormalizedErrorObject(value)) {
return value.message;
}

if (value instanceof Error) {
return value.message;
}

if (typeof value === "object" && value !== null && "message" in value && typeof value.message === "string") {
return value.message;
}

if (typeof value === "object" && value !== null) {
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}

if (typeof value === "undefined") {
return "Unknown error";
}

return String(value);
};

export const serializeWorkerCause = (cause: Cause.Cause<unknown>): SerializedWorkerError => {
const failures = Array.from(Cause.failures(cause), serializeWorkerErrorValue);
const defects = Array.from(Cause.defects(cause), serializeWorkerErrorValue);
const interrupted = Cause.isInterrupted(cause);
const primary = failures[0] ?? defects[0] ?? null;
const kind =
failures.length > 0 && defects.length > 0
? "mixed"
: failures.length > 0
? "fail"
: defects.length > 0
? "die"
: interrupted
? "interrupt"
: "unknown";

return {
kind,
message:
primary !== null
? renderTransportMessage(primary)
: interrupted
? "Interrupted"
: "Unknown error",
primary,
failures,
defects,
interrupted,
};
};

export const renderWorkerError = (error: SerializedWorkerError): string => {
if (isNormalizedErrorObject(error.primary)) {
return error.primary.message;
}

if (typeof error.primary === "string") {
return error.primary;
}

if (typeof error.primary === "object" && error.primary !== null) {
try {
return JSON.stringify(error.primary);
} catch {
return error.message;
}
}

return error.message;
};

const encodeWorkerRpcResponse = (response: WorkerRpcResponse): string => JSON.stringify(response);

export const decodeWorkerRpcResponse = (raw: string): WorkerRpcResponse =>
JSON.parse(raw) as WorkerRpcResponse;

// ---------------------------------------------------------------------------
// ToolDispatcher — bridges RPC calls back to SandboxToolInvoker
// ---------------------------------------------------------------------------

/**
* An `RpcTarget` passed to the dynamic Worker so that sandboxed code can
* invoke tools on the host. The dynamic worker calls
* invoke tools on the host. The dynamic worker calls
* `__dispatcher.call(path, argsJson)` over Workers RPC.
*/
export class ToolDispatcher extends RpcTarget {
Expand All @@ -79,19 +229,20 @@ export class ToolDispatcher extends RpcTarget {

return Effect.runPromise(
this.#invoker.invoke({ path, args }).pipe(
Effect.map((value) => JSON.stringify({ result: value })),
Effect.map(
(value): WorkerRpcResponse => ({
ok: true,
result: value,
}),
),
Effect.sandbox,
Effect.catchAll((cause) =>
Effect.succeed(
JSON.stringify({
error:
cause instanceof Error
? cause.message
: typeof cause === "object" && cause !== null && "message" in cause
? String((cause as { message: unknown }).message)
: String(cause),
}),
),
Effect.succeed<WorkerRpcResponse>({
ok: false,
error: serializeWorkerCause(cause),
}),
),
Effect.map(encodeWorkerRpcResponse),
),
);
}
Expand Down Expand Up @@ -128,16 +279,17 @@ const evaluate = async (
const entrypoint = worker.getEntrypoint() as unknown as {
evaluate(dispatcher: ToolDispatcher): Promise<{
result: unknown;
error?: string;
error?: SerializedWorkerError;
logs?: string[];
}>;
};

const response = await entrypoint.evaluate(dispatcher);
const error = response.error ? renderWorkerError(response.error) : undefined;

return {
result: response.error ? null : response.result,
error: response.error,
result: error ? null : response.result,
error,
logs: response.logs,
};
};
Expand All @@ -155,7 +307,7 @@ const runInDynamicWorker = (
try: () => evaluate(options, code, toolInvoker),
catch: (cause) =>
new DynamicWorkerExecutionError({
message: cause instanceof Error ? cause.message : String(cause),
message: renderTransportMessage(serializeWorkerErrorValue(cause)),
}),
});

Expand Down
Loading
Loading