-
Notifications
You must be signed in to change notification settings - Fork 86
/
P2P.hs
1260 lines (1132 loc) · 48.5 KB
/
P2P.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
#if !defined(mingw32_HOST_OS)
#define POSIX
#endif
-- | This module is expected to be imported qualified (it will clash
-- with the "Ouroboros.Network.Diffusion.NonP2P").
--
module Ouroboros.Network.Diffusion.P2P
( TracersExtra (..)
, nullTracers
, ArgumentsExtra (..)
, AcceptedConnectionsLimit (..)
, ApplicationsExtra (..)
, run
, Interfaces (..)
, runM
, NodeToNodePeerConnectionHandle
-- * Re-exports
, AbstractTransitionTrace
, RemoteTransitionTrace
) where
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar (MonadMVar)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (IOException)
import Control.Monad.Class.MonadAsync (Async, MonadAsync)
import Control.Monad.Class.MonadAsync qualified as Async
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Monad.Fix (MonadFix)
import Control.Tracer (Tracer, contramap, nullTracer, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Foldable (asum)
import Data.Hashable (Hashable)
import Data.IP (IP)
import Data.IP qualified as IP
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map (Map)
import Data.Maybe (catMaybes, maybeToList)
import Data.Typeable (Typeable)
import Data.Void (Void)
import System.Exit (ExitCode)
import System.Random (StdGen, newStdGen, split)
#ifdef POSIX
import System.Posix.Signals qualified as Signals
#endif
import Network.Socket (Socket)
import Network.Socket qualified as Socket
import Network.Mux as Mx (MakeBearer)
import Ouroboros.Network.Snocket (FileDescriptor, LocalAddress,
LocalSocket (..), Snocket, localSocketFileDescriptor,
makeLocalBearer, makeSocketBearer)
import Ouroboros.Network.Snocket qualified as Snocket
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.ConnectionId
import Ouroboros.Network.Context (ExpandedInitiatorContext, ResponderContext)
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec
import Ouroboros.Network.Protocol.Handshake.Version
import Ouroboros.Network.Socket (configureSocket, configureSystemdSocket)
import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionManager.Core
import Ouroboros.Network.ConnectionManager.InformationChannel
(InformationChannel (..), newInformationChannel)
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Diffusion.Common hiding (nullTracers)
import Ouroboros.Network.Diffusion.Policies qualified as Diffusion.Policies
import Ouroboros.Network.Diffusion.Utils
import Ouroboros.Network.ExitPolicy
import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..),
RemoteTransitionTrace)
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux hiding (MiniProtocol (..))
import Ouroboros.Network.MuxMode
import Ouroboros.Network.NodeToClient (NodeToClientVersion (..),
NodeToClientVersionData)
import Ouroboros.Network.NodeToClient qualified as NodeToClient
import Ouroboros.Network.NodeToNode (AcceptedConnectionsLimit (..),
DiffusionMode (..), NodeToNodeVersion (..),
NodeToNodeVersionData (..), RemoteAddress)
import Ouroboros.Network.NodeToNode qualified as NodeToNode
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers)
import Ouroboros.Network.PeerSelection.Governor qualified as Governor
import Ouroboros.Network.PeerSelection.Governor.Types
(ChurnMode (ChurnModeNormal), DebugPeerSelection (..),
PeerSelectionActions, PeerSelectionCounters,
PeerSelectionPolicy (..), PeerSelectionState,
TracePeerSelection (..), emptyPeerSelectionCounters,
emptyPeerSelectionState)
#ifdef POSIX
import Ouroboros.Network.PeerSelection.Governor.Types
(makeDebugPeerSelectionState)
#endif
import Ouroboros.Network.PeerSelection.LedgerPeers (TraceLedgerPeers,
WithLedgerPeersArgs (..))
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface (..), UseLedgerPeers)
#ifdef POSIX
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics,
fetchynessBlocks, upstreamyness)
#else
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics)
#endif
import Ouroboros.Network.PeerSelection.PeerSelectionActions
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.PeerSelection.PeerStateActions (PeerConnectionHandle,
PeerSelectionActionsTrace (..), PeerStateActionsArguments (..),
pchPeerSharing, withPeerStateActions)
import Ouroboros.Network.PeerSelection.PeerTrustable (PeerTrustable)
import Ouroboros.Network.PeerSelection.RelayAccessPoint (RelayAccessPoint)
import Ouroboros.Network.PeerSelection.RootPeersDNS
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions (DNSActions,
DNSLookupType (..), ioDNSActions)
import Ouroboros.Network.PeerSelection.RootPeersDNS.LocalRootPeers
(TraceLocalRootPeers)
import Ouroboros.Network.PeerSelection.RootPeersDNS.PublicRootPeers
(TracePublicRootPeers)
import Ouroboros.Network.PeerSelection.State.LocalRootPeers (HotValency,
WarmValency)
import Ouroboros.Network.PeerSharing (PeerSharingRegistry (..))
import Ouroboros.Network.RethrowPolicy
import Ouroboros.Network.Server2 (ServerArguments (..), ServerTrace (..))
import Ouroboros.Network.Server2 qualified as Server
-- | P2P DiffusionTracers Extras
--
data TracersExtra ntnAddr ntnVersion ntnVersionData
ntcAddr ntcVersion ntcVersionData
resolverError m =
TracersExtra {
dtTraceLocalRootPeersTracer
:: Tracer m (TraceLocalRootPeers ntnAddr resolverError)
, dtTracePublicRootPeersTracer
:: Tracer m TracePublicRootPeers
-- | Ledger Peers tracer
, dtTraceLedgerPeersTracer
:: Tracer m TraceLedgerPeers
, dtTracePeerSelectionTracer
:: Tracer m (TracePeerSelection ntnAddr)
, dtDebugPeerSelectionInitiatorTracer
:: Tracer m (DebugPeerSelection ntnAddr)
-- TODO: can be unified with the previous one
, dtDebugPeerSelectionInitiatorResponderTracer
:: Tracer m (DebugPeerSelection ntnAddr)
, dtTracePeerSelectionCounters
:: Tracer m PeerSelectionCounters
, dtTraceChurnCounters
:: Tracer m Governor.ChurnCounters
, dtPeerSelectionActionsTracer
:: Tracer m (PeerSelectionActionsTrace ntnAddr ntnVersion)
, dtConnectionManagerTracer
:: Tracer m (ConnectionManagerTrace
ntnAddr
(ConnectionHandlerTrace
ntnVersion
ntnVersionData))
, dtConnectionManagerTransitionTracer
:: Tracer m (AbstractTransitionTrace ntnAddr)
, dtServerTracer
:: Tracer m (ServerTrace ntnAddr)
, dtInboundGovernorTracer
:: Tracer m (InboundGovernorTrace ntnAddr)
, dtInboundGovernorTransitionTracer
:: Tracer m (RemoteTransitionTrace ntnAddr)
--
-- NodeToClient tracers
--
-- | Connection manager tracer for local clients
, dtLocalConnectionManagerTracer
:: Tracer m (ConnectionManagerTrace
ntcAddr
(ConnectionHandlerTrace
ntcVersion
ntcVersionData))
-- | Server tracer for local clients
, dtLocalServerTracer
:: Tracer m (ServerTrace ntcAddr)
-- | Inbound protocol governor tracer for local clients
, dtLocalInboundGovernorTracer
:: Tracer m (InboundGovernorTrace ntcAddr)
}
nullTracers :: Applicative m
=> TracersExtra ntnAddr ntnVersion ntnVersionData
ntcAddr ntcVersion ntcVersionData
resolverError m
nullTracers =
TracersExtra {
dtTraceLocalRootPeersTracer = nullTracer
, dtTracePublicRootPeersTracer = nullTracer
, dtTraceLedgerPeersTracer = nullTracer
, dtTracePeerSelectionTracer = nullTracer
, dtTraceChurnCounters = nullTracer
, dtDebugPeerSelectionInitiatorTracer = nullTracer
, dtDebugPeerSelectionInitiatorResponderTracer = nullTracer
, dtTracePeerSelectionCounters = nullTracer
, dtPeerSelectionActionsTracer = nullTracer
, dtConnectionManagerTracer = nullTracer
, dtConnectionManagerTransitionTracer = nullTracer
, dtServerTracer = nullTracer
, dtInboundGovernorTracer = nullTracer
, dtInboundGovernorTransitionTracer = nullTracer
, dtLocalConnectionManagerTracer = nullTracer
, dtLocalServerTracer = nullTracer
, dtLocalInboundGovernorTracer = nullTracer
}
-- | P2P Arguments Extras
--
data ArgumentsExtra m = ArgumentsExtra {
-- | selection targets for the peer governor
--
daPeerSelectionTargets :: PeerSelectionTargets
, daReadLocalRootPeers :: STM m [(HotValency, WarmValency, Map RelayAccessPoint (PeerAdvertise, PeerTrustable))]
, daReadPublicRootPeers :: STM m (Map RelayAccessPoint PeerAdvertise)
, daReadUseBootstrapPeers :: STM m UseBootstrapPeers
-- | Peer's own PeerSharing value.
--
-- This value comes from the node's configuration file and is static.
, daOwnPeerSharing :: PeerSharing
, daReadUseLedgerPeers :: STM m UseLedgerPeers
-- | Timeout which starts once all responder protocols are idle. If the
-- responders stay idle for duration of the timeout, the connection will
-- be demoted, if it wasn't used by the p2p-governor it will be closed.
--
-- Applies to 'Unidirectional' as well as 'Duplex' /node-to-node/
-- connections.
--
-- See 'serverProtocolIdleTimeout'.
--
, daProtocolIdleTimeout :: DiffTime
-- | Time for which /node-to-node/ connections are kept in
-- 'TerminatingState', it should correspond to the OS configured @TCP@
-- @TIME_WAIT@ timeout.
--
-- This timeout will apply to after a connection has been closed, its
-- purpose is to be resilient for delayed packets in the same way @TCP@
-- is using @TIME_WAIT@.
--
, daTimeWaitTimeout :: DiffTime
-- | Churn interval between churn events in deadline mode. A small fuzz
-- is added (max 10 minutes) so that not all nodes churn at the same time.
--
-- By default it is set to 3300 seconds.
--
, daDeadlineChurnInterval :: DiffTime
-- | Churn interval between churn events in bulk sync mode. A small fuzz
-- is added (max 1 minute) so that not all nodes churn at the same time.
--
-- By default it is set to 300 seconds.
--
, daBulkChurnInterval :: DiffTime
}
--
-- Constants
--
-- | Protocol inactivity timeout for local (e.g. /node-to-client/) connections.
--
local_PROTOCOL_IDLE_TIMEOUT :: DiffTime
local_PROTOCOL_IDLE_TIMEOUT = 2 -- 2 seconds
-- | Used to set 'cmWaitTimeout' for local (e.g. /node-to-client/) connections.
--
local_TIME_WAIT_TIMEOUT :: DiffTime
local_TIME_WAIT_TIMEOUT = 0
socketAddressType :: Socket.SockAddr -> Maybe AddressType
socketAddressType Socket.SockAddrInet {} = Just IPv4Address
socketAddressType Socket.SockAddrInet6 {} = Just IPv6Address
socketAddressType Socket.SockAddrUnix {} = Nothing
-- | P2P Applications Extras
--
-- TODO: we need initiator only mode for Daedalus, there's no reason why it
-- should run a node-to-node server side.
--
data ApplicationsExtra ntnAddr m a =
ApplicationsExtra {
-- | /node-to-node/ rethrow policy
--
daRethrowPolicy :: RethrowPolicy
-- | /node-to-node/ return policy
--
, daReturnPolicy :: ReturnPolicy a
-- | /node-to-client/ rethrow policy
--
, daLocalRethrowPolicy :: RethrowPolicy
-- | 'PeerMetrics' used by peer selection policy (see
-- 'simplePeerSelectionPolicy')
--
, daPeerMetrics :: PeerMetrics m ntnAddr
-- | Used by churn-governor
--
, daBlockFetchMode :: STM m FetchMode
-- | Used for peer sharing protocol
--
, daPeerSharingRegistry :: PeerSharingRegistry ntnAddr m
}
--
-- Node-To-Client type aliases
--
-- Node-To-Client diffusion is only used in 'ResponderMode'.
--
type NodeToClientHandle ntcAddr versionData m =
HandleWithMinimalCtx ResponderMode ntcAddr versionData ByteString m Void ()
type NodeToClientHandleError ntcVersion =
HandleError ResponderMode ntcVersion
type NodeToClientConnectionHandler
ntcFd ntcAddr ntcVersion ntcVersionData m =
ConnectionHandler
ResponderMode
(ConnectionHandlerTrace ntcVersion ntcVersionData)
ntcFd
ntcAddr
(NodeToClientHandle ntcAddr ntcVersionData m)
(NodeToClientHandleError ntcVersion)
(ntcVersion, ntcVersionData)
m
type NodeToClientConnectionManagerArguments
ntcFd ntcAddr ntcVersion ntcVersionData m =
ConnectionManagerArguments
(ConnectionHandlerTrace ntcVersion ntcVersionData)
ntcFd
ntcAddr
(NodeToClientHandle ntcAddr ntcVersionData m)
(NodeToClientHandleError ntcVersion)
ntcVersion
ntcVersionData
m
--
-- Node-To-Node type aliases
--
-- Node-To-Node diffusion runs in either 'InitiatorMode' or 'InitiatorResponderMode'.
--
type NodeToNodeHandle
(mode :: MuxMode)
ntnAddr ntnVersionData m a b =
HandleWithExpandedCtx mode ntnAddr ntnVersionData ByteString m a b
type NodeToNodeConnectionManager
(mode :: MuxMode)
ntnFd ntnAddr ntnVersionData ntnVersion m a b =
ConnectionManager
mode
ntnFd
ntnAddr
(NodeToNodeHandle mode ntnAddr ntnVersionData m a b)
(HandleError mode ntnVersion)
m
--
-- Governor type aliases
--
type NodeToNodePeerConnectionHandle (mode :: MuxMode) ntnAddr ntnVersionData m a b =
PeerConnectionHandle
mode
(ResponderContext ntnAddr)
ntnAddr
ntnVersionData
ByteString
m a b
type NodeToNodePeerSelectionActions (mode :: MuxMode) ntnAddr ntnVersionData m a b =
PeerSelectionActions
ntnAddr
(NodeToNodePeerConnectionHandle mode ntnAddr ntnVersionData m a b)
m
data Interfaces ntnFd ntnAddr ntnVersion ntnVersionData
ntcFd ntcAddr ntcVersion ntcVersionData
resolver resolverError
m =
Interfaces {
-- | node-to-node snocket
--
diNtnSnocket
:: Snocket m ntnFd ntnAddr,
-- | node-to-node 'Mx.MakeBearer' callback
--
diNtnBearer
:: Mx.MakeBearer m ntnFd,
-- | node-to-node socket configuration
--
diNtnConfigureSocket
:: ntnFd -> Maybe ntnAddr -> m (),
-- | node-to-node systemd socket configuration
--
diNtnConfigureSystemdSocket
:: ntnFd -> ntnAddr -> m (),
-- | node-to-node handshake configuration
--
diNtnHandshakeArguments
:: HandshakeArguments (ConnectionId ntnAddr) ntnVersion ntnVersionData m,
-- | node-to-node address type
--
diNtnAddressType
:: ntnAddr -> Maybe AddressType,
-- | node-to-node data flow used by connection manager to classify
-- negotiated connections
--
diNtnDataFlow
:: ntnVersion -> ntnVersionData -> DataFlow,
-- | remote side peer sharing information used by peer selection governor
-- to decide which peers are available for performing peer sharing
diNtnPeerSharing
:: ntnVersionData -> PeerSharing,
-- | node-to-node peer address
--
diNtnToPeerAddr
:: IP -> Socket.PortNumber -> ntnAddr,
-- | node-to-client snocket
--
diNtcSnocket
:: Snocket m ntcFd ntcAddr,
-- | node-to-client 'Mx.MakeBearer' callback
--
diNtcBearer
:: Mx.MakeBearer m ntcFd,
-- | node-to-client handshake configuration
--
diNtcHandshakeArguments
:: HandshakeArguments (ConnectionId ntcAddr) ntcVersion ntcVersionData m,
-- | node-to-client file descriptor
--
diNtcGetFileDescriptor
:: ntcFd -> m FileDescriptor,
-- | diffusion pseudo random generator. It is split between various
-- components that need randomness, e.g. inbound governor, peer
-- selection, policies, etc.
--
diRng
:: StdGen,
-- | callback which is used to register @SIGUSR1@ signal handler.
diInstallSigUSR1Handler
:: forall mode x y.
NodeToNodeConnectionManager mode ntnFd ntnAddr ntnVersionData ntnVersion m x y
-> StrictTVar m (PeerSelectionState ntnAddr (NodeToNodePeerConnectionHandle
mode ntnAddr ntnVersionData m x y))
-> PeerMetrics m ntnAddr
-> m (),
-- | diffusion dns actions
--
diDnsActions
:: DNSLookupType -> DNSActions resolver resolverError m
}
runM
:: forall m ntnFd ntnAddr ntnVersion ntnVersionData
ntcFd ntcAddr ntcVersion ntcVersionData
resolver resolverError a.
( Alternative (STM m)
, MonadAsync m
, MonadDelay m
, MonadEvaluate m
, MonadFix m
, MonadFork m
, MonadLabelledSTM m
, MonadTraceSTM m
, MonadMask m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, MonadMVar m
, Typeable ntnAddr
, Ord ntnAddr
, Show ntnAddr
, Hashable ntnAddr
, Typeable ntnVersion
, Ord ntnVersion
, Show ntnVersion
, Show ntnVersionData
, Typeable ntcAddr
, Ord ntcAddr
, Show ntcAddr
, Ord ntcVersion
, Exception resolverError
)
=> -- | interfaces
Interfaces ntnFd ntnAddr ntnVersion ntnVersionData
ntcFd ntcAddr ntcVersion ntcVersionData
resolver resolverError
m
-> -- | tracers
Tracers ntnAddr ntnVersion
ntcAddr ntcVersion
m
-> -- | p2p tracers
TracersExtra ntnAddr ntnVersion ntnVersionData
ntcAddr ntcVersion ntcVersionData
resolverError m
-> -- | configuration
Arguments m ntnFd ntnAddr
ntcFd ntcAddr
-> -- | p2p configuration
ArgumentsExtra m
-> -- | protocol handlers
Applications ntnAddr ntnVersion ntnVersionData
ntcAddr ntcVersion ntcVersionData
m a
-> -- | p2p protocol handlers
ApplicationsExtra ntnAddr m a
-> m Void
runM Interfaces
{ diNtnSnocket
, diNtnBearer
, diNtnConfigureSocket
, diNtnConfigureSystemdSocket
, diNtnHandshakeArguments
, diNtnAddressType
, diNtnDataFlow
, diNtnPeerSharing
, diNtnToPeerAddr
, diNtcSnocket
, diNtcBearer
, diNtcHandshakeArguments
, diNtcGetFileDescriptor
, diRng
, diInstallSigUSR1Handler
, diDnsActions
}
Tracers
{ dtMuxTracer
, dtLocalMuxTracer
, dtDiffusionTracer = tracer
}
TracersExtra
{ dtTracePeerSelectionTracer
, dtTraceChurnCounters
, dtDebugPeerSelectionInitiatorTracer
, dtDebugPeerSelectionInitiatorResponderTracer
, dtTracePeerSelectionCounters
, dtPeerSelectionActionsTracer
, dtTraceLocalRootPeersTracer
, dtTracePublicRootPeersTracer
, dtTraceLedgerPeersTracer
, dtConnectionManagerTracer
, dtConnectionManagerTransitionTracer
, dtServerTracer
, dtInboundGovernorTracer
, dtInboundGovernorTransitionTracer
, dtLocalConnectionManagerTracer
, dtLocalServerTracer
, dtLocalInboundGovernorTracer
}
Arguments
{ daIPv4Address
, daIPv6Address
, daLocalAddress
, daAcceptedConnectionsLimit
, daMode = diffusionMode
, daPublicPeerSelectionVar
}
ArgumentsExtra
{ daPeerSelectionTargets
, daReadLocalRootPeers
, daReadPublicRootPeers
, daReadUseBootstrapPeers
, daOwnPeerSharing
, daReadUseLedgerPeers
, daProtocolIdleTimeout
, daTimeWaitTimeout
, daDeadlineChurnInterval
, daBulkChurnInterval
}
Applications
{ daApplicationInitiatorMode
, daApplicationInitiatorResponderMode
, daLocalResponderApplication
, daLedgerPeersCtx =
daLedgerPeersCtx@LedgerPeersConsensusInterface
{ lpGetLedgerStateJudgement }
}
ApplicationsExtra
{ daRethrowPolicy
, daLocalRethrowPolicy
, daReturnPolicy
, daPeerMetrics
, daBlockFetchMode
, daPeerSharingRegistry
}
= do
-- Thread to which 'RethrowPolicy' will throw fatal exceptions.
mainThreadId <- myThreadId
Async.runConcurrently
$ asum
$ Async.Concurrently <$>
( mkRemoteThread mainThreadId
: maybeToList (mkLocalThread mainThreadId <$> daLocalAddress)
)
where
(ledgerPeersRng, rng1) = split diRng
(policyRng, rng2) = split rng1
(churnRng, rng3) = split rng2
(fuzzRng, rng4) = split rng3
(ntnInbgovRng, ntcInbgovRng) = split rng4
-- Only the 'IOManagerError's are fatal, all the other exceptions in the
-- networking code will only shutdown the bearer (see 'ShutdownPeer' why
-- this is so).
rethrowPolicy =
RethrowPolicy $ \_ctx err ->
case fromException err of
Just (_ :: IOManagerError) -> ShutdownNode
Nothing -> mempty
-- | mkLocalThread - create local connection manager
mkLocalThread :: ThreadId m -> Either ntcFd ntcAddr -> m Void
mkLocalThread mainThreadId localAddr =
withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr
$ \localSocket -> do
localInbInfoChannel <- newInformationChannel
localServerStateVar <- Server.newObservableStateVar ntcInbgovRng
let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0
localConnectionHandler :: NodeToClientConnectionHandler
ntcFd ntcAddr ntcVersion ntcVersionData m
localConnectionHandler =
makeConnectionHandler
dtLocalMuxTracer
SingResponderMode
diNtcHandshakeArguments
( ( \ (OuroborosApplication apps)
-> TemperatureBundle
(WithHot apps)
(WithWarm [])
(WithEstablished [])
) <$> daLocalResponderApplication )
(mainThreadId, rethrowPolicy <> daLocalRethrowPolicy)
localConnectionManagerArguments
:: NodeToClientConnectionManagerArguments
ntcFd ntcAddr ntcVersion ntcVersionData m
localConnectionManagerArguments =
ConnectionManagerArguments {
cmTracer = dtLocalConnectionManagerTracer,
cmTrTracer = nullTracer, -- TODO: issue #3320
cmMuxTracer = dtLocalMuxTracer,
cmIPv4Address = Nothing,
cmIPv6Address = Nothing,
cmAddressType = const Nothing,
cmSnocket = diNtcSnocket,
cmMakeBearer = diNtcBearer,
cmConfigureSocket = \_ _ -> return (),
cmTimeWaitTimeout = local_TIME_WAIT_TIMEOUT,
cmOutboundIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT,
connectionDataFlow = localDataFlow,
cmPrunePolicy = Diffusion.Policies.prunePolicy
localServerStateVar,
cmConnectionsLimits = localConnectionLimits,
-- local thread does not start a Outbound Governor
-- so it doesn't matter what we put here.
-- 'NoPeerSharing' is set for all connections.
cmGetPeerSharing = \_ -> PeerSharingDisabled
}
withConnectionManager
localConnectionManagerArguments
localConnectionHandler
daOwnPeerSharing
classifyHandleError
(InResponderMode localInbInfoChannel)
(InResponderMode Nothing)
$ \localConnectionManager-> do
--
-- run local server
--
traceWith tracer . RunLocalServer
=<< Snocket.getLocalAddr diNtcSnocket localSocket
Async.withAsync
(Server.run
ServerArguments {
serverSockets = localSocket :| [],
serverSnocket = diNtcSnocket,
serverTracer = dtLocalServerTracer,
serverTrTracer = nullTracer, -- TODO: issue #3320
serverInboundGovernorTracer = dtLocalInboundGovernorTracer,
serverInboundIdleTimeout = Nothing,
serverConnectionLimits = localConnectionLimits,
serverConnectionManager = localConnectionManager,
serverInboundInfoChannel = localInbInfoChannel,
serverObservableStateVar = localServerStateVar
}) Async.wait
-- | mkRemoteThread - create remote connection manager
mkRemoteThread :: ThreadId m -> m Void
mkRemoteThread mainThreadId = do
let
exitPolicy :: ExitPolicy a
exitPolicy = stdExitPolicy daReturnPolicy
cmIPv4Address
<- traverse (either (Snocket.getLocalAddr diNtnSnocket) pure)
daIPv4Address
case cmIPv4Address of
Just addr | Just IPv4Address <- diNtnAddressType addr
-> pure ()
| otherwise
-> throwIO (UnexpectedIPv4Address addr)
Nothing -> pure ()
cmIPv6Address
<- traverse (either (Snocket.getLocalAddr diNtnSnocket) pure)
daIPv6Address
case cmIPv6Address of
Just addr | Just IPv6Address <- diNtnAddressType addr
-> pure ()
| otherwise
-> throwIO (UnexpectedIPv6Address addr)
Nothing -> pure ()
lookupReqs <- case (cmIPv4Address, cmIPv6Address) of
(Just _ , Nothing) -> return LookupReqAOnly
(Nothing, Just _ ) -> return LookupReqAAAAOnly
(Just _ , Just _ ) -> return LookupReqAAndAAAA
(Nothing, Nothing) -> throwIO NoSocket
-- RNGs used for picking random peers from the ledger and for
-- demoting/promoting peers.
policyRngVar <- newTVarIO policyRng
churnModeVar <- newTVarIO ChurnModeNormal
peerSelectionTargetsVar <- newTVarIO $ daPeerSelectionTargets {
-- Start with a smaller number of active peers, the churn governor
-- will increase it to the configured value after a delay.
targetNumberOfActivePeers =
min 2 (targetNumberOfActivePeers daPeerSelectionTargets)
}
countersVar <- newTVarIO emptyPeerSelectionCounters
-- Design notes:
-- - We split the following code into two parts:
-- - Part (a): plumb data flow (in particular arguments and tracersr)
-- and define common functions as a sequence of 'let's in which we
-- define needed 'withXXX' functions (and similar) which
-- - are used in Part (b),
-- - handle the plumbing of tracers, and
-- - capture commonalities between the two cases.
--
-- - Part (b): capturing the major control-flow of runM:
-- in particular, two different case alternatives in which is captured
-- the monadic flow of the program stripped down to its essence:
--- ```
-- <setup...>
-- case diffusionMode of
-- InitiatorOnlyDiffusionMode -> ...
-- InitiatorAndResponderDiffusionMode -> ...
-- ```
--
-- Part (a): plumb data flow and define common functions
--
let connectionManagerArguments'
:: forall handle handleError.
PrunePolicy ntnAddr (STM m)
-> ConnectionManagerArguments
(ConnectionHandlerTrace ntnVersion ntnVersionData)
ntnFd ntnAddr handle handleError ntnVersion ntnVersionData m
connectionManagerArguments' prunePolicy =
ConnectionManagerArguments {
cmTracer = dtConnectionManagerTracer,
cmTrTracer =
fmap abstractState
`contramap` dtConnectionManagerTransitionTracer,
cmMuxTracer = dtMuxTracer,
cmIPv4Address,
cmIPv6Address,
cmAddressType = diNtnAddressType,
cmSnocket = diNtnSnocket,
cmMakeBearer = diNtnBearer,
cmConfigureSocket = diNtnConfigureSocket,
connectionDataFlow = diNtnDataFlow,
cmPrunePolicy = prunePolicy,
cmConnectionsLimits = daAcceptedConnectionsLimit,
cmTimeWaitTimeout = daTimeWaitTimeout,
cmOutboundIdleTimeout = daProtocolIdleTimeout,
cmGetPeerSharing = diNtnPeerSharing
}
let peerSelectionPolicy = Diffusion.Policies.simplePeerSelectionPolicy
policyRngVar (readTVar churnModeVar)
daPeerMetrics (epErrorDelay exitPolicy)
let makeConnectionHandler'
:: forall muxMode socket initiatorCtx responderCtx b c.
SingMuxMode muxMode
-> Versions ntnVersion ntnVersionData
(OuroborosBundle muxMode initiatorCtx responderCtx ByteString m b c)
-> MuxConnectionHandler
muxMode socket initiatorCtx responderCtx ntnAddr
ntnVersion ntnVersionData ByteString m b c
makeConnectionHandler' muxMode versions =
makeConnectionHandler
dtMuxTracer
muxMode
diNtnHandshakeArguments
versions
(mainThreadId, rethrowPolicy <> daRethrowPolicy)
-- | Capture the two variations (InitiatorMode,InitiatorResponderMode) of
-- withConnectionManager:
withConnectionManagerInitiatorOnlyMode =
withConnectionManager
(connectionManagerArguments' simplePrunePolicy)
-- Server is not running, it will not be able to
-- advise which connections to prune. It's also not
-- expected that the governor targets will be larger
-- than limits imposed by 'cmConnectionsLimits'.
(makeConnectionHandler'
SingInitiatorMode
daApplicationInitiatorMode)
daOwnPeerSharing
classifyHandleError
NotInResponderMode
NotInResponderMode
withConnectionManagerInitiatorAndResponderMode
inbndInfoChannel outbndInfoChannel observableStateVar =
withConnectionManager
(connectionManagerArguments'
$ Diffusion.Policies.prunePolicy observableStateVar)
(makeConnectionHandler'
SingInitiatorResponderMode
daApplicationInitiatorResponderMode)
daOwnPeerSharing
classifyHandleError
(InResponderMode inbndInfoChannel)
(if daOwnPeerSharing /= PeerSharingDisabled
then InResponderMode (Just outbndInfoChannel)
else InResponderMode Nothing)
--
-- peer state actions
--
-- Peer state actions run a job pool in the background which
-- tracks threads forked by 'PeerStateActions'
--
let -- | parameterized version of 'withPeerStateActions'
withPeerStateActions'
:: forall (muxMode :: MuxMode) responderCtx socket b c.
HasInitiator muxMode ~ True
=> MuxConnectionManager
muxMode socket (ExpandedInitiatorContext ntnAddr m)
responderCtx ntnAddr ntnVersionData ntnVersion
ByteString m a b
-> (Governor.PeerStateActions
ntnAddr
(PeerConnectionHandle muxMode responderCtx ntnAddr
ntnVersionData ByteString m a b)
m
-> m c)
-> m c
withPeerStateActions' connectionManager =
withPeerStateActions
PeerStateActionsArguments {
spsTracer = dtPeerSelectionActionsTracer,
spsDeactivateTimeout = Diffusion.Policies.deactivateTimeout,
spsCloseConnectionTimeout =
Diffusion.Policies.closeConnectionTimeout,
spsConnectionManager = connectionManager,
spsExitPolicy = exitPolicy
}
dnsSemaphore <- newLedgerAndPublicRootDNSSemaphore
--
-- Run peer selection (p2p governor)
--
let withPeerSelectionActions'
:: forall muxMode responderCtx peerAddr bytes a1 b c.
PeerSelectionActionsDiffusionMode ntnAddr (PeerConnectionHandle muxMode responderCtx peerAddr ntnVersionData bytes m a1 b) m
-> ( (Async m Void, Async m Void)
-> PeerSelectionActions
ntnAddr
(PeerConnectionHandle
muxMode responderCtx peerAddr ntnVersionData bytes m a1 b)
m
-> m c)
-- ^ continuation, receives a handle to the local roots peer provider thread
-- (only if local root peers were non-empty).
-> m c
withPeerSelectionActions' =
withPeerSelectionActions PeerActionsDNS {
paToPeerAddr = diNtnToPeerAddr,
paDnsActions = diDnsActions lookupReqs,
paDnsSemaphore = dnsSemaphore }
PeerSelectionActionsArgs {
psLocalRootPeersTracer = dtTraceLocalRootPeersTracer,
psPublicRootPeersTracer = dtTracePublicRootPeersTracer,
psReadTargets = readTVar peerSelectionTargetsVar,
psJudgement = lpGetLedgerStateJudgement,
psReadLocalRootPeers = daReadLocalRootPeers,
psReadPublicRootPeers = daReadPublicRootPeers,
psReadUseBootstrapPeers = daReadUseBootstrapPeers,
psPeerSharing = daOwnPeerSharing,
psPeerConnToPeerSharing = pchPeerSharing diNtnPeerSharing,
psReadPeerSharingController = readTVar (getPeerSharingRegistry daPeerSharingRegistry) }
WithLedgerPeersArgs {
wlpRng = ledgerPeersRng,
wlpConsensusInterface = daLedgerPeersCtx,
wlpTracer = dtTraceLedgerPeersTracer,
wlpGetUseLedgerPeers = daReadUseLedgerPeers }
peerSelectionGovernor'
:: forall (muxMode :: MuxMode) b.
Tracer m (DebugPeerSelection ntnAddr)
-> StrictTVar m (PeerSelectionState ntnAddr
(NodeToNodePeerConnectionHandle
muxMode ntnAddr ntnVersionData m a b))
-> NodeToNodePeerSelectionActions muxMode ntnAddr ntnVersionData m a b
-> m Void