/
coda_net2.ml
1560 lines (1385 loc) · 50.7 KB
/
coda_net2.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
open Core
open Async
open Async_unix
open Deferred.Let_syntax
open Pipe_lib
open Network_peer
(** simple types for yojson to derive, later mapped into a Peer.t *)
type peer_info = {libp2p_port: int; host: string; peer_id: string}
[@@deriving yojson]
let peer_of_peer_info peer_info =
Peer.create
(Unix.Inet_addr.of_string peer_info.host)
~libp2p_port:peer_info.libp2p_port
~peer_id:(Peer.Id.unsafe_of_string peer_info.peer_id)
let of_b64_data = function
| `String s -> (
match Base64.decode s with
| Ok result ->
Ok result
| Error (`Msg s) ->
Or_error.error_string ("invalid base64: " ^ s) )
| _ ->
Or_error.error_string "expected a string"
let to_b64_data (s : string) = Base64.encode_string ~pad:true s
let to_int_res x =
match Yojson.Safe.Util.to_int_option x with
| Some i ->
Ok i
| None ->
Or_error.error_string "needed an int"
module Keypair0 = struct
[%%versioned
module Stable = struct
module V1 = struct
type t = {secret: string; public: string; peer_id: Peer.Id.Stable.V1.t}
let to_latest = Fn.id
end
end]
end
type stream_state =
| FullyOpen (** Streams start in this state. Both sides can still write *)
| HalfClosed of [`Us | `Them]
(** Streams move from [FullyOpen] to [HalfClosed `Us] when the write pipe is closed. Streams move from [FullyOpen] to [HalfClosed `Them] when [Stream.reset] is called or the remote host closes their write stream. *)
| FullyClosed
(** Streams move from [HalfClosed peer] to FullyClosed once the party that isn't peer has their "close write" event. Once a stream is FullyClosed, its resources are released. *)
[@@deriving show]
type erased_magic = [`Be_very_careful_to_be_type_safe]
module Go_log = struct
let ours_of_go lvl =
let open Logger.Level in
match lvl with
| "error" | "panic" | "fatal" ->
Error
| "warn" ->
Warn
| "info" ->
Info
| "debug" ->
Debug
| _ ->
Spam
(* there should be no other levels. *)
type record =
{ ts: string
; module_: string [@key "logger"]
; level: string
; msg: string
; error: string [@default ""] }
[@@deriving of_yojson]
let record_to_message r =
Logger.Message.
{ timestamp= Time.of_string r.ts
; level= ours_of_go r.level
; source=
Some
(Logger.Source.create
~module_:(sprintf "Libp2p_helper.Go.%s" r.module_)
~location:"(not tracked)")
; message= r.msg
; metadata=
( if r.error <> "" then
String.Map.singleton "go_error" (`String r.error)
else String.Map.empty )
; event_id= None }
end
module Helper = struct
type t =
{ subprocess: Child_processes.t
; conf_dir: string
; outstanding_requests: (int, Yojson.Safe.t Or_error.t Ivar.t) Hashtbl.t
(**
seqno is used to assign unique IDs to our outbound requests and index the
tables below.
The helper can also generate sequence numbers- but they are not the same space
of sequence numbers!
In general, if a message contains a seqno/idx, the response should contain the
same seqno/idx.
Some types would make it harder to misuse these integers.
*)
; mutable seqno: int
; logger: Logger.t
; me_keypair: Keypair0.t Ivar.t
; subscriptions: (int, erased_magic subscription) Hashtbl.t
; streams: (int, stream) Hashtbl.t
; protocol_handlers: (string, protocol_handler) Hashtbl.t
; mutable banned_ips: Unix.Inet_addr.t list
; mutable new_peer_callback: (string -> string list -> unit) option
; mutable finished: bool }
and 'a subscription =
{ net: t
; topic: string
; idx: int
; mutable closed: bool
; validator: 'a Envelope.Incoming.t -> bool Deferred.t
; encode: 'a -> string
; on_decode_failure:
[`Ignore | `Call of string Envelope.Incoming.t -> Error.t -> unit]
; decode: string -> 'a Or_error.t
; write_pipe:
( 'a Envelope.Incoming.t
, Strict_pipe.synchronous
, unit Deferred.t )
Strict_pipe.Writer.t
; read_pipe: 'a Envelope.Incoming.t Strict_pipe.Reader.t }
and stream =
{ net: t
; idx: int
; mutable state: stream_state
; mutable state_lock: bool
; state_wait: unit Async.Condition.t
; protocol: string
; peer: Peer.t
; incoming_r: string Pipe.Reader.t
; incoming_w: string Pipe.Writer.t
; outgoing_r: string Pipe.Reader.t
; outgoing_w: string Pipe.Writer.t }
and protocol_handler =
{ net: t
; protocol_name: string
; mutable closed: bool
; on_handler_error: [`Raise | `Ignore | `Call of stream -> exn -> unit]
; f: stream -> unit Deferred.t }
module type Rpc = sig
type input [@@deriving to_yojson]
type output [@@deriving of_yojson]
val name : string
end
type ('a, 'b) rpc = (module Rpc with type input = 'a and type output = 'b)
module Data : sig
type t [@@deriving yojson]
val pack_data : string -> t
val to_string : t -> string
end = struct
type t = string
let encode_string t = Base64.encode_string ~pad:true t
let decode_string s = Base64.decode_exn s
let to_yojson s = `String (encode_string s)
let of_yojson = function
| `String s -> (
try Ok (decode_string s)
with exn -> Error Error.(to_string_hum (of_exn exn)) )
| _ ->
Error "expected a string"
let pack_data s = s
let to_string s = s
end
module Rpcs = struct
module No_input = struct
type input = unit
let input_to_yojson () = `Assoc []
end
module Send_stream_msg = struct
type input = {stream_idx: int; data: string} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "sendStreamMsg"
end
module Close_stream = struct
type input = {stream_idx: int} [@@deriving yojson]
type output = string [@@deriving yojson]
(* This RPC remains unused, see below for the commented out
Close_stream usage *)
let[@warning "-32"] name = "closeStream"
end
module Remove_stream_handler = struct
type input = {protocol: string} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "removeStreamHandler"
end
module Generate_keypair = struct
include No_input
type output = {sk: string; pk: string; peer_id: string}
[@@deriving yojson]
let name = "generateKeypair"
end
module Publish = struct
type input = {topic: string; data: Data.t} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "publish"
end
module Subscribe = struct
type input = {topic: string; subscription_idx: int} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "subscribe"
end
module Unsubscribe = struct
type input = {subscription_idx: int} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "unsubscribe"
end
module Configure = struct
type input =
{ privk: string
; statedir: string
; ifaces: string list
; external_maddr: string
; network_id: string
; unsafe_no_trust_ip: bool
; gossip_type: string
; seed_peers: string list }
[@@deriving yojson]
type output = string [@@deriving yojson]
let name = "configure"
end
module Listen = struct
type input = {iface: string} [@@deriving yojson]
type output = string list [@@deriving yojson]
let name = "listen"
end
module Listening_addrs = struct
include No_input
type output = string list [@@deriving yojson]
let name = "listeningAddrs"
end
module Reset_stream = struct
type input = {idx: int} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "resetStream"
end
module Add_stream_handler = struct
type input = {protocol: string} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "addStreamHandler"
end
module Open_stream = struct
type input = {peer: string; protocol: string} [@@deriving yojson]
type output = {stream_idx: int; peer: peer_info} [@@deriving yojson]
let name = "openStream"
end
module Validation_complete = struct
type input = {seqno: int; is_valid: bool} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "validationComplete"
end
module Add_peer = struct
type input = {multiaddr: string} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "addPeer"
end
module Begin_advertising = struct
include No_input
type output = string [@@deriving yojson]
let name = "beginAdvertising"
end
module List_peers = struct
include No_input
type output = peer_info list [@@deriving yojson]
let name = "listPeers"
end
module Find_peer = struct
type input = {peer_id: string} [@@deriving yojson]
type output = peer_info [@@deriving yojson]
let name = "findPeer"
end
module Set_gater_config = struct
type input = {banned_ips: string list ; banned_peers: string list; trusted_peers: string list; trusted_ips: string list; isolate: bool} [@@deriving yojson]
type output = string [@@deriving yojson]
let name = "setGaterConfig"
end
end
(** Generate the next sequence number for our side of the connection *)
let genseq t =
let v = t.seqno in
t.seqno <- t.seqno + 1 ;
v
(** [do_rpc net rpc body] will encode [body] as JSON according to [rpc],
send it to the helper, and return a deferred that resolves once the daemon
gets around to replying. *)
let do_rpc (type a b) t (rpc : (a, b) rpc) (body : a) : b Deferred.Or_error.t
=
let module M = (val rpc) in
if
(not t.finished)
&& (not @@ Writer.is_closed (Child_processes.stdin t.subprocess))
then (
let res = Ivar.create () in
let seqno = genseq t in
Hashtbl.add_exn t.outstanding_requests ~key:seqno ~data:res ;
let actual_obj =
`Assoc
[ ("seqno", `Int seqno)
; ("method", `String M.name)
; ("body", M.input_to_yojson body) ]
in
let rpc = Yojson.Safe.to_string actual_obj in
[%log' spam t.logger] "sending line to libp2p_helper: $line"
~metadata:
[ ( "line"
, `String (String.slice rpc 0 (Int.min (String.length rpc) 2048))
) ] ;
Writer.write_line (Child_processes.stdin t.subprocess) rpc ;
let%map res_json = Ivar.read res in
Or_error.bind res_json
~f:
(Fn.compose (Result.map_error ~f:Error.of_string) M.output_of_yojson) )
else
Deferred.Or_error.errorf "helper process already exited (doing RPC %s)"
(M.input_to_yojson body |> Yojson.Safe.to_string)
let stream_state_invariant stream logger =
let us_closed = Pipe.is_closed stream.outgoing_w in
let them_closed = Pipe.is_closed stream.incoming_w in
[%log trace] "%sus_closed && %sthem_closed"
(if us_closed then "" else "not ")
(if them_closed then "" else "not ") ;
match stream.state with
| FullyOpen ->
(not us_closed) && not them_closed
| HalfClosed `Us ->
us_closed && not them_closed
| HalfClosed `Them ->
(not us_closed) && them_closed
| FullyClosed ->
us_closed && them_closed
(** Advance the stream_state automata, closing pipes as necessary. This
executes atomically, using a bool + condition variable to synchronize
updates. *)
let advance_stream_state net (stream : stream) who_closed =
let name_participant = function
| `Us ->
"the local host"
| `Them ->
"the remote host"
in
let rec acquire_lock () =
if not stream.state_lock then (
stream.state_lock <- true ;
Deferred.unit )
else
let%bind () = Async.Condition.wait stream.state_wait in
acquire_lock ()
in
let%bind () = acquire_lock () in
let old_state = stream.state in
Monitor.protect
~finally:(fun () ->
stream.state_lock <- false ;
Async.Condition.signal stream.state_wait () ;
Deferred.unit )
(fun () ->
let%map () =
match who_closed with
| `Us ->
(* FIXME related to https://github.com/libp2p/go-libp2p-circuit/issues/18
"preemptive" or half-closing a stream doesn't actually seem supported:
after closing it we can't read anymore.*)
(*
match%map
do_rpc net (module Rpcs.Close_stream) {stream_idx= stream.idx}
with
| Ok "closeStream success" ->
()
| Ok v ->
failwithf "helper broke RPC protocol: closeStream got %s" v
()
| Error e ->
Error.raise e )
*)
Deferred.unit
| `Them ->
(* Helper notified us that the Go side closed its write pipe. *)
Pipe.close stream.incoming_w ;
Deferred.unit
in
let double_close () =
[%log' error net.logger]
"stream with index $index closed twice by $party"
~metadata:
[ ("index", `Int stream.idx)
; ("party", `String (name_participant who_closed)) ] ;
stream.state
in
(* replace with [%derive.eq : [`Us|`Them]] when it is supported.*)
let us_them_eq a b =
match (a, b) with `Us, `Us | `Them, `Them -> true | _, _ -> false
in
let release () =
match Hashtbl.find_and_remove net.streams stream.idx with
| Some _ ->
()
| None ->
[%log' error net.logger]
"tried to release stream $idx but it was already gone"
~metadata:[("idx", `Int stream.idx)]
in
stream.state
<- ( match old_state with
| FullyOpen ->
HalfClosed who_closed
| HalfClosed other ->
if us_them_eq other who_closed then ignore (double_close ())
else release () ;
FullyClosed
| FullyClosed ->
double_close () ) ;
(* TODO: maybe we can check some invariants on the Go side too? *)
if not (stream_state_invariant stream net.logger) then
[%log' error net.logger]
"after $who_closed closed the stream, stream state invariant \
broke (previous state: $old_stream_state)"
~metadata:
[ ("who_closed", `String (name_participant who_closed))
; ("old_stream_state", `String (show_stream_state old_state)) ]
)
(** Track a new stream.
This is used for both newly created outbound streams and incoming streams, and
spawns the task that sends outbound messages to the helper.
Our writing end of the stream will be automatically be closed once the
write pipe is closed.
*)
let make_stream net idx protocol remote_peer_info =
let incoming_r, incoming_w = Pipe.create () in
let outgoing_r, outgoing_w = Pipe.create () in
let peer =
Peer.create
(Unix.Inet_addr.of_string remote_peer_info.host)
~libp2p_port:remote_peer_info.libp2p_port
~peer_id:(Peer.Id.unsafe_of_string remote_peer_info.peer_id)
in
let stream =
{ net
; idx
; state= FullyOpen
; state_lock= false
; state_wait= Async.Condition.create ()
; peer
; protocol
; incoming_r
; incoming_w
; outgoing_r
; outgoing_w }
in
let outgoing_loop () =
let%bind () =
Pipe.iter outgoing_r ~f:(fun msg ->
match%map
do_rpc net
(module Rpcs.Send_stream_msg)
{stream_idx= idx; data= to_b64_data msg}
with
| Ok "sendStreamMsg success" ->
()
| Ok v ->
failwithf "helper broke RPC protocol: sendStreamMsg got %s" v
()
| Error e ->
[%log' error net.logger]
"error sending message on stream $idx: $error"
~metadata:
[ ("idx", `Int idx)
; ("error", `String (Error.to_string_hum e)) ] ;
Pipe.close outgoing_w )
in
advance_stream_state net stream `Us
in
don't_wait_for (outgoing_loop ()) ;
stream
(** Parses a normal RPC response and resolves the deferred it answers. *)
let handle_response t v =
let open Yojson.Safe.Util in
let open Or_error.Let_syntax in
let%bind seq = v |> member "seqno" |> to_int_res in
let err = v |> member "error" in
let res = v |> member "success" in
if not (Int.equal seq 0) then
let fill_result =
match (err, res) with
| `Null, r ->
Ok r
| e, `Null ->
Or_error.errorf "RPC #%d failed: %s" seq (Yojson.Safe.to_string e)
| _, _ ->
Or_error.errorf "unexpected response to RPC #%d: %s" seq
(Yojson.Safe.to_string v)
in
match Hashtbl.find_and_remove t.outstanding_requests seq with
| Some ivar ->
Ivar.fill ivar fill_result ; Ok ()
| None ->
Or_error.errorf "spurious reply to RPC #%d: %s" seq
(Yojson.Safe.to_string v)
else (
[%log' error t.logger] "important info from helper: %s"
(Yojson.Safe.to_string err) ;
Ok () )
(** Parses an "upcall" and performs it.
An upcall is like an RPC from the helper to us.*)
module Upcall = struct
module Publish = struct
type t =
{ upcall: string
; subscription_idx: int
; sender: peer_info option
; data: Data.t }
[@@deriving yojson]
end
module Validate = struct
type t =
{ sender: peer_info option
; data: Data.t
; seqno: int
; upcall: string
; subscription_idx: int }
[@@deriving yojson]
end
module Stream_lost = struct
type t = {upcall: string; stream_idx: int; reason: string}
[@@deriving yojson]
end
module Stream_read_complete = struct
type t = {upcall: string; stream_idx: int} [@@deriving yojson]
end
module Incoming_stream_msg = struct
type t = {upcall: string; stream_idx: int; data: Data.t}
[@@deriving yojson]
end
module Incoming_stream = struct
type t =
{upcall: string; peer: peer_info; stream_idx: int; protocol: string}
[@@deriving yojson]
end
module Discovered_peer = struct
type t = {upcall: string; peer_id: string; multiaddrs: string list}
[@@deriving yojson]
end
let or_error (t : ('a, string) Result.t) =
match t with
| Ok a ->
Ok a
| Error s ->
Or_error.errorf !"Error converting from json: %s" s
end
let lookup_peerid net peer_id =
match%map do_rpc net (module Rpcs.Find_peer) {peer_id} with
| Ok peer_info ->
Ok
(Peer.create
(Unix.Inet_addr.of_string peer_info.host)
~libp2p_port:peer_info.libp2p_port ~peer_id:peer_info.peer_id)
| Error e ->
Error e
let handle_upcall t v =
let open Yojson.Safe.Util in
let open Or_error.Let_syntax in
let open Upcall in
let wrap sender data =
match sender with
| Some sender ->
if
String.equal sender.host "127.0.0.1"
&& Int.equal sender.libp2p_port 0
then Envelope.Incoming.local data
else
Envelope.Incoming.wrap_peer ~sender:(peer_of_peer_info sender)
~data
| None ->
Envelope.Incoming.local data
in
match member "upcall" v |> to_string with
(* Message published on one of our subscriptions *)
| "publish" -> (
let%bind m = Publish.of_yojson v |> or_error in
let _me =
Ivar.peek t.me_keypair
|> Option.value_exn
~message:
"How did we receive pubsub before configuring our keypair?"
in
(*if
Option.fold m.sender ~init:false ~f:(fun _ sender ->
Peer.Id.equal sender.peer_id me.peer_id )
then (
[%log trace]
"not handling published message originated from me";
(* elide messages that we sent *) return () )
else*)
let idx = m.subscription_idx in
let data = m.data in
match Hashtbl.find t.subscriptions idx with
| Some sub ->
if not sub.closed then (
let raw_data = Data.to_string data in
let decoded = sub.decode raw_data in
match decoded with
| Ok data ->
(* TAKE CARE: doing anything with the return
value here except ignore is UNSOUND because
write_pipe has a cast type. We don't remember
what the original 'return was. *)
Strict_pipe.Writer.write sub.write_pipe (wrap m.sender data)
|> ignore
| Error e ->
( match sub.on_decode_failure with
| `Ignore ->
()
| `Call f ->
f (wrap m.sender raw_data) e ) ;
[%log' error t.logger]
"failed to decode message published on subscription \
$topic ($idx): $error"
~metadata:
[ ("topic", `String sub.topic)
; ("idx", `Int idx)
; ("error", `String (Error.to_string_hum e)) ] ;
()
(* TODO: add sender to Publish.t and include it here. *)
(* TODO: think about exposing the PeerID of the originator as well? *) )
else
[%log' debug t.logger]
"received msg for subscription $sub after unsubscribe, was it \
still in the stdout pipe?"
~metadata:[("sub", `Int idx)] ;
Ok ()
| None ->
Or_error.errorf
"message published with inactive subsubscription %d" idx )
(* Validate a message received on a subscription *)
| "validate" -> (
let%bind m = Validate.of_yojson v |> or_error in
let idx = m.subscription_idx in
let seqno = m.seqno in
match Hashtbl.find t.subscriptions idx with
| Some sub ->
(let open Deferred.Let_syntax in
let raw_data = Data.to_string m.data in
let decoded = sub.decode raw_data in
let%bind is_valid =
match decoded with
| Ok data ->
sub.validator (wrap m.sender data)
| Error e ->
( match sub.on_decode_failure with
| `Ignore ->
()
| `Call f ->
f (wrap m.sender raw_data) e ) ;
[%log' error t.logger]
"failed to decode message published on subscription \
$topic ($idx): $error"
~metadata:
[ ("topic", `String sub.topic)
; ("idx", `Int idx)
; ("error", `String (Error.to_string_hum e)) ] ;
return false
in
match%map
do_rpc t (module Rpcs.Validation_complete) {seqno; is_valid}
with
| Ok "validationComplete success" ->
()
| Ok v ->
failwithf
"helper broke RPC protocol: validationComplete got %s" v ()
| Error e ->
[%log' error t.logger]
"error during validationComplete, ignoring and continuing: \
$error"
~metadata:[("error", `String (Error.to_string_hum e))])
|> don't_wait_for ;
Ok ()
| None ->
Or_error.errorf
"asked to validate message for unregistered subscription idx %d"
idx )
(* A new inbound stream was opened *)
| "incomingStream" -> (
let%bind m = Incoming_stream.of_yojson v |> or_error in
let stream_idx = m.stream_idx in
let protocol = m.protocol in
let stream = make_stream t stream_idx protocol m.peer in
match Hashtbl.find t.protocol_handlers protocol with
| Some ph ->
if not ph.closed then (
Hashtbl.add_exn t.streams ~key:stream_idx ~data:stream ;
don't_wait_for
(let open Deferred.Let_syntax in
(* Call the protocol handler. If it throws an exception,
handle it according to [on_handler_error]. Mimics
[Tcp.Server.create]. See [handle_protocol] doc comment.
*)
match%map
Monitor.try_with ~extract_exn:true (fun () -> ph.f stream)
with
| Ok () ->
()
| Error e -> (
try
match ph.on_handler_error with
| `Raise ->
raise e
| `Ignore ->
()
| `Call f ->
f stream e
with handler_exn ->
ph.closed <- true ;
don't_wait_for
( do_rpc t (module Rpcs.Remove_stream_handler) {protocol}
>>| fun _ -> Hashtbl.remove t.protocol_handlers protocol
) ;
raise handler_exn )) ;
Ok () )
else
(* silently ignore new streams for closed protocol handlers.
these are buffered stream open RPCs that were enqueued before
our close went into effect. *)
(* TODO: we leak the new pipes here*)
Ok ()
| None ->
(* TODO: punish *)
Or_error.errorf "incoming stream for protocol we don't know about?"
)
| "discoveredPeer" ->
let%map p = Discovered_peer.of_yojson v |> or_error in
Option.iter t.new_peer_callback ~f:(fun cb -> cb p.peer_id p.multiaddrs)
(* Received a message on some stream *)
| "incomingStreamMsg" -> (
let%bind m = Incoming_stream_msg.of_yojson v |> or_error in
match Hashtbl.find t.streams m.stream_idx with
| Some {incoming_w; _} ->
don't_wait_for
(Pipe.write_if_open incoming_w (Data.to_string m.data)) ;
Ok ()
| None ->
Or_error.errorf
"incoming stream message for stream we don't know about?" )
(* Stream was reset, either by the remote peer or an error on our end. *)
| "streamLost" ->
let%bind m = Stream_lost.of_yojson v |> or_error in
let stream_idx = m.stream_idx in
[%log' trace t.logger]
"Encountered error while reading stream $idx: $error"
~metadata:[("error", `String m.reason); ("idx", `Int stream_idx)] ;
Ok ()
(* The remote peer closed its write end of one of our streams *)
| "streamReadComplete" -> (
let%bind m = Stream_read_complete.of_yojson v |> or_error in
let stream_idx = m.stream_idx in
match Hashtbl.find t.streams stream_idx with
| Some stream ->
advance_stream_state t stream `Them |> don't_wait_for ;
Ok ()
| None ->
Or_error.errorf
"streamReadComplete for stream we don't know about %d" stream_idx
)
| s ->
Or_error.errorf "unknown upcall %s" s
end [@(* Warning 30 is about field labels being defined in multiple types.
It means more disambiguation has to happen sometimes but it doesn't
seem to be a big deal. *)
warning
"-30"]
type net = Helper.t
module Keypair = struct
include Keypair0
let random net =
match%map Helper.do_rpc net (module Helper.Rpcs.Generate_keypair) () with
| Ok {sk; pk; peer_id} ->
(let open Or_error.Let_syntax in
let%bind secret = of_b64_data (`String sk) in
let%map public = of_b64_data (`String pk) in
({secret; public; peer_id= Peer.Id.unsafe_of_string peer_id} : t))
|> Or_error.ok_exn
| Error e ->
failwithf "other RPC error generateKeypair: %s" (Error.to_string_hum e)
()
let secret_key_base64 ({secret; _} : t) = to_b64_data secret
let to_string ({secret; public; peer_id} : t) =
String.concat ~sep:","
[to_b64_data secret; to_b64_data public; Peer.Id.to_string peer_id]
let of_string s =
let parse_with_sep sep =
match String.split s ~on:sep with
| [secret_b64; public_b64; peer_id] ->
let open Or_error.Let_syntax in
let%map secret = of_b64_data (`String secret_b64)
and public = of_b64_data (`String public_b64) in
({secret; public; peer_id= Peer.Id.unsafe_of_string peer_id} : t)
| _ ->
Or_error.errorf "%s is not a valid Keypair.to_string output" s
in
let with_semicolon = parse_with_sep ';' in
let with_comma = parse_with_sep ',' in
if Or_error.is_error with_semicolon then with_comma else with_semicolon
let to_peer_id ({peer_id; _} : t) = peer_id
end
module Multiaddr = struct
type t = string
let to_string t = t
let of_string t = t
end
type discovered_peer = {id: Peer.Id.t; maddrs: Multiaddr.t list}
module Pubsub = struct
let publish net ~topic ~data =
match%map
Helper.do_rpc net
(module Helper.Rpcs.Publish)
{topic; data= Helper.Data.pack_data data}
with
| Ok "publish success" ->
()
| Ok v ->
failwithf "helper broke RPC protocol: publish got %s" v ()
| Error e ->
[%log' error net.logger]
"error while publishing message on $topic: $err"
~metadata:
[("topic", `String topic); ("err", `String (Error.to_string_hum e))]
module Subscription = struct
type 'a t = 'a Helper.subscription =
{ net: Helper.t
; topic: string
; idx: int
; mutable closed: bool
; validator: 'a Envelope.Incoming.t -> bool Deferred.t
; encode: 'a -> string
; on_decode_failure:
[`Ignore | `Call of string Envelope.Incoming.t -> Error.t -> unit]
; decode: string -> 'a Or_error.t
; write_pipe:
( 'a Envelope.Incoming.t
, Strict_pipe.synchronous
, unit Deferred.t )
Strict_pipe.Writer.t
; read_pipe: 'a Envelope.Incoming.t Strict_pipe.Reader.t }
let publish {net; topic; encode; _} message =
publish net ~topic ~data:(encode message)
let unsubscribe ({net; idx; write_pipe; _} as t) =
if not t.closed then (
t.closed <- true ;
Strict_pipe.Writer.close write_pipe ;
match%map
Helper.do_rpc net
(module Helper.Rpcs.Unsubscribe)
{subscription_idx= idx}
with
| Ok "unsubscribe success" ->
Ok ()
| Ok v ->
failwithf "helper broke RPC protocol: unsubscribe got %s" v ()
| Error e ->
Error e )
else Deferred.Or_error.error_string "already unsubscribed"
let message_pipe {read_pipe; _} = read_pipe
end
let subscribe_raw (net : net) (topic : string) ~should_forward_message
~encode ~decode ~on_decode_failure =
let subscription_idx = Helper.genseq net in
let read_pipe, write_pipe =
Strict_pipe.(
create ~name:(sprintf "subscription to topic «%s»" topic) Synchronous)
in