/
worker-rpc.c++
1766 lines (1497 loc) · 70.4 KB
/
worker-rpc.c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
#include <workerd/api/worker-rpc.h>
#include <workerd/io/features.h>
#include <workerd/api/global-scope.h>
#include <workerd/api/actor-state.h>
#include <workerd/jsg/ser.h>
#include <capnp/membrane.h>
namespace workerd::api {
namespace {
using StreamSinkFulfiller = kj::Own<kj::PromiseFulfiller<rpc::JsValue::StreamSink::Client>>;
} // namespace
// Implementation of StreamSink RPC interface. The stream sender calls `startStream()` when
// serializing each stream, and the recipient calls `setSlot()` when deserializing streams to
// provide the appropriate destination capability. This class is designed to allow these two
// calls to happen in either order for each slot.
class StreamSinkImpl final: public rpc::JsValue::StreamSink::Server, public kj::Refcounted {
public:
~StreamSinkImpl() noexcept(false) {
for (auto& slot: table) {
KJ_IF_SOME(f, slot.tryGet<StreamFulfiller>()) {
f->reject(KJ_EXCEPTION(FAILED, "expected startStream() was never received"));
}
}
}
void setSlot(uint i, capnp::Capability::Client stream) {
if (table.size() <= i) table.resize(i + 1);
if (table[i] == nullptr) {
table[i] = kj::mv(stream);
} else KJ_SWITCH_ONEOF(table[i]) {
KJ_CASE_ONEOF(stream, capnp::Capability::Client) {
KJ_FAIL_REQUIRE("setSlot() tried to set the same slot twice", i);
}
KJ_CASE_ONEOF(fulfiller, StreamFulfiller) {
fulfiller->fulfill(kj::mv(stream));
table[i] = Consumed();
}
KJ_CASE_ONEOF(_, Consumed) {
KJ_FAIL_REQUIRE("setSlot() tried to set the same slot twice", i);
}
}
}
kj::Promise<void> startStream(StartStreamContext context) override {
uint i = context.getParams().getExternalIndex();
if (table.size() <= i) {
// guard against ridiculous table allocation
JSG_REQUIRE(i < 1024, Error, "Too many streams in one message.");
table.resize(i + 1);
}
if (table[i] == nullptr) {
auto paf = kj::newPromiseAndFulfiller<capnp::Capability::Client>();
table[i] = kj::mv(paf.fulfiller);
context.getResults(capnp::MessageSize {4, 1}).setStream(kj::mv(paf.promise));
} else KJ_SWITCH_ONEOF(table[i]) {
KJ_CASE_ONEOF(stream, capnp::Capability::Client) {
context.getResults(capnp::MessageSize {4, 1}).setStream(kj::mv(stream));
table[i] = Consumed();
}
KJ_CASE_ONEOF(fulfiller, StreamFulfiller) {
KJ_FAIL_REQUIRE("startStream() tried to start the same stream twice", i);
}
KJ_CASE_ONEOF(_, Consumed) {
KJ_FAIL_REQUIRE("startStream() tried to start the same stream twice", i);
}
}
return kj::READY_NOW;
}
private:
using StreamFulfiller = kj::Own<kj::PromiseFulfiller<capnp::Capability::Client>>;
struct Consumed {};
// Each slot starts out null (uninitialized). It becomes a Capability::Client if setSlot() is
// called first, or a StreamFulfiller if startStream() is called first. It becomse `Consumed`
// when the other method is called.
// HACK: Slots in the table take advantage of the little-known fact that OneOf has a "null"
// value, which is the value a OneOf has when default-initialized. This is useful because we
// don't want to explicitly initialize skipped slots. Maybe<OneOf> would be another option
// here, but would add 8 bytes to every slot just to store a boolean... feels bloated. There
// are only two methods in this class so I think it's OK.
using Slot = kj::OneOf<capnp::Capability::Client, StreamFulfiller, Consumed>;
kj::Vector<Slot> table;
};
capnp::Capability::Client RpcSerializerExternalHander::writeStream(BuilderCallback callback) {
rpc::JsValue::StreamSink::Client* streamSinkPtr;
KJ_IF_SOME(ss, streamSink) {
streamSinkPtr = &ss;
} else {
// First stream written, set up the StreamSink.
streamSinkPtr = &streamSink.emplace(getStreamSinkFunc());
}
auto result = ({
auto req = streamSinkPtr->startStreamRequest(capnp::MessageSize {4, 0});
req.setExternalIndex(externals.size());
req.send().getStream();
});
write(kj::mv(callback));
return result;
}
capnp::Orphan<capnp::List<rpc::JsValue::External>>
RpcSerializerExternalHander::build(capnp::Orphanage orphanage) {
auto result = orphanage.newOrphan<capnp::List<rpc::JsValue::External>>(externals.size());
auto builder = result.get();
for (auto i: kj::indices(externals)) {
externals[i](builder[i]);
}
return result;
}
RpcDeserializerExternalHander::~RpcDeserializerExternalHander() noexcept(false) {
if (!unwindDetector.isUnwinding()) {
KJ_ASSERT(i == externals.size(), "deserialization did not consume all of the externals");
}
}
rpc::JsValue::External::Reader RpcDeserializerExternalHander::read() {
KJ_ASSERT(i < externals.size());
return externals[i++];
}
void RpcDeserializerExternalHander::setLastStream(capnp::Capability::Client stream) {
KJ_IF_SOME(ss, streamSink) {
ss.setSlot(i - 1, kj::mv(stream));
} else {
auto ss = kj::refcounted<StreamSinkImpl>();
ss->setSlot(i - 1, kj::mv(stream));
streamSink = *ss;
streamSinkCap = rpc::JsValue::StreamSink::Client(kj::mv(ss));
}
}
namespace {
// Call to construct an `rpc::JsValue` from a JS value.
//
// `makeBuilder` is a function which takes a capnp::MessageSize hint and returns the
// rpc::JsValue::Builder to fill in.
template <typename Func>
void serializeJsValue(jsg::Lock& js, jsg::JsValue value, Func makeBuilder,
RpcSerializerExternalHander::GetStreamSinkFunc getStreamSinkFunc) {
RpcSerializerExternalHander externalHandler(kj::mv(getStreamSinkFunc));
jsg::Serializer serializer(js, jsg::Serializer::Options {
.version = 15,
.omitHeader = false,
.treatClassInstancesAsPlainObjects = false,
.externalHandler = externalHandler,
});
serializer.write(js, value);
kj::Array<const byte> data = serializer.release().data;
JSG_ASSERT(data.size() <= MAX_JS_RPC_MESSAGE_SIZE, Error,
"Serialized RPC arguments or return values are limited to 1MiB, but the size of this value "
"was: ", data.size(), " bytes.");
capnp::MessageSize hint {0, 0};
hint.wordCount += (data.size() + sizeof(capnp::word) - 1) / sizeof(capnp::word);
hint.wordCount += capnp::sizeInWords<rpc::JsValue>();
hint.wordCount += externalHandler.size() * capnp::sizeInWords<rpc::JsValue::External>();
hint.capCount += externalHandler.size();
rpc::JsValue::Builder builder = makeBuilder(hint);
// TODO(perf): It would be nice if we could serialize directly into the capnp message to avoid
// a redundant copy of the bytes here. Maybe we could even cancel serialization early if it
// goes over the size limit.
builder.setV8Serialized(data);
if (externalHandler.size() > 0) {
builder.adoptExternals(externalHandler.build(
capnp::Orphanage::getForMessageContaining(builder)));
}
}
struct DeserializeResult {
jsg::JsValue value;
kj::Own<RpcStubDisposalGroup> disposalGroup;
kj::Maybe<rpc::JsValue::StreamSink::Client> streamSink;
};
// Call to construct a JS value from an `rpc::JsValue`.
DeserializeResult deserializeJsValue(jsg::Lock& js, rpc::JsValue::Reader reader,
kj::Maybe<StreamSinkImpl&> streamSink = kj::none) {
auto disposalGroup = kj::heap<RpcStubDisposalGroup>();
RpcDeserializerExternalHander externalHandler(reader.getExternals(), *disposalGroup, streamSink);
jsg::Deserializer deserializer(js, reader.getV8Serialized(), kj::none, kj::none,
jsg::Deserializer::Options {
.version = 15,
.readHeader = true,
.externalHandler = externalHandler,
});
return {
.value = deserializer.readValue(js),
.disposalGroup = kj::mv(disposalGroup),
.streamSink = externalHandler.getStreamSink(),
};
}
// Does deserializeJsValue() and then adds a `dispose()` method to the returned object (if it is
// an object) which disposes all stubs therein.
jsg::JsValue deserializeRpcReturnValue(jsg::Lock& js,
rpc::JsRpcTarget::CallResults::Reader callResults,
StreamSinkImpl& streamSink) {
auto [ value, disposalGroup, _ ] = deserializeJsValue(js, callResults.getResult(), streamSink);
if (callResults.hasCallPipeline()) {
disposalGroup->setCallPipeline(IoContext::current().addObject(
kj::heap(callResults.getCallPipeline())));
}
KJ_IF_SOME(obj, value.tryCast<jsg::JsObject>()) {
if (obj.isInstanceOf<JsRpcStub>(js)) {
// We're returning a plain stub. We don't need to override its `dispoose` method.
disposalGroup->disownAll();
} else {
// Add a dispose method to the return object that disposes the DisposalGroup.
v8::Local<v8::Value> func = js.wrapSimpleFunction(js.v8Context(),
[disposalGroup = kj::mv(disposalGroup)]
(jsg::Lock&, const v8::FunctionCallbackInfo<v8::Value>&) mutable {
disposalGroup->disposeAll();
});
obj.set(js, js.symbolDispose(), jsg::JsValue(func));
}
} else {
// Result wasn't an object, so it must not contain any stubs.
KJ_ASSERT(disposalGroup->empty());
}
return value;
}
// A membrane applied which detects when no capabilities are held any longer, at which point it
// fulfills a fulfiller.
//
// TODO(cleanup): This is generally useful, should it be part of capnp?
class CompletionMembrane final: public capnp::MembranePolicy, public kj::Refcounted {
public:
explicit CompletionMembrane(kj::Own<kj::PromiseFulfiller<void>> doneFulfiller)
: doneFulfiller(kj::mv(doneFulfiller)) {}
~CompletionMembrane() noexcept(false) {
doneFulfiller->fulfill();
}
kj::Maybe<capnp::Capability::Client> inboundCall(
uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override {
return kj::none;
}
kj::Maybe<capnp::Capability::Client> outboundCall(
uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override {
return kj::none;
}
kj::Own<MembranePolicy> addRef() override {
return kj::addRef(*this);
}
private:
kj::Own<kj::PromiseFulfiller<void>> doneFulfiller;
};
// A membrane which revokes when some Promise is fulfilled.
//
// TODO(cleanup): This is generally useful, should it be part of capnp?
class RevokerMembrane final: public capnp::MembranePolicy, public kj::Refcounted {
public:
explicit RevokerMembrane(kj::Promise<void> promise)
: promise(promise.fork()) {}
kj::Maybe<capnp::Capability::Client> inboundCall(
uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override {
return kj::none;
}
kj::Maybe<capnp::Capability::Client> outboundCall(
uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override {
return kj::none;
}
kj::Own<MembranePolicy> addRef() override {
return kj::addRef(*this);
}
kj::Maybe<kj::Promise<void>> onRevoked() override {
return promise.addBranch();
}
private:
kj::ForkedPromise<void> promise;
};
// Given a value, check if it has a dispose method and, if so, invoke it.
void tryCallDisposeMethod(jsg::Lock& js, jsg::JsValue value) {
js.withinHandleScope([&]() {
KJ_IF_SOME(obj, value.tryCast<jsg::JsObject>()) {
auto dispose = obj.get(js, js.symbolDispose());
if (dispose.isFunction()) {
jsg::check(v8::Local<v8::Value>(dispose).As<v8::Function>()->Call(
js.v8Context(), value, 0, nullptr));
}
}
});
}
} // namespace
JsRpcPromise::JsRpcPromise(jsg::JsRef<jsg::JsPromise> inner, kj::Own<WeakRef> weakRefParam,
IoOwn<rpc::JsRpcTarget::CallResults::Pipeline> pipeline)
: inner(kj::mv(inner)), weakRef(kj::mv(weakRefParam)), state(Pending { kj::mv(pipeline) }) {
KJ_REQUIRE(weakRef->ref == kj::none);
weakRef->ref = *this;
}
JsRpcPromise::~JsRpcPromise() noexcept(false) {
weakRef->ref = kj::none;
}
void JsRpcPromise::resolve(jsg::Lock& js, jsg::JsValue result) {
if (state.is<Pending>()) {
state = Resolved {
.result = jsg::Value(js.v8Isolate, result),
.ioCtx = IoContext::current().getWeakRef()
};
} else {
// We'd better dispose this.
tryCallDisposeMethod(js, result);
}
}
void JsRpcPromise::dispose(jsg::Lock& js) {
KJ_IF_SOME(resolved, state.tryGet<Resolved>()) {
// Disposing the promise implies disposing the final result.
tryCallDisposeMethod(js, jsg::JsValue(resolved.result.getHandle(js)));
}
state = Disposed();
weakRef->disposed = true;
}
// See comment at call site for explanation.
static rpc::JsRpcTarget::Client makeJsRpcTargetForSingleLoopbackCall(
jsg::Lock& js, jsg::JsObject obj);
rpc::JsRpcTarget::Client JsRpcPromise::getClientForOneCall(
jsg::Lock& js, kj::Vector<kj::StringPtr>& path) {
// (Don't extend `path` because we're the root.)
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(pending, Pending) {
return pending.pipeline->getCallPipeline();
}
KJ_CASE_ONEOF(resolved, Resolved) {
IoContext::requireCurrentOrThrowJs(*resolved.ioCtx);
// A value was already returned, and we closed the original RPC pipeline. But the application
// kept the promise around and is still trying to pipeline on it. What do we do?
//
// A naive answer would be: We just return the actual value that was returned originally.
// Like if someone asked for `promise.foo.bar`, we just give them `returnValue.foo.bar`.
//
// That doesn't quite work, for a couple reasons:
// * If the caller is awaiting a property, they expect the result will have a `dispose()`
// method added to it, and that any stubs in the result will be independently disposable.
// This essentially means we need to clone the value so that we can dup() all the stubs and
// modify the result.
// * If the caller is trying to make a pipelined RPC call, they expect this call to go
// through all the usual RPC machinery. They do NOT expect that this is going to be a local
// call.
//
// The easiest way to make this all just work is... to actually wrap the value in a one-off
// RPC stub, and make a real RPC on it.
return js.withinHandleScope([&]() -> rpc::JsRpcTarget::Client {
auto value = jsg::JsValue(resolved.result.getHandle(js));
KJ_IF_SOME(obj, value.tryCast<jsg::JsObject>()) {
KJ_IF_SOME(stub, obj.tryUnwrapAs<JsRpcStub>(js)) {
// Oh, the return value is actually a stub itself. Just use it.
return stub->getClient();
} else {
// Must be a plain object.
return makeJsRpcTargetForSingleLoopbackCall(js, obj);
}
} else {
JSG_FAIL_REQUIRE(TypeError, "Can't pipeline on RPC that did not return an object.");
}
});
}
KJ_CASE_ONEOF(disposed, Disposed) {
return JSG_KJ_EXCEPTION(FAILED, Error, "RPC promise used after being disposed.");
}
}
KJ_UNREACHABLE;
}
rpc::JsRpcTarget::Client JsRpcProperty::getClientForOneCall(
jsg::Lock& js, kj::Vector<kj::StringPtr>& path) {
auto result = parent->getClientForOneCall(js, path);
path.add(name);
return result;
}
namespace {
struct JsRpcPromiseAndPipleine {
jsg::JsPromise promise;
kj::Own<JsRpcPromise::WeakRef> weakRef;
rpc::JsRpcTarget::CallResults::Pipeline pipeline;
jsg::Ref<JsRpcPromise> asJsRpcPromise(jsg::Lock& js) && {
return jsg::alloc<JsRpcPromise>(
jsg::JsRef<jsg::JsPromise>(js, promise), kj::mv(weakRef),
IoContext::current().addObject(kj::heap(kj::mv(pipeline))));
}
};
// Core implementation of making an RPC call, reusable for many cases below.
JsRpcPromiseAndPipleine callImpl(
jsg::Lock& js,
JsRpcClientProvider& parent,
kj::Maybe<const kj::String&> name,
// If `maybeArgs` is provided, this is a call, otherwise it is a property access.
kj::Maybe<const v8::FunctionCallbackInfo<v8::Value>&> maybeArgs) {
// Note: We used to enforce that RPC methods had to be called with the correct `this`. That is,
// we prevented people from doing:
//
// let obj = {foo: someRpcStub.foo};
// obj.foo();
//
// This would throw "Illegal invocation", as is the norm when pulling methods of a native object.
// That worked as long as RPC methods were implemented as `jsg::Function`. However, when we
// switched to RPC methods being implemented as callable objects (JsRpcProperty), this became
// impossible, because V8's SetCallAsFunctionHandler() arranges that `this` is bound to the
// callable object itself, regardless of how it was invoked. So now we cannot detect the
// situation above, because V8 never tells us about `obj` at all.
//
// Oh well. It's not a big deal. Just annoying that we have to forever support tearing RPC
// methods off their source object, even if we change implementations to something where that's
// less convenient.
try {
return js.tryCatch([&]() -> JsRpcPromiseAndPipleine {
// `path` will be filled in with the path of property names leading from the stub represented by
// `client` to the specific property / method that we're trying to invoke.
kj::Vector<kj::StringPtr> path;
auto client = parent.getClientForOneCall(js, path);
auto& ioContext = IoContext::current();
auto builder = client.callRequest();
// This code here is slightly overcomplicated in order to avoid pushing anything to the
// kj::Vector in the common case that the parent path is empty. I'm probably trying too hard
// but oh well.
if (path.empty()) {
KJ_IF_SOME(n, name) {
builder.setMethodName(n);
} else {
// No name and no path, must be directly calling a stub.
builder.initMethodPath(0);
}
} else {
auto pathBuilder = builder.initMethodPath(path.size() + (name != kj::none));
for (auto i: kj::indices(path)) {
pathBuilder.set(i, path[i]);
}
KJ_IF_SOME(n, name) {
pathBuilder.set(path.size(), n);
}
}
kj::Maybe<StreamSinkFulfiller> paramsStreamSinkFulfiller;
KJ_IF_SOME(args, maybeArgs) {
// This is a function call with arguments.
kj::Vector<jsg::JsValue> argv(args.Length());
for (int n = 0; n < args.Length(); n++) {
argv.add(jsg::JsValue(args[n]));
}
// If we have arguments, serialize them.
// Note that we may fail to serialize some element, in which case this will throw back to
// JS.
if (argv.size() > 0) {
serializeJsValue(js, js.arr(argv.asPtr()), [&](capnp::MessageSize hint) {
// TODO(perf): Actually use the size hint.
return builder.getOperation().initCallWithArgs();
}, [&]() -> rpc::JsValue::StreamSink::Client {
// A stream was encountered in the params, so we must expect the response to contain
// paramsStreamSink. But we don't have the response yet. So, we need to set up a
// temporary promise client, which we hook to the response a little bit later.
auto paf = kj::newPromiseAndFulfiller<rpc::JsValue::StreamSink::Client>();
paramsStreamSinkFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
});
}
} else {
// This is a property access.
builder.getOperation().setGetProperty();
}
StreamSinkFulfiller resultsStreamSinkFulfiller;
// Unfortunately, we always have to send a `resultsStreamSink` because we don't know until
// after the call completes whether or not it will return any streams. If it's unused,
// though, it should only be a couple allocations.
auto resultStreamSink = kj::refcounted<StreamSinkImpl>();
builder.setResultsStreamSink(kj::addRef(*resultStreamSink));
auto callResult = builder.send();
KJ_IF_SOME(ssf, paramsStreamSinkFulfiller) {
ssf->fulfill(callResult.getParamsStreamSink());
}
// We need to arrange that our JsRpcPromise will updated in-place with the final settlement
// of this RPC promise. However, we can't actually construct the JsRpcPromise until we have
// the final promise to give it. To resolve the cycle, we only create a JsRpcPromise::WeakRef
// here, which is filled in later on to point at the JsRpcPromise, if and when one is created.
auto weakRef = kj::atomicRefcounted<JsRpcPromise::WeakRef>();
auto jsPromise = ioContext.awaitIo(js, kj::mv(callResult),
[weakRef = kj::atomicAddRef(*weakRef), resultStreamSink = kj::mv(resultStreamSink)]
(jsg::Lock& js, capnp::Response<rpc::JsRpcTarget::CallResults> response) mutable
-> jsg::Value {
auto jsResult = deserializeRpcReturnValue(js, response, *resultStreamSink);
if (weakRef->disposed) {
// The promise was explicitly disposed before it even resolved. This means we must dispose
// the returned object as well.
tryCallDisposeMethod(js, jsResult);
} else {
KJ_IF_SOME(r, weakRef->ref) {
r.resolve(js, jsResult);
}
}
return jsg::Value(js.v8Isolate, jsResult);
});
return {
.promise = jsg::JsPromise(js.wrapSimplePromise(kj::mv(jsPromise))),
.weakRef = kj::mv(weakRef),
.pipeline = kj::mv(callResult),
};
}, [&](jsg::Value error) -> JsRpcPromiseAndPipleine {
// Probably a serialization error. Need to convert to an async error since we never throw
// synchronously from async functions.
auto jsError = jsg::JsValue(error.getHandle(js));
auto pipeline = capnp::newBrokenPipeline(js.exceptionToKj(jsError));
return {
.promise = js.rejectedJsPromise(jsError),
.weakRef = kj::atomicRefcounted<JsRpcPromise::WeakRef>(),
.pipeline = rpc::JsRpcTarget::CallResults::Pipeline(
capnp::AnyPointer::Pipeline(kj::mv(pipeline)))
};
});
} catch (jsg::JsExceptionThrown&) {
// This must be a termination exception, or we would have caught it above.
throw;
} catch (...) {
// Catch KJ exceptions and make them async, since we don't want async calls to throw
// synchronously.
auto e = kj::getCaughtExceptionAsKj();
auto pipeline = capnp::newBrokenPipeline(kj::cp(e));
return {
.promise = jsg::JsPromise(js.wrapSimplePromise(js.rejectedPromise<jsg::Value>(kj::mv(e)))),
.weakRef = kj::atomicRefcounted<JsRpcPromise::WeakRef>(),
.pipeline = rpc::JsRpcTarget::CallResults::Pipeline(
capnp::AnyPointer::Pipeline(kj::mv(pipeline)))
};
}
}
} // namespace
jsg::Ref<JsRpcPromise> JsRpcProperty::call(const v8::FunctionCallbackInfo<v8::Value>& args) {
jsg::Lock& js = jsg::Lock::from(args.GetIsolate());
return callImpl(js, *parent, name, args).asJsRpcPromise(js);
}
jsg::Ref<JsRpcPromise> JsRpcStub::call(const v8::FunctionCallbackInfo<v8::Value>& args) {
jsg::Lock& js = jsg::Lock::from(args.GetIsolate());
return callImpl(js, *this, kj::none, args).asJsRpcPromise(js);
}
jsg::Ref<JsRpcPromise> JsRpcPromise::call(const v8::FunctionCallbackInfo<v8::Value>& args) {
jsg::Lock& js = jsg::Lock::from(args.GetIsolate());
return callImpl(js, *this, kj::none, args).asJsRpcPromise(js);
}
namespace {
jsg::JsValue thenImpl(jsg::Lock& js, v8::Local<v8::Promise> promise,
v8::Local<v8::Function> handler, jsg::Optional<v8::Local<v8::Function>> errorHandler) {
KJ_IF_SOME(e, errorHandler) {
// Note that we intentionally propagate any exception from promise->Then() sychronously since
// if V8's native Promise threw synchronously from `then()`, we might as well too. Anyway it's
// probably a termination exception.
return jsg::JsPromise(jsg::check(promise->Then(js.v8Context(), handler, e)));
} else {
return jsg::JsPromise(jsg::check(promise->Then(js.v8Context(), handler)));
}
}
jsg::JsValue catchImpl(jsg::Lock& js, v8::Local<v8::Promise> promise,
v8::Local<v8::Function> errorHandler) {
return jsg::JsPromise(jsg::check(promise->Catch(js.v8Context(), errorHandler)));
}
jsg::JsValue finallyImpl(jsg::Lock& js, v8::Local<v8::Promise> promise,
v8::Local<v8::Function> onFinally) {
// HACK: `finally()` is not exposed as a C++ API, so we have to manually read it from JS.
jsg::JsObject obj(promise);
auto func = obj.get(js, "finally");
KJ_ASSERT(func.isFunction());
v8::Local<v8::Value> param = onFinally;
return jsg::JsValue(jsg::check(v8::Local<v8::Value>(func).As<v8::Function>()
->Call(js.v8Context(), obj, 1, ¶m)));
}
} // namespace
jsg::JsValue JsRpcProperty::then(jsg::Lock& js, v8::Local<v8::Function> handler,
jsg::Optional<v8::Local<v8::Function>> errorHandler) {
auto promise = callImpl(js, *parent, name, kj::none).promise;
return thenImpl(js, promise, handler, errorHandler);
}
jsg::JsValue JsRpcProperty::catch_(jsg::Lock& js, v8::Local<v8::Function> errorHandler) {
auto promise = callImpl(js, *parent, name, kj::none).promise;
return catchImpl(js, promise, errorHandler);
}
jsg::JsValue JsRpcProperty::finally(jsg::Lock& js, v8::Local<v8::Function> onFinally) {
auto promise = callImpl(js, *parent, name, kj::none).promise;
return finallyImpl(js, promise, onFinally);
}
jsg::JsValue JsRpcPromise::then(jsg::Lock& js, v8::Local<v8::Function> handler,
jsg::Optional<v8::Local<v8::Function>> errorHandler) {
return thenImpl(js, inner.getHandle(js), handler, errorHandler);
}
jsg::JsValue JsRpcPromise::catch_(jsg::Lock& js, v8::Local<v8::Function> errorHandler) {
return catchImpl(js, inner.getHandle(js), errorHandler);
}
jsg::JsValue JsRpcPromise::finally(jsg::Lock& js, v8::Local<v8::Function> onFinally) {
return finallyImpl(js, inner.getHandle(js), onFinally);
}
kj::Maybe<jsg::Ref<JsRpcProperty>> JsRpcProperty::getProperty(jsg::Lock& js, kj::String name) {
return jsg::alloc<JsRpcProperty>(JSG_THIS, kj::mv(name));
}
kj::Maybe<jsg::Ref<JsRpcProperty>> JsRpcPromise::getProperty(jsg::Lock& js, kj::String name) {
return jsg::alloc<JsRpcProperty>(JSG_THIS, kj::mv(name));
}
JsRpcStub::JsRpcStub(
IoOwn<rpc::JsRpcTarget::Client> capnpClient, RpcStubDisposalGroup& disposalGroup)
: capnpClient(kj::mv(capnpClient)),
disposalGroup(disposalGroup) {
disposalGroup.list.add(*this);
}
JsRpcStub::~JsRpcStub() noexcept(false) {
KJ_IF_SOME(d, disposalGroup) {
d.list.remove(*this);
}
KJ_IF_SOME(c, capnpClient) {
// The app failed to dispose the stub; it leaked. We'd rather not make GC observable, so we
// must pass the capnp capability off to the I/O context to be dropped when the I/O context
// itself shuts down.
kj::mv(c).deferGcToContext();
// In preview, let's try to warn the developer about the problem.
//
// TODO(cleanup): Instead of logging this warning at GC time, it would be better if we logged
// it at the time that the client is destroyed, i.e. when the IoContext is torn down,
// which is usually sooner (and more deterministic). But logging a warning during
// IoContext tear-down is problematic since logWarningOnce() is a method on
// IoContext...
if (IoContext::hasCurrent()) {
IoContext::current().logWarningOnce(kj::str(
"An RPC stub was not disposed properly. You must call dispose() on all stubs in order to "
"let the other side know that you are no longer using them. You cannot rely on "
"the garbage collector for this because it may take arbitrarily long before actually "
"collecting unreachable objects. As a shortcut, calling dispose() on the result of "
"an RPC call disposes all stubs within it."));
}
}
}
RpcStubDisposalGroup::~RpcStubDisposalGroup() noexcept(false) {
if (jsg::isInGcDestructor()) {
// If the disposal group was dropped as a result of garbage collection, we should NOT actually
// dispose any stubs. In particular:
// * If an application never invokes dispose() on an RPC result and the result is GC'd, the
// app could still be holding onto stubs that came from that result. We don't want to
// dispose those unexpectedly.
// * If an incoming RPC call does something like `await new Promise(() => {})` to hang
// forever, the promise reaction can be GC'd even though the call didn't really complete.
// We don't want to dispose param stubs in this case.
disownAll();
// If we have a `callPipeline`, it means we called an RPC that returned an object, and that
// object had a dispose method defined on the server side. We don't want it to observe GC,
// so we'll defer dropping the pipeline until the IoContext is destroyed.
//
// (We don't do this as part of disownAll() because the one other call site of disownAll()
// is only invoked in cases where there shouldn't be a `callPipeline` anyway...)
KJ_IF_SOME(c, callPipeline) {
kj::mv(c).deferGcToContext();
// In preview, let's try to warn the developer about the problem.
//
// TODO(cleanup): Same comment as in ~JsRpcStub().
if (IoContext::hasCurrent()) {
IoContext::current().logWarningOnce(kj::str(
"An RPC result was not disposed properly. One of the RPC calls you made expects you "
"to call dispose() on the return value, but you didn't do so. You cannot rely on "
"the garbage collector for this because it may take arbitrarily long before actually "
"collecting unreachable objects."));
}
}
} else {
// However, if we're destroying the RpcStubDisposalGroup NOT as a result of GC, this probably
// means one of:
// * This is the disposal group for an incoming RPC call, and that call completed. The group
// was attached to the completion continuation, which executed, and is now being destroyed.
// This is the normal completion case, and we should dispose all the param stubs.
// * An exception was thrown in the RPC implementation before stubs could be passed to
// JavaScript in the first place, resulting in the disposal group being destroyed during
// exception unwind. The stubs should be disposed proactively since they were never
// received.
disposeAll();
}
}
rpc::JsRpcTarget::Client JsRpcStub::getClient() {
KJ_IF_SOME(c, capnpClient) {
return *c;
} else {
// TODO(soon): Improve the error message to describe why it was disposed.
return JSG_KJ_EXCEPTION(FAILED, Error, "RPC stub used after being disposed.");
}
}
rpc::JsRpcTarget::Client JsRpcStub::getClientForOneCall(
jsg::Lock& js, kj::Vector<kj::StringPtr>& path) {
// (Don't extend `path` because we're the root.)
return getClient();
}
jsg::Ref<JsRpcStub> JsRpcStub::dup() {
return jsg::alloc<JsRpcStub>(IoContext::current().addObject(kj::heap(getClient())));
}
void JsRpcStub::dispose() {
capnpClient = kj::none;
KJ_IF_SOME(d, disposalGroup) {
d.list.remove(*this);
disposalGroup = kj::none;
}
}
void RpcStubDisposalGroup::disownAll() {
for (auto& stub: list) {
stub.disposalGroup = kj::none;
list.remove(stub);
}
}
void RpcStubDisposalGroup::disposeAll() {
for (auto& stub: list) {
stub.dispose();
}
callPipeline = kj::none;
// Each stub should have removed itself.
KJ_ASSERT(list.empty());
}
kj::Maybe<jsg::Ref<JsRpcProperty>> JsRpcStub::getRpcMethod(
jsg::Lock& js, kj::String name) {
// Do not return a method for `then`, otherwise JavaScript decides this is a thenable, i.e. a
// custom Promise, which will mean a Promise that resolves to this object will attempt to chain
// with it, which is not what you want!
if (name == "then"_kj) return kj::none;
return jsg::alloc<JsRpcProperty>(JSG_THIS, kj::mv(name));
}
void JsRpcStub::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
auto& handler = JSG_REQUIRE_NONNULL(serializer.getExternalHandler(), DOMDataCloneError,
"Remote RPC references can only be serialized for RPC.");
auto externalHandler = dynamic_cast<RpcSerializerExternalHander*>(&handler);
JSG_REQUIRE(externalHandler != nullptr, DOMDataCloneError,
"Remote RPC references can only be serialized for RPC.");
externalHandler->write([cap = getClient()](rpc::JsValue::External::Builder builder) mutable {
builder.setRpcTarget(kj::mv(cap));
});
// Sending a stub over RPC implicitly disposes the stub. The application can explicitly .dup() it
// if this is undesired.
dispose();
}
jsg::Ref<JsRpcStub> JsRpcStub::deserialize(
jsg::Lock& js, rpc::SerializationTag tag, jsg::Deserializer& deserializer) {
auto& handler = KJ_REQUIRE_NONNULL(deserializer.getExternalHandler(),
"got JsRpcStub on non-RPC serialized object?");
auto externalHandler = dynamic_cast<RpcDeserializerExternalHander*>(&handler);
KJ_REQUIRE(externalHandler != nullptr, "got JsRpcStub on non-RPC serialized object?");
auto reader = externalHandler->read();
KJ_REQUIRE(reader.isRpcTarget(), "external table slot type doesn't match serialization tag");
auto& ioctx = IoContext::current();
return jsg::alloc<JsRpcStub>(ioctx.addObject(kj::heap(reader.getRpcTarget())),
externalHandler->getDisposalGroup());
}
static bool isFunctionForRpc(jsg::Lock& js, v8::Local<v8::Function> func) {
jsg::JsObject obj(func);
if (obj.isInstanceOf<JsRpcProperty>(js) || obj.isInstanceOf<JsRpcPromise>(js)) {
// Don't allow JsRpcProperty or JsRpcPromise to be treated as plain functions, even though they
// are technically callable. These types need to be treated specially (if we decide to let
// them be passed over RPC at all).
return false;
}
return true;
}
static bool isFunctionForRpc(jsg::Lock& js, jsg::JsValue value) {
if (!value.isFunction()) return false;
return isFunctionForRpc(js, v8::Local<v8::Value>(value).As<v8::Function>());
}
static inline bool isFunctionForRpc(jsg::Lock& js, v8::Local<v8::Value> val) {
return isFunctionForRpc(js, jsg::JsValue(val));
}
static inline bool isFunctionForRpc(jsg::Lock& js, jsg::JsObject val) {
return isFunctionForRpc(js, jsg::JsValue(val));
}
// `makeCallPipeline()` has a bit of a complicated result type..
namespace MakeCallPipeline {
// The value is an object, which may have stubs inside it.
struct Object {
rpc::JsRpcTarget::Client cap;
// Was the value a plain JavaScript object which had a custom dispose() method?
bool hasDispose;
};
// The value was something that should serialize to a single stub (e.g. it was an RpcTarget, a
// plain function, or already a stub). The callPipeline should simply be a copy of that stub.
struct SingleStub {};
// The value is not a type that supports pipelining. It may still be serializable, and it could
// even contain stubs (e.g. in a Map).
struct NonPipelinable {};
using Result = kj::OneOf<Object, SingleStub, NonPipelinable>;
};
// Create the callPipeline for a call result.
//
// Defined later in this file.
static MakeCallPipeline::Result makeCallPipeline(jsg::Lock& js, jsg::JsValue value);
// Callee-side implementation of JsRpcTarget.
//
// Most of the implementation is in this base class. There are subclasses specializing for the case
// of a top-level entrypoint vs. a transient object introduced by a previous RPC in the same
// session.
class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
public:
JsRpcTargetBase(IoContext& ctx)
: weakIoContext(ctx.getWeakRef()) {}
struct EnvCtx {
v8::Local<v8::Value> env;
jsg::JsObject ctx;
};
struct TargetInfo {
// The object on which the RPC method should be invoked.
jsg::JsObject target;
// If `env` and `ctx` need to be delivered as arguments to the method, these are the values
// to deliver.
kj::Maybe<EnvCtx> envCtx;
bool allowInstanceProperties;
};
// Get the object on which the method is to be invoked. This is virtual so that we can have
// separate subclasses handling the case of an entrypoint vs. a transient RPC object.
virtual TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) = 0;
// Handles the delivery of JS RPC method calls.
kj::Promise<void> call(CallContext callContext) override {
IoContext& ctx = JSG_REQUIRE_NONNULL(weakIoContext->tryGet(), Error,
"The destination object for this RPC no longer exists.");
// HACK: Cap'n Proto call contexts are documented as being pointer-like types where the backing
// object's lifetime is that of the RPC call, but in reality they are refcounted under the
// hood. Since well be executing the call in the JS microtask queue, we have no ability to
// actually cancel execution if a cancellation arrives over RPC, and at the end of that
// execution we're going to accell the call context to write the results. We could invent some
// complicated way to skip initializing results in the case the call has been canceled, but
// it's easier and safer to just grap a refcount on the call context object itself, which
// fully protects us. So... do that.
auto ownCallContext = capnp::CallContextHook::from(callContext).addRef();
// Try to execute the requested method.
auto promise = ctx.run(
[this, &ctx, callContext, ownCallContext = kj::mv(ownCallContext), ownThis = thisCap()]
(Worker::Lock& lock) mutable -> kj::Promise<void> {
jsg::Lock& js = lock;
auto targetInfo = getTargetInfo(lock, ctx);
auto params = callContext.getParams();
// We will try to get the function, if we can't we'll throw an error to the client.
auto [propHandle, thisArg, methodNameForErrors] = tryGetProperty(
lock, targetInfo.target, params, targetInfo.allowInstanceProperties);
auto op = params.getOperation();
auto handleResult = [&](InvocationResult&& invocationResult) {
// Given a handle for the result, if it's a promise, await the promise, then serialize the
// final result for return.
kj::Maybe<kj::Own<kj::PromiseFulfiller<rpc::JsRpcTarget::Client>>> callPipelineFulfiller;
// We need another ref to this fulfiller for the error callback. It can rely on being
// destroyed at the same time as the success callback.
kj::Maybe<kj::PromiseFulfiller<rpc::JsRpcTarget::Client>&> callPipelineFulfillerRef;
KJ_IF_SOME(ss, invocationResult.streamSink) {
// Since we have a StreamSink, it's important that we hook up the pipeline for that
// immediately. Annoyingly, that also means we need to hook up a pipeline for
// callPipeline, which we don't actually have yet, so we need to promise-ify it.
auto paf = kj::newPromiseAndFulfiller<rpc::JsRpcTarget::Client>();
callPipelineFulfillerRef = *paf.fulfiller;
callPipelineFulfiller = kj::mv(paf.fulfiller);
capnp::PipelineBuilder<rpc::JsRpcTarget::CallResults> builder(16);
builder.setCallPipeline(kj::mv(paf.promise));
builder.setParamsStreamSink(ss);
callContext.setPipeline(builder.build());
}
auto result = ctx.awaitJs(js, js.toPromise(invocationResult.returnValue)
.then(js, ctx.addFunctor(
[callContext, ownCallContext = kj::mv(ownCallContext),
paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup),
paramsStreamSink = kj::mv(invocationResult.streamSink),
resultStreamSink = params.getResultsStreamSink(),
callPipelineFulfiller = kj::mv(callPipelineFulfiller)]
(jsg::Lock& js, jsg::Value value) mutable {
jsg::JsValue resultValue(value.getHandle(js));
// Call makeCallPipeline before serializing becaues it may need to extract the disposer.