Skip to content

Commit

Permalink
Add jwt to R2 rpc, introduce R2CrossAccount
Browse files Browse the repository at this point in the history
To allow greater flexibility in how R2 RPC operations can be performed,
introducing the jwt as optional attribute. Also refactored the way
R2 paths are built and passed to the doR2HTTP(verb)Request functions,
since we expect to support greater variety of paths.

Introduced concept of R2CrossAccount, which will leverage the new
underlying flexibility to perform cross account operations with proper
auth.
  • Loading branch information
OilyLime authored and pull[bot] committed Aug 31, 2023
1 parent 58adbd2 commit fe71ff3
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 39 deletions.
23 changes: 15 additions & 8 deletions src/workerd/api/r2-admin.c++
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@

namespace workerd::api::public_beta {
jsg::Ref<R2Bucket> R2Admin::get(jsg::Lock& js, kj::String bucketName) {
return jsg::alloc<R2Bucket>(featureFlags, subrequestChannel, kj::mv(bucketName),
R2Bucket::friend_tag_t{});
KJ_IF_MAYBE(a, adminAccount) {
auto& j = KJ_ASSERT_NONNULL(jwt, "adminAccount without corresponding jwt");
return jsg::alloc<R2Bucket>(featureFlags, subrequestChannel, kj::mv(bucketName),
kj::str(*a), kj::str(j), R2Bucket::friend_tag_t{});
}
return jsg::alloc<R2Bucket>(featureFlags, subrequestChannel, kj::mv(bucketName), R2Bucket::friend_tag_t{});
}

jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(jsg::Lock& js, kj::String name,
Expand All @@ -33,9 +37,10 @@ jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(jsg::Lock& js, kj::String name,
createBucketBuilder.setBucket(name);

auto requestJson = json.encode(requestBuilder);

kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, nullptr);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr,
kj::mv(requestJson), nullptr);
kj::mv(requestJson), path, jwt);

return context.awaitIo(kj::mv(promise),
[this, subrequestChannel = subrequestChannel, name = kj::mv(name), &errorType]
Expand Down Expand Up @@ -72,8 +77,9 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
}

auto requestJson = json.encode(requestBuilder);

auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), nullptr);
kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, nullptr);
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, jwt);

return context.awaitIo(js, kj::mv(promise),
[this, &retrievedBucketType, &errorType](jsg::Lock& js, R2Result r2Result) mutable {
Expand Down Expand Up @@ -126,9 +132,10 @@ jsg::Promise<void> R2Admin::delete_(jsg::Lock& js, kj::String name,
deleteBucketBuilder.setBucket(name);

auto requestJson = json.encode(requestBuilder);

kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, nullptr);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr,
kj::mv(requestJson), nullptr);
kj::mv(requestJson), path, jwt);

return context.awaitIo(kj::mv(promise), [&errorType](R2Result r2Result) mutable {
r2Result.throwIfError("deleteBucket", errorType);
Expand Down
24 changes: 24 additions & 0 deletions src/workerd/api/r2-admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@
#include <workerd/jsg/jsg.h>
#include <workerd/api/http.h>

namespace edgeworker::api {
class R2CrossAccount;
}

namespace workerd::api::public_beta {

class R2Admin: public jsg::Object {

struct friend_tag_t {};
// A friend tag that grants access to an internal constructor for the R2CrossAccount binding

// A capability to an R2 Admin interface.

struct FeatureFlags: public R2Bucket::FeatureFlags {
Expand All @@ -23,6 +31,18 @@ class R2Admin: public jsg::Object {
// `subrequestChannel` is what to pass to IoContext::getHttpClient() to get an HttpClient
// representing this namespace.

R2Admin(FeatureFlags featureFlags,
uint subrequestChannel,
kj::String account,
kj::String jwt,
friend_tag_t)
: featureFlags(featureFlags),
subrequestChannel(subrequestChannel),
adminAccount(kj::mv(account)),
jwt(kj::mv(jwt)) {}
// This constructor is intended to be used by the R2CrossAccount binding, which has access to the
// friend_tag

struct ListOptions {
jsg::Optional<int> limit;
jsg::Optional<kj::String> cursor;
Expand Down Expand Up @@ -80,6 +100,10 @@ class R2Admin: public jsg::Object {
private:
R2Bucket::FeatureFlags featureFlags;
uint subrequestChannel;
kj::Maybe<kj::String> adminAccount;
kj::Maybe<kj::String> jwt;

friend class edgeworker::api::R2CrossAccount;
};

#define EW_R2_PUBLIC_BETA_ADMIN_ISOLATE_TYPES \
Expand Down
48 changes: 32 additions & 16 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::head(
headBuilder.setObject(name);

auto requestJson = json.encode(requestBuilder);

auto bucket = adminBucket.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), kj::mv(bucket));
kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, adminBucket);
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, jwt);

return context.awaitIo(kj::mv(promise), [&errorType](R2Result r2Result) {
return parseObjectMetadata<HeadResult>("head", r2Result, errorType);
Expand Down Expand Up @@ -359,9 +359,9 @@ R2Bucket::get(jsg::Lock& js, kj::String name, jsg::Optional<GetOptions> options,
initGetOptions(js, getBuilder, *o);
}
auto requestJson = json.encode(requestBuilder);

auto bucket = adminBucket.map([](const auto& b) { return kj::str(b); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), kj::mv(bucket));
kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, adminBucket);
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, jwt);

return context.awaitIo(kj::mv(promise), [&context, &errorType](R2Result r2Result)
-> kj::OneOf<kj::Maybe<jsg::Ref<GetResult>>, jsg::Ref<HeadResult>> {
Expand Down Expand Up @@ -547,11 +547,12 @@ R2Bucket::put(jsg::Lock& js, kj::String name, kj::Maybe<R2PutValue> value,
}

auto requestJson = json.encode(requestBuilder);
auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });

cancelReader.cancel();
kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, adminBucket);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), kj::mv(value), nullptr,
kj::mv(requestJson), kj::mv(bucket));
kj::mv(requestJson), path, jwt);

return context.awaitIo(js, kj::mv(promise),
[sentHttpMetadata = kj::mv(sentHttpMetadata),
Expand Down Expand Up @@ -634,10 +635,10 @@ jsg::Promise<jsg::Ref<R2MultipartUpload>> R2Bucket::createMultipartUpload(jsg::L
}

auto requestJson = json.encode(requestBuilder);
auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });

kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, adminBucket);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path, jwt);

return context.awaitIo(js, kj::mv(promise),
[&errorType, key=kj::mv(key), this] (jsg::Lock& js, R2Result r2Result) mutable {
Expand Down Expand Up @@ -688,10 +689,10 @@ jsg::Promise<void> R2Bucket::delete_(jsg::Lock& js, kj::OneOf<kj::String, kj::Ar

auto requestJson = json.encode(requestBuilder);

auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });

kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, adminBucket);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path, jwt);

return context.awaitIo(js, kj::mv(promise), [&errorType](jsg::Lock& js, R2Result r) {
if (r.objectNotFound()) {
Expand Down Expand Up @@ -789,8 +790,9 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(

auto requestJson = json.encode(requestBuilder);

auto bucket = adminBucket.map([](auto&& s) { return kj::str(s); });
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), kj::mv(bucket));
kj::StringPtr components[2];
auto path = fillR2Path(components, adminAccount, adminBucket);
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, jwt);

return context.awaitIo(kj::mv(promise),
[expectedOptionalFields = expectedOptionalFields.releaseAsArray(), &errorType]
Expand Down Expand Up @@ -1089,4 +1091,18 @@ kj::Maybe<jsg::Ref<R2Bucket::HeadResult>> parseHeadResultWrapper(
return parseObjectMetadata<R2Bucket::HeadResult>(action, r2Result, errorType);
}

kj::ArrayPtr<kj::StringPtr> fillR2Path(kj::StringPtr pathStorage[2], const kj::Maybe<kj::String>& account, const kj::Maybe<kj::String>& bucket) {
int numComponents = 0;

KJ_IF_MAYBE(a, account) {
pathStorage[numComponents++] = *a;
}
KJ_IF_MAYBE(b, bucket) {
pathStorage[numComponents++] = *b;
}

return kj::arrayPtr(pathStorage, numComponents);
}


} // namespace workerd::api::public_beta
6 changes: 6 additions & 0 deletions src/workerd/api/r2-bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
namespace workerd::api::public_beta {

kj::Array<kj::byte> cloneByteArray(const kj::Array<kj::byte>& arr);
kj::ArrayPtr<kj::StringPtr> fillR2Path(kj::StringPtr pathStorage[2], const kj::Maybe<kj::String>& admin, const kj::Maybe<kj::String>& bucket);

class R2MultipartUpload;

Expand All @@ -39,6 +40,9 @@ class R2Bucket: public jsg::Object {
explicit R2Bucket(FeatureFlags featureFlags, uint clientIndex, kj::String bucket, friend_tag_t)
: featureFlags(featureFlags), clientIndex(clientIndex), adminBucket(kj::mv(bucket)) {}

explicit R2Bucket(FeatureFlags featureFlags, uint clientIndex, kj::String bucket, kj::String account, kj::String jwt, friend_tag_t)
: featureFlags(featureFlags), clientIndex(clientIndex), adminBucket(kj::mv(bucket)), adminAccount(kj::mv(account)), jwt(kj::mv(jwt)) {}

struct Range {
jsg::Optional<double> offset;
jsg::Optional<double> length;
Expand Down Expand Up @@ -403,6 +407,8 @@ class R2Bucket: public jsg::Object {
FeatureFlags featureFlags;
uint clientIndex;
kj::Maybe<kj::String> adminBucket;
kj::Maybe<kj::String> adminAccount;
kj::Maybe<kj::String> jwt;

friend class R2Admin;
friend class R2MultipartUpload;
Expand Down
16 changes: 9 additions & 7 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ jsg::Promise<R2MultipartUpload::UploadedPart> R2MultipartUpload::uploadPart(
auto requestJson = json.encode(requestBuilder);
auto bucket = this->bucket->adminBucket.map([](auto&& s) { return kj::str(s); });

kj::StringPtr components[2];
auto path = fillR2Path(components, nullptr, this->bucket->adminBucket);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), kj::mv(value), nullptr,
kj::mv(requestJson), kj::mv(bucket));
kj::mv(requestJson), path, nullptr);

return context.awaitIo(js, kj::mv(promise),
[&errorType, partNumber]
Expand Down Expand Up @@ -98,10 +100,10 @@ jsg::Promise<jsg::Ref<R2Bucket::HeadResult>> R2MultipartUpload::complete(

auto requestJson = json.encode(requestBuilder);

auto bucket = this->bucket->adminBucket.map([](auto&& s) { return kj::str(s); });

kj::StringPtr components[2];
auto path = fillR2Path(components, nullptr, this->bucket->adminBucket);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path, nullptr);

return context.awaitIo(js, kj::mv(promise),
[&errorType]
Expand Down Expand Up @@ -134,10 +136,10 @@ jsg::Promise<void> R2MultipartUpload::abort(jsg::Lock& js, const jsg::TypeHandle

auto requestJson = json.encode(requestBuilder);

auto bucket = this->bucket->adminBucket.map([](auto&& s) { return kj::str(s); });

kj::StringPtr components[2];
auto path = fillR2Path(components, nullptr, this->bucket->adminBucket);
auto promise = doR2HTTPPutRequest(js, kj::mv(client), nullptr, nullptr, kj::mv(requestJson),
kj::mv(bucket));
path, nullptr);

return context.awaitIo(js, kj::mv(promise), [&errorType](jsg::Lock& js, R2Result r) {
if (r.objectNotFound()) {
Expand Down
18 changes: 12 additions & 6 deletions src/workerd/api/r2-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,22 @@ void R2Result::throwIfError(kj::StringPtr action,
}

kj::Promise<R2Result> doR2HTTPGetRequest(kj::Own<kj::HttpClient> client,
kj::String metadataPayload, kj::Maybe<kj::String> path) {
kj::String metadataPayload, kj::ArrayPtr<kj::StringPtr> path, kj::Maybe<kj::StringPtr> jwt) {
auto& context = IoContext::current();
kj::Url url;
url.scheme = kj::str("https");
url.host = kj::str("fake-host");
KJ_IF_MAYBE(p, path) {
url.path.add(kj::mv(*p));
for (const auto &p : path) {
url.path.add(kj::str(p));
}

auto& headerIds = context.getHeaderIds();

auto requestHeaders = kj::HttpHeaders(context.getHeaderTable());
requestHeaders.set(headerIds.cfBlobRequest, kj::mv(metadataPayload));
KJ_IF_MAYBE(j, jwt) {
requestHeaders.set(headerIds.authorization, kj::str("Bearer ", *j));
}
return client->request(
kj::HttpMethod::GET, url.toString(kj::Url::Context::HTTP_PROXY_REQUEST),
requestHeaders)
Expand Down Expand Up @@ -130,16 +133,16 @@ kj::Promise<R2Result> doR2HTTPGetRequest(kj::Own<kj::HttpClient> client,

kj::Promise<R2Result> doR2HTTPPutRequest(jsg::Lock& js, kj::Own<kj::HttpClient> client,
kj::Maybe<R2PutValue> supportedBody, kj::Maybe<uint64_t> streamSize, kj::String metadataPayload,
kj::Maybe<kj::String> path) {
kj::ArrayPtr<kj::StringPtr> path, kj::Maybe<kj::StringPtr> jwt) {
// NOTE: A lot of code here is duplicated with kv.c++. Maybe it can be refactored to be more
// reusable?
auto& context = IoContext::current();
auto headers = kj::HttpHeaders(context.getHeaderTable());
kj::Url url;
url.scheme = kj::str("https");
url.host = kj::str("fake-host");
KJ_IF_MAYBE(p, path) {
url.path.add(kj::mv(*p));
for (const auto &p : path) {
url.path.add(kj::str(p));
}

kj::Maybe<uint64_t> expectedBodySize;
Expand Down Expand Up @@ -177,6 +180,9 @@ kj::Promise<R2Result> doR2HTTPPutRequest(jsg::Lock& js, kj::Own<kj::HttpClient>
}

headers.set(context.getHeaderIds().cfBlobMetadataSize, kj::str(metadataPayload.size()));
KJ_IF_MAYBE(j, jwt) {
headers.set(context.getHeaderIds().authorization, kj::str("Bearer ", *j));
}

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

Expand Down
6 changes: 4 additions & 2 deletions src/workerd/api/r2-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ struct R2Result {
kj::Promise<R2Result> doR2HTTPGetRequest(
kj::Own<kj::HttpClient> client,
kj::String metadataPayload,
kj::Maybe<kj::String> path);
kj::ArrayPtr<kj::StringPtr> path,
kj::Maybe<kj::StringPtr> jwt);

kj::Promise<R2Result> doR2HTTPPutRequest(
jsg::Lock& js,
Expand All @@ -82,6 +83,7 @@ kj::Promise<R2Result> doR2HTTPPutRequest(
kj::Maybe<uint64_t> streamSize,
// Deprecated. For internal beta API only.
kj::String metadataPayload,
kj::Maybe<kj::String> path);
kj::ArrayPtr<kj::StringPtr> path,
kj::Maybe<kj::StringPtr> jwt);

} // namespace workerd::api
1 change: 1 addition & 0 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ ThreadContext::HeaderIdBundle::HeaderIdBundle(kj::HttpHeaderTable::Builder& buil
cfR2ErrorHeader(builder.add("CF-R2-Error")),
cfBlobMetadataSize(builder.add("CF-R2-Metadata-Size")),
cfBlobRequest(builder.add("CF-R2-Request")),
authorization(builder.add("Authorization")),
secWebSocketProtocol(builder.add("Sec-WebSocket-Protocol")) {}

ThreadContext::ThreadContext(
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ThreadContext {
const kj::HttpHeaderId cfR2ErrorHeader; // used by R2 binding implementation
const kj::HttpHeaderId cfBlobMetadataSize; // used by R2 binding implementation
const kj::HttpHeaderId cfBlobRequest; // used by R2 binding implementation
const kj::HttpHeaderId authorization; // used by R2 binding implementation
const kj::HttpHeaderId secWebSocketProtocol;
};

Expand Down

0 comments on commit fe71ff3

Please sign in to comment.