-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Miniflare 3] Implement Queues #566
Conversation
|
d23c667
to
b6ce395
Compare
for (const [queueName, consumer] of queueConsumers) { | ||
if (consumer.deadLetterQueue !== undefined) { | ||
// Check the dead letter queue isn't configured to be the queue itself | ||
// (NOTE: Queues *does* permit DLQ cycles between multiple queues, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queues does permit dead-letter queue cycles between multiple queues, but is this something we want to block in Miniflare? Are developers ever likely to need this?
else if (acked > 0) colour = yellow; | ||
else colour = red; | ||
|
||
let message = `${bold("QUEUE")} ${queueName} ${colour(`${acked}/${total}`)}`; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this log include anything else?
enqueueOn: QueueEnqueueOn, | ||
consumer: QueueConsumer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we wouldn't need to pass enqueueOn
and consumer
each time. The problem is that gateways are created once and then reused across the lifetime of a Miniflare
instance. If the consumer ever changes from for example a setOptions()
call, we need to respect that. I wonder if there's a better way of doing this... 🤔
queueProducers: z | ||
.union([z.record(z.string()), z.string().array()]) | ||
.optional(), | ||
queueConsumers: z | ||
.union([z.record(QueueConsumerOptionsSchema), z.string().array()]) | ||
.optional(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a slightly different API to Miniflare 2...
miniflare/packages/queues/src/plugin.ts
Lines 16 to 32 in e3673a5
export interface BindingOptions { | |
name: string; | |
queueName: string; | |
} | |
export interface ConsumerOptions { | |
queueName: string; | |
maxBatchSize?: number; | |
maxWaitMs?: number; | |
maxRetries?: number; | |
deadLetterQueue?: string; | |
} | |
export interface QueuesOptions { | |
queueBindings?: BindingOptions[]; | |
queueConsumers?: (string | ConsumerOptions)[]; | |
} |
...but I think it more closely matches the rest of Miniflare's options. Queues was previously marked as experimental, so I think this change is ok, but open to suggestions.
export const _QUEUES_COMPATIBLE_V8_VERSION = | ||
semiver(process.versions.v8, "10.0.29") >= 0; | ||
|
||
function assertCompatibleV8Version() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we'd just bump Miniflare 3's minimum supported Node version to 18, but I'm not sure if we want to do that for Wrangler 3 too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's required for Queues...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! How does this work with Wrangler? i.e. if we don't have both workers available as source
// @ts-expect-error "devalue" is ESM-only, so TypeScript requires us to use the | ||
// `*.mts` extension, or set `"type": "module"` in our `package.json`. We can't | ||
// do the first as ambient types from `@cloudflare/workers-types` don't seem to | ||
// work, and the second would affect all consumers of `miniflare`. | ||
import { unflatten } from "devalue"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://devblogs.microsoft.com/typescript/announcing-typescript-5-0-beta/#moduleresolution-bundler should fix this if the Typescript version can be upgraded to 5?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice! Doesn't look like api-extractor
supports TypeScript 5 yet, but we should definitely switch to this when we upgrade. Alternatively, we just scrap API extractor and ship multiple *.d.ts
files for our single bundled *.js
file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would that look like? What's the benefit of api extractor over that approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
api-extractor
just means we have a single index.d.ts
file corresponding to our index.js
bundle. Without it, the structure of our src
folder would make it to the final bundle as TypeScript definitions file. Essentially, I think you'd be able to do stuff like import { ... } from "@miniflare/tre/plugins/core"
as plugins/core/index.d.ts
would exist, but the code for it would be in the index.js
bundle, so it wouldn't actually work.
Are you referring to the bundling of Miniflare into Wrangler here? I think the esbuild plugin for building Workers should alleviate this issue. |
No, I was wondering how this works if Wrangler only has access to one user Worker—do the queue producer and consumer have to be defined in the same script? |
Ah, yeah they would have to be. We could probably do something similar to remote Durable Objects by defining a fake queue handler that proxies to another process which then sends the messages to the correct handler. |
Squashing... |
Hmmm, that Windows test failure on Node 20 looks like a real issue... 🙁 Time to debug... |
Potential fix here: cloudflare/workerd#644 |
I think I'm going to merge this with broken tests... The fix for this will require a |
This PR adds support for Queues to Miniflare 3. It builds on cloudflare/workerd#541, cloudflare/workerd#543, and cloudflare/workerd#545 (thanks @a-robinson!).
cloudflare/workerd#545 adds a
queue()
method on service bindings to dispatchqueue
events to Workers. It expects regular, de-serialised JavaScript objects, but the Queue producer bindings from cloudflare/workerd#541 send V8-serialised objects instead. Unfortunately,workerd
doesn't expose the V8 (de)serialiser in Workers yet, so we need to re-serialise all messages into a format we can deserialise in a Worker. We usedevalue
for this. This is the first time we're using annpm
dependency in an internal Miniflare Worker. To support this, and because it's something we've been wanting to do for a while, the core entrypoint Worker has been moved to a separate TypeScript file, which is built as an additional entrypoint withesbuild
, and type-checked under@cloudflare/workers-types
. This is a separate commit to aid reviewing. We'll migrate other internal Workers to this format soon.Another minor issue is that
workerd
uses V8 serialisation version 15 when sending messages. This is only supported by V8 versions 10.0.29 and above. For reference, the V8 versions associated with notable Node versions are:This means that Queues will only be supported in Miniflare 3 if you're using Node 18 or above. I think this is ok, as I'm pretty sure Queues is still in beta, and it was previously marked experimental in Miniflare 2.
I've added some other questions that would be good to get opinions on too. 🙂