Skip to content

Commit

Permalink
wrangler support Queues http-pull consumer config and cli (#5231)
Browse files Browse the repository at this point in the history
* MQ-307 Add typed consumer types and client api

* MQ-307 Update queue consumer config to allow type, support http pull consumers

* MQ-307 Make queue consumer type optional in config

* MQ-307 Update for PR feedback

* MQ-307 Add changeset

* MQ-307 Update changeset to be minor bump

* MQ-307 PR feedback: wrangler deploy updates an existing http-pull consumer

* Use new Queues config endpoints for pull consumers

* Add retry delay option for Queues http pull consumers

* Fix config api field

* Retry delay is always in seconds

* Update snapshot
  • Loading branch information
w-kuhn committed Mar 21, 2024
1 parent 248a318 commit e88ad44
Show file tree
Hide file tree
Showing 15 changed files with 726 additions and 48 deletions.
7 changes: 7 additions & 0 deletions .changeset/fuzzy-mangos-sparkle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"wrangler": minor
---

feature: Add support for configuring HTTP Pull consumers for Queues

HTTP Pull consumers can be used to pull messages from queues via https request.
140 changes: 137 additions & 3 deletions packages/wrangler/src/__tests__/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import writeWranglerToml from "./helpers/write-wrangler-toml";
import type { Config } from "../config";
import type { CustomDomain, CustomDomainChangeset } from "../deploy/deploy";
import type { KVNamespaceInfo } from "../kv/helpers";
import type { PutConsumerBody } from "../queues/client";
import type { PostTypedConsumerBody, PutConsumerBody } from "../queues/client";
import type { RestRequest } from "msw";

describe("deploy", () => {
Expand Down Expand Up @@ -8643,6 +8643,112 @@ export default{
`);
});

it("should post queue http consumers on deploy", async () => {
writeWranglerToml({
queues: {
consumers: [
{
queue: "queue1",
type: "http_pull",
dead_letter_queue: "myDLQ",
max_batch_size: 5,
visibility_timeout_ms: 4000,
max_retries: 10,
retry_delay: 1,
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
mockGetQueue("queue1", "queue1-queue-id");
mockPostQueueHTTPConsumer("queue1-queue-id", {
type: "http_pull",
dead_letter_queue: "myDLQ",
settings: {
batch_size: 5,
max_retries: 10,
visibility_timeout_ms: 4000,
retry_delay: 1,
},
});
await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Uploaded test-name (TIMINGS)
Published test-name (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Consumer for queue1
Current Deployment ID: Galaxy-Class"
`);
});

it("should update queue http consumers when one already exists for queue", async () => {
writeWranglerToml({
queues: {
consumers: [
{
queue: "queue1",
type: "http_pull",
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
msw.use(
rest.get(
`*/accounts/:accountId/workers/queues/queue1`,
(req, res, ctx) => {
expect(req.params.accountId).toEqual("some-account-id");
return res(
ctx.json({
success: true,
errors: [],
messages: [],
result: {
queue: "queue1",
queue_id: "queue1-queue-id",
consumers: [
{ type: "http_pull", consumer_id: "queue1-consumer-id" },
],
},
})
);
}
)
);
msw.use(
rest.put(
`*/accounts/:accountId/queues/:queueId/consumers/:consumerId`,
async (req, res, ctx) => {
expect(req.params.queueId).toEqual("queue1-queue-id");
expect(req.params.consumerId).toEqual("queue1-consumer-id");
expect(req.params.accountId).toEqual("some-account-id");
return res(
ctx.json({
success: true,
errors: [],
messages: [],
result: null,
})
);
}
)
);
await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Uploaded test-name (TIMINGS)
Published test-name (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Consumer for queue1
Current Deployment ID: Galaxy-Class"
`);
});

it("should support queue consumer concurrency with a max concurrency specified", async () => {
writeWranglerToml({
queues: {
Expand Down Expand Up @@ -9541,7 +9647,7 @@ function mockServiceScriptData(options: {
}
}

function mockGetQueue(expectedQueueName: string) {
function mockGetQueue(expectedQueueName: string, expectedQueueId?: string) {
const requests = { count: 0 };
msw.use(
rest.get(
Expand All @@ -9554,7 +9660,7 @@ function mockGetQueue(expectedQueueName: string) {
success: true,
errors: [],
messages: [],
result: { queue: expectedQueueName },
result: { queue: expectedQueueName, queue_id: expectedQueueId },
})
);
}
Expand Down Expand Up @@ -9619,6 +9725,34 @@ function mockPutQueueConsumer(
return requests;
}

function mockPostQueueHTTPConsumer(
expectedQueueId: string,
expectedBody: PostTypedConsumerBody
) {
const requests = { count: 0 };
msw.use(
rest.post(
`*/accounts/:accountId/queues/:queueId/consumers`,
async (req, res, ctx) => {
const body = await req.json();
expect(req.params.queueId).toEqual(expectedQueueId);
expect(req.params.accountId).toEqual("some-account-id");
expect(body).toEqual(expectedBody);
requests.count += 1;
return res(
ctx.json({
success: true,
errors: [],
messages: [],
result: {},
})
);
}
)
);
return requests;
}

// MSW FormData & Blob polyfills to test FormData requests
function mockFormDataToString(this: FormData) {
const entries = [];
Expand Down

0 comments on commit e88ad44

Please sign in to comment.