Skip to content

Commit

Permalink
RPC: Rename 'Restore' to 'Bootstrap' and redefine it to have nothing …
Browse files Browse the repository at this point in the history
…to do with SturdyRefs.

The 'objectId' field is now deprecated. Long-term, each vat will export no more than one "bootstrap interface" which can be obtained via 'Bootstrap'. Restoring SturdyRefs will be accomplished through higher-level interfaces specific to the VatNetwork in use.

See comments for 'Bootstrap' in rpc.capnp for more discussion.
  • Loading branch information
kentonv committed Nov 4, 2014
1 parent 46c48df commit 9a32fa7
Show file tree
Hide file tree
Showing 20 changed files with 908 additions and 1,162 deletions.
2 changes: 1 addition & 1 deletion c++/samples/calculator-client.c++
Expand Up @@ -48,7 +48,7 @@ int main(int argc, const char* argv[]) {
}

capnp::EzRpcClient client(argv[1]);
Calculator::Client calculator = client.importCap<Calculator>("calculator");
Calculator::Client calculator = client.getMain<Calculator>();

// Keep an eye on `waitScope`. Whenever you see it used is a place where we
// stop and wait for the server to respond. If a line of code does not use
Expand Down
3 changes: 1 addition & 2 deletions c++/samples/calculator-server.c++
Expand Up @@ -197,8 +197,7 @@ int main(int argc, const char* argv[]) {
}

// Set up a server.
capnp::EzRpcServer server(argv[1]);
server.exportCap("calculator", kj::heap<CalculatorImpl>());
capnp::EzRpcServer server(kj::heap<CalculatorImpl>(), argv[1]);

// Write the port number to stdout, in case it was chosen automatically.
auto& waitScope = server.getWaitScope();
Expand Down
17 changes: 17 additions & 0 deletions c++/src/capnp/ez-rpc-test.c++
Expand Up @@ -28,6 +28,23 @@ namespace _ {
namespace {

TEST(EzRpc, Basic) {
int callCount = 0;
EzRpcServer server(kj::heap<TestInterfaceImpl>(callCount), "localhost");

EzRpcClient client("localhost", server.getPort().wait(server.getWaitScope()));

auto cap = client.getMain<test::TestInterface>();
auto request = cap.fooRequest();
request.setI(123);
request.setJ(true);

EXPECT_EQ(0, callCount);
auto response = request.send().wait(server.getWaitScope());
EXPECT_EQ("foo", response.getX());
EXPECT_EQ(1, callCount);
}

TEST(EzRpc, DeprecatedNames) {
EzRpcServer server("localhost");
int callCount = 0;
server.exportCap("cap1", kj::heap<TestInterfaceImpl>(callCount));
Expand Down
98 changes: 75 additions & 23 deletions c++/src/capnp/ez-rpc.c++
Expand Up @@ -78,22 +78,37 @@ struct EzRpcClient::Impl {
struct ClientContext {
kj::Own<kj::AsyncIoStream> stream;
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::SturdyRefHostId> rpcSystem;
RpcSystem<rpc::twoparty::VatId> rpcSystem;

ClientContext(kj::Own<kj::AsyncIoStream>&& stream, ReaderOptions readerOpts)
: stream(kj::mv(stream)),
network(*this->stream, rpc::twoparty::Side::CLIENT, readerOpts),
rpcSystem(makeRpcClient(network)) {}

Capability::Client getMain() {
word scratch[4];
memset(scratch, 0, sizeof(scratch));
MallocMessageBuilder message(scratch);
auto hostId = message.getRoot<rpc::twoparty::VatId>();
hostId.setSide(rpc::twoparty::Side::SERVER);
return rpcSystem.bootstrap(hostId);
}

Capability::Client restore(kj::StringPtr name) {
word scratch[64];
memset(scratch, 0, sizeof(scratch));
MallocMessageBuilder message(scratch);
auto root = message.getRoot<rpc::SturdyRef>();
auto hostId = root.getHostId().getAs<rpc::twoparty::SturdyRefHostId>();

auto hostIdOrphan = message.getOrphanage().newOrphan<rpc::twoparty::VatId>();
auto hostId = hostIdOrphan.get();
hostId.setSide(rpc::twoparty::Side::SERVER);
root.getObjectId().setAs<Text>(name);
return rpcSystem.restore(hostId, root.getObjectId());

auto objectId = message.getRoot<AnyPointer>();
objectId.setAs<Text>(name);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
return rpcSystem.restore(hostId, objectId);
#pragma GCC diagnostic pop
}
};

Expand Down Expand Up @@ -143,6 +158,16 @@ EzRpcClient::EzRpcClient(int socketFd, ReaderOptions readerOpts)

EzRpcClient::~EzRpcClient() noexcept(false) {}

Capability::Client EzRpcClient::getMain() {
KJ_IF_MAYBE(client, impl->clientContext) {
return client->get()->getMain();
} else {
return impl->setupPromise.addBranch().then([this]() {
return KJ_ASSERT_NONNULL(impl->clientContext)->getMain();
});
}
}

Capability::Client EzRpcClient::importCap(kj::StringPtr name) {
KJ_IF_MAYBE(client, impl->clientContext) {
return client->get()->restore(name);
Expand All @@ -168,7 +193,9 @@ kj::LowLevelAsyncIoProvider& EzRpcClient::getLowLevelIoProvider() {

// =======================================================================================

struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskSet::ErrorHandler {
struct EzRpcServer::Impl final: public SturdyRefRestorer<AnyPointer>,
public kj::TaskSet::ErrorHandler {
Capability::Client mainInterface;
kj::Own<EzRpcContext> context;

struct ExportedCap {
Expand All @@ -195,17 +222,22 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
struct ServerContext {
kj::Own<kj::AsyncIoStream> stream;
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::SturdyRefHostId> rpcSystem;
RpcSystem<rpc::twoparty::VatId> rpcSystem;

ServerContext(kj::Own<kj::AsyncIoStream>&& stream, SturdyRefRestorer<Text>& restorer,
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
ServerContext(kj::Own<kj::AsyncIoStream>&& stream, SturdyRefRestorer<AnyPointer>& restorer,
ReaderOptions readerOpts)
: stream(kj::mv(stream)),
network(*this->stream, rpc::twoparty::Side::SERVER, readerOpts),
rpcSystem(makeRpcServer(network, restorer)) {}
#pragma GCC diagnostic pop
};

Impl(kj::StringPtr bindAddress, uint defaultPort, ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
Impl(Capability::Client mainInterface, kj::StringPtr bindAddress, uint defaultPort,
ReaderOptions readerOpts)
: mainInterface(kj::mv(mainInterface)),
context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
auto paf = kj::newPromiseAndFulfiller<uint>();
portPromise = paf.promise.fork();

Expand All @@ -219,16 +251,19 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
})));
}

Impl(struct sockaddr* bindAddress, uint addrSize, ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
Impl(Capability::Client mainInterface, struct sockaddr* bindAddress, uint addrSize,
ReaderOptions readerOpts)
: mainInterface(kj::mv(mainInterface)),
context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
auto listener = context->getIoProvider().getNetwork()
.getSockaddr(bindAddress, addrSize)->listen();
portPromise = kj::Promise<uint>(listener->getPort()).fork();
acceptLoop(kj::mv(listener), readerOpts);
}

Impl(int socketFd, uint port, ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()),
Impl(Capability::Client mainInterface, int socketFd, uint port, ReaderOptions readerOpts)
: mainInterface(kj::mv(mainInterface)),
context(EzRpcContext::getThreadLocal()),
portPromise(kj::Promise<uint>(port).fork()),
tasks(*this) {
acceptLoop(context->getLowLevelIoProvider().wrapListenSocketFd(socketFd), readerOpts);
Expand All @@ -249,13 +284,18 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
})));
}

Capability::Client restore(Text::Reader name) override {
auto iter = exportMap.find(name);
if (iter == exportMap.end()) {
KJ_FAIL_REQUIRE("Server exports no such capability.", name) { break; }
return nullptr;
Capability::Client restore(AnyPointer::Reader objectId) override {
if (objectId.isNull()) {
return mainInterface;
} else {
return iter->second.cap;
auto name = objectId.getAs<Text>();
auto iter = exportMap.find(name);
if (iter == exportMap.end()) {
KJ_FAIL_REQUIRE("Server exports no such capability.", name) { break; }
return nullptr;
} else {
return iter->second.cap;
}
}
}

Expand All @@ -264,16 +304,28 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
}
};

EzRpcServer::EzRpcServer(Capability::Client mainInterface, kj::StringPtr bindAddress,
uint defaultPort, ReaderOptions readerOpts)
: impl(kj::heap<Impl>(kj::mv(mainInterface), bindAddress, defaultPort, readerOpts)) {}

EzRpcServer::EzRpcServer(Capability::Client mainInterface, struct sockaddr* bindAddress,
uint addrSize, ReaderOptions readerOpts)
: impl(kj::heap<Impl>(kj::mv(mainInterface), bindAddress, addrSize, readerOpts)) {}

EzRpcServer::EzRpcServer(Capability::Client mainInterface, int socketFd, uint port,
ReaderOptions readerOpts)
: impl(kj::heap<Impl>(kj::mv(mainInterface), socketFd, port, readerOpts)) {}

EzRpcServer::EzRpcServer(kj::StringPtr bindAddress, uint defaultPort,
ReaderOptions readerOpts)
: impl(kj::heap<Impl>(bindAddress, defaultPort, readerOpts)) {}
: EzRpcServer(nullptr, bindAddress, defaultPort, readerOpts) {}

EzRpcServer::EzRpcServer(struct sockaddr* bindAddress, uint addrSize,
ReaderOptions readerOpts)
: impl(kj::heap<Impl>(bindAddress, addrSize, readerOpts)) {}
: EzRpcServer(nullptr, bindAddress, addrSize, readerOpts) {}

EzRpcServer::EzRpcServer(int socketFd, uint port, ReaderOptions readerOpts)
: impl(kj::heap<Impl>(socketFd, port, readerOpts)) {}
: EzRpcServer(nullptr, socketFd, port, readerOpts) {}

EzRpcServer::~EzRpcServer() noexcept(false) {}

Expand Down
49 changes: 35 additions & 14 deletions c++/src/capnp/ez-rpc.h
Expand Up @@ -44,7 +44,7 @@ class EzRpcClient {
// // C++ client
// int main() {
// capnp::EzRpcClient client("localhost:3456");
// Adder::Client adder = client.importCap<Adder>("adder");
// Adder::Client adder = client.getMain<Adder>();
// auto request = adder.addRequest();
// request.setLeft(12);
// request.setRight(34);
Expand All @@ -64,8 +64,7 @@ class EzRpcClient {
// };
//
// int main() {
// capnp::EzRpcServer server("*:3456");
// server.exportCap("adder", kj::heap<AdderImpl>());
// capnp::EzRpcServer server(kj::heap<AdderImpl>(), "*:3456");
// kj::NEVER_DONE.wait(server.getWaitScope());
// }
//
Expand Down Expand Up @@ -110,11 +109,6 @@ class EzRpcClient {
// your protocol to send large data blobs in multiple small chunks -- this is much better for
// both security and performance. See `ReaderOptions` in `message.h` for more details.

// You should only need to set this if you are receiving errors about messages being too large or
// too deep in normal operation, and should consider restructuring your protocol to use simpler
// or smaller messages if this is an issue for you.


EzRpcClient(const struct sockaddr* serverAddress, uint addrSize,
ReaderOptions readerOpts = ReaderOptions());
// Like the above constructor, but connects to an already-resolved socket address. Any address
Expand All @@ -127,10 +121,22 @@ class EzRpcClient {
~EzRpcClient() noexcept(false);

template <typename Type>
typename Type::Client importCap(kj::StringPtr name);
Capability::Client importCap(kj::StringPtr name);
typename Type::Client getMain();
Capability::Client getMain();
// Get the server's main (aka "bootstrap") interface.

template <typename Type>
typename Type::Client importCap(kj::StringPtr name)
KJ_DEPRECATED("Change your server to export a main interface, then use getMain() instead.");
Capability::Client importCap(kj::StringPtr name)
KJ_DEPRECATED("Change your server to export a main interface, then use getMain() instead.");
// ** DEPRECATED **
//
// Ask the sever for the capability with the given name. You may specify a type to automatically
// down-cast to that type. It is up to you to specify the correct expected type.
//
// Named interfaces are deprecated. The new preferred usage pattern is for the server to export
// a "main" interface which itself has methods for getting any other interfaces.

kj::WaitScope& getWaitScope();
// Get the `WaitScope` for the client's `EventLoop`, which allows you to synchronously wait on
Expand All @@ -153,8 +159,8 @@ class EzRpcServer {
// The server counterpart to `EzRpcClient`. See `EzRpcClient` for an example.

public:
explicit EzRpcServer(kj::StringPtr bindAddress, uint defaultPort = 0,
ReaderOptions readerOpts = ReaderOptions());
explicit EzRpcServer(Capability::Client mainInterface, kj::StringPtr bindAddress,
uint defaultPort = 0, ReaderOptions readerOpts = ReaderOptions());
// Construct a new `EzRpcServer` that binds to the given address. An address of "*" means to
// bind to all local addresses.
//
Expand All @@ -175,16 +181,26 @@ class EzRpcServer {
// your protocol to send large data blobs in multiple small chunks -- this is much better for
// both security and performance. See `ReaderOptions` in `message.h` for more details.

EzRpcServer(struct sockaddr* bindAddress, uint addrSize,
EzRpcServer(Capability::Client mainInterface, struct sockaddr* bindAddress, uint addrSize,
ReaderOptions readerOpts = ReaderOptions());
// Like the above constructor, but binds to an already-resolved socket address. Any address
// format supported by `kj::Network` in `kj/async-io.h` is accepted.

EzRpcServer(int socketFd, uint port, ReaderOptions readerOpts = ReaderOptions());
EzRpcServer(Capability::Client mainInterface, int socketFd, uint port,
ReaderOptions readerOpts = ReaderOptions());
// Create a server on top of an already-listening socket (i.e. one on which accept() may be
// called). `port` is returned by `getPort()` -- it serves no other purpose.
// `readerOpts` acts as in the other two above constructors.

explicit EzRpcServer(kj::StringPtr bindAddress, uint defaultPort = 0,
ReaderOptions readerOpts = ReaderOptions())
KJ_DEPRECATED("Please specify a main interface for your server.");
EzRpcServer(struct sockaddr* bindAddress, uint addrSize,
ReaderOptions readerOpts = ReaderOptions())
KJ_DEPRECATED("Please specify a main interface for your server.");
EzRpcServer(int socketFd, uint port, ReaderOptions readerOpts = ReaderOptions())
KJ_DEPRECATED("Please specify a main interface for your server.");

~EzRpcServer() noexcept(false);

void exportCap(kj::StringPtr name, Capability::Client cap);
Expand Down Expand Up @@ -219,6 +235,11 @@ class EzRpcServer {
// =======================================================================================
// inline implementation details

template <typename Type>
inline typename Type::Client EzRpcClient::getMain() {
return getMain().castAs<Type>();
}

template <typename Type>
inline typename Type::Client EzRpcClient::importCap(kj::StringPtr name) {
return importCap(name).castAs<Type>();
Expand Down
15 changes: 13 additions & 2 deletions c++/src/capnp/layout.c++
Expand Up @@ -2156,11 +2156,21 @@ bool PointerBuilder::isNull() {
}

void PointerBuilder::transferFrom(PointerBuilder other) {
if (!pointer->isNull()) {
WireHelpers::zeroObject(segment, pointer);
memset(pointer, 0, sizeof(*pointer));
}
WireHelpers::transferPointer(segment, pointer, other.segment, other.pointer);
}

void PointerBuilder::copyFrom(PointerReader other) {
WireHelpers::copyPointer(segment, pointer, other.segment, other.pointer, other.nestingLimit);
if (!pointer->isNull()) {
WireHelpers::zeroObject(segment, pointer);
memset(pointer, 0, sizeof(*pointer));
}
if (other.pointer != nullptr) {
WireHelpers::copyPointer(segment, pointer, other.segment, other.pointer, other.nestingLimit);
}
}

PointerReader PointerBuilder::asReader() const {
Expand Down Expand Up @@ -2220,7 +2230,8 @@ const word* PointerReader::getUnchecked() const {
}

MessageSizeCounts PointerReader::targetSize() const {
return WireHelpers::totalSize(segment, pointer, nestingLimit);
return pointer == nullptr ? MessageSizeCounts { 0 * WORDS, 0 }
: WireHelpers::totalSize(segment, pointer, nestingLimit);
}

bool PointerReader::isNull() const {
Expand Down
14 changes: 6 additions & 8 deletions c++/src/capnp/rpc-prelude.h
Expand Up @@ -53,13 +53,9 @@ class VatNetworkBase {
public:
virtual kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) = 0;
virtual kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() = 0;
virtual void baseIntroduceTo(Connection& recipient,
AnyPointer::Builder sendToRecipient, AnyPointer::Builder sendToTarget) = 0;
virtual ConnectionAndProvisionId baseConnectToIntroduced(AnyPointer::Reader capId) = 0;
virtual kj::Own<Connection> baseAcceptIntroducedConnection(AnyPointer::Reader recipientId) = 0;
};
virtual kj::Maybe<kj::Own<Connection>> baseConnectToRefHost(_::StructReader hostId) = 0;
virtual kj::Promise<kj::Own<Connection>> baseAcceptConnectionAsRefHost() = 0;
virtual kj::Maybe<kj::Own<Connection>> baseConnect(_::StructReader vatId) = 0;
virtual kj::Promise<kj::Own<Connection>> baseAccept() = 0;
};

class SturdyRefRestorerBase {
Expand All @@ -69,15 +65,17 @@ class SturdyRefRestorerBase {

class RpcSystemBase {
public:
RpcSystemBase(VatNetworkBase& network, kj::Maybe<SturdyRefRestorerBase&> restorer);
RpcSystemBase(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface);
RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer);
RpcSystemBase(RpcSystemBase&& other) noexcept;
~RpcSystemBase() noexcept(false);

private:
class Impl;
kj::Own<Impl> impl;

Capability::Client baseRestore(_::StructReader hostId, AnyPointer::Reader objectId);
Capability::Client baseBootstrap(_::StructReader vatId);
Capability::Client baseRestore(_::StructReader vatId, AnyPointer::Reader objectId);
// TODO(someday): Maybe define a public API called `TypelessStruct` so we don't have to rely
// on `_::StructReader` here?

Expand Down

0 comments on commit 9a32fa7

Please sign in to comment.