diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 48be056f393..d9577bcbcda 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -3457,7 +3457,7 @@ void WritableStreamJsController::releaseWriter( } kj::Maybe> WritableStreamJsController::removeSink(jsg::Lock& js) { - KJ_UNIMPLEMENTED("WritableStreamJsController::removeSink is not implemented"); + return kj::none; } void WritableStreamJsController::detach(jsg::Lock& js) { KJ_UNIMPLEMENTED("WritableStreamJsController::detach is not implemented"); diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index 8f32cbb5c42..d533b14cc78 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -369,6 +369,173 @@ private: } }; +// In order to support JavaScript-backed WritableStreams that do not have a backing +// WritableStreamSink, we need an alternative version of the WritableStreamRpcAdapter +// that will arrange to acquire the isolate lock when necessary to perform writes +// directly on the WritableStreamController. Note that this approach is necessarily +// a lot slower +class WritableStreamJsRpcAdapter final: public capnp::ExplicitEndOutputStream { +public: + WritableStreamJsRpcAdapter(IoContext& context, jsg::Ref writer) + : context(context), writer(kj::mv(writer)) {} + + ~WritableStreamJsRpcAdapter() noexcept(false) { + weakRef->invalidate(); + doneFulfiller->fulfill(); + + // If the stream was not explicitly ended and the writer still exists at this point, + // then we should trigger calling the abort algorithm on the stream. Sadly, there's a + // bit of an incompetibility with kj::AsyncOutputStream and the standard definition of + // WritableStream in that AsyncOutputStream has no specific way to explicitly signal that + // the stream is being aborted, or even being closed due to a particular reason. On the + // remote side, because it is using a WritableStreamSink implementation, when that side + // is aborted, all it does is record the reason and drop the stream. It does not propagate + // the reason back to this side. So, we have to do the best we can here. Our assumption + // is that once the stream is dropped, if it has not been explicitly ended and the writer + // still exists, then the writer should be aborted. This is not perfect because we + // cannot propgate the actual reason why it was aborted. + // + // Note also that there is no guarantee that the abort will actually run if the context + // is being torn down. Some WritableStream implementations might use the abort algorithm + // to clean things up or perform logging in the case of an error. Care needs to be taken + // in this situation or the user code might end up with bugs. Need to see if there's a + // better solution. + // + // TODO(someday): If the remote end can be updated to propagate the abort, then we can + // hopefully improve the situation here. + if (!ended) { + KJ_IF_SOME(writer, this->writer) { + context.addTask(context.run( + [writer=kj::mv(writer), exception=cancellationException()] + (Worker::Lock& lock) mutable { + jsg::Lock& js = lock; + auto ex = js.exceptionToJs(kj::mv(exception)); + return IoContext::current().awaitJs(lock, + writer->abort(lock, ex.getHandle(js))); + })); + } + } + } + + // Returns a promise that resolves when the stream is dropped. If the promise is canceled before + // that, the stream is revoked. + kj::Promise waitForCompletionOrRevoke() { + auto paf = kj::newPromiseAndFulfiller(); + doneFulfiller = kj::mv(paf.fulfiller); + + return paf.promise.attach(kj::defer([weakRef = weakRef->addRef()]() mutable { + KJ_IF_SOME(obj, weakRef->tryGet()) { + // Stream is still alive, revoke it. + if (!obj.canceler.isEmpty()) { + obj.canceler.cancel(cancellationException()); + } + } + })); + } + + kj::Promise write(const void* buffer, size_t size) override { + if (size == 0) return kj::READY_NOW; + return canceler.wrap(context.run([this, buffer, size](Worker::Lock& lock) mutable { + jsg::Lock& js = lock; + // Sadly, we have to allocate and copy here. Our received buffer is only guaranteed + // to live until the returned promise is resolved, but the application code may + // hold onto the ArrayBuffer for longer. We need to make sure that the backing store + // for the ArrayBuffer remains valid. + auto source = KJ_ASSERT_NONNULL(jsg::BufferSource::tryAlloc(js, size)); + auto ptr = source.asArrayPtr(); + memcpy(ptr.begin(), buffer, size); + return context.awaitJs(js, getInner().write(js, source.getHandle(js))); + })); + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + auto amount = 0; + for (auto& piece : pieces) { + amount += piece.size(); + } + if (amount == 0) return kj::READY_NOW; + return canceler.wrap(context.run([this, amount, pieces](Worker::Lock& lock) mutable { + jsg::Lock& js = lock; + + // Sadly, we have to allocate and copy here. Our received set of buffers are only + // guaranteed to live until the returned promise is resolved, but the application code + // may hold onto the ArrayBuffer for longer. We need to make sure that the backing store + // for the ArrayBuffer remains valid. + auto source = KJ_ASSERT_NONNULL(jsg::BufferSource::tryAlloc(js, amount)); + auto ptr = source.asArrayPtr(); + for (auto& piece : pieces) { + if (piece.size() == 0) continue; + KJ_ASSERT(piece.size() <= ptr.size()); + memcpy(ptr.begin(), piece.begin(), piece.size()); + ptr = ptr.slice(piece.size()); + } + + return context.awaitJs(js, getInner().write(js, source.getHandle(js))); + })); + } + + // TODO(perf): We can't properly implement tryPumpFrom(), which means that Cap'n Proto will + // be unable to perform path shortening if the underlying stream turns out to be another capnp + // stream. This isn't a huge deal, but might be nice to enable someday. It may require + // significant refactoring of streams. + + kj::Promise whenWriteDisconnected() override { + // TODO(soon): We might be able to support this by following the writer.closed promise, + // which becomes resolved when the writer is used to close the stream, or rejects when + // the stream has errored. However, currently, we don't have an easy way to do this for + // a couple of reasons. + // + // 1. The Writer's getClosed() method returns a jsg::MemoizedIdentity>. + // jsg::MemoizedIdentity lazily converts the jsg::Promise into a v8::Promise once it + // passes through the type wrapper. It does not give us any way to consistently get + // at the underlying jsg::Promise or the mapped v8::Promise. We would need to + // capture a TypeHandler in here and convert each time to one or the other, then + // attach our continuation. It's doable but a bit of a pain. + // + // 2. The contract of whenWritedDisconnected says that it indicates when the stream will + // start to reject with DISCONNETED exceptions. There is no distinction with WritableStream. + // We would need to determine if all errors are treated equally here. If the closed + // promise rejects, do we just go ahead and resolve this promise? Or do we need to + // filter for specific kinds of errors? If we are filtering, what do we do if the + // closed promise rejects with a different error? Is the whenWriteDisconnected promise + // ever expected to reject? + // + // For now, let's handle this the same as WritableStreamRpcAdapter and just return a + // never done. + return kj::NEVER_DONE; + } + + kj::Promise end() override { + ended = true; + return canceler.wrap(context.run([this](Worker::Lock& lock) mutable { + return context.awaitJs(lock, getInner().close(lock)); + })); + } + +private: + IoContext& context; + kj::Maybe> writer; + kj::Canceler canceler; + kj::Own> doneFulfiller; + kj::Own> weakRef = + kj::refcounted>( + kj::Badge(), *this); + bool ended = false; + + WritableStreamDefaultWriter& getInner() { + KJ_IF_SOME(inner, writer) { + return *inner; + } + kj::throwFatalException(cancellationException()); + } + + static kj::Exception cancellationException() { + return JSG_KJ_EXCEPTION(DISCONNECTED, Error, + "WritableStream received over RPC was disconnected because the remote execution context " + "has endeded."); + } +}; + } // namespace void WritableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) { @@ -388,24 +555,44 @@ void WritableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) { // TODO(soon): Support JS-backed WritableStreams. Currently this only supports native streams // and IdentityTransformStream, since only they are backed by WritableStreamSink. - // NOTE: We're counting on `removeSink()`, to check that the stream is not locked and other - // common checks. It's important we don't modify the WritableStream before this call. - auto sink = removeSink(js); - auto encoding = sink->disownEncodingResponsibility(); - auto wrapper = kj::heap(kj::mv(sink)); + KJ_IF_SOME(sink, getController().removeSink(js)) { + // NOTE: We're counting on `removeSink()`, to check that the stream is not locked and other + // common checks. It's important we don't modify the WritableStream before this call. + auto encoding = sink->disownEncodingResponsibility(); + auto wrapper = kj::heap(kj::mv(sink)); - // Make sure this stream will be revoked if the IoContext ends. - ioctx.addTask(wrapper->waitForCompletionOrRevoke().attach(ioctx.registerPendingEvent())); + // Make sure this stream will be revoked if the IoContext ends. + ioctx.addTask(wrapper->waitForCompletionOrRevoke().attach(ioctx.registerPendingEvent())); - auto capnpStream = ioctx.getByteStreamFactory().kjToCapnp(kj::mv(wrapper)); + auto capnpStream = ioctx.getByteStreamFactory().kjToCapnp(kj::mv(wrapper)); - externalHandler->write( - [capnpStream = kj::mv(capnpStream), encoding] - (rpc::JsValue::External::Builder builder) mutable { - auto ws = builder.initWritableStream(); - ws.setByteStream(kj::mv(capnpStream)); - ws.setEncoding(encoding); - }); + externalHandler->write( + [capnpStream = kj::mv(capnpStream), encoding] + (rpc::JsValue::External::Builder builder) mutable { + auto ws = builder.initWritableStream(); + ws.setByteStream(kj::mv(capnpStream)); + ws.setEncoding(encoding); + }); + } else { + // TODO(soon): Support disownEncodingResponsibility with JS-backed streams + + // NOTE: We're counting on `getWriter()` to check that the stream is not locked and other + // common checks. It's important we don't modify the WritableStream before this call. + auto wrapper = kj::heap(ioctx, getWriter(js)); + + // Make sure this stream will be revoked if the IoContext ends. + ioctx.addTask(wrapper->waitForCompletionOrRevoke().attach(ioctx.registerPendingEvent())); + + auto capnpStream = ioctx.getByteStreamFactory().kjToCapnp(kj::mv(wrapper)); + + externalHandler->write( + [capnpStream = kj::mv(capnpStream)] + (rpc::JsValue::External::Builder builder) mutable { + auto ws = builder.initWritableStream(); + ws.setByteStream(kj::mv(capnpStream)); + ws.setEncoding(StreamEncoding::IDENTITY); + }); + } } jsg::Ref WritableStream::deserialize( diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index f06a4a90a03..1a65b0d5608 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -293,6 +293,30 @@ export class MyService extends WorkerEntrypoint { await writer.close(); } + async writeToStreamExpectingError(stream) { + // TODO(now): In this test, we write to a stream that we expect to error + // immediately. Ideally, the writes would throw or at least there would be + // some signal that thge remote stream has failed. Currently it does not + // appear that the error signal is propagating back here correctly. Both + // writer.write() calls resolve successfully and we get no signal that the + // stream failed. + let writer = stream.getWriter(); + let enc = new TextEncoder(); + await writer.write(enc.encode("foo, ")); + await writer.write(enc.encode("bar, ")); + } + + async writeToStreamAbort(stream) { + // In this test, aborting the stream should propagate back to the remote + // side, causing the stream to be errored and the abort algorithm to be + // called with the provided error. Unfortunately the current implementation + // does not allow for that. The reason passed to abort is cached locally and + // is never communicated to the remote. Instead, the remote side will end up + // with a generic disconnect error. Sad face. + let writer = stream.getWriter(); + writer.abort(new Error('boom')); + } + async readFromStream(stream) { return await new Response(stream).text(); } @@ -1015,7 +1039,79 @@ export let streams = { await promise; } - // TODO(someday): Test JS-backed WritableStream, when it actually works. + { + const dec = new TextDecoder(); + let result = ''; + const { promise, resolve } = Promise.withResolvers(); + const writable = new WritableStream({ + write(chunk) { + result += dec.decode(chunk, {stream: true}); + }, + close() { + result += dec.decode(); + resolve(); + }, + }); + const p1 = env.MyService.writeToStream(writable); + await promise; + assert.strictEqual(result, "foo, bar, baz!"); + await p1; + } + + { + // In this test, the remote side writes a chunk to the stream below, which throws + // an error. Ideally the error would propagate back to the calling side so that the + // remote knows the stream failed and can no longer be written to. The call to + // writeToStreamExpectingError should throw because the error should be propagated + // through the round trip. + const dec = new TextDecoder(); + let result = ''; + const writable = new WritableStream({ + write(chunk) { + throw new Error('boom'); + }, + }); + // TODO(now): This works in one test case (js-rpc-test), but not another + // (js-rpc-socket-test). Need to understand why. + // The writeToStreamExpectingError calls write twice, The first call to write should + // error the stream and should cause the subsequent write to fail also. However, both + // writes on the remote side succeed. What we should end up with is + // writeToStreamExpectingError throwing an error that matches the error thrown in the + // write algorithm. + // The error thrown in the write IS being propagated up to the capnp stream created + // in WritableStream::serialize, but it does not get propagated back to the remote + // side as far as I can tell. At least, the writes happening on that side are never + // errored and the remote stream is not errored. + try { + await env.MyService.writeToStreamExpectingError(writable); + throw new Error('should have thrown'); + } catch (err) { + assert.strictEqual(err.message, 'boom'); + } + } + + { + // In this test, the remote side aborts the writable stream it receives. + // The abort should propagate such that the abort algorithm is called and the + // writeToStreamAbort call should succeed. The error passed on to abort(reason) + // should be the error that was given on the remote side when abort is called, + // but we currently do not propagate the abort reason through. What ends up + // happening is that the local stream is dropped with a generic cancelation + // error. + const dec = new TextDecoder(); + const { promise, resolve } = Promise.withResolvers(); + const writable = new WritableStream({ + write(chunk) {}, + abort(reason) { + resolve(reason); + } + }); + await env.MyService.writeToStreamAbort(writable); + const reason = await promise; + assert.strictEqual(reason.message, + 'WritableStream received over RPC was disconnected because the remote execution ' + + 'context has endeded.'); + } // TODO(someday): Is there any way to construct an encoded WritableStream? Only system // streams can be encoded, but there's no API that returns an encoded WritableStream I think.