Skip to content

Commit

Permalink
[Miniflare 3] Add getFetcher() for dispatching fetch/scheduled/…
Browse files Browse the repository at this point in the history
…`queue` events (#708)

* Add `getFetcher()` for dispatching `fetch`/`scheduled`/`queue` events

This change adds back support for dispatching `scheduled` and `queue`
events directly. Miniflare 2 previously provided similar
`dispatchScheduled()` and `dispatchQueue()` methods, but these
implemented an inconsistent, non-standard API.

With `getFetcher()`, we're able to reuse magic proxy code to support
arbitrary Workers APIs. This is important for `queue()`, which
supports sending any structured serialisable as a message `body`.

`getFetcher()` also provides an idiomatic Miniflare API for dealing
with multiple workers, matching that provided by `getBindings()`.

* rename getFetcher to getWorker

---------

Co-authored-by: Rahul Sethi <5822355+RamIdeas@users.noreply.github.com>
  • Loading branch information
mrbbot and RamIdeas committed Oct 10, 2023
1 parent 7ec7fa3 commit ff17f0c
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 5 deletions.
12 changes: 12 additions & 0 deletions packages/miniflare/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,18 @@ defined at the top-level.
bindings, for all bindings in the Worker with the specified `workerName`. If
`workerName` is not specified, defaults to the entrypoint Worker.

- `getWorker(workerName?: string): Promise<Fetcher>`

Returns a `Promise` that resolves with a
[`Fetcher`](https://workers-types.pages.dev/experimental/#Fetcher) pointing to
the specified `workerName`. If `workerName` is not specified, defaults to the
entrypoint Worker. Note this `Fetcher` uses the experimental
[`service_binding_extra_handlers`](https://github.com/cloudflare/workerd/blob/1d9158af7ca1389474982c76ace9e248320bec77/src/workerd/io/compatibility-date.capnp#L290-L297)
compatibility flag to expose
[`scheduled()`](https://workers-types.pages.dev/experimental/#Fetcher.scheduled)
and [`queue()`](https://workers-types.pages.dev/experimental/#Fetcher.queue)
methods for dispatching `scheduled` and `queue` events.

- `getCaches(): Promise<CacheStorage>`

Returns a `Promise` that resolves with the
Expand Down
16 changes: 16 additions & 0 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type {
CacheStorage,
D1Database,
DurableObjectNamespace,
Fetcher,
KVNamespace,
Queue,
R2Bucket,
Expand Down Expand Up @@ -1399,6 +1400,21 @@ export class Miniflare {

return bindings as Env;
}
async getWorker(workerName?: string): Promise<ReplaceWorkersTypes<Fetcher>> {
const proxyClient = await this._getProxyClient();

// Find worker by name, defaulting to entrypoint worker if none specified
const workerIndex = this.#findAndAssertWorkerIndex(workerName);
const workerOpts = this.#workerOpts[workerIndex];
workerName = workerOpts.core.name ?? "";

// Get a `Fetcher` to that worker (NOTE: the `ProxyServer` Durable Object
// shares its `env` with Miniflare's entry worker, so has access to routes)
const bindingName = CoreBindings.SERVICE_USER_ROUTE_PREFIX + workerName;
const fetcher = proxyClient.env[bindingName];
assert(fetcher !== undefined);
return fetcher as ReplaceWorkersTypes<Fetcher>;
}

async #getProxy<T>(
pluginName: string,
Expand Down
3 changes: 1 addition & 2 deletions packages/miniflare/src/plugins/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,7 @@ export function getGlobalServices({
name: CoreBindings.DURABLE_OBJECT_NAMESPACE_PROXY,
durableObjectNamespace: { className: "ProxyServer" },
},
// Add `proxyBindings` here, they'll be added to the `ProxyServer` `env`.
// TODO(someday): consider making the proxy server a separate worker
// Add `proxyBindings` here, they'll be added to the `ProxyServer` `env`
...proxyBindings,
];
if (sharedOptions.upstream !== undefined) {
Expand Down
124 changes: 121 additions & 3 deletions packages/miniflare/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,112 @@ test("Miniflare: getBindings() returns all bindings", async (t) => {
};
t.throws(() => bindings.KV.get("key"), expectations);
});
test("Miniflare: getWorker() allows dispatching events directly", async (t) => {
const mf = new Miniflare({
modules: true,
script: `
let lastScheduledController;
let lastQueueBatch;
export default {
async fetch(request, env, ctx) {
const { pathname } = new URL(request.url);
if (pathname === "/scheduled") {
return Response.json({
scheduledTime: lastScheduledController?.scheduledTime,
cron: lastScheduledController?.cron,
});
} else if (pathname === "/queue") {
return Response.json({
queue: lastQueueBatch.queue,
messages: lastQueueBatch.messages.map((message) => ({
id: message.id,
timestamp: message.timestamp.getTime(),
body: message.body,
bodyType: message.body.constructor.name,
})),
});
} else {
return new Response(null, { status: 404 });
}
},
async scheduled(controller, env, ctx) {
lastScheduledController = controller;
if (controller.cron === "* * * * *") controller.noRetry();
},
async queue(batch, env, ctx) {
lastQueueBatch = batch;
if (batch.queue === "needy") batch.retryAll();
for (const message of batch.messages) {
if (message.id === "perfect") message.ack();
}
}
}`,
});
t.teardown(() => mf.dispose());
const fetcher = await mf.getWorker();

// Check `Fetcher#scheduled()` (implicitly testing `Fetcher#fetch()`)
let scheduledResult = await fetcher.scheduled({
cron: "* * * * *",
});
t.deepEqual(scheduledResult, { outcome: "ok", noRetry: true });
scheduledResult = await fetcher.scheduled({
scheduledTime: new Date(1000),
cron: "30 * * * *",
});
t.deepEqual(scheduledResult, { outcome: "ok", noRetry: false });

let res = await fetcher.fetch("http://localhost/scheduled");
const scheduledController = await res.json();
t.deepEqual(scheduledController, {
scheduledTime: 1000,
cron: "30 * * * *",
});

// Check `Fetcher#queue()`
let queueResult = await fetcher.queue("needy", [
{ id: "a", timestamp: new Date(1000), body: "a" },
{ id: "b", timestamp: new Date(2000), body: { b: 1 } },
]);
t.deepEqual(queueResult, {
outcome: "ok",
retryAll: true,
ackAll: false,
explicitRetries: [],
explicitAcks: [],
});
queueResult = await fetcher.queue("queue", [
{ id: "c", timestamp: new Date(3000), body: new Uint8Array([1, 2, 3]) },
{ id: "perfect", timestamp: new Date(4000), body: new Date(5000) },
]);
t.deepEqual(queueResult, {
outcome: "ok",
retryAll: false,
ackAll: false,
explicitRetries: [],
explicitAcks: ["perfect"],
});

res = await fetcher.fetch("http://localhost/queue");
const queueBatch = await res.json();
t.deepEqual(queueBatch, {
queue: "queue",
messages: [
{
id: "c",
timestamp: 3000,
body: { 0: 1, 1: 2, 2: 3 },
bodyType: "Uint8Array",
},
{
id: "perfect",
timestamp: 4000,
body: "1970-01-01T00:00:05.000Z",
bodyType: "Date",
},
],
});
});
test("Miniflare: getBindings() and friends return bindings for different workers", async (t) => {
const mf = new Miniflare({
workers: [
Expand All @@ -802,7 +908,7 @@ test("Miniflare: getBindings() and friends return bindings for different workers
modules: true,
script: `
export class DurableObject {}
export default { fetch() { return new Response(null, { status: 404 }); } }
export default { fetch() { return new Response("a"); } }
`,
d1Databases: ["DB"],
durableObjects: { DO: "DurableObject" },
Expand All @@ -811,14 +917,14 @@ test("Miniflare: getBindings() and friends return bindings for different workers
// 2nd worker unnamed, to validate that not specifying a name when
// getting bindings gives the entrypoint, not the unnamed worker
script:
'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));',
'addEventListener("fetch", (event) => event.respondWith(new Response("unnamed")));',
kvNamespaces: ["KV"],
queueProducers: ["QUEUE"],
},
{
name: "b",
script:
'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));',
'addEventListener("fetch", (event) => event.respondWith(new Response("b")));',
r2Buckets: ["BUCKET"],
},
],
Expand All @@ -837,6 +943,18 @@ test("Miniflare: getBindings() and friends return bindings for different workers
message: '"c" worker not found',
});

// Check `getWorker()`
let fetcher = await mf.getWorker();
t.is(await (await fetcher.fetch("http://localhost")).text(), "a");
fetcher = await mf.getWorker("");
t.is(await (await fetcher.fetch("http://localhost")).text(), "unnamed");
fetcher = await mf.getWorker("b");
t.is(await (await fetcher.fetch("http://localhost")).text(), "b");
await t.throwsAsync(() => mf.getWorker("c"), {
instanceOf: TypeError,
message: '"c" worker not found',
});

const unboundExpectations = (name: string): ThrowsExpectation<TypeError> => ({
instanceOf: TypeError,
message: `"${name}" unbound in "c" worker`,
Expand Down

0 comments on commit ff17f0c

Please sign in to comment.