Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/native-context-facet-bootstrap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"agents": patch
---

Clear request, WebSocket, and email native context handles when switching Agent instances and suppress protocol broadcasts during sub-agent facet bootstrap.
76 changes: 58 additions & 18 deletions packages/agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -885,16 +885,25 @@ function withAgentContext<T extends (...args: any[]) => any>(
...args: Parameters<T>
) => ReturnType<T> {
return function (...args: Parameters<T>): ReturnType<T> {
const { connection, request, email, agent } = getCurrentAgent();
const { agent } = getCurrentAgent();

if (agent === this) {
// already wrapped, so we can just call the method
return method.apply(this, args);
}
// not wrapped, so we need to wrap it
return agentContext.run({ agent: this, connection, request, email }, () => {
return method.apply(this, args);
});
// Crossing to a different Agent must not carry native I/O handles
// from the previous request/WebSocket/email turn into the new DO.
return agentContext.run(
{
agent: this,
connection: undefined,
request: undefined,
email: undefined
},
() => {
return method.apply(this, args);
}
);
};
}

Expand Down Expand Up @@ -951,6 +960,14 @@ export class Agent<
/** True when this agent runs as a facet (sub-agent) inside a parent. */
private _isFacet = false;

/**
* True only while the internal facet bootstrap RPC runs startup.
* Startup may happen while the parent is handling a WebSocket
* message, so protocol broadcasts must not touch any ambient
* parent-owned WebSocket handles during this window.
*/
private _suppressProtocolBroadcasts = false;

/**
* Ancestor chain, root-first. Empty for top-level DOs; populated at
* facet init time from the parent's own `selfPath`. Exposed publicly
Expand Down Expand Up @@ -1844,6 +1861,8 @@ export class Agent<
* @param excludeIds Additional connection IDs to exclude (e.g. the source)
*/
private _broadcastProtocol(msg: string, excludeIds: string[] = []) {
if (this._suppressProtocolBroadcasts) return;

const exclude = [...excludeIds];
for (const conn of this.getConnections()) {
if (!this.isConnectionProtocolEnabled(conn)) {
Expand Down Expand Up @@ -4675,10 +4694,10 @@ export class Agent<
* We set `_isFacet` eagerly (before `__unsafe_ensureInitialized`
* runs `onStart()`) so any code that legitimately branches on it
* — e.g. skipping parent-owned alarms in schedule guards — sees
* the flag during the first `onStart()` run. Broadcast paths no
* longer special-case facets, since facets can be directly
* addressed via sub-agent routing and have their own WebSocket
* connections.
* the flag during the first `onStart()` run. Protocol broadcasts are
* suppressed only during this bootstrap window; afterward, facets can
* broadcast to their own WebSocket clients reached via sub-agent
* routing.
*
* The facet's name (and `this.name` getter) is handled entirely by
* partyserver via `ctx.id.name`, which is populated because the
Expand Down Expand Up @@ -4718,8 +4737,15 @@ export class Agent<
this.ctx.storage.put("cf_agents_parent_path", parentPath)
]);
// Fire onStart() now since this RPC bypasses Server.fetch(),
// which is the entry point that normally triggers it.
await this.__unsafe_ensureInitialized();
// which is the entry point that normally triggers it. Suppress
// protocol broadcasts only during startup so bootstrap cannot touch
// parent-owned WebSocket handles if the parent is inside onMessage().
this._suppressProtocolBroadcasts = true;
try {
await this.__unsafe_ensureInitialized();
} finally {
this._suppressProtocolBroadcasts = false;
}
}

/**
Expand Down Expand Up @@ -5807,14 +5833,28 @@ export class Agent<
// inside the child's isolate. Avoids the cross-DO I/O error that
// the previous `stub.fetch(req)` path triggered by handing a
// parent-owned Request across the isolate boundary.
await (
stub as unknown as {
_cf_initAsFacet(
name: string,
parentPath: ReadonlyArray<{ className: string; name: string }>
): Promise<void>;
//
// The parent may be inside a WebSocket/message request context here.
// Clear native context handles before the child facet RPC so workerd
// never sees parent-owned I/O attached to child initialization.
await agentContext.run(
{
agent: this,
connection: undefined,
request: undefined,
email: undefined
},
async () => {
await (
stub as unknown as {
_cf_initAsFacet(
name: string,
parentPath: ReadonlyArray<{ className: string; name: string }>
): Promise<void>;
}
)._cf_initAsFacet(name, childParentPath);
}
)._cf_initAsFacet(name, childParentPath);
);

// Record in the parent's sub-agent registry so `hasSubAgent` /
// `listSubAgents` reflect the spawn. Idempotent.
Expand Down
39 changes: 34 additions & 5 deletions packages/agents/src/tests/agents/sub-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,10 @@ class UnexportedSubAgent extends Agent {
}

// ── SubAgent: Broadcast/state regression cases ─────────────────────
// Exercises the broadcast paths that used to throw cross-DO I/O
// before `_isFacet` guards were added to `_broadcastProtocol()` and
// `broadcast()`. On a facet these calls should no-op, not throw.
// Exercises broadcast paths on facets. Startup protocol broadcasts are
// suppressed during bootstrap to avoid parent-owned WebSocket handles,
// but normal facet broadcasts after bootstrap must still reach the
// facet's own WebSocket clients.

type BroadcastState = { count: number; lastMsg: string };

Expand All @@ -754,8 +755,8 @@ export class BroadcastSubAgent extends Agent<Cloudflare.Env, BroadcastState> {

/**
* Calls `this.setState(...)` from a facet RPC. `setState` drives
* `_broadcastProtocol()` internally, so this exercises the
* `_isFacet` early-return guard there.
* `_broadcastProtocol()` internally, so this exercises facet state
* sync after bootstrap.
*/
trySetState(count: number, msg: string): string {
try {
Expand Down Expand Up @@ -789,6 +790,34 @@ export class BroadcastSubAgent extends Agent<Cloudflare.Env, BroadcastState> {
// ── Parent Agent that manages sub-agents ────────────────────────────

export class TestSubAgentParent extends Agent {
async onMessage(
connection: { send(message: string): void },
message: string | ArrayBuffer
): Promise<void> {
const text =
typeof message === "string" ? message : new TextDecoder().decode(message);
if (text !== "spawn-sub-agent") return;

try {
const result = await this.subAgentPing(`ws-${crypto.randomUUID()}`);
connection.send(
JSON.stringify({
type: "sub-agent-result",
ok: true,
result
})
);
} catch (error) {
connection.send(
JSON.stringify({
type: "sub-agent-result",
ok: false,
error: error instanceof Error ? error.message : String(error)
})
);
}
}

/** Called by child facets via `parentAgent()` to verify the lookup works. */
async getOwnName(): Promise<string> {
return this.name;
Expand Down
118 changes: 107 additions & 11 deletions packages/agents/src/tests/sub-agent.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,51 @@
import { env } from "cloudflare:workers";
import { env, exports } from "cloudflare:workers";
import { runDurableObjectAlarm } from "cloudflare:test";
import { describe, expect, it } from "vitest";
import { getAgentByName } from "../index";
import { MessageType } from "../types";

function uniqueName() {
return `sub-agent-test-${Math.random().toString(36).slice(2)}`;
}

async function connectWS(path: string): Promise<WebSocket> {
const res = await exports.default.fetch(`http://example.com${path}`, {
headers: { Upgrade: "websocket" }
});
expect(res.status).toBe(101);
const ws = res.webSocket as WebSocket;
expect(ws).toBeDefined();
ws.accept();
return ws;
}

function waitForJsonMessage<T>(
ws: WebSocket,
predicate: (data: T) => boolean,
timeoutMs = 5000
): Promise<T> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
ws.removeEventListener("message", handler);
reject(new Error(`waitForJsonMessage timed out after ${timeoutMs}ms`));
}, timeoutMs);

const handler = (event: MessageEvent) => {
try {
const data = JSON.parse(event.data as string) as T;
if (predicate(data)) {
clearTimeout(timer);
ws.removeEventListener("message", handler);
resolve(data);
}
} catch {
// Ignore non-JSON protocol frames.
}
};
ws.addEventListener("message", handler);
});
}

async function expectRootKeepAliveRefCount(
agent: { getRootKeepAliveRefCount(): Promise<number> },
expected: number
Expand Down Expand Up @@ -1156,13 +1195,37 @@ describe("SubAgent", () => {
expect(error).toBe("");
});

// ── Regression: cross-DO I/O on broadcast paths ─────────────────────
it("should spawn a sub-agent from a WebSocket onMessage turn", async () => {
const name = uniqueName();
const ws = await connectWS(`/agents/test-sub-agent-parent/${name}`);
try {
const resultPromise = waitForJsonMessage<{
type: string;
ok: boolean;
result?: string;
error?: string;
}>(ws, (data) => data.type === "sub-agent-result");

ws.send("spawn-sub-agent");

const message = await resultPromise;
expect(message).toEqual({
type: "sub-agent-result",
ok: true,
result: "pong"
});
} finally {
ws.close();
}
});

// ── Regression: cross-DO I/O on bootstrap broadcast paths ───────────
// Sub-agents share their parent's process but have their own isolate.
// On production, iterating the connection registry or sending through
// a parent-owned WebSocket from a facet throws "Cannot perform I/O on
// behalf of a different Durable Object". The Agent base class guards
// every broadcast path with `_isFacet` — these tests pin the guards
// in place so they cannot be regressed away.
// a parent-owned WebSocket during facet bootstrap throws "Cannot
// perform I/O on behalf of a different Durable Object". Startup
// protocol broadcasts are suppressed, but normal facet broadcasts
// after bootstrap must still reach the facet's own WebSocket clients.

describe("broadcast paths on facets", () => {
it("should initialize a facet without throwing on first onStart", async () => {
Expand All @@ -1178,7 +1241,7 @@ describe("SubAgent", () => {
expect(ok).toBe(true);
});

it("should no-op when a sub-agent calls this.broadcast(...)", async () => {
it("should not throw when a sub-agent calls this.broadcast(...)", async () => {
const name = uniqueName();
const agent = await getAgentByName(env.TestSubAgentParent, name);

Expand All @@ -1189,18 +1252,51 @@ describe("SubAgent", () => {
expect(error).toBe("");
});

it("should persist state but skip broadcast when setState is called in a sub-agent", async () => {
it("should persist state when setState is called in a sub-agent", async () => {
const name = uniqueName();
const agent = await getAgentByName(env.TestSubAgentParent, name);

// setState drives `_broadcastProtocol()` under the hood. On a
// facet the broadcast must be skipped, but the state mutation
// itself must still succeed (SQL + in-memory update).
const result = await agent.subAgentTrySetState("stateful", 42, "ping");
expect(result.error).toBe("");
expect(result.persistedCount).toBe(42);
expect(result.persistedMsg).toBe("ping");
});

it("should broadcast state updates to WebSocket clients connected directly to a sub-agent", async () => {
const parentName = uniqueName();
const childName = uniqueName();
const ws = await connectWS(
`/agents/test-sub-agent-parent/${parentName}/sub/broadcast-sub-agent/${childName}`
);
try {
await waitForJsonMessage<{
type: MessageType;
state?: { count: number; lastMsg: string };
}>(
ws,
(data) =>
data.type === MessageType.CF_AGENT_STATE && data.state?.count === 0
);

const stateUpdatePromise = waitForJsonMessage<{
type: MessageType;
state?: { count: number; lastMsg: string };
}>(
ws,
(data) =>
data.type === MessageType.CF_AGENT_STATE && data.state?.count === 42
);

const parent = await getAgentByName(env.TestSubAgentParent, parentName);
const result = await parent.subAgentTrySetState(childName, 42, "ping");
expect(result.error).toBe("");

const update = await stateUpdatePromise;
expect(update.state).toEqual({ count: 42, lastMsg: "ping" });
} finally {
ws.close();
}
});
});

// ── parentPath / selfPath / hasSubAgent / listSubAgents ────────────
Expand Down
Loading
Loading