Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miniflare 3] Add getFetcher() for dispatching fetch/scheduled/queue events #708

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/miniflare/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,18 @@ defined at the top-level.
bindings, for all bindings in the Worker with the specified `workerName`. If
`workerName` is not specified, defaults to the entrypoint Worker.

- `getFetcher(workerName?: string): Promise<Fetcher>`

Returns a `Promise` that resolves with a
[`Fetcher`](https://workers-types.pages.dev/experimental/#Fetcher) pointing to
the specified `workerName`. If `workerName` is not specified, defaults to the
entrypoint Worker. Note this `Fetcher` uses the experimental
[`service_binding_extra_handlers`](https://github.com/cloudflare/workerd/blob/1d9158af7ca1389474982c76ace9e248320bec77/src/workerd/io/compatibility-date.capnp#L290-L297)
compatibility flag to expose
[`scheduled()`](https://workers-types.pages.dev/experimental/#Fetcher.scheduled)
and [`queue()`](https://workers-types.pages.dev/experimental/#Fetcher.queue)
methods for dispatching `scheduled` and `queue` events.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to add an example here, with a <details>.


- `getCaches(): Promise<CacheStorage>`

Returns a `Promise` that resolves with the
Expand Down
16 changes: 16 additions & 0 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type {
CacheStorage,
D1Database,
DurableObjectNamespace,
Fetcher,
KVNamespace,
Queue,
R2Bucket,
Expand Down Expand Up @@ -1399,6 +1400,21 @@ export class Miniflare {

return bindings as Env;
}
async getFetcher(workerName?: string): Promise<ReplaceWorkersTypes<Fetcher>> {
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
const proxyClient = await this._getProxyClient();

// Find worker by name, defaulting to entrypoint worker if none specified
const workerIndex = this.#findAndAssertWorkerIndex(workerName);
const workerOpts = this.#workerOpts[workerIndex];
workerName = workerOpts.core.name ?? "";

// Get a `Fetcher` to that worker (NOTE: the `ProxyServer` Durable Object
// shares its `env` with Miniflare's entry worker, so has access to routes)
const bindingName = CoreBindings.SERVICE_USER_ROUTE_PREFIX + workerName;
const fetcher = proxyClient.env[bindingName];
assert(fetcher !== undefined);
return fetcher as ReplaceWorkersTypes<Fetcher>;
}

async #getProxy<T>(
pluginName: string,
Expand Down
3 changes: 1 addition & 2 deletions packages/miniflare/src/plugins/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,7 @@ export function getGlobalServices({
name: CoreBindings.DURABLE_OBJECT_NAMESPACE_PROXY,
durableObjectNamespace: { className: "ProxyServer" },
},
// Add `proxyBindings` here, they'll be added to the `ProxyServer` `env`.
// TODO(someday): consider making the proxy server a separate worker
// Add `proxyBindings` here, they'll be added to the `ProxyServer` `env`
...proxyBindings,
];
if (sharedOptions.upstream !== undefined) {
Expand Down
124 changes: 121 additions & 3 deletions packages/miniflare/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,112 @@ test("Miniflare: getBindings() returns all bindings", async (t) => {
};
t.throws(() => bindings.KV.get("key"), expectations);
});
test("Miniflare: getFetcher() allows dispatching events directly", async (t) => {
const mf = new Miniflare({
modules: true,
script: `
let lastScheduledController;
let lastQueueBatch;
export default {
async fetch(request, env, ctx) {
const { pathname } = new URL(request.url);
if (pathname === "/scheduled") {
return Response.json({
scheduledTime: lastScheduledController?.scheduledTime,
cron: lastScheduledController?.cron,
});
} else if (pathname === "/queue") {
return Response.json({
queue: lastQueueBatch.queue,
messages: lastQueueBatch.messages.map((message) => ({
id: message.id,
timestamp: message.timestamp.getTime(),
body: message.body,
bodyType: message.body.constructor.name,
})),
});
} else {
return new Response(null, { status: 404 });
}
},
async scheduled(controller, env, ctx) {
lastScheduledController = controller;
if (controller.cron === "* * * * *") controller.noRetry();
},
async queue(batch, env, ctx) {
lastQueueBatch = batch;
if (batch.queue === "needy") batch.retryAll();
for (const message of batch.messages) {
if (message.id === "perfect") message.ack();
}
}
}`,
});
t.teardown(() => mf.dispose());
const fetcher = await mf.getFetcher();

// Check `Fetcher#scheduled()` (implicitly testing `Fetcher#fetch()`)
let scheduledResult = await fetcher.scheduled({
cron: "* * * * *",
});
t.deepEqual(scheduledResult, { outcome: "ok", noRetry: true });
scheduledResult = await fetcher.scheduled({
scheduledTime: new Date(1000),
cron: "30 * * * *",
});
t.deepEqual(scheduledResult, { outcome: "ok", noRetry: false });

let res = await fetcher.fetch("http://localhost/scheduled");
const scheduledController = await res.json();
t.deepEqual(scheduledController, {
scheduledTime: 1000,
cron: "30 * * * *",
});

// Check `Fetcher#queue()`
let queueResult = await fetcher.queue("needy", [
{ id: "a", timestamp: new Date(1000), body: "a" },
{ id: "b", timestamp: new Date(2000), body: { b: 1 } },
]);
t.deepEqual(queueResult, {
outcome: "ok",
retryAll: true,
ackAll: false,
explicitRetries: [],
explicitAcks: [],
});
queueResult = await fetcher.queue("queue", [
{ id: "c", timestamp: new Date(3000), body: new Uint8Array([1, 2, 3]) },
{ id: "perfect", timestamp: new Date(4000), body: new Date(5000) },
]);
t.deepEqual(queueResult, {
outcome: "ok",
retryAll: false,
ackAll: false,
explicitRetries: [],
explicitAcks: ["perfect"],
});

res = await fetcher.fetch("http://localhost/queue");
const queueBatch = await res.json();
t.deepEqual(queueBatch, {
queue: "queue",
messages: [
{
id: "c",
timestamp: 3000,
body: { 0: 1, 1: 2, 2: 3 },
bodyType: "Uint8Array",
},
{
id: "perfect",
timestamp: 4000,
body: "1970-01-01T00:00:05.000Z",
bodyType: "Date",
},
],
});
});
test("Miniflare: getBindings() and friends return bindings for different workers", async (t) => {
const mf = new Miniflare({
workers: [
Expand All @@ -802,7 +908,7 @@ test("Miniflare: getBindings() and friends return bindings for different workers
modules: true,
script: `
export class DurableObject {}
export default { fetch() { return new Response(null, { status: 404 }); } }
export default { fetch() { return new Response("a"); } }
`,
d1Databases: ["DB"],
durableObjects: { DO: "DurableObject" },
Expand All @@ -811,14 +917,14 @@ test("Miniflare: getBindings() and friends return bindings for different workers
// 2nd worker unnamed, to validate that not specifying a name when
// getting bindings gives the entrypoint, not the unnamed worker
script:
'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));',
'addEventListener("fetch", (event) => event.respondWith(new Response("unnamed")));',
kvNamespaces: ["KV"],
queueProducers: ["QUEUE"],
},
{
name: "b",
script:
'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));',
'addEventListener("fetch", (event) => event.respondWith(new Response("b")));',
r2Buckets: ["BUCKET"],
},
],
Expand All @@ -837,6 +943,18 @@ test("Miniflare: getBindings() and friends return bindings for different workers
message: '"c" worker not found',
});

// Check `getFetcher()`
let fetcher = await mf.getFetcher();
t.is(await (await fetcher.fetch("http://localhost")).text(), "a");
fetcher = await mf.getFetcher("");
t.is(await (await fetcher.fetch("http://localhost")).text(), "unnamed");
fetcher = await mf.getFetcher("b");
t.is(await (await fetcher.fetch("http://localhost")).text(), "b");
await t.throwsAsync(() => mf.getFetcher("c"), {
instanceOf: TypeError,
message: '"c" worker not found',
});

const unboundExpectations = (name: string): ThrowsExpectation<TypeError> => ({
instanceOf: TypeError,
message: `"${name}" unbound in "c" worker`,
Expand Down