Skip to content

Commit

Permalink
chore: bump workerd to 1.20240329.0 (#5455)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrbbot committed Mar 31, 2024
1 parent 63216f1 commit d994066
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 67 deletions.
5 changes: 5 additions & 0 deletions .changeset/quick-insects-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"miniflare": minor
---

chore: bump `workerd` to [`1.20240329.0`](https://github.com/cloudflare/workerd/releases/tag/v1.20240329.0)
2 changes: 1 addition & 1 deletion fixtures/vitest-pool-workers-examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
},
"devDependencies": {
"@cloudflare/vitest-pool-workers": "workspace:*",
"@cloudflare/workers-types": "^4.20240320.1",
"@cloudflare/workers-types": "^4.20240329.0",
"@types/node": "20.8.3",
"jose": "^5.2.2",
"miniflare": "workspace:*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ it("consumes queue messages", async () => {
{
id: randomBytes(16).toString("hex"),
timestamp: new Date(1000),
attempts: 1,
body: { key: "/1", value: "one" },
},
{
id: randomBytes(16).toString("hex"),
timestamp: new Date(2000),
attempts: 1,
body: { key: "/2", value: "two" },
},
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ it("consumes queue messages", async () => {
{
id: randomBytes(16).toString("hex"),
timestamp: new Date(1000),
attempts: 1,
body: { key: "/1", value: "one" },
},
{
id: randomBytes(16).toString("hex"),
timestamp: new Date(2000),
attempts: 1,
body: { key: "/2", value: "two" },
},
];
Expand Down
4 changes: 2 additions & 2 deletions packages/miniflare/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@
"glob-to-regexp": "^0.4.1",
"stoppable": "^1.1.0",
"undici": "^5.28.2",
"workerd": "1.20240320.1",
"workerd": "1.20240329.0",
"ws": "^8.11.0",
"youch": "^3.2.2",
"zod": "^3.20.6"
},
"devDependencies": {
"@ava/typescript": "^4.0.0",
"@cloudflare/kv-asset-handler": "workspace:*",
"@cloudflare/workers-types": "^4.20240320.1",
"@cloudflare/workers-types": "^4.20240329.0",
"@microsoft/api-extractor": "^7.36.3",
"@types/debug": "^4.1.7",
"@types/estree": "^1.0.0",
Expand Down
4 changes: 4 additions & 0 deletions packages/miniflare/src/workers/node.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,7 @@ declare module "node:crypto" {

export function createHash(algorithm: string): Hash;
}

interface SymbolConstructor {
readonly dispose: unique symbol;
}
20 changes: 11 additions & 9 deletions packages/miniflare/src/workers/queues/broker.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ function serialise(msg: QueueMessage): QueueOutgoingMessage {
}
return {
id: msg.id,
timestamp: msg.timestamp,
timestamp: msg.timestamp.getTime(),
contentType: msg.body.contentType,
body: body.toString("base64"),
};
Expand All @@ -131,13 +131,17 @@ class QueueMessage {

constructor(
readonly id: string,
readonly timestamp: number,
readonly timestamp: Date,
readonly body: QueueBody
) {}

incrementFailedAttempts(): number {
return ++this.#failedAttempts;
}

get failedAttempts() {
return this.#failedAttempts;
}
}

function formatQueueResponse(
Expand Down Expand Up @@ -195,16 +199,14 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
maybeService !== undefined,
`Expected ${bindingName} service binding`
);
const messages = batch.map(({ id, timestamp, body }) => {
const messages = batch.map(({ id, timestamp, body, failedAttempts }) => {
const attempts = failedAttempts + 1;
if (body.contentType === "v8") {
return { id, timestamp, serializedBody: body.body };
return { id, timestamp, serializedBody: body.body, attempts };
} else {
return { id, timestamp, body: body.body };
return { id, timestamp, body: body.body, attempts };
}
});
// @ts-expect-error `Fetcher#queue()` types haven't been updated for
// `serializedBody` yet, and don't allow `number` for `timestamp`, even
// though that's permitted at runtime
return maybeService.queue(this.name, messages);
}

Expand Down Expand Up @@ -322,7 +324,7 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
for (const message of messages) {
const randomness = crypto.getRandomValues(new Uint8Array(16));
const id = message.id ?? Buffer.from(randomness).toString("hex");
const timestamp = message.timestamp ?? this.timers.now();
const timestamp = new Date(message.timestamp ?? this.timers.now());
const body = deserialise(message);
this.#messages.push(new QueueMessage(id, timestamp, body));
}
Expand Down
20 changes: 15 additions & 5 deletions packages/miniflare/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1175,8 +1175,8 @@ test("Miniflare: getWorker() allows dispatching events directly", async (t) => {

// Check `Fetcher#queue()`
let queueResult = await fetcher.queue("needy", [
{ id: "a", timestamp: new Date(1000), body: "a" },
{ id: "b", timestamp: new Date(2000), body: { b: 1 } },
{ id: "a", timestamp: new Date(1000), body: "a", attempts: 1 },
{ id: "b", timestamp: new Date(2000), body: { b: 1 }, attempts: 1 },
]);
t.deepEqual(queueResult, {
outcome: "ok",
Expand All @@ -1188,14 +1188,24 @@ test("Miniflare: getWorker() allows dispatching events directly", async (t) => {
retryMessages: [],
});
queueResult = await fetcher.queue("queue", [
{ id: "c", timestamp: new Date(3000), body: new Uint8Array([1, 2, 3]) },
{ id: "perfect", timestamp: new Date(4000), body: new Date(5000) },
{
id: "c",
timestamp: new Date(3000),
body: new Uint8Array([1, 2, 3]),
attempts: 1,
},
{
id: "perfect",
timestamp: new Date(4000),
body: new Date(5000),
attempts: 1,
},
]);
t.deepEqual(queueResult, {
outcome: "ok",
ackAll: false,
retryBatch: {
retry: false
retry: false,
},
explicitAcks: ["perfect"],
retryMessages: [],
Expand Down
87 changes: 63 additions & 24 deletions packages/miniflare/test/plugins/queues/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ import {

const StringArraySchema = z.string().array();
const MessageArraySchema = z
.object({ queue: z.string(), id: z.string(), body: z.string() })
.object({
queue: z.string(),
id: z.string(),
body: z.string(),
attempts: z.number(),
})
.array();

async function getControlStub(
Expand Down Expand Up @@ -294,7 +299,10 @@ function stripTimings(entries: LogEntry[]) {

test("retries messages", async (t) => {
let batches: z.infer<typeof MessageArraySchema>[] = [];
const bodies = () => batches.map((batch) => batch.map(({ body }) => body));
const bodiesAttempts = () =>
batches.map((batch) =>
batch.map(({ body, attempts }) => ({ body, attempts }))
);

let retryAll = false;
let errorAll = false;
Expand Down Expand Up @@ -326,7 +334,7 @@ test("retries messages", async (t) => {
async queue(batch, env, ctx) {
const res = await env.RETRY_FILTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id, body }) => ({ queue: batch.queue, id, body }))),
body: JSON.stringify(batch.messages.map(({ id, body, attempts }) => ({ queue: batch.queue, id, body, attempts }))),
});
const { retryAll, errorAll, retryMessages } = await res.json();
if (retryAll) {
Expand Down Expand Up @@ -379,7 +387,14 @@ test("retries messages", async (t) => {
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 2);
t.deepEqual(bodies(), [["msg1", "msg2", "msg3"], ["msg2"]]);
t.deepEqual(bodiesAttempts(), [
[
{ body: "msg1", attempts: 1 },
{ body: "msg2", attempts: 1 },
{ body: "msg3", attempts: 1 },
],
[{ body: "msg2", attempts: 2 }],
]);
batches = [];

// Check with explicit retry all
Expand Down Expand Up @@ -412,9 +427,17 @@ test("retries messages", async (t) => {
t.deepEqual(stripTimings(log.logs), [
[LogLevel.INFO, "QUEUE queue 3/3 (Xms)"],
]);
t.deepEqual(bodies(), [
["msg1", "msg2", "msg3"],
["msg1", "msg2", "msg3"],
t.deepEqual(bodiesAttempts(), [
[
{ body: "msg1", attempts: 1 },
{ body: "msg2", attempts: 1 },
{ body: "msg3", attempts: 1 },
],
[
{ body: "msg1", attempts: 2 },
{ body: "msg2", attempts: 2 },
{ body: "msg3", attempts: 2 },
],
]);
batches = [];

Expand Down Expand Up @@ -448,9 +471,17 @@ test("retries messages", async (t) => {
t.deepEqual(stripTimings(log.logs), [
[LogLevel.INFO, "QUEUE queue 3/3 (Xms)"],
]);
t.deepEqual(bodies(), [
["msg1", "msg2", "msg3"],
["msg1", "msg2", "msg3"],
t.deepEqual(bodiesAttempts(), [
[
{ body: "msg1", attempts: 1 },
{ body: "msg2", attempts: 1 },
{ body: "msg3", attempts: 1 },
],
[
{ body: "msg1", attempts: 2 },
{ body: "msg2", attempts: 2 },
{ body: "msg3", attempts: 2 },
],
]);
batches = [];

Expand Down Expand Up @@ -504,10 +535,18 @@ test("retries messages", async (t) => {
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 3);
t.deepEqual(bodies(), [
["msg1", "msg2", "msg3"],
["msg1", "msg2", "msg3"],
["msg3"],
t.deepEqual(bodiesAttempts(), [
[
{ body: "msg1", attempts: 1 },
{ body: "msg2", attempts: 1 },
{ body: "msg3", attempts: 1 },
],
[
{ body: "msg1", attempts: 2 },
{ body: "msg2", attempts: 2 },
{ body: "msg3", attempts: 2 },
],
[{ body: "msg3", attempts: 3 }],
]);
batches = [];
});
Expand Down Expand Up @@ -555,7 +594,7 @@ test("moves to dead letter queue", async (t) => {
async queue(batch, env, ctx) {
const res = await env.RETRY_FILTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id, body }) => ({ queue: batch.queue, id, body }))),
body: JSON.stringify(batch.messages.map(({ id, body, attempts }) => ({ queue: batch.queue, id, body, attempts }))),
});
const { retryMessages } = await res.json();
for (const message of batch.messages) {
Expand Down Expand Up @@ -616,15 +655,15 @@ test("moves to dead letter queue", async (t) => {
log.logs = [];
t.deepEqual(batches, [
[
{ queue: "bad", id: batches[0][0].id, body: "msg1" },
{ queue: "bad", id: batches[0][1].id, body: "msg2" },
{ queue: "bad", id: batches[0][2].id, body: "msg3" },
{ queue: "bad", id: batches[0][0].id, body: "msg1", attempts: 1 },
{ queue: "bad", id: batches[0][1].id, body: "msg2", attempts: 1 },
{ queue: "bad", id: batches[0][2].id, body: "msg3", attempts: 1 },
],
[
{ queue: "dlq", id: batches[0][1].id, body: "msg2" },
{ queue: "dlq", id: batches[0][2].id, body: "msg3" },
{ queue: "dlq", id: batches[0][1].id, body: "msg2", attempts: 1 },
{ queue: "dlq", id: batches[0][2].id, body: "msg3", attempts: 1 },
],
[{ queue: "bad", id: batches[0][1].id, body: "msg2" }],
[{ queue: "bad", id: batches[0][1].id, body: "msg2", attempts: 1 }],
]);

// Check rejects queue as own dead letter queue
Expand Down Expand Up @@ -663,7 +702,7 @@ test("operations permit strange queue names", async (t) => {
async queue(batch, env, ctx) {
await env.REPORTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id, body }) => ({ queue: batch.queue, id, body }))),
body: JSON.stringify(batch.messages.map(({ id, body, attempts }) => ({ queue: batch.queue, id, body, attempts }))),
});
}
}`,
Expand All @@ -676,8 +715,8 @@ test("operations permit strange queue names", async (t) => {
await object.waitForFakeTasks();
const batch = await promise;
t.deepEqual(batch, [
{ queue: id, id: batch[0].id, body: "msg1" },
{ queue: id, id: batch[1].id, body: "msg2" },
{ queue: id, id: batch[0].id, body: "msg1", attempts: 1 },
{ queue: id, id: batch[1].id, body: "msg2", attempts: 1 },
]);
});

Expand Down
2 changes: 1 addition & 1 deletion packages/vitest-pool-workers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"devDependencies": {
"@cloudflare/eslint-config-worker": "workspace:*",
"@cloudflare/workers-tsconfig": "workspace:*",
"@cloudflare/workers-types": "^4.20240320.1",
"@cloudflare/workers-types": "^4.20240329.0",
"@types/node": "20.8.3",
"capnp-ts": "^0.7.0",
"capnpc-ts": "^0.7.0",
Expand Down
16 changes: 16 additions & 0 deletions packages/vitest-pool-workers/src/worker/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class QueueMessage<Body = unknown> /* Message */ {
readonly id!: string;
readonly timestamp!: Date;
readonly body!: Body;
readonly attempts!: number;
[kRetry] = false;
[kAck] = false;

Expand All @@ -171,6 +172,16 @@ class QueueMessage<Body = unknown> /* Message */ {
);
}

let attempts: number;
// noinspection SuspiciousTypeOfGuard
if (typeof message.attempts === "number") {
attempts = message.attempts;
} else {
throw new TypeError(
"Incorrect type for the 'attempts' field on 'ServiceBindingQueueMessage': the provided value is not of type 'number'."
);
}

if ("serializedBody" in message) {
throw new TypeError(
"Cannot use `serializedBody` with `createMessageBatch()`"
Expand All @@ -195,6 +206,11 @@ class QueueMessage<Body = unknown> /* Message */ {
return body;
},
},
attempts: {
get() {
return attempts;
},
},
});
}

Expand Down
Loading

0 comments on commit d994066

Please sign in to comment.