Skip to content

Commit 737c0f4

Browse files
NuroDevpetebacondarwindario-piotrowicz
authored
fix(miniflare): Fix Durable Object RPC calls from Node.js blocking the event loop (#11663)
* Add Miniflare logic to mark DO stubs as async * Added tests to check for blocking DO stubs * Added changeset * Minor `blockedOn` RPC method refactoring Co-authored-by: Pete Bacon Darwin <pete@bacondarwin.com> * Improved DO "complete when unblocked" RPC tests timeouts * Fixed DO RPC calls timeout test * Minor DO RPC calls timeout test tweaks * Updated `release` method to be synchronous * Updated changset description * Shorten commit sha links Co-authored-by: Dario Piotrowicz <dario@cloudflare.com> --------- Co-authored-by: Pete Bacon Darwin <pete@bacondarwin.com> Co-authored-by: Dario Piotrowicz <dario@cloudflare.com>
1 parent c04e718 commit 737c0f4

File tree

5 files changed

+109
-1
lines changed

5 files changed

+109
-1
lines changed

.changeset/hip-clouds-build.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"miniflare": patch
3+
---
4+
5+
Fix Durable Object RPC calls from Node.js blocking the event loop, preventing `Promise.race()` and timeouts from working correctly.
6+
7+
Previously, RPC calls from Node.js to Durable Objects would block the Node.js event loop, causing `Promise.race()` with timeouts to never resolve. This fix ensures that RPC calls properly yield control back to the event loop, allowing concurrent operations and timeouts to work as expected.

packages/miniflare/src/plugins/core/proxy/client.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
CoreHeaders,
1313
createHTTPReducers,
1414
createHTTPRevivers,
15+
isDurableObjectStub,
1516
isFetcherFetch,
1617
isR2ObjectWriteHttpMetadata,
1718
parseWithReadableStreams,
@@ -628,6 +629,10 @@ class ProxyStubHandler<T extends object>
628629
);
629630
if (
630631
knownAsync ||
632+
// Durable Object stub RPC calls should always be async to avoid blocking
633+
// the Node.js event loop with `Atomics.wait()`. This allows Promise.race()
634+
// and timeouts to work correctly when racing against DO method calls.
635+
isDurableObjectStub(targetName) ||
631636
// We assume every call with `ReadableStream`/`Blob` arguments is async.
632637
// Note that you can't consume `ReadableStream`/`Blob` synchronously: if
633638
// you tried a similar trick to `SynchronousFetcher`, blocking the main

packages/miniflare/src/workers/core/constants.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,11 @@ export function isR2ObjectWriteHttpMetadata(targetName: string, key: string) {
9797
export function isImagesInput(targetName: string, key: string) {
9898
return targetName === "ImagesBindingImpl" && key === "input";
9999
}
100+
101+
// Durable Object stub RPC calls should always be async to avoid blocking the
102+
// Node.js event loop. The internal names are "DurableObject" and "WorkerRpc".
103+
// https://github.com/cloudflare/workerd/blob/62b9ceee/src/workerd/api/actor.h#L86
104+
// https://github.com/cloudflare/workerd/blob/62b9ceee/src/workerd/api/worker-rpc.h#L30
105+
export function isDurableObjectStub(targetName: string) {
106+
return targetName === "DurableObject" || targetName === "WorkerRpc";
107+
}

packages/miniflare/src/workers/core/proxy.worker.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ export class ProxyServer implements DurableObject {
236236
let status = 200;
237237
let result: unknown;
238238
let unbufferedRest: ReadableStream | undefined;
239+
let rpcAwaited = false;
239240
if (opHeader === ProxyOps.GET) {
240241
// If no key header is specified, just return the target
241242
result = keyHeader === null ? target : target[keyHeader];
@@ -317,6 +318,8 @@ export class ProxyServer implements DurableObject {
317318
} else if (["RpcProperty", "RpcStub"].includes(func.constructor.name)) {
318319
// let's resolve RpcPromise instances right away (to support serialization)
319320
result = await func(...args);
321+
// Mark that we've awaited this RPC call, so we set the Promise header below
322+
rpcAwaited = true;
320323
} else {
321324
result = func.apply(target, args);
322325
}
@@ -334,7 +337,7 @@ export class ProxyServer implements DurableObject {
334337
}
335338

336339
const headers = new Headers();
337-
if (allowAsync && result instanceof Promise) {
340+
if (allowAsync && (result instanceof Promise || rpcAwaited)) {
338341
// Note we only resolve `Promise`s if we're allowing async operations.
339342
// Otherwise, we'll treat the `Promise` as a native target. This allows
340343
// us to use regular HTTP status/headers to indicate whether the `Promise`

packages/miniflare/test/plugins/do/index.spec.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,3 +603,88 @@ test("multiple workers with DO useSQLite true and undefined does not cause optio
603603
});
604604
});
605605
});
606+
607+
const BLOCKING_DO_SCRIPT = `
608+
import { DurableObject } from 'cloudflare:workers';
609+
610+
export class BlockingDO extends DurableObject {
611+
locks = new Map();
612+
613+
blockedOp(n, lock) {
614+
return new Promise((resolve) => {
615+
this.locks.set(lock, () => resolve(lock));
616+
}).then(() => n + 2);
617+
}
618+
619+
release(lock) {
620+
const releaseFn = this.locks.get(lock);
621+
if (releaseFn) {
622+
releaseFn();
623+
this.locks.delete(lock);
624+
}
625+
}
626+
}
627+
628+
export default {
629+
fetch() { return new Response("OK"); }
630+
}
631+
`;
632+
633+
test("Durable Object RPC calls do not block Node.js event loop", async (t) => {
634+
const mf = new Miniflare({
635+
durableObjects: { BLOCKING_DO: "BlockingDO" },
636+
modules: true,
637+
script: BLOCKING_DO_SCRIPT,
638+
});
639+
640+
t.teardown(() => mf.dispose());
641+
642+
const namespace = await mf.getDurableObjectNamespace("BLOCKING_DO");
643+
const stubId = namespace.idFromName("test");
644+
const stub = namespace.get(stubId) as unknown as {
645+
blockedOp: (n: number, lock: string) => Promise<number>;
646+
release: (lock: string) => Promise<void>;
647+
};
648+
649+
const blockedPromise = stub.blockedOp(5, "lock-1");
650+
651+
const raced = await Promise.race([
652+
blockedPromise.then((result) => ({ type: "resolved", result })),
653+
setTimeout(100).then(() => ({ type: "timeout" })),
654+
]);
655+
656+
// If the event loop wasn't blocked, the timeout should win
657+
t.deepEqual(raced, { type: "timeout" });
658+
});
659+
660+
test("Durable Object RPC calls complete when unblocked", async (t) => {
661+
const mf = new Miniflare({
662+
durableObjects: { BLOCKING_DO: "BlockingDO" },
663+
modules: true,
664+
script: BLOCKING_DO_SCRIPT,
665+
});
666+
667+
t.teardown(() => mf.dispose());
668+
669+
const namespace = await mf.getDurableObjectNamespace("BLOCKING_DO");
670+
const stubId = namespace.idFromName("test");
671+
const stub = namespace.get(stubId) as unknown as {
672+
blockedOp: (n: number, lock: string) => Promise<number>;
673+
release: (lock: string) => Promise<void>;
674+
};
675+
676+
const blockedPromise = stub.blockedOp(10, "lock-2");
677+
678+
// Race the blocked operation against a timeout, releasing the lock as part of the race.
679+
// The release should cause `blockedPromise` to resolve before the timeout.
680+
// Use a generous timeout (5s) to avoid flakiness in CI environments.
681+
const raced = await Promise.race([
682+
blockedPromise.then((result) => ({ type: "resolved", result })),
683+
stub
684+
.release("lock-2")
685+
.then(() => setTimeout(5_000))
686+
.then(() => ({ type: "timeout" })),
687+
]);
688+
689+
t.deepEqual(raced, { type: "resolved", result: 12 });
690+
});

0 commit comments

Comments
 (0)