From 9a46e03f013cc6f1e2d38d47f9bf002626b6bd95 Mon Sep 17 00:00:00 2001 From: Pedro Leal Date: Mon, 15 Apr 2024 21:08:01 +0100 Subject: [PATCH] feat(wrangler): updated Queues client to use the new 'by-id' endpoints (#5477) --- .changeset/famous-vans-sin.md | 5 + .../wrangler/src/__tests__/deploy.test.ts | 390 +++++++-- .../wrangler/src/__tests__/queues.test.ts | 743 +++++++++++++++--- packages/wrangler/src/__tests__/r2.test.ts | 42 +- packages/wrangler/src/deploy/deploy.ts | 118 ++- .../cli/commands/consumer/http-pull/add.ts | 34 +- .../cli/commands/consumer/worker/add.ts | 30 +- .../cli/commands/consumer/worker/remove.ts | 4 +- packages/wrangler/src/queues/client.ts | 402 +++++++--- packages/wrangler/src/r2/helpers.ts | 4 +- packages/wrangler/src/r2/notification.ts | 4 +- packages/wrangler/src/triggers/deploy.ts | 4 +- packages/wrangler/src/versions/upload.ts | 30 +- 13 files changed, 1335 insertions(+), 475 deletions(-) create mode 100644 .changeset/famous-vans-sin.md diff --git a/.changeset/famous-vans-sin.md b/.changeset/famous-vans-sin.md new file mode 100644 index 00000000000..724302c18ae --- /dev/null +++ b/.changeset/famous-vans-sin.md @@ -0,0 +1,5 @@ +--- +"wrangler": minor +--- + +feature: Changed Queues client to use the new QueueId and ConsumerId-based endpoints. diff --git a/packages/wrangler/src/__tests__/deploy.test.ts b/packages/wrangler/src/__tests__/deploy.test.ts index da1af0576fd..08396f00317 100644 --- a/packages/wrangler/src/__tests__/deploy.test.ts +++ b/packages/wrangler/src/__tests__/deploy.test.ts @@ -54,7 +54,7 @@ import type { KVNamespaceInfo } from "../kv/helpers"; import type { PostQueueBody, PostTypedConsumerBody, - PutConsumerBody, + QueueResponse, } from "../queues/client"; import type { RestRequest } from "msw"; @@ -9051,8 +9051,18 @@ export default{ }, ], }); - mockGetQueue(queueName, queueId); - mockPutQueue(queueId, { + const existingQueue = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + mockPutQueueById(queueId, { queue_name: queueName, settings: {}, }); @@ -9089,8 +9099,18 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - mockGetQueue("queue1", queueId); - mockPutQueue(queueId, { + const existingQueue = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + mockPutQueueById(queueId, { queue_name: queueName, settings: { delivery_delay: 10, @@ -9113,7 +9133,61 @@ export default{ `); }); - it("should update queue consumers on deploy", async () => { + it("should post worker queue consumers on deploy", async () => { + writeWranglerToml({ + queues: { + consumers: [ + { + queue: queueName, + dead_letter_queue: "myDLQ", + max_batch_size: 5, + max_batch_timeout: 3, + max_retries: 10, + retry_delay: 5, + }, + ], + }, + }); + await fs.promises.writeFile("index.js", `export default {};`); + mockSubDomainRequest(); + mockUploadWorkerRequest(); + const existingQueue: QueueResponse = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 0, + consumers_total_count: 0, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + mockPostConsumerById(queueId, { + dead_letter_queue: "myDLQ", + type: "worker", + script_name: "test-name", + settings: { + batch_size: 5, + max_retries: 10, + max_wait_time_ms: 3000, + retry_delay: 5, + }, + }); + await runWrangler("deploy index.js"); + expect(std.out).toMatchInlineSnapshot(` + "Total Upload: xx KiB / gzip: xx KiB + Uploaded test-name (TIMINGS) + Published test-name (TIMINGS) + https://test-name.test-sub-domain.workers.dev + Consumer for queue1 + Current Deployment ID: Galaxy-Class + + + NOTE: \\"Deployment ID\\" in this output will be changed to \\"Version ID\\" in a future version of Wrangler. To learn more visit: https://developers.cloudflare.com/workers/configuration/versions-and-deployments" + `); + }); + + it("should update worker queue consumers on deploy", async () => { writeWranglerToml({ queues: { consumers: [ @@ -9131,9 +9205,29 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - mockGetQueue(queueName); - mockPutQueueConsumer(queueName, "test-name", { + const expectedConsumerId = "consumerId"; + const existingQueue: QueueResponse = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [ + { + script: "test-name", + consumer_id: expectedConsumerId, + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 1, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + mockPutQueueConsumerById(queueId, queueName, expectedConsumerId, { dead_letter_queue: "myDLQ", + type: "worker", + script_name: "test-name", settings: { batch_size: 5, max_retries: 10, @@ -9155,6 +9249,73 @@ export default{ `); }); + it("should update worker (service) queue consumers with default environment on deploy", async () => { + writeWranglerToml({ + queues: { + consumers: [ + { + queue: queueName, + dead_letter_queue: "myDLQ", + max_batch_size: 5, + max_batch_timeout: 3, + max_retries: 10, + retry_delay: 5, + }, + ], + }, + }); + await fs.promises.writeFile("index.js", `export default {};`); + mockSubDomainRequest(); + mockUploadWorkerRequest(); + const expectedConsumerId = "consumerId"; + const expectedConsumerName = "test-name"; + const expectedEnvironment = "production"; + const existingQueue: QueueResponse = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [ + { + service: expectedConsumerName, + environment: "production", + consumer_id: expectedConsumerId, + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 1, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + mockGetServiceByName(expectedConsumerName, expectedEnvironment); + mockPutQueueConsumerById(queueId, queueName, expectedConsumerId, { + dead_letter_queue: "myDLQ", + type: "worker", + script_name: "test-name", + settings: { + batch_size: 5, + max_retries: 10, + max_wait_time_ms: 3000, + retry_delay: 5, + }, + }); + + await runWrangler("deploy index.js"); + expect(std.out).toMatchInlineSnapshot(` + "Total Upload: xx KiB / gzip: xx KiB + Uploaded test-name (TIMINGS) + Published test-name (TIMINGS) + https://test-name.test-sub-domain.workers.dev + Consumer for queue1 + Current Deployment ID: Galaxy-Class + + + NOTE: \\"Deployment ID\\" in this output will be changed to \\"Version ID\\" in a future version of Wrangler. To learn more visit: https://developers.cloudflare.com/workers/configuration/versions-and-deployments" + `); + }); + it("should post queue http consumers on deploy", async () => { writeWranglerToml({ queues: { @@ -9174,7 +9335,17 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - mockGetQueue(queueName, queueId); + const existingQueue: QueueResponse = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 0, + consumers_total_count: 0, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); mockPostQueueHTTPConsumer(queueId, { type: "http_pull", dead_letter_queue: "myDLQ", @@ -9213,28 +9384,24 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - msw.use( - rest.get( - `*/accounts/:accountId/workers/queues/queue1`, - (req, res, ctx) => { - expect(req.params.accountId).toEqual("some-account-id"); - return res( - ctx.json({ - success: true, - errors: [], - messages: [], - result: { - queue: queueName, - queue_id: queueId, - consumers: [ - { type: "http_pull", consumer_id: "queue1-consumer-id" }, - ], - }, - }) - ); - } - ) - ); + const existingQueue: QueueResponse = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [ + { + type: "http_pull", + consumer_id: "queue1-consumer-id", + settings: {}, + }, + ], + producers_total_count: 0, + consumers_total_count: 0, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + msw.use( rest.put( `*/accounts/:accountId/queues/:queueId/consumers/:consumerId`, @@ -9285,9 +9452,29 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - mockGetQueue(queueName); - mockPutQueueConsumer(queueName, "test-name", { + const consumerId = "consumer-id"; + const existingQueue: QueueResponse = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [ + { + type: "worker", + script: "test-name", + consumer_id: consumerId, + settings: {}, + }, + ], + producers_total_count: 0, + consumers_total_count: 0, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + mockPutQueueConsumerById(queueId, queueName, consumerId, { dead_letter_queue: "myDLQ", + type: "worker", + script_name: "test-name", settings: { batch_size: 5, max_retries: 10, @@ -9327,15 +9514,37 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - mockGetQueue(queueName); - mockPutQueueConsumer(queueName, "test-name", { + + const consumerId = "consumer-id"; + const existingQueue: QueueResponse = { + queue_id: queueId, + queue_name: queueName, + created_on: "", + producers: [], + consumers: [ + { + type: "worker", + script: "test-name", + consumer_id: consumerId, + settings: {}, + }, + ], + producers_total_count: 0, + consumers_total_count: 0, + modified_on: "", + }; + mockGetQueueByName(queueName, existingQueue); + mockPutQueueConsumerById(queueId, queueName, consumerId, { dead_letter_queue: "myDLQ", + type: "worker", + script_name: "test-name", settings: { batch_size: 5, max_retries: 10, max_wait_time_ms: 3000, }, }); + await runWrangler("deploy index.js"); expect(std.out).toMatchInlineSnapshot(` "Total Upload: xx KiB / gzip: xx KiB @@ -9368,7 +9577,7 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - mockGetQueueMissing("queue1"); + mockGetQueueByName(queueName, null); await expect( runWrangler("deploy index.js") @@ -9387,7 +9596,7 @@ export default{ await fs.promises.writeFile("index.js", `export default {};`); mockSubDomainRequest(); mockUploadWorkerRequest(); - mockGetQueueMissing(queueName); + mockGetQueueByName(queueName, null); await expect( runWrangler("deploy index.js") @@ -10310,23 +10519,25 @@ function mockServiceScriptData(options: { } } -function mockGetQueue(expectedQueueName: string, expectedQueueId?: string) { +function mockGetQueueByName(queueName: string, queue: QueueResponse | null) { const requests = { count: 0 }; msw.use( rest.get( - `*/accounts/:accountId/workers/queues/${expectedQueueName}`, - (req, res, ctx) => { - expect(req.params.accountId).toEqual("some-account-id"); + "*/accounts/:accountId/queues?*", + async (request, response, context) => { requests.count += 1; - return res( - ctx.json({ + expect(await request.text()).toEqual(""); + if (queue) { + const nameParam = request.url.searchParams.getAll("name"); + expect(nameParam.length).toBeGreaterThan(0); + expect(nameParam[0]).toEqual(queueName); + } + return response( + context.json({ success: true, errors: [], messages: [], - result: { - queue_name: expectedQueueName, - queue_id: expectedQueueId, - }, + result: queue ? [queue] : [], }) ); } @@ -10335,43 +10546,46 @@ function mockGetQueue(expectedQueueName: string, expectedQueueId?: string) { return requests; } -function mockGetQueueMissing(expectedQueueName: string) { +function mockGetServiceByName(serviceName: string, defaultEnvironment: string) { const requests = { count: 0 }; + const resource = `*/accounts/:accountId/workers/services/:serviceName`; msw.use( - rest.get( - `*/accounts/:accountId/workers/queues/${expectedQueueName}`, - (req, res, ctx) => { - requests.count += 1; - expect(req.params.accountId).toEqual("some-account-id"); + rest.get(resource, async (request, response, context) => { + requests.count += 1; + expect(request.params.accountId).toEqual("some-account-id"); + expect(request.params.serviceName).toEqual(serviceName); - return res( - ctx.json({ - success: false, - errors: [ - { - code: 11000, - message: "workers.api.error.queue_not_found", + return response( + context.json({ + success: true, + errors: [], + messages: [], + result: { + id: serviceName, + default_environment: { + environment: defaultEnvironment, + script: { + last_deployed_from: "wrangler", }, - ], - messages: [], - result: null, - }) - ); - } - ) + }, + }, + }) + ); + }) ); return requests; } -function mockPutQueueConsumer( +function mockPutQueueConsumerById( + expectedQueueId: string, expectedQueueName: string, - expectedConsumerName: string, - expectedBody: PutConsumerBody + expectedConsumerId: string, + expectedBody: PostTypedConsumerBody ) { const requests = { count: 0 }; msw.use( rest.put( - `*/accounts/:accountId/workers/queues/${expectedQueueName}/consumers/${expectedConsumerName}`, + `*/accounts/:accountId/queues/${expectedQueueId}/consumers/${expectedConsumerId}`, async (req, res, ctx) => { const body = await req.json(); expect(req.params.accountId).toEqual("some-account-id"); @@ -10382,7 +10596,34 @@ function mockPutQueueConsumer( success: true, errors: [], messages: [], - result: { queue: expectedQueueName }, + result: { queue_name: expectedQueueName }, + }) + ); + } + ) + ); + return requests; +} + +function mockPostConsumerById( + expectedQueueId: string, + expectedBody: PostTypedConsumerBody +) { + const requests = { count: 0 }; + msw.use( + rest.post( + "*/accounts/:accountId/queues/:queueId/consumers", + async (request, response, context) => { + requests.count += 1; + expect(request.params.queueId).toEqual(expectedQueueId); + expect(request.params.accountId).toEqual("some-account-id"); + expect(await request.json()).toEqual(expectedBody); + return response.once( + context.json({ + success: true, + errors: [], + messages: [], + result: {}, }) ); } @@ -10391,7 +10632,10 @@ function mockPutQueueConsumer( return requests; } -function mockPutQueue(expectedQueueId: string, expectedBody: PostQueueBody) { +function mockPutQueueById( + expectedQueueId: string, + expectedBody: PostQueueBody +) { const requests = { count: 0 }; msw.use( rest.put(`*/accounts/:accountId/queues/:queueId`, async (req, res, ctx) => { diff --git a/packages/wrangler/src/__tests__/queues.test.ts b/packages/wrangler/src/__tests__/queues.test.ts index 9ca0a8e7e38..a261fa611e8 100644 --- a/packages/wrangler/src/__tests__/queues.test.ts +++ b/packages/wrangler/src/__tests__/queues.test.ts @@ -4,11 +4,7 @@ import { mockConsoleMethods } from "./helpers/mock-console"; import { msw } from "./helpers/msw"; import { runInTempDir } from "./helpers/run-in-tmp"; import { runWrangler } from "./helpers/run-wrangler"; -import type { - PostConsumerBody, - PostTypedConsumerBody, - QueueResponse, -} from "../queues/client"; +import type { PostTypedConsumerBody, QueueResponse } from "../queues/client"; describe("wrangler", () => { mockAccountId(); @@ -17,6 +13,10 @@ describe("wrangler", () => { const std = mockConsoleMethods(); describe("queues", () => { + const expectedQueueId = "queueId"; + const expectedConsumerId = "consumerId"; + const expectedQueueName = "testQueue"; + it("should show the correct help text", async () => { await runWrangler("queues --help"); expect(std.err).toMatchInlineSnapshot(`""`); @@ -40,12 +40,42 @@ describe("wrangler", () => { `); }); + function mockGetQueueByNameRequest( + queueName: string, + queue: QueueResponse | null + ) { + const requests = { count: 0 }; + msw.use( + rest.get( + "*/accounts/:accountId/queues?*", + async (request, response, context) => { + requests.count += 1; + if (queue) { + const nameParam = request.url.searchParams.getAll("name"); + expect(nameParam.length).toBeGreaterThan(0); + expect(nameParam[0]).toEqual(queueName); + } + expect(await request.text()).toEqual(""); + return response.once( + context.json({ + success: true, + errors: [], + messages: [], + result: queue ? [queue] : [], + }) + ); + } + ) + ); + return requests; + } + describe("list", () => { function mockListRequest(queues: QueueResponse[], page: number) { const requests = { count: 0 }; msw.use( rest.get( - "*/accounts/:accountId/workers/queues?*", + "*/accounts/:accountId/queues?*", async (request, response, context) => { requests.count += 1; const query = request.url.searchParams; @@ -163,14 +193,14 @@ describe("wrangler", () => { describe("create", () => { function mockCreateRequest( - expectedQueueName: string, + queueName: string, queueSettings: { delivery_delay?: number } | undefined = undefined ) { const requests = { count: 0 }; msw.use( rest.post( - "*/accounts/:accountId/workers/queues", + "*/accounts/:accountId/queues", async (request, response, context) => { requests.count += 1; @@ -180,7 +210,7 @@ describe("wrangler", () => { delivery_delay: number; }; }; - expect(body.queue_name).toEqual(expectedQueueName); + expect(body.queue_name).toEqual(queueName); expect(body.settings).toEqual(queueSettings); return response.once( context.json({ @@ -188,7 +218,7 @@ describe("wrangler", () => { errors: [], messages: [], result: { - queue_name: expectedQueueName, + queue_name: queueName, created_on: "01-01-2001", modified_on: "01-01-2001", }, @@ -237,7 +267,7 @@ describe("wrangler", () => { const queueName = "testQueue"; msw.use( rest.post( - "*/accounts/:accountId/workers/queues", + "*/accounts/:accountId/queues", async (request, response, context) => { expect(request.params.accountId).toEqual("some-account-id"); return response.once( @@ -261,7 +291,7 @@ describe("wrangler", () => { "Creating queue testQueue. Queues is not currently enabled on this account. Go to https://dash.cloudflare.com/some-account-id/workers/queues to enable it. - X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/workers/queues) failed. + X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/queues) failed. workers.api.error.unauthorized [code: 10023] @@ -298,14 +328,14 @@ describe("wrangler", () => { }); describe("delete", () => { - function mockDeleteRequest(expectedQueueName: string) { + function mockDeleteRequest(queueId: string) { const requests = { count: 0 }; msw.use( rest.delete( - "*/accounts/:accountId/workers/queues/:queueName", + "*/accounts/:accountId/queues/:queueId", async (request, response, context) => { requests.count += 1; - expect(request.params.queueName).toEqual(expectedQueueName); + expect(request.params.queueId).toEqual(queueId); expect(request.params.accountId).toEqual("some-account-id"); return response.once( context.json({ @@ -342,13 +372,46 @@ describe("wrangler", () => { }); it("should delete a queue", async () => { - const requests = mockDeleteRequest("testQueue"); + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + + const deleteRequest = mockDeleteRequest(expectedQueueId); await runWrangler("queues delete testQueue"); expect(std.out).toMatchInlineSnapshot(` "Deleting queue testQueue. Deleted queue testQueue." `); - expect(requests.count).toEqual(1); + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(1); + }); + + it("should show error when a queue doesn't exist", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + null + ); + + const deleteRequest = mockDeleteRequest(expectedQueueId); + await runWrangler(); + await expect( + runWrangler("queues delete testQueue") + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"Queue \\"testQueue\\" does not exist. To create it, run: wrangler queues create testQueue"` + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(0); }); }); @@ -379,16 +442,16 @@ describe("wrangler", () => { describe("add", () => { function mockPostRequest( - expectedQueueName: string, - expectedBody: PostConsumerBody + queueName: string, + expectedBody: PostTypedConsumerBody ) { const requests = { count: 0 }; msw.use( rest.post( - "*/accounts/:accountId/workers/queues/:queueName/consumers", + "*/accounts/:accountId/queues/:queueName/consumers", async (request, response, context) => { requests.count += 1; - expect(request.params.queueName).toEqual(expectedQueueName); + expect(request.params.queueName).toEqual(queueName); expect(request.params.accountId).toEqual("some-account-id"); expect(await request.json()).toEqual(expectedBody); return response.once( @@ -435,8 +498,23 @@ describe("wrangler", () => { }); it("should add a consumer using defaults", async () => { - const expectedBody: PostConsumerBody = { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + + const expectedBody: PostTypedConsumerBody = { script_name: "testScript", + type: "worker", environment_name: "", settings: { batch_size: undefined, @@ -447,8 +525,12 @@ describe("wrangler", () => { }, dead_letter_queue: undefined, }; - mockPostRequest("testQueue", expectedBody); + const postRequest = mockPostRequest(expectedQueueId, expectedBody); await runWrangler("queues consumer add testQueue testScript"); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(postRequest.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` "Adding consumer to queue testQueue. Added consumer to queue testQueue." @@ -456,8 +538,23 @@ describe("wrangler", () => { }); it("should add a consumer using custom values", async () => { - const expectedBody: PostConsumerBody = { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + + const expectedBody: PostTypedConsumerBody = { script_name: "testScript", + type: "worker", environment_name: "myEnv", settings: { batch_size: 20, @@ -468,11 +565,15 @@ describe("wrangler", () => { }, dead_letter_queue: "myDLQ", }; - mockPostRequest("testQueue", expectedBody); + const postRequest = mockPostRequest(expectedQueueId, expectedBody); await runWrangler( "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10" ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(postRequest.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` "Adding consumer to queue testQueue. Added consumer to queue testQueue." @@ -480,8 +581,9 @@ describe("wrangler", () => { }); it("should show an error when two retry delays are set", async () => { - const expectedBody: PostConsumerBody = { + const expectedBody: PostTypedConsumerBody = { script_name: "testScript", + type: "worker", environment_name: "myEnv", settings: { batch_size: 20, @@ -505,11 +607,43 @@ describe("wrangler", () => { expect(requests.count).toEqual(0); }); - it("should show link to dash when not enabled", async () => { - const queueName = "testQueue"; + it("should show an error when queue does not exist", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + null + ); + const expectedBody: PostTypedConsumerBody = { + script_name: "testScript", + type: "worker", + environment_name: "myEnv", + settings: { + batch_size: 20, + max_retries: 3, + max_wait_time_ms: 10 * 1000, + max_concurrency: 3, + retry_delay: 0, + }, + dead_letter_queue: "myDLQ", + }; + const postRequest = mockPostRequest(expectedQueueId, expectedBody); + + await expect( + runWrangler( + "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ" + ) + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"Queue \\"testQueue\\" does not exist. To create it, run: wrangler queues create testQueue"` + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(postRequest.count).toEqual(0); + }); + + xit("should show link to dash when not enabled", async () => { + const queueName = "testQueueId"; msw.use( rest.post( - "*/accounts/:accountId/workers/queues/:queueName/consumers", + "*/accounts/:accountId/queues/:testQueueId/consumers", async (request, response, context) => { expect(request.params.queueName).toEqual(queueName); expect(request.params.accountId).toEqual("some-account-id"); @@ -538,7 +672,7 @@ describe("wrangler", () => { "Adding consumer to queue testQueue. Queues is not currently enabled on this account. Go to https://dash.cloudflare.com/some-account-id/workers/queues to enable it. - X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/workers/queues/testQueue/consumers) failed. + X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/queues/testQueue/consumers) failed. workers.api.error.unauthorized [code: 10023] @@ -551,24 +685,16 @@ describe("wrangler", () => { }); describe("delete", () => { - function mockDeleteRequest( - expectedQueueName: string, - expectedScriptName: string, - expectedEnvName?: string - ) { + function mockDeleteRequest(queueId: string, consumerId: string) { const requests = { count: 0 }; - let resource = `accounts/:accountId/workers/queues/:expectedQueueName/consumers/:expectedScriptName`; - if (expectedEnvName !== undefined) { - resource += `/environments/:expectedEnvName`; - } + const resource = `accounts/:accountId/queues/:expectedQueueId/consumers/:expectedConsumerId`; + msw.use( rest.delete(`*/${resource}`, async (request, response, context) => { requests.count++; expect(request.params.accountId).toBe("some-account-id"); - expect(request.params.expectedQueueName).toBe(expectedQueueName); - expect(request.params.expectedScriptName).toBe( - expectedScriptName - ); + expect(request.params.expectedQueueId).toBe(queueId); + expect(request.params.expectedConsumerId).toBe(consumerId); return response.once( context.status(200), context.json({ @@ -584,6 +710,34 @@ describe("wrangler", () => { return requests; } + function mockServiceRequest(serviceName: string, defaultEnv: string) { + const requests = { count: 0 }; + const resource = `accounts/:accountId/workers/services/:serviceName`; + + msw.use( + rest.get(`*/${resource}`, async (request, response, context) => { + requests.count++; + expect(request.params.accountId).toBe("some-account-id"); + expect(request.params.serviceName).toBe(serviceName); + return response.once( + context.status(200), + context.json({ + success: true, + errors: [], + messages: [], + result: { + id: serviceName, + default_environment: { + environment: defaultEnv, + }, + }, + }) + ); + }) + ); + return requests; + } + it("should show the correct help text", async () => { await runWrangler("queues consumer remove --help"); expect(std.err).toMatchInlineSnapshot(`""`); @@ -605,32 +759,367 @@ describe("wrangler", () => { `); }); - it("should delete a consumer with no --env", async () => { - const requests = mockDeleteRequest("testQueue", "testScript"); - await runWrangler("queues consumer remove testQueue testScript"); + it("should show an error when queue does not exist", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + null + ); + const postRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId + ); + + await expect( + runWrangler( + "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ" + ) + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"Queue \\"testQueue\\" does not exist. To create it, run: wrangler queues create testQueue"` + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(postRequest.count).toEqual(0); + }); + + describe("when script consumers are in use", () => { + it("should delete the correct consumer", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerId, + script: "testScript", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); - expect(requests.count).toEqual(1); - expect(std.out).toMatchInlineSnapshot(` + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId + ); + await runWrangler("queues consumer remove testQueue testScript"); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` "Removing consumer from queue testQueue. Removed consumer from queue testQueue." `); + }); + + it("should show error when deleting a non-existing consumer", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerId, + script: "testScriptTwo", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId + ); + await expect( + runWrangler("queues consumer remove testQueue testScript") + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"No worker consumer 'testScript' exists for queue testQueue"` + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(0); + }); }); - it("should delete a consumer with --env", async () => { - const requests = mockDeleteRequest( - "testQueue", - "testScript", - "myEnv" - ); - await runWrangler( - "queues consumer remove testQueue testScript --env myEnv" - ); - - expect(requests.count).toEqual(1); - expect(std.out).toMatchInlineSnapshot(` + describe("when service consumers are in use", () => { + it("should delete a consumer with env set", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerId, + service: "testScript", + environment: "myEnv", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId + ); + await runWrangler( + "queues consumer remove testQueue testScript --env myEnv" + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` "Removing consumer from queue testQueue. Removed consumer from queue testQueue." `); + }); + + it("should show error when deleting a non-matching environment", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerId, + service: "testScriptTwo", + environment: "randomEnvironment", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId + ); + await expect( + runWrangler( + "queues consumer remove testQueue testScript --env anotherEnvironment" + ) + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"No worker consumer 'testScript' exists for queue testQueue"` + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(0); + }); + + it("should delete a consumer without env set", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerId, + service: "testScript", + environment: "myEnv", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 1, + modified_on: "", + } + ); + + const serviceRequest = mockServiceRequest("testScript", "myEnv"); + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId + ); + + await runWrangler("queues consumer remove testQueue testScript"); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(1); + expect(serviceRequest.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` + "Removing consumer from queue testQueue. + Removed consumer from queue testQueue." + `); + }); + + describe("when multiple consumers are set", () => { + it("should delete default environment consumer without env set", async () => { + const expectedDefaultEnvironment = "staging"; + const expectedConsumerIdToDelete = "consumer-id-staging"; + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerIdToDelete, + service: "testScript", + environment: "staging", + type: "worker", + settings: {}, + }, + { + consumer_id: expectedConsumerId, + service: "testScript", + environment: "production", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 2, + modified_on: "", + } + ); + + const serviceRequest = mockServiceRequest( + "testScript", + expectedDefaultEnvironment + ); + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerIdToDelete + ); + await runWrangler("queues consumer remove testQueue testScript"); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(serviceRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` + "Removing consumer from queue testQueue. + Removed consumer from queue testQueue." + `); + }); + + it("should delete matching consumer with env set", async () => { + const expectedConsumerIdToDelete = "consumer-id-staging"; + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerIdToDelete, + service: "testScript", + environment: "staging", + type: "worker", + settings: {}, + }, + { + consumer_id: expectedConsumerId, + service: "testScript", + environment: "consumer-id-production", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 2, + modified_on: "", + } + ); + + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerIdToDelete + ); + await runWrangler( + "queues consumer remove testQueue testScript --env staging" + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` + "Removing consumer from queue testQueue. + Removed consumer from queue testQueue." + `); + }); + + it("should show error when deleting on a non-matching environment", async () => { + const expectedConsumerIdToDelete = "consumer-id-staging"; + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + consumer_id: expectedConsumerIdToDelete, + service: "testScript", + environment: "staging", + type: "worker", + settings: {}, + }, + { + consumer_id: expectedConsumerId, + service: "testScript", + environment: "production", + type: "worker", + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 2, + modified_on: "", + } + ); + + const deleteRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId + ); + await expect( + runWrangler( + "queues consumer remove testQueue testScript --env anotherEnvironment" + ) + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"No worker consumer 'testScript' exists for queue testQueue"` + ); + + expect(queueNameResolveRequest.count).toEqual(1); + expect(deleteRequest.count).toEqual(0); + }); + }); }); }); }); @@ -659,32 +1148,8 @@ describe("wrangler", () => { }); describe("add", () => { - function mockGetQueueRequest(expectedQueueName: string) { - const requests = { count: 0 }; - msw.use( - rest.get( - "*/accounts/:accountId/workers/queues/:queueName", - async (request, response, context) => { - requests.count += 1; - expect(request.params.queueName).toEqual(expectedQueueName); - expect(request.params.accountId).toEqual("some-account-id"); - return response.once( - context.json({ - success: true, - errors: [], - messages: [], - result: { - queue_id: "fake-queue-id", - }, - }) - ); - } - ) - ); - return requests; - } function mockPostRequest( - expectedQueueId: string, + queueId: string, expectedBody: PostTypedConsumerBody ) { const requests = { count: 0 }; @@ -693,7 +1158,7 @@ describe("wrangler", () => { "*/accounts/:accountId/queues/:queueId/consumers", async (request, response, context) => { requests.count += 1; - expect(request.params.queueId).toEqual(expectedQueueId); + expect(request.params.queueId).toEqual(queueId); expect(request.params.accountId).toEqual("some-account-id"); expect(await request.json()).toEqual(expectedBody); return response.once( @@ -738,6 +1203,20 @@ describe("wrangler", () => { }); it("should add a consumer using defaults", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + const expectedBody: PostTypedConsumerBody = { type: "http_pull", settings: { @@ -748,10 +1227,11 @@ describe("wrangler", () => { }, dead_letter_queue: undefined, }; - mockPostRequest("fake-queue-id", expectedBody); - mockGetQueueRequest("testQueue"); + const postRequest = mockPostRequest(expectedQueueId, expectedBody); await runWrangler("queues consumer http add testQueue"); + expect(queueNameResolveRequest.count).toEqual(1); + expect(postRequest.count).toEqual(1); expect(std.out).toMatchInlineSnapshot(` "Adding consumer to queue testQueue. Added consumer to queue testQueue." @@ -759,6 +1239,20 @@ describe("wrangler", () => { }); it("should add a consumer using custom values", async () => { + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + } + ); + const expectedBody: PostTypedConsumerBody = { type: "http_pull", settings: { @@ -769,12 +1263,13 @@ describe("wrangler", () => { }, dead_letter_queue: "myDLQ", }; - mockPostRequest("fake-queue-id", expectedBody); - mockGetQueueRequest("testQueue"); + const postRequest = mockPostRequest(expectedQueueId, expectedBody); await runWrangler( "queues consumer http add testQueue --batch-size 20 --message-retries 3 --visibility-timeout-secs 6 --retry-delay-secs 3 --dead-letter-queue myDLQ" ); + expect(queueNameResolveRequest.count).toEqual(1); + expect(postRequest.count).toEqual(1); expect(std.out).toMatchInlineSnapshot(` "Adding consumer to queue testQueue. Added consumer to queue testQueue." @@ -783,47 +1278,15 @@ describe("wrangler", () => { }); describe("delete", () => { - function mockGetQueueRequest(expectedQueueName: string) { - const requests = { count: 0 }; - msw.use( - rest.get( - "*/accounts/:accountId/workers/queues/:queueName", - async (request, response, context) => { - requests.count += 1; - expect(request.params.queueName).toEqual(expectedQueueName); - expect(request.params.accountId).toEqual("some-account-id"); - return response.once( - context.json({ - success: true, - errors: [], - messages: [], - result: { - queue_id: "fake-queue-id", - consumers: [ - { consumer_id: "fake-consumer-id", type: "http_pull" }, - ], - }, - }) - ); - } - ) - ); - return requests; - } - function mockDeleteRequest( - expectedQueueId: string, - expectedConsumerId: string - ) { + function mockDeleteRequest(queueId: string, consumerId: string) { const requests = { count: 0 }; const resource = `accounts/:accountId/queues/:expectedQueueId/consumers/:expectedConsumerId`; msw.use( rest.delete(`*/${resource}`, async (request, response, context) => { requests.count++; expect(request.params.accountId).toBe("some-account-id"); - expect(request.params.expectedQueueId).toBe(expectedQueueId); - expect(request.params.expectedConsumerId).toBe( - expectedConsumerId - ); + expect(request.params.expectedQueueId).toBe(queueId); + expect(request.params.expectedConsumerId).toBe(consumerId); return response.once( context.status(200), context.json({ @@ -860,14 +1323,34 @@ describe("wrangler", () => { }); it("should delete a pull consumer", async () => { - mockGetQueueRequest("testQueue"); - const requests = mockDeleteRequest( - "fake-queue-id", - "fake-consumer-id" + const queueNameResolveRequest = mockGetQueueByNameRequest( + expectedQueueName, + { + queue_id: expectedQueueId, + queue_name: expectedQueueName, + created_on: "", + producers: [], + consumers: [ + { + type: "http_pull", + consumer_id: expectedConsumerId, + settings: {}, + }, + ], + producers_total_count: 1, + consumers_total_count: 1, + modified_on: "", + } + ); + + const postRequest = mockDeleteRequest( + expectedQueueId, + expectedConsumerId ); await runWrangler("queues consumer http remove testQueue"); - expect(requests.count).toEqual(1); + expect(postRequest.count).toEqual(1); + expect(queueNameResolveRequest.count).toEqual(1); expect(std.out).toMatchInlineSnapshot(` "Removing consumer from queue testQueue. Removed consumer from queue testQueue." diff --git a/packages/wrangler/src/__tests__/r2.test.ts b/packages/wrangler/src/__tests__/r2.test.ts index c2993060073..5e017077181 100644 --- a/packages/wrangler/src/__tests__/r2.test.ts +++ b/packages/wrangler/src/__tests__/r2.test.ts @@ -797,11 +797,13 @@ describe("r2", () => { } ), rest.get( - "*/accounts/:accountId/workers/queues/:queueName", + "*/accounts/:accountId/queues?*", async (request, response, context) => { - const { accountId, queueName } = request.params; + const { accountId } = request.params; + const nameParams = request.url.searchParams.getAll("name"); + expect(accountId).toEqual("some-account-id"); - expect(queueName).toEqual(queue); + expect(nameParams[0]).toEqual(queue); expect(request.headers.get("authorization")).toEqual( "Bearer some-api-token" ); @@ -810,7 +812,18 @@ describe("r2", () => { success: true, errors: [], messages: [], - result: {}, + result: [ + { + queue_id: "queue-id", + queue_name: queue, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + }, + ], }) ); } @@ -877,11 +890,13 @@ describe("r2", () => { } ), rest.get( - "*/accounts/:accountId/workers/queues/:queueName", + "*/accounts/:accountId/queues?*", async (request, response, context) => { - const { accountId, queueName } = request.params; + const { accountId } = request.params; + const nameParams = request.url.searchParams.getAll("name"); + expect(accountId).toEqual("some-account-id"); - expect(queueName).toEqual(queue); + expect(nameParams[0]).toEqual(queue); expect(request.headers.get("authorization")).toEqual( "Bearer some-api-token" ); @@ -890,7 +905,18 @@ describe("r2", () => { success: true, errors: [], messages: [], - result: {}, + result: [ + { + queue_id: "queue-id", + queue_name: queue, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + }, + ], }) ); } diff --git a/packages/wrangler/src/deploy/deploy.ts b/packages/wrangler/src/deploy/deploy.ts index 04cd4ff69a3..03eec85e5cb 100644 --- a/packages/wrangler/src/deploy/deploy.ts +++ b/packages/wrangler/src/deploy/deploy.ts @@ -30,11 +30,12 @@ import { isNavigatorDefined } from "../navigator-user-agent"; import { APIError, ParseError } from "../parse"; import { getWranglerTmpDir } from "../paths"; import { + ensureQueuesExistByConfig, getQueue, - postTypedConsumer, + postConsumer, putConsumer, + putConsumerById, putQueue, - putTypedConsumer, } from "../queues/client"; import { syncAssets } from "../sites"; import { @@ -44,7 +45,6 @@ import { import triggersDeploy from "../triggers/deploy"; import { logVersionIdChange } from "../utils/deployment-id-version-id-change"; import { getZoneForRoute } from "../zones"; -import type { FetchError } from "../cfetch"; import type { Config } from "../config"; import type { CustomDomainRoute, @@ -59,11 +59,7 @@ import type { CfPlacement, CfWorkerInit, } from "../deployment-bundle/worker"; -import type { - PostQueueBody, - PostTypedConsumerBody, - PutConsumerBody, -} from "../queues/client"; +import type { PostQueueBody, PostTypedConsumerBody } from "../queues/client"; import type { AssetPaths } from "../sites"; import type { RetrieveSourceMapFunction } from "../sourcemap"; @@ -701,7 +697,7 @@ See https://developers.cloudflare.com/workers/platform/compatibility-dates for m printBindings({ ...withoutStaticAssets, vars: maskedVars }); if (!props.dryRun) { - await ensureQueuesExist(config); + await ensureQueuesExistByConfig(config); // Upload the script so it has time to propagate. // We can also now tell whether available_on_subdomain is set @@ -1002,47 +998,21 @@ export function isAuthenticationError(e: unknown): e is ParseError { return e instanceof ParseError && (e as { code?: number }).code === 10000; } -export async function ensureQueuesExist(config: Config) { - const producers = (config.queues.producers || []).map( - (producer) => producer.queue - ); - const consumers = (config.queues.consumers || []).map( - (consumer) => consumer.queue - ); - - const queueNames = producers.concat(consumers); - for (const queue of queueNames) { - try { - await getQueue(config, queue); - } catch (err) { - const queueErr = err as FetchError; - if (queueErr.code === 11000) { - // queue_not_found - throw new UserError( - `Queue "${queue}" does not exist. To create it, run: wrangler queues create ${queue}` - ); - } - throw err; - } - } -} - export async function updateQueueProducers( config: Config ): Promise[]> { const producers = config.queues.producers || []; const updateProducers: Promise[] = []; for (const producer of producers) { - const queue = await getQueue(config, producer.queue); const body: PostQueueBody = { - queue_name: queue.queue_name, + queue_name: producer.queue, settings: { delivery_delay: producer.delivery_delay, }, }; updateProducers.push( - putQueue(config, queue.queue_id, body).then(() => [ + putQueue(config, producer.queue, body).then(() => [ `Producer for ${producer.queue}`, ]) ); @@ -1057,22 +1027,24 @@ export async function updateQueueConsumers( const consumers = config.queues.consumers || []; const updateConsumers: Promise[] = []; for (const consumer of consumers) { + const queue = await getQueue(config, consumer.queue); + if (consumer.type === "http_pull") { - const queue = await getQueue(config, consumer.queue); + const body: PostTypedConsumerBody = { + type: consumer.type, + dead_letter_queue: consumer.dead_letter_queue, + settings: { + batch_size: consumer.max_batch_size, + max_retries: consumer.max_retries, + visibility_timeout_ms: consumer.visibility_timeout_ms, + retry_delay: consumer.retry_delay, + }, + }; + const existingConsumer = queue.consumers && queue.consumers[0]; if (existingConsumer) { - const body: PostTypedConsumerBody = { - type: consumer.type, - dead_letter_queue: consumer.dead_letter_queue, - settings: { - batch_size: consumer.max_batch_size, - max_retries: consumer.max_retries, - visibility_timeout_ms: consumer.visibility_timeout_ms, - retry_delay: consumer.retry_delay, - }, - }; updateConsumers.push( - putTypedConsumer( + putConsumerById( config, queue.queue_id, existingConsumer.consumer_id, @@ -1081,25 +1053,23 @@ export async function updateQueueConsumers( ); continue; } - - const body: PostTypedConsumerBody = { - type: consumer.type, - dead_letter_queue: consumer.dead_letter_queue, - settings: { - batch_size: consumer.max_batch_size, - max_retries: consumer.max_retries, - visibility_timeout_ms: consumer.visibility_timeout_ms, - retry_delay: consumer.retry_delay, - }, - }; updateConsumers.push( - postTypedConsumer(config, consumer.queue, body).then(() => [ + postConsumer(config, consumer.queue, body).then(() => [ `Consumer for ${consumer.queue}`, ]) ); } else { - const body: PutConsumerBody = { + if (config.name === undefined) { + // TODO: how can we reliably get the current script name? + throw new UserError( + "Script name is required to update queue consumers" + ); + } + + const body: PostTypedConsumerBody = { + type: "worker", dead_letter_queue: consumer.dead_letter_queue, + script_name: config.name, settings: { batch_size: consumer.max_batch_size, max_retries: consumer.max_retries, @@ -1111,18 +1081,24 @@ export async function updateQueueConsumers( }, }; - if (config.name === undefined) { - // TODO: how can we reliably get the current script name? - throw new UserError( - "Script name is required to update queue consumers" + // Current script already assigned to queue? + const existingConsumer = + queue.consumers.filter( + (c) => c.script === config.name || c.service === config.name + ).length > 0; + const envName = undefined; // TODO: script environment for wrangler deploy? + if (existingConsumer) { + updateConsumers.push( + putConsumer(config, consumer.queue, config.name, envName, body).then( + () => [`Consumer for ${consumer.queue}`] + ) ); + continue; } - const scriptName = config.name; - const envName = undefined; // TODO: script environment for wrangler deploy? updateConsumers.push( - putConsumer(config, consumer.queue, scriptName, envName, body).then( - () => [`Consumer for ${consumer.queue}`] - ) + postConsumer(config, consumer.queue, body).then(() => [ + `Consumer for ${consumer.queue}`, + ]) ); } } diff --git a/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts index 8a80d4e53d6..808999dadfc 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts @@ -1,7 +1,7 @@ import { readConfig } from "../../../../../config"; import { CommandLineArgsError } from "../../../../../index"; import { logger } from "../../../../../logger"; -import { postTypedConsumer } from "../../../../client"; +import { postConsumer } from "../../../../client"; import type { CommonYargsArgv, StrictYargsOptionsToInterface, @@ -40,18 +40,10 @@ export function options(yargs: CommonYargsArgv) { }); } -export async function handler( +function createBody( args: StrictYargsOptionsToInterface -) { - const config = readConfig(args.config, args); - - if (Array.isArray(args.retryDelaySecs)) { - throw new CommandLineArgsError( - `Cannot specify --retry-delay-secs multiple times` - ); - } - - const postTypedConsumerBody: PostTypedConsumerBody = { +): PostTypedConsumerBody { + return { type: "http_pull", settings: { batch_size: args.batchSize, @@ -63,8 +55,22 @@ export async function handler( }, dead_letter_queue: args.deadLetterQueue, }; - logger.log(`Adding consumer to queue ${args.queueName}.`); +} - await postTypedConsumer(config, args.queueName, postTypedConsumerBody); +export async function handler( + args: StrictYargsOptionsToInterface +) { + const config = readConfig(args.config, args); + + if (Array.isArray(args.retryDelaySecs)) { + throw new CommandLineArgsError( + `Cannot specify --retry-delay-secs multiple times` + ); + } + + const body = createBody(args); + + logger.log(`Adding consumer to queue ${args.queueName}.`); + await postConsumer(config, args.queueName, body); logger.log(`Added consumer to queue ${args.queueName}.`); } diff --git a/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts b/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts index 5c02e86314d..9850913ce2d 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts @@ -6,7 +6,7 @@ import type { CommonYargsArgv, StrictYargsOptionsToInterface, } from "../../../../../yargs-types"; -import type { PostConsumerBody } from "../../../../client"; +import type { PostTypedConsumerBody } from "../../../../client"; export function options(yargs: CommonYargsArgv) { return yargs @@ -50,21 +50,14 @@ export function options(yargs: CommonYargsArgv) { }); } -export async function handler( +function createBody( args: StrictYargsOptionsToInterface -) { - const config = readConfig(args.config, args); - - if (Array.isArray(args.retryDelaySecs)) { - throw new CommandLineArgsError( - `Cannot specify --retry-delay-secs multiple times` - ); - } - - const body: PostConsumerBody = { +): PostTypedConsumerBody { + return { script_name: args.scriptName, // TODO(soon) is this still the correct usage of the environment? environment_name: args.env ?? "", // API expects empty string as default + type: "worker", settings: { batch_size: args.batchSize, max_retries: args.messageRetries, @@ -76,6 +69,19 @@ export async function handler( }, dead_letter_queue: args.deadLetterQueue, }; +} +export async function handler( + args: StrictYargsOptionsToInterface +) { + const config = readConfig(args.config, args); + + if (Array.isArray(args.retryDelaySecs)) { + throw new CommandLineArgsError( + `Cannot specify --retry-delay-secs multiple times` + ); + } + + const body = createBody(args); logger.log(`Adding consumer to queue ${args.queueName}.`); await postConsumer(config, args.queueName, body); diff --git a/packages/wrangler/src/queues/cli/commands/consumer/worker/remove.ts b/packages/wrangler/src/queues/cli/commands/consumer/worker/remove.ts index 2ffdef797e4..a90c0b88d2b 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/worker/remove.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/worker/remove.ts @@ -1,6 +1,6 @@ import { readConfig } from "../../../../../config"; import { logger } from "../../../../../logger"; -import { deleteConsumer } from "../../../../client"; +import { deleteWorkerConsumer } from "../../../../client"; import type { CommonYargsArgv, StrictYargsOptionsToInterface, @@ -26,6 +26,6 @@ export async function handler( const config = readConfig(args.config, args); logger.log(`Removing consumer from queue ${args.queueName}.`); - await deleteConsumer(config, args.queueName, args.scriptName, args.env); + await deleteWorkerConsumer(config, args.queueName, args.scriptName, args.env); logger.log(`Removed consumer from queue ${args.queueName}.`); } diff --git a/packages/wrangler/src/queues/client.ts b/packages/wrangler/src/queues/client.ts index 198436dc68f..d4a624719e9 100644 --- a/packages/wrangler/src/queues/client.ts +++ b/packages/wrangler/src/queues/client.ts @@ -1,24 +1,22 @@ -import { fetchResult } from "../cfetch"; +import { URLSearchParams } from "node:url"; +import { fetchPagedListResult, fetchResult } from "../cfetch"; import { type Config } from "../config"; import { UserError } from "../errors"; +import { logger } from "../logger"; import { requireAuth } from "../user"; -export async function createQueue( - config: Config, - body: PostQueueBody -): Promise { - const accountId = await requireAuth(config); - return await fetchResult(`/accounts/${accountId}/workers/queues`, { - method: "POST", - body: JSON.stringify(body), - }); -} - export interface PostQueueBody { queue_name: string; settings?: QueueSettings; } +export interface WorkerService { + id: string; + default_environment: { + environment: string; + }; +} + export interface QueueSettings { delivery_delay?: number; } @@ -31,6 +29,18 @@ export interface PostQueueResponse { modified_on: string; } +export interface QueueResponse { + queue_id: string; + queue_name: string; + created_on: string; + modified_on: string; + producers: ScriptReference[]; + producers_total_count: number; + consumers: Consumer[]; + consumers_total_count: number; + settings?: QueueSettings; +} + export interface ScriptReference { namespace?: string; script?: string; @@ -46,208 +56,338 @@ export type Consumer = ScriptReference & { type: string; }; -export interface QueueResponse { - queue_id: string; +export interface TypedConsumerResponse extends Consumer { queue_name: string; created_on: string; - modified_on: string; - producers: ScriptReference[]; - producers_total_count: number; - consumers: Consumer[]; - consumers_total_count: number; - settings?: QueueSettings; +} + +export interface PostTypedConsumerBody { + type: string; + script_name?: string; + environment_name?: string; + settings: ConsumerSettings; + dead_letter_queue?: string; +} + +export interface ConsumerSettings { + batch_size?: number; + max_retries?: number; + max_wait_time_ms?: number; + max_concurrency?: number | null; + visibility_timeout_ms?: number; + retry_delay?: number; +} + +const queuesUrl = (accountId: string, queueId?: string): string => { + let url = `/accounts/${accountId}/queues`; + if (queueId) { + url += `/${queueId}`; + } + return url; +}; + +const queueConsumersUrl = ( + accountId: string, + queueId: string, + consumerId?: string +): string => { + let url = `${queuesUrl(accountId, queueId)}/consumers`; + if (consumerId) { + url += `/${consumerId}`; + } + return url; +}; + +export async function createQueue( + config: Config, + body: PostQueueBody +): Promise { + const accountId = await requireAuth(config); + return fetchResult(queuesUrl(accountId), { + method: "POST", + body: JSON.stringify(body), + }); } export async function deleteQueue( config: Config, queueName: string +): Promise { + const queue = await getQueue(config, queueName); + return deleteQueueById(config, queue.queue_id); +} + +export async function deleteQueueById( + config: Config, + queueId: string ): Promise { const accountId = await requireAuth(config); - return await fetchResult( - `/accounts/${accountId}/workers/queues/${queueName}`, - { - method: "DELETE", - } - ); + return fetchResult(queuesUrl(accountId, queueId), { + method: "DELETE", + }); } // TODO(soon) show detailed queue response export async function listQueues( config: Config, - page?: number + page?: number, + name?: string ): Promise { page = page ?? 1; const accountId = await requireAuth(config); - return await fetchResult( - `/accounts/${accountId}/workers/queues`, - {}, - new URLSearchParams({ page: page.toString() }) - ); + const params = new URLSearchParams({ page: page.toString() }); + + if (name) { + params.append("name", name); + } + + return fetchResult(queuesUrl(accountId), {}, params); +} + +export async function listAllQueues( + config: Config, + queueNames: string[] +): Promise { + const accountId = await requireAuth(config); + const params = new URLSearchParams(); + + queueNames.forEach((e) => { + params.append("name", e); + }); + + return fetchPagedListResult(queuesUrl(accountId), {}, params); } export async function getQueue( config: Config, queueName: string ): Promise { - const accountId = await requireAuth(config); - return await fetchResult( - `/accounts/${accountId}/workers/queues/${queueName}`, - {} + const queues = await listQueues(config, 1, queueName); + if (queues.length === 0) { + throw new UserError( + `Queue "${queueName}" does not exist. To create it, run: wrangler queues create ${queueName}` + ); + } + return queues[0]; +} + +export async function ensureQueuesExistByConfig(config: Config) { + const producers = (config.queues.producers || []).map( + (producer) => producer.queue + ); + const consumers = (config.queues.consumers || []).map( + (consumer) => consumer.queue ); + + const queueNames = producers.concat(consumers); + await ensureQueuesExist(config, queueNames); } -export async function getQueueByID( +async function ensureQueuesExist(config: Config, queueNames: string[]) { + if (queueNames.length > 0) { + const existingQueues = (await listAllQueues(config, queueNames)).map( + (q) => q.queue_name + ); + + if (queueNames.length !== existingQueues.length) { + const queueSet = new Set(existingQueues); + + for (const queue of queueNames) { + if (!queueSet.has(queue)) { + throw new UserError( + `Queue "${queue}" does not exist. To create it, run: wrangler queues create ${queue}` + ); + } + } + } + } +} + +export async function getQueueById( config: Pick, - queueID: string + queueId: string ): Promise { const accountId = await requireAuth(config); - return await fetchResult(`/accounts/${accountId}/queues/${queueID}`, {}); + return fetchResult(queuesUrl(accountId, queueId), {}); } -export async function postConsumer( +export async function putQueue( config: Config, queueName: string, - body: PostConsumerBody -): Promise { + body: PostQueueBody +): Promise { + const queue = await getQueue(config, queueName); + return putQueueById(config, queue.queue_id, body); +} + +export async function putQueueById( + config: Config, + queueId: string, + body: PostQueueBody +): Promise { const accountId = await requireAuth(config); - return fetchResult( - `/accounts/${accountId}/workers/queues/${queueName}/consumers`, - { - method: "POST", - body: JSON.stringify(body), - } - ); + return fetchResult(queuesUrl(accountId, queueId), { + method: "PUT", + body: JSON.stringify(body), + }); } -export async function postTypedConsumer( +export async function postConsumer( config: Config, queueName: string, body: PostTypedConsumerBody ): Promise { - const accountId = await requireAuth(config); const queue = await getQueue(config, queueName); - return fetchResult( - `/accounts/${accountId}/queues/${queue.queue_id}/consumers`, - { - method: "POST", - body: JSON.stringify(body), - } - ); + return postConsumerById(config, queue.queue_id, body); } -export async function putTypedConsumer( +export async function postConsumerById( config: Config, queueId: string, - consumerId: string, body: PostTypedConsumerBody ): Promise { const accountId = await requireAuth(config); - - return fetchResult( - `/accounts/${accountId}/queues/${queueId}/consumers/${consumerId}`, - { - method: "PUT", - body: JSON.stringify(body), - } - ); + return fetchResult(queueConsumersUrl(accountId, queueId), { + method: "POST", + body: JSON.stringify(body), + }); } -export async function putQueue( +export async function putConsumerById( config: Config, queueId: string, - body: PostQueueBody -): Promise { + consumerId: string, + body: PostTypedConsumerBody +): Promise { const accountId = await requireAuth(config); - return fetchResult(`/accounts/${accountId}/queues/${queueId}/`, { + return fetchResult(queueConsumersUrl(accountId, queueId, consumerId), { method: "PUT", body: JSON.stringify(body), }); } -export interface TypedConsumerResponse extends Consumer { - queue_name: string; - created_on: string; +export async function putConsumer( + config: Config, + queueName: string, + scriptName: string, + envName: string | undefined, + body: PostTypedConsumerBody +): Promise { + const queue = await getQueue(config, queueName); + const targetConsumer = await resolveWorkerConsumerByName( + config, + scriptName, + envName, + queue + ); + return putConsumerById( + config, + queue.queue_id, + targetConsumer.consumer_id, + body + ); } -export interface PostTypedConsumerBody extends PutConsumerBody { - type: string; - script_name?: string; - environment_name?: string; -} +async function resolveWorkerConsumerByName( + config: Config, + consumerName: string, + envName: string | undefined, + queue: QueueResponse +): Promise { + const queueName = queue.queue_name; + const consumers = queue.consumers.filter( + (c) => + c.type === "worker" && + (c.script === consumerName || c.service === consumerName) + ); -export interface PutConsumerBody { - settings: ConsumerSettings; - dead_letter_queue?: string; -} + if (consumers.length === 0) { + throw new UserError( + `No worker consumer '${consumerName}' exists for queue ${queue.queue_name}` + ); + } -export interface PostConsumerBody extends PutConsumerBody { - script_name: string; - environment_name: string; -} + // If more than a consumer with the same name is found, it should be + // a service+environment combination + if (consumers.length > 1) { + const targetEnv = + envName ?? (await getDefaultService(config, consumerName)); + const targetConsumers = consumers.filter( + (c) => c.environment === targetEnv + ); -export interface ConsumerSettings { - batch_size?: number; - max_retries?: number; - max_wait_time_ms?: number; - max_concurrency?: number | null; - visibility_timeout_ms?: number; - retry_delay?: number; + if (targetConsumers.length === 0) { + throw new UserError( + `No worker consumer '${consumerName}' exists for queue ${queueName}` + ); + } + return consumers[0]; + } + + if (consumers[0].service) { + const targetEnv = + envName ?? (await getDefaultService(config, consumerName)); + if (targetEnv != consumers[0].environment) { + throw new UserError( + `No worker consumer '${consumerName}' exists for queue ${queueName}` + ); + } + } + return consumers[0]; } -export interface ConsumerResponse extends PostConsumerBody { - queue_name: string; - script_name: string; - environment_name: string; - settings: ConsumerSettings; - dead_letter_queue?: string; +export async function deleteConsumerById( + config: Config, + queueId: string, + consumerId: string +): Promise { + const accountId = await requireAuth(config); + return fetchResult(queueConsumersUrl(accountId, queueId, consumerId), { + method: "DELETE", + }); } export async function deletePullConsumer( config: Config, queueName: string ): Promise { - const accountId = await requireAuth(config); const queue = await getQueue(config, queueName); const consumer = queue.consumers[0]; if (consumer?.type !== "http_pull") { throw new UserError(`No http_pull consumer exists for queue ${queueName}`); } - const resource = `/accounts/${accountId}/queues/${queue.queue_id}/consumers/${consumer.consumer_id}`; - return fetchResult(resource, { - method: "DELETE", - }); + return deleteConsumerById(config, queue.queue_id, consumer.consumer_id); } -export async function deleteConsumer( +async function getDefaultService( config: Config, - queueName: string, - scriptName: string, - envName?: string -): Promise { + serviceName: string +): Promise { const accountId = await requireAuth(config); - let resource = `/accounts/${accountId}/workers/queues/${queueName}/consumers/${scriptName}`; - if (envName !== undefined) { - resource += `/environments/${envName}`; - } - return fetchResult(resource, { - method: "DELETE", - }); + const service = await fetchResult( + `/accounts/${accountId}/workers/services/${serviceName}`, + { + method: "GET", + } + ); + + logger.info(service); + + return service.default_environment.environment; } -export async function putConsumer( +export async function deleteWorkerConsumer( config: Config, queueName: string, scriptName: string, - envName: string | undefined, - body: PutConsumerBody -): Promise { - const accountId = await requireAuth(config); - let resource = `/accounts/${accountId}/workers/queues/${queueName}/consumers/${scriptName}`; - if (envName !== undefined) { - resource += `/environments/${envName}`; - } - return fetchResult(resource, { - method: "PUT", - body: JSON.stringify(body), - }); + envName?: string +): Promise { + const queue = await getQueue(config, queueName); + const targetConsumer = await resolveWorkerConsumerByName( + config, + scriptName, + envName, + queue + ); + return deleteConsumerById(config, queue.queue_id, targetConsumer.consumer_id); } diff --git a/packages/wrangler/src/r2/helpers.ts b/packages/wrangler/src/r2/helpers.ts index 8474c869dfa..a86ff74a3ef 100644 --- a/packages/wrangler/src/r2/helpers.ts +++ b/packages/wrangler/src/r2/helpers.ts @@ -7,7 +7,7 @@ import { UserError } from "../errors"; import { logger } from "../logger"; import { getQueue } from "../queues/client"; import type { Config } from "../config"; -import type { getQueueByID } from "../queues/client"; +import type { getQueueById } from "../queues/client"; import type { ApiCredentials } from "../user"; import type { R2Bucket } from "@cloudflare/workers-types/experimental"; import type { ReplaceWorkersTypes } from "miniflare"; @@ -427,7 +427,7 @@ export async function tableFromNotificationGetResponse( response: GetNotificationConfigResponse[BucketName], // We're injecting this parameter because it makes testing easier, // relative to mocking. - queueIdentifier: typeof getQueueByID + queueIdentifier: typeof getQueueById ): Promise< { queue_name: string; diff --git a/packages/wrangler/src/r2/notification.ts b/packages/wrangler/src/r2/notification.ts index 8a7e2eee103..dc6237b171f 100644 --- a/packages/wrangler/src/r2/notification.ts +++ b/packages/wrangler/src/r2/notification.ts @@ -1,6 +1,6 @@ import { readConfig } from "../config"; import { logger } from "../logger"; -import { getQueueByID } from "../queues/client"; +import { getQueueById } from "../queues/client"; import { printWranglerBanner } from "../update-check"; import { requireApiToken, requireAuth } from "../user"; import { @@ -39,7 +39,7 @@ export async function GetHandler( const tableOutput = await tableFromNotificationGetResponse( config, resp[args.bucket], - getQueueByID + getQueueById ); logger.table(tableOutput); } diff --git a/packages/wrangler/src/triggers/deploy.ts b/packages/wrangler/src/triggers/deploy.ts index 0e9fe00f97f..b6681cc4e91 100644 --- a/packages/wrangler/src/triggers/deploy.ts +++ b/packages/wrangler/src/triggers/deploy.ts @@ -1,7 +1,6 @@ import chalk from "chalk"; import { fetchListResult, fetchResult } from "../cfetch"; import { - ensureQueuesExist, formatTime, publishCustomDomains, publishRoutes, @@ -12,6 +11,7 @@ import { } from "../deploy/deploy"; import { UserError } from "../errors"; import { logger } from "../logger"; +import { ensureQueuesExistByConfig } from "../queues/client"; import { getWorkersDevSubdomain } from "../routes"; import { getZoneForRoute } from "../zones"; import type { Config } from "../config"; @@ -79,7 +79,7 @@ export default async function triggersDeploy(props: Props): Promise { }>(`${workerUrl}/subdomain`); if (!props.dryRun) { - await ensureQueuesExist(config); + await ensureQueuesExistByConfig(config); } if (props.dryRun) { diff --git a/packages/wrangler/src/versions/upload.ts b/packages/wrangler/src/versions/upload.ts index f0a86ea5ac4..95f7ea46733 100644 --- a/packages/wrangler/src/versions/upload.ts +++ b/packages/wrangler/src/versions/upload.ts @@ -28,12 +28,11 @@ import { getMetricsUsageHeaders } from "../metrics"; import { isNavigatorDefined } from "../navigator-user-agent"; import { ParseError } from "../parse"; import { getWranglerTmpDir } from "../paths"; -import { getQueue } from "../queues/client"; +import { ensureQueuesExistByConfig } from "../queues/client"; import { getSourceMappedString, maybeRetrieveFileSourceMap, } from "../sourcemap"; -import type { FetchError } from "../cfetch"; import type { Config } from "../config"; import type { Rule } from "../config/environment"; import type { Entry } from "../deployment-bundle/entry"; @@ -430,7 +429,7 @@ See https://developers.cloudflare.com/workers/platform/compatibility-dates for m printBindings({ ...withoutStaticAssets, vars: maskedVars }); if (!props.dryRun) { - await ensureQueuesExist(config); + await ensureQueuesExistByConfig(config); // Upload the script so it has time to propagate. // We can also now tell whether available_on_subdomain is set @@ -557,31 +556,6 @@ export function isAuthenticationError(e: unknown): e is ParseError { return e instanceof ParseError && (e as { code?: number }).code === 10000; } -async function ensureQueuesExist(config: Config) { - const producers = (config.queues.producers || []).map( - (producer) => producer.queue - ); - const consumers = (config.queues.consumers || []).map( - (consumer) => consumer.queue - ); - - const queueNames = producers.concat(consumers); - for (const queue of queueNames) { - try { - await getQueue(config, queue); - } catch (err) { - const queueErr = err as FetchError; - if (queueErr.code === 11000) { - // queue_not_found - throw new UserError( - `Queue "${queue}" does not exist. To create it, run: wrangler queues create ${queue}` - ); - } - throw err; - } - } -} - async function noBundleWorker( entry: Entry, rules: Rule[],