Skip to content

Commit

Permalink
Add create & delete commands for R2 event notifications (#5294)
Browse files Browse the repository at this point in the history
* Add command for adding event notification configs

* Add command for deleting event-notification cfgs

* Add changeset for event notification commands

* Expose event-types instead of actions.

For beta of this feature, we'll only expose the categories of actions we want to support.

* Add test coverage for eventNotificationHeaders fn

* Simplify eventNotificationHeaders unit test

Co-authored-by: Pete Bacon Darwin <pete@bacondarwin.com>

* Fix typo in argument description

* Update event notif. request body

EWC no longer supports including `bucketName` and `queue` in the request body, since these values are already present in the URL.

---------

Co-authored-by: Pete Bacon Darwin <pete@bacondarwin.com>
  • Loading branch information
mattdeboard and petebacondarwin committed Mar 21, 2024
1 parent a9b8f4a commit bdc121d
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 6 deletions.
7 changes: 7 additions & 0 deletions .changeset/breezy-berries-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"wrangler": minor
---

Add `event-notification` commands in support of event notifications for Cloudflare R2.

Included are commands for creating and deleting event notification configurations for individual buckets.
160 changes: 155 additions & 5 deletions packages/wrangler/src/__tests__/r2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ import * as fs from "node:fs";
import { rest } from "msw";
import prettyBytes from "pretty-bytes";
import { MAX_UPLOAD_SIZE } from "../r2/constants";
import { actionsForEventCategories } from "../r2/helpers";
import { mockAccountId, mockApiToken } from "./helpers/mock-account-id";
import { mockConsoleMethods } from "./helpers/mock-console";
import { useMockIsTTY } from "./helpers/mock-istty";
import { createFetchResult, msw, mswSuccessR2handlers } from "./helpers/msw";
import { runInTempDir } from "./helpers/run-in-tmp";
import { runWrangler } from "./helpers/run-wrangler";
import type { R2BucketInfo } from "../r2/helpers";
import type {
EWCRequestBody,
R2BucketInfo,
R2EventableOperation,
R2EventType,
} from "../r2/helpers";

describe("r2", () => {
const std = mockConsoleMethods();
Expand Down Expand Up @@ -36,10 +42,11 @@ describe("r2", () => {
Manage R2 buckets
Commands:
wrangler r2 bucket create <name> Create a new R2 bucket
wrangler r2 bucket list List R2 buckets
wrangler r2 bucket delete <name> Delete an R2 bucket
wrangler r2 bucket sippy Manage Sippy incremental migration on an R2 bucket
wrangler r2 bucket create <name> Create a new R2 bucket
wrangler r2 bucket list List R2 buckets
wrangler r2 bucket delete <name> Delete an R2 bucket
wrangler r2 bucket sippy Manage Sippy incremental migration on an R2 bucket
wrangler r2 bucket event-notification Manage event notifications for an R2 bucket
Flags:
-j, --experimental-json-config Experimental: Support wrangler.json [boolean]
Expand Down Expand Up @@ -537,6 +544,149 @@ describe("r2", () => {
);
});
});

describe("event-notification", () => {
describe("create", () => {
it("follows happy path as expected", async () => {
const eventTypes: R2EventType[] = ["object_create", "object_delete"];
const actions: R2EventableOperation[] = [];
const bucketName = "my-bucket";
const queue = "deadbeef-0123-4567-8910-abcdefabcdef";

const config: EWCRequestBody = {
rules: [
{
actions: eventTypes.reduce(
(acc, et) => acc.concat(actionsForEventCategories[et]),
actions
),
},
],
};
msw.use(
rest.put(
"*/accounts/:accountId/event_notifications/r2/:bucketName/configuration/queues/:queueUUID",
async (request, response, context) => {
const { accountId } = request.params;
expect(accountId).toEqual("some-account-id");
expect(await request.json()).toEqual({
...config,
// We fill in `prefix` & `suffix` with empty strings if not
// provided
rules: [{ ...config.rules[0], prefix: "", suffix: "" }],
});
expect(request.headers.get("authorization")).toEqual(
"Bearer some-api-token"
);
return response.once(context.json(createFetchResult({})));
}
)
);
await expect(
runWrangler(
`r2 bucket event-notification create ${bucketName} --queue ${queue} --event-types ${eventTypes.join(
" "
)}`
)
).resolves.toBe(undefined);
expect(std.out).toMatchInlineSnapshot(`
"Sending this configuration to \\"my-bucket\\":
{\\"rules\\":[{\\"prefix\\":\\"\\",\\"suffix\\":\\"\\",\\"actions\\":[\\"PutObject\\",\\"CompleteMultipartUpload\\",\\"CopyObject\\",\\"DeleteObject\\",\\"LifecycleDeletion\\"]}]}
Configuration created successfully!"
`);
});

it("errors if required options are not provided", async () => {
await expect(
runWrangler(
"r2 bucket event-notification create event-notification-test-001"
)
).rejects.toMatchInlineSnapshot(
`[Error: Missing required arguments: event-types, queue]`
);
expect(std.out).toMatchInlineSnapshot(`
"
wrangler r2 bucket event-notification create <bucket>
Create new event notification configuration for an R2 bucket
Positionals:
bucket The name of the bucket for which notifications will be emitted [string] [required]
Flags:
-j, --experimental-json-config Experimental: Support wrangler.json [boolean]
-c, --config Path to .toml configuration file [string]
-e, --env Environment to use for operations and .env files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]
Options:
--event-types, --event-type Specify the kinds of object events to emit notifications for. ex. '--event-types object_create object_delete' [array] [required] [choices: \\"object_create\\", \\"object_delete\\"]
--prefix only actions on objects with this prefix will emit notifications [string]
--suffix only actions on objects with this suffix will emit notifications [string]
--queue The ID of the queue to which event notifications will be sent. ex '--queue deadbeef-0123-4567-8910-abcdefgabcde' [string] [required]"
`);
});
});

describe("delete", () => {
it("follows happy path as expected", async () => {
const bucketName = "my-bucket";
const queue = "deadbeef-0123-4567-8910-abcdefabcdef";
msw.use(
rest.delete(
"*/accounts/:accountId/event_notifications/r2/:bucketName/configuration/queues/:queueUUID",
async (request, response, context) => {
const { accountId } = request.params;
expect(accountId).toEqual("some-account-id");
expect(request.headers.get("authorization")).toEqual(
"Bearer some-api-token"
);
return response.once(context.json(createFetchResult({})));
}
)
);
await expect(
runWrangler(
`r2 bucket event-notification delete ${bucketName} --queue ${queue}`
)
).resolves.toBe(undefined);
expect(std.out).toMatchInlineSnapshot(`
"Disabling event notifications for \\"my-bucket\\" to queue deadbeef-0123-4567-8910-abcdefabcdef...
Configuration deleted successfully!"
`);
});

it("errors if required options are not provided", async () => {
await expect(
runWrangler(
"r2 bucket event-notification delete event-notification-test-001"
)
).rejects.toMatchInlineSnapshot(
`[Error: Missing required argument: queue]`
);
expect(std.out).toMatchInlineSnapshot(`
"
wrangler r2 bucket event-notification delete <bucket>
Delete event notification configuration for an R2 bucket and queue
Positionals:
bucket The name of the bucket for which notifications will be emitted [string] [required]
Flags:
-j, --experimental-json-config Experimental: Support wrangler.json [boolean]
-c, --config Path to .toml configuration file [string]
-e, --env Environment to use for operations and .env files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]
Options:
--queue The ID of the queue that is configured to receive notifications. ex '--queue deadbeef-0123-4567-8910-abcdefgabcde' [string] [required]"
`);
});
});
});
});

describe("r2 object", () => {
Expand Down
24 changes: 24 additions & 0 deletions packages/wrangler/src/__tests__/r2/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { eventNotificationHeaders } from "../../r2/helpers";
import type { ApiCredentials } from "../../user";

describe("event notifications", () => {
test("auth email eventNotificationHeaders", () => {
const creds: ApiCredentials = {
authEmail: "test@example.com",
authKey: "some-big-secret",
};
const result = eventNotificationHeaders(creds);
expect(result).toMatchObject({
"X-Auth-Key": creds.authKey,
"X-Auth-Email": creds.authEmail,
});
});

test("API token eventNotificationHeaders", () => {
const creds: ApiCredentials = { apiToken: "some-api-token" };
const result = eventNotificationHeaders(creds);
expect(result).toMatchObject({
Authorization: `Bearer ${creds.apiToken}`,
});
});
});
104 changes: 104 additions & 0 deletions packages/wrangler/src/r2/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { fetchR2Objects } from "../cfetch/internal";
import { getLocalPersistencePath } from "../dev/get-local-persistence-path";
import { buildPersistOptions } from "../dev/miniflare";
import { UserError } from "../errors";
import { logger } from "../logger";
import type { ApiCredentials } from "../user";
import type { R2Bucket } from "@cloudflare/workers-types/experimental";
import type { ReplaceWorkersTypes } from "miniflare";
import type { Readable } from "node:stream";
Expand Down Expand Up @@ -317,3 +319,105 @@ export async function putR2Sippy(
{ method: "PUT", body: JSON.stringify(params), headers }
);
}

export const R2EventableOperations = [
"PutObject",
"DeleteObject",
"CompleteMultipartUpload",
"AbortMultipartUpload",
"CopyObject",
"LifecycleDeletion",
] as const;
export type R2EventableOperation = typeof R2EventableOperations[number];

export const actionsForEventCategories: Record<
"object_create" | "object_delete",
R2EventableOperation[]
> = {
object_create: ["PutObject", "CompleteMultipartUpload", "CopyObject"],
object_delete: ["DeleteObject", "LifecycleDeletion"],
};

export type R2EventType = keyof typeof actionsForEventCategories;
// This type captures the shape of the data expected by EWC API.
export type EWCRequestBody = {
// `jurisdiction` is included here for completeness, but until Queues
// supports jurisdictions, then this command will not send anything to do
// with jurisdictions.
jurisdiction?: string;
rules: Array<{
prefix?: string;
suffix?: string;
actions: R2EventableOperation[];
}>;
};

export function eventNotificationHeaders(
apiCredentials: ApiCredentials
): HeadersInit {
const headers: HeadersInit = {
"Content-Type": "application/json",
};

if ("apiToken" in apiCredentials) {
headers["Authorization"] = `Bearer ${apiCredentials.apiToken}`;
} else {
headers["X-Auth-Key"] = apiCredentials.authKey;
headers["X-Auth-Email"] = apiCredentials.authEmail;
}
return headers;
}
/** Construct & transmit notification configuration to EWC.
*
* On success, receive HTTP 200 response with a body like:
* { event_notification_detail_id: string }
*
* Possible status codes on failure:
* - 400 Bad Request - Either:
* - Uploaded configuration is invalid
* - Communication with either R2-gateway-worker or queue-broker-worker fails
* - 409 Conflict - A configuration between the bucket and queue already exists
* */
export async function putEventNotificationConfig(
apiCredentials: ApiCredentials,
accountId: string,
bucketName: string,
queueUUID: string,
eventTypes: R2EventType[],
prefix?: string,
suffix?: string
): Promise<{ event_notification_detail_id: string }> {
const headers = eventNotificationHeaders(apiCredentials);
let actions: R2EventableOperation[] = [];

for (const et of eventTypes) {
actions = actions.concat(actionsForEventCategories[et]);
}

const body: EWCRequestBody = {
rules: [{ prefix, suffix, actions }],
};
logger.log(
`Sending this configuration to "${bucketName}":\n${JSON.stringify(body)}`
);
return await fetchResult<{ event_notification_detail_id: string }>(
`/accounts/${accountId}/event_notifications/r2/${bucketName}/configuration/queues/${queueUUID}`,
{ method: "PUT", body: JSON.stringify(body), headers }
);
}

export async function deleteEventNotificationConfig(
apiCredentials: ApiCredentials,
accountId: string,
bucketName: string,
queueUUID: string
): Promise<null> {
const headers = eventNotificationHeaders(apiCredentials);
logger.log(
`Disabling event notifications for "${bucketName}" to queue ${queueUUID}...`
);
return await fetchResult<null>(
`/accounts/${accountId}/event_notifications/r2/${bucketName}/configuration/queues/${queueUUID}`,
{ method: "DELETE", headers }
);
}

0 comments on commit bdc121d

Please sign in to comment.