Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime stream #1460

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d922e97
Add realtime streams
JonasVautherin May 15, 2023
8d34c6c
capnp/rpc.{h, c++}: improve documentation
JonasVautherin Feb 21, 2024
ef1eb0b
Assign id first, increment highCounter next
JonasVautherin Feb 21, 2024
bc78833
Remove lowBit() and isLow() helpers
JonasVautherin Feb 21, 2024
c3caec1
capnp/c++.capnp: add docs to realtime annotation
JonasVautherin Feb 21, 2024
20ae19e
capnp/rpc.c++: pass realtime promise to tasks instead of detaching an…
JonasVautherin Feb 22, 2024
f1f1f5f
capnp/rpc.c++: remove isTailCall from sendRealtimeInternal as it is a…
JonasVautherin Feb 22, 2024
a0ec5bc
capnp/rpc.c++: remove isTailCall from sendStreamingInternal as it is …
JonasVautherin Feb 22, 2024
01547bd
capnp/rpc.{h, c++}: add sendRealtime implementation to flow controllers
JonasVautherin Feb 22, 2024
604b9f0
capnp/rpc.c++: do NOT allow capabilities in realtime streams
JonasVautherin Mar 1, 2024
cd953b1
capnp/rpc.c++: only send Finish if noFinishNeeded is NOT set. Also,
JonasVautherin Mar 1, 2024
e0187dc
Add tests for realtime streaming
JonasVautherin Mar 5, 2024
b7055ee
Trying to redirect send() to sendStreaming() when it is a realtime st…
JonasVautherin Mar 2, 2024
bde8e84
Move realtime annotation from c++.capnp to stream.capnp
JonasVautherin Mar 6, 2024
d4612cc
Test that realtime streams throws instead of sending capabilities
JonasVautherin Mar 6, 2024
30f7151
Remove apparently superfluous include
JonasVautherin Mar 6, 2024
8370ea1
Remove diffs that are unrelated to my change
JonasVautherin Mar 6, 2024
e8c7e32
Trying to fix failing unit test
JonasVautherin May 22, 2024
6628bd8
Fix segmentation fault by attaching context
JonasVautherin May 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions c++/src/capnp/capability.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ class Capability::Client {
// `sendForPipeline()` method. The effect of setting `onlyPromisePipeline = true` when invoking
// `ClientHook::newCall()` is unspecified; it might cause the returned `Request` to support
// only pipelining even when `send()` is called, or it might not.

bool isRealtime = false;
// Hints that the call is a "realtime" call, meaning that it does not expect a response (i.e.
// a `Return` message). As far as the caller is concerned, the message may be dropped.
};

Request<AnyPointer, AnyPointer> typelessRequest(
Expand Down
7 changes: 6 additions & 1 deletion c++/src/capnp/compiler/capnpc-c++.c++
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ namespace {
static constexpr uint64_t NAMESPACE_ANNOTATION_ID = 0xb9c6f99ebf805f2cull;
static constexpr uint64_t NAME_ANNOTATION_ID = 0xf264a779fef191ceull;
static constexpr uint64_t ALLOW_CANCELLATION_ANNOTATION_ID = 0xac7096ff8cfc9dceull;
static constexpr uint64_t REALTIME_ANNOTATION_ID = 0xbbedf146e910b948ull;

bool hasDiscriminantValue(const schema::Field::Reader& reader) {
return reader.getDiscriminantValue() != schema::Field::NO_DISCRIMINANT;
Expand Down Expand Up @@ -2165,6 +2166,10 @@ private:
auto resultProto = resultSchema.getProto();

bool isStreaming = method.isStreaming();
bool isRealtime = false;
if (annotationValue(proto, REALTIME_ANNOTATION_ID) != nullptr) {
isRealtime = true;
}

auto implicitParamsReader = proto.getImplicitParameters();
auto implicitParamsBuilder = kj::heapArrayBuilder<CppTypeName>(implicitParamsReader.size());
Expand Down Expand Up @@ -2254,7 +2259,7 @@ private:
isStreaming
? kj::strTree(" return newStreamingCall<", paramType, ">(\n")
: kj::strTree(" return newCall<", paramType, ", ", resultType, ">(\n"),
" 0x", interfaceIdHex, "ull, ", methodId, ", sizeHint, {", noPromisePipelining, "});\n"
" 0x", interfaceIdHex, "ull, ", methodId, ", sizeHint, {", noPromisePipelining, ", false, ", isRealtime, "});\n"
"}\n");

bool allowCancellation = false;
Expand Down
2 changes: 1 addition & 1 deletion c++/src/capnp/persistent.capnp.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ template <typename SturdyRef, typename Owner>
CAPNP_AUTO_IF_MSVC(::capnp::Request<typename ::capnp::Persistent<SturdyRef, Owner>::SaveParams, typename ::capnp::Persistent<SturdyRef, Owner>::SaveResults>)
Persistent<SturdyRef, Owner>::Client::saveRequest(::kj::Maybe< ::capnp::MessageSize> sizeHint) {
return newCall<typename ::capnp::Persistent<SturdyRef, Owner>::SaveParams, typename ::capnp::Persistent<SturdyRef, Owner>::SaveResults>(
0xc8cb212fcd9f5691ull, 0, sizeHint, {false});
0xc8cb212fcd9f5691ull, 0, sizeHint, {false, false, false});
}
template <typename SturdyRef, typename Owner>
::kj::Promise<void> Persistent<SturdyRef, Owner>::Server::save(SaveContext) {
Expand Down
1 change: 1 addition & 0 deletions c++/src/capnp/rpc-prelude.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class RpcSystemBase {
~RpcSystemBase() noexcept(false);

void setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func);
int countQuestionsForTest();

kj::Promise<void> run();

Expand Down
4 changes: 4 additions & 0 deletions c++/src/capnp/rpc-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ public:
}));
}

void sendRealtime() override {
send();
}

size_t sizeInWords() override {
return message.sizeInWords();
}
Expand Down
200 changes: 200 additions & 0 deletions c++/src/capnp/rpc-twoparty-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,206 @@ KJ_TEST("Streaming over RPC then unwrap with CapabilitySet") {
promise.wait(waitScope);
}

KJ_TEST("Realtime streaming goes through") {
kj::EventLoop loop;
kj::WaitScope waitScope(loop);

auto pipe = kj::newTwoWayPipe();

auto ownServer = kj::heap<TestRealtimeStreamingImpl>();
test::TestRealtimeStreaming::Client serverCap(kj::mv(ownServer));

TwoPartyClient tpClient(*pipe.ends[0]);
TwoPartyClient tpServer(*pipe.ends[1], serverCap, rpc::twoparty::Side::SERVER);

auto cap = tpClient.bootstrap().castAs<test::TestRealtimeStreaming>();

// Send a few realtime requests
kj::Promise<void> promise = kj::READY_NOW;
for (uint i = 0; i < 5; i++) {
auto req = cap.doRealtimeStreamRequest();
req.setJ(i + 1);
promise = req.send();
KJ_ASSERT(promise.poll(waitScope));
}

// Send finishStream request
auto finishReq = cap.finishStreamRequest();
auto result = finishReq.send().wait(waitScope);
KJ_ASSERT(result.getTotalJ() == 15);
}

KJ_TEST("Realtime streaming throws instead of sending capabilities") {
kj::EventLoop loop;
kj::WaitScope waitScope(loop);

auto pipe = kj::newTwoWayPipe();

auto ownServer = kj::heap<TestRealtimeStreamingImpl>();
test::TestRealtimeStreaming::Client serverCap(kj::mv(ownServer));

TwoPartyClient tpClient(*pipe.ends[0]);
TwoPartyClient tpServer(*pipe.ends[1], serverCap, rpc::twoparty::Side::SERVER);

auto cap = tpClient.bootstrap().castAs<test::TestRealtimeStreaming>();

// Try to stream a capability and check that it throws
auto req = cap.doFailRealtimeStreamRequest();
req.setCap(cap);

kj::Maybe<kj::Exception> maybeException = kj::runCatchingExceptions([&]() {
auto ignored = req.send();
});

KJ_IF_MAYBE(e, maybeException) {
KJ_EXPECT(e->getType() == kj::Exception::Type::FAILED);
} else {
KJ_FAIL_EXPECT("should have thrown");
}
}

KJ_TEST("Realtime streaming is stopped by flow control") {
// This tests that realtime streaming is stopped by the flow controller when
// congestion is detected. Realtime messages are dropped in that case.

kj::EventLoop loop;
kj::WaitScope waitScope(loop);

auto pipe = kj::newTwoWayPipe();

size_t window = 1024;
size_t clientWritten = 0;
size_t serverWritten = 0;

pipe.ends[0] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[0]), window, clientWritten);
pipe.ends[1] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[1]), window, serverWritten);

auto ownServer = kj::heap<TestRealtimeStreamingImpl>();
auto& server = *ownServer;
test::TestRealtimeStreaming::Client serverCap(kj::mv(ownServer));

TwoPartyClient tpClient(*pipe.ends[0]);
TwoPartyClient tpServer(*pipe.ends[1], serverCap, rpc::twoparty::Side::SERVER);

auto cap = tpClient.bootstrap().castAs<test::TestRealtimeStreaming>();

// Send normal streaming requests until flow control kicks in
kj::Promise<void> promise = kj::READY_NOW;
uint count = 0;
while (promise.poll(waitScope)) {
promise.wait(waitScope);

auto req = cap.doStreamRequest();
req.setI(++count);
promise = req.send();
}

// Send a realtime request and check that its promise hangs
auto req = cap.doRealtimeStreamRequest();
req.setJ(100);
kj::Promise<void> realtimePromise = req.send();
KJ_ASSERT(!realtimePromise.poll(waitScope));

// Cause the last stream to finish on the server side, unlocking the flow
server.fulfillLast();
realtimePromise.wait(waitScope);

// Check that a realtime request now goes through
req = cap.doRealtimeStreamRequest();
req.setJ(3);
realtimePromise = req.send();
KJ_ASSERT(realtimePromise.poll(waitScope));

// Fulfill the remaining requests
server.setAutoFulfill(true);

// Send finishStream request
auto finishReq = cap.finishStreamRequest();
auto result = finishReq.send().wait(waitScope);

// Make sure that only the second realtime stream was sent (the first one
// should be dropped because of the congestion)
KJ_ASSERT(result.getTotalJ() == 3);
}

rpc::twoparty::VatId::Builder generateServerVatId() {
capnp::word scratch[4];
memset(&scratch, 0, sizeof(scratch));
capnp::MallocMessageBuilder message(scratch);
auto vatId = message.getRoot<rpc::twoparty::VatId>();
vatId.setSide(rpc::twoparty::Side::SERVER);

return vatId;
}

KJ_TEST("Realtime streaming does not leak question IDs when proxied") {
// Realtime calls are marked with an "isRealtime" hint. But if the capability
// is proxied, this information is lost and startCall() may end up just
// calling the normal "send()", which will leak question IDs. The
// RPC implementation needs to deal with that properly. This test checks that
// the question IDs are not being leaked in this situation.

kj::EventLoop loop;
kj::WaitScope waitScope(loop);

// Set up two two-party RPC connections in series. The middle node just proxies requests through.
auto frontPipe = kj::newTwoWayPipe();
auto backPipe = kj::newTwoWayPipe();

// Prepare the server S
auto ownServer = kj::heap<TestRealtimeStreamingImpl>();
test::TestRealtimeStreaming::Client serverCap(kj::mv(ownServer));
TwoPartyClient tpServer(*backPipe.ends[1], serverCap, rpc::twoparty::Side::SERVER);

// Prepare the internal client iC that connects to S
TwoPartyVatNetwork internalClientNetwork(*backPipe.ends[0], rpc::twoparty::Side::CLIENT);
auto rpcInternalClient = makeRpcClient(internalClientNetwork);
auto internalClient = rpcInternalClient.bootstrap(generateServerVatId());
auto internalCap = internalClient.castAs<test::TestRealtimeStreaming>();

// Prepare the proxy P
auto ownProxy = kj::heap<TestCapabilityProxyImpl>();
auto& proxy = *ownProxy;
test::TestCapabilityProxy::Client proxyCap(kj::mv(ownProxy));
TwoPartyClient tpProxy(*frontPipe.ends[1], proxyCap, rpc::twoparty::Side::SERVER);

// Prepare the client C that connects to P
TwoPartyVatNetwork clientNetwork(*frontPipe.ends[0], rpc::twoparty::Side::CLIENT);
auto rpcClient = makeRpcClient(clientNetwork);
auto client = rpcClient.bootstrap(generateServerVatId());
auto cap = client.castAs<test::TestCapabilityProxy>();

// Set iC into P, such that C can fetch it and start talking to S through P
proxy.cap = internalCap;

// Now we have a setup with [C <---> P <---> S] that we can use

// Have C fetch the capability of S through P
auto proxyReq = cap.getCapRequest();
auto proxiedCap = proxyReq
.send()
.wait(waitScope)
.getCap()
.castAs<test::TestRealtimeStreaming>();

// Send a few realtime requests
for (uint i = 0; i < 10; i++) {
auto req = proxiedCap.doRealtimeStreamRequest();
req.setJ(42);
req.send().wait(waitScope);
}

// Finish streaming
auto finishReq = proxiedCap
.finishStreamRequest()
.send()
.wait(waitScope);

// Check that the question IDs were not leaked
KJ_ASSERT(1 == rpcClient.countQuestionsForTest());
KJ_ASSERT(1 == rpcInternalClient.countQuestionsForTest());
}

KJ_TEST("promise cap resolves between starting request and sending it") {
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
Expand Down
6 changes: 6 additions & 0 deletions c++/src/capnp/rpc-twoparty.c++
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ public:
.eagerlyEvaluate(nullptr);
}

void sendRealtime() override {
// This implementation does exactly the same as `send`, but we could imagine
// a VatNetwork implementation treating realtime messages differently.
send();
}

size_t sizeInWords() override {
return message.sizeInWords();
}
Expand Down