Skip to content

Commit

Permalink
Implement support for JS-backed WritableStreams in JSRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed May 3, 2024
1 parent a105553 commit ad36a3d
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3457,7 +3457,7 @@ void WritableStreamJsController::releaseWriter(
}

kj::Maybe<kj::Own<WritableStreamSink>> WritableStreamJsController::removeSink(jsg::Lock& js) {
KJ_UNIMPLEMENTED("WritableStreamJsController::removeSink is not implemented");
return kj::none;
}
void WritableStreamJsController::detach(jsg::Lock& js) {
KJ_UNIMPLEMENTED("WritableStreamJsController::detach is not implemented");
Expand Down
217 changes: 202 additions & 15 deletions src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,173 @@ private:
}
};

// In order to support JavaScript-backed WritableStreams that do not have a backing
// WritableStreamSink, we need an alternative version of the WritableStreamRpcAdapter
// that will arrange to acquire the isolate lock when necessary to perform writes
// directly on the WritableStreamController. Note that this approach is necessarily
// a lot slower
class WritableStreamJsRpcAdapter final: public capnp::ExplicitEndOutputStream {
public:
WritableStreamJsRpcAdapter(IoContext& context, jsg::Ref<WritableStreamDefaultWriter> writer)
: context(context), writer(kj::mv(writer)) {}

~WritableStreamJsRpcAdapter() noexcept(false) {
weakRef->invalidate();
doneFulfiller->fulfill();

// If the stream was not explicitly ended and the writer still exists at this point,
// then we should trigger calling the abort algorithm on the stream. Sadly, there's a
// bit of an incompetibility with kj::AsyncOutputStream and the standard definition of
// WritableStream in that AsyncOutputStream has no specific way to explicitly signal that
// the stream is being aborted, or even being closed due to a particular reason. On the
// remote side, because it is using a WritableStreamSink implementation, when that side
// is aborted, all it does is record the reason and drop the stream. It does not propagate
// the reason back to this side. So, we have to do the best we can here. Our assumption
// is that once the stream is dropped, if it has not been explicitly ended and the writer
// still exists, then the writer should be aborted. This is not perfect because we
// cannot propgate the actual reason why it was aborted.
//
// Note also that there is no guarantee that the abort will actually run if the context
// is being torn down. Some WritableStream implementations might use the abort algorithm
// to clean things up or perform logging in the case of an error. Care needs to be taken
// in this situation or the user code might end up with bugs. Need to see if there's a
// better solution.
//
// TODO(someday): If the remote end can be updated to propagate the abort, then we can
// hopefully improve the situation here.
if (!ended) {
KJ_IF_SOME(writer, this->writer) {
context.addTask(context.run(
[writer=kj::mv(writer), exception=cancellationException()]
(Worker::Lock& lock) mutable {
jsg::Lock& js = lock;
auto ex = js.exceptionToJs(kj::mv(exception));
return IoContext::current().awaitJs(lock,
writer->abort(lock, ex.getHandle(js)));
}));
}
}
}

// Returns a promise that resolves when the stream is dropped. If the promise is canceled before
// that, the stream is revoked.
kj::Promise<void> waitForCompletionOrRevoke() {
auto paf = kj::newPromiseAndFulfiller<void>();
doneFulfiller = kj::mv(paf.fulfiller);

return paf.promise.attach(kj::defer([weakRef = weakRef->addRef()]() mutable {
KJ_IF_SOME(obj, weakRef->tryGet()) {
// Stream is still alive, revoke it.
if (!obj.canceler.isEmpty()) {
obj.canceler.cancel(cancellationException());
}
}
}));
}

kj::Promise<void> write(const void* buffer, size_t size) override {
if (size == 0) return kj::READY_NOW;
return canceler.wrap(context.run([this, buffer, size](Worker::Lock& lock) mutable {
jsg::Lock& js = lock;
// Sadly, we have to allocate and copy here. Our received buffer is only guaranteed
// to live until the returned promise is resolved, but the application code may
// hold onto the ArrayBuffer for longer. We need to make sure that the backing store
// for the ArrayBuffer remains valid.
auto source = KJ_ASSERT_NONNULL(jsg::BufferSource::tryAlloc(js, size));
auto ptr = source.asArrayPtr();
memcpy(ptr.begin(), buffer, size);
return context.awaitJs(js, getInner().write(js, source.getHandle(js)));
}));
}

kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
auto amount = 0;
for (auto& piece : pieces) {
amount += piece.size();
}
if (amount == 0) return kj::READY_NOW;
return canceler.wrap(context.run([this, amount, pieces](Worker::Lock& lock) mutable {
jsg::Lock& js = lock;

// Sadly, we have to allocate and copy here. Our received set of buffers are only
// guaranteed to live until the returned promise is resolved, but the application code
// may hold onto the ArrayBuffer for longer. We need to make sure that the backing store
// for the ArrayBuffer remains valid.
auto source = KJ_ASSERT_NONNULL(jsg::BufferSource::tryAlloc(js, amount));
auto ptr = source.asArrayPtr();
for (auto& piece : pieces) {
if (piece.size() == 0) continue;
KJ_ASSERT(piece.size() <= ptr.size());
memcpy(ptr.begin(), piece.begin(), piece.size());
ptr = ptr.slice(piece.size());
}

return context.awaitJs(js, getInner().write(js, source.getHandle(js)));
}));
}

// TODO(perf): We can't properly implement tryPumpFrom(), which means that Cap'n Proto will
// be unable to perform path shortening if the underlying stream turns out to be another capnp
// stream. This isn't a huge deal, but might be nice to enable someday. It may require
// significant refactoring of streams.

kj::Promise<void> whenWriteDisconnected() override {
// TODO(soon): We might be able to support this by following the writer.closed promise,
// which becomes resolved when the writer is used to close the stream, or rejects when
// the stream has errored. However, currently, we don't have an easy way to do this for
// a couple of reasons.
//
// 1. The Writer's getClosed() method returns a jsg::MemoizedIdentity<jsg::Promise<void>>.
// jsg::MemoizedIdentity lazily converts the jsg::Promise into a v8::Promise once it
// passes through the type wrapper. It does not give us any way to consistently get
// at the underlying jsg::Promise<void> or the mapped v8::Promise. We would need to
// capture a TypeHandler in here and convert each time to one or the other, then
// attach our continuation. It's doable but a bit of a pain.
//
// 2. The contract of whenWritedDisconnected says that it indicates when the stream will
// start to reject with DISCONNETED exceptions. There is no distinction with WritableStream.
// We would need to determine if all errors are treated equally here. If the closed
// promise rejects, do we just go ahead and resolve this promise? Or do we need to
// filter for specific kinds of errors? If we are filtering, what do we do if the
// closed promise rejects with a different error? Is the whenWriteDisconnected promise
// ever expected to reject?
//
// For now, let's handle this the same as WritableStreamRpcAdapter and just return a
// never done.
return kj::NEVER_DONE;
}

kj::Promise<void> end() override {
ended = true;
return canceler.wrap(context.run([this](Worker::Lock& lock) mutable {
return context.awaitJs(lock, getInner().close(lock));
}));
}

private:
IoContext& context;
kj::Maybe<jsg::Ref<WritableStreamDefaultWriter>> writer;
kj::Canceler canceler;
kj::Own<kj::PromiseFulfiller<void>> doneFulfiller;
kj::Own<WeakRef<WritableStreamJsRpcAdapter>> weakRef =
kj::refcounted<WeakRef<WritableStreamJsRpcAdapter>>(
kj::Badge<WritableStreamJsRpcAdapter>(), *this);
bool ended = false;

WritableStreamDefaultWriter& getInner() {
KJ_IF_SOME(inner, writer) {
return *inner;
}
kj::throwFatalException(cancellationException());
}

static kj::Exception cancellationException() {
return JSG_KJ_EXCEPTION(DISCONNECTED, Error,
"WritableStream received over RPC was disconnected because the remote execution context "
"has endeded.");
}
};

} // namespace

void WritableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
Expand All @@ -388,24 +555,44 @@ void WritableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
// TODO(soon): Support JS-backed WritableStreams. Currently this only supports native streams
// and IdentityTransformStream, since only they are backed by WritableStreamSink.

// NOTE: We're counting on `removeSink()`, to check that the stream is not locked and other
// common checks. It's important we don't modify the WritableStream before this call.
auto sink = removeSink(js);
auto encoding = sink->disownEncodingResponsibility();
auto wrapper = kj::heap<WritableStreamRpcAdapter>(kj::mv(sink));
KJ_IF_SOME(sink, getController().removeSink(js)) {
// NOTE: We're counting on `removeSink()`, to check that the stream is not locked and other
// common checks. It's important we don't modify the WritableStream before this call.
auto encoding = sink->disownEncodingResponsibility();
auto wrapper = kj::heap<WritableStreamRpcAdapter>(kj::mv(sink));

// Make sure this stream will be revoked if the IoContext ends.
ioctx.addTask(wrapper->waitForCompletionOrRevoke().attach(ioctx.registerPendingEvent()));
// Make sure this stream will be revoked if the IoContext ends.
ioctx.addTask(wrapper->waitForCompletionOrRevoke().attach(ioctx.registerPendingEvent()));

auto capnpStream = ioctx.getByteStreamFactory().kjToCapnp(kj::mv(wrapper));
auto capnpStream = ioctx.getByteStreamFactory().kjToCapnp(kj::mv(wrapper));

externalHandler->write(
[capnpStream = kj::mv(capnpStream), encoding]
(rpc::JsValue::External::Builder builder) mutable {
auto ws = builder.initWritableStream();
ws.setByteStream(kj::mv(capnpStream));
ws.setEncoding(encoding);
});
externalHandler->write(
[capnpStream = kj::mv(capnpStream), encoding]
(rpc::JsValue::External::Builder builder) mutable {
auto ws = builder.initWritableStream();
ws.setByteStream(kj::mv(capnpStream));
ws.setEncoding(encoding);
});
} else {
// TODO(soon): Support disownEncodingResponsibility with JS-backed streams

// NOTE: We're counting on `getWriter()` to check that the stream is not locked and other
// common checks. It's important we don't modify the WritableStream before this call.
auto wrapper = kj::heap<WritableStreamJsRpcAdapter>(ioctx, getWriter(js));

// Make sure this stream will be revoked if the IoContext ends.
ioctx.addTask(wrapper->waitForCompletionOrRevoke().attach(ioctx.registerPendingEvent()));

auto capnpStream = ioctx.getByteStreamFactory().kjToCapnp(kj::mv(wrapper));

externalHandler->write(
[capnpStream = kj::mv(capnpStream)]
(rpc::JsValue::External::Builder builder) mutable {
auto ws = builder.initWritableStream();
ws.setByteStream(kj::mv(capnpStream));
ws.setEncoding(StreamEncoding::IDENTITY);
});
}
}

jsg::Ref<WritableStream> WritableStream::deserialize(
Expand Down
98 changes: 97 additions & 1 deletion src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,30 @@ export class MyService extends WorkerEntrypoint {
await writer.close();
}

async writeToStreamExpectingError(stream) {
// TODO(now): In this test, we write to a stream that we expect to error
// immediately. Ideally, the writes would throw or at least there would be
// some signal that thge remote stream has failed. Currently it does not
// appear that the error signal is propagating back here correctly. Both
// writer.write() calls resolve successfully and we get no signal that the
// stream failed.
let writer = stream.getWriter();
let enc = new TextEncoder();
await writer.write(enc.encode("foo, "));
await writer.write(enc.encode("bar, "));
}

async writeToStreamAbort(stream) {
// In this test, aborting the stream should propagate back to the remote
// side, causing the stream to be errored and the abort algorithm to be
// called with the provided error. Unfortunately the current implementation
// does not allow for that. The reason passed to abort is cached locally and
// is never communicated to the remote. Instead, the remote side will end up
// with a generic disconnect error. Sad face.
let writer = stream.getWriter();
writer.abort(new Error('boom'));
}

async readFromStream(stream) {
return await new Response(stream).text();
}
Expand Down Expand Up @@ -1015,7 +1039,79 @@ export let streams = {
await promise;
}

// TODO(someday): Test JS-backed WritableStream, when it actually works.
{
const dec = new TextDecoder();
let result = '';
const { promise, resolve } = Promise.withResolvers();
const writable = new WritableStream({
write(chunk) {
result += dec.decode(chunk, {stream: true});
},
close() {
result += dec.decode();
resolve();
},
});
const p1 = env.MyService.writeToStream(writable);
await promise;
assert.strictEqual(result, "foo, bar, baz!");
await p1;
}

{
// In this test, the remote side writes a chunk to the stream below, which throws
// an error. Ideally the error would propagate back to the calling side so that the
// remote knows the stream failed and can no longer be written to. The call to
// writeToStreamExpectingError should throw because the error should be propagated
// through the round trip.
const dec = new TextDecoder();
let result = '';
const writable = new WritableStream({
write(chunk) {
throw new Error('boom');
},
});
// TODO(now): This works in one test case (js-rpc-test), but not another
// (js-rpc-socket-test). Need to understand why.
// The writeToStreamExpectingError calls write twice, The first call to write should
// error the stream and should cause the subsequent write to fail also. However, both
// writes on the remote side succeed. What we should end up with is
// writeToStreamExpectingError throwing an error that matches the error thrown in the
// write algorithm.
// The error thrown in the write IS being propagated up to the capnp stream created
// in WritableStream::serialize, but it does not get propagated back to the remote
// side as far as I can tell. At least, the writes happening on that side are never
// errored and the remote stream is not errored.
try {
await env.MyService.writeToStreamExpectingError(writable);
throw new Error('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'boom');
}
}

{
// In this test, the remote side aborts the writable stream it receives.
// The abort should propagate such that the abort algorithm is called and the
// writeToStreamAbort call should succeed. The error passed on to abort(reason)
// should be the error that was given on the remote side when abort is called,
// but we currently do not propagate the abort reason through. What ends up
// happening is that the local stream is dropped with a generic cancelation
// error.
const dec = new TextDecoder();
const { promise, resolve } = Promise.withResolvers();
const writable = new WritableStream({
write(chunk) {},
abort(reason) {
resolve(reason);
}
});
await env.MyService.writeToStreamAbort(writable);
const reason = await promise;
assert.strictEqual(reason.message,
'WritableStream received over RPC was disconnected because the remote execution ' +
'context has endeded.');
}

// TODO(someday): Is there any way to construct an encoded WritableStream? Only system
// streams can be encoded, but there's no API that returns an encoded WritableStream I think.
Expand Down

0 comments on commit ad36a3d

Please sign in to comment.