From f97f9c7e799c9cf93bfac8f7501522d14ac56f78 Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Fri, 4 Jun 2021 12:24:52 +0100 Subject: [PATCH] Separated Diffusion into P2P, NonP2P and Common - P2P is the p2p-master branch Diffusion module - NonP2P is the master branch Diffusion module - Common contains data types that are common to both versions - Diffusion.hs indexes both P2P and NonP2P modules - Uniforms DiffusionTracers to accomodate p2p types switching - Uniforms DiffusionApplication to accomodate p2p types switching - Uniforms DiffusionArguments to accomodate p2p types switching - Renamed P2P and NonP2P DTracers, DApps, DArgs to DApplicationsExtra and DArgumentsExtra (these types fill the p2p polymorphic types in DiffusionApplication, DiffusionArguments and DiffusionTracers - General long lines cleanup Diffusion.hs now offers the needed API to ouroboros-consensus in a way that it is possible to pick between P2P and NonP2P Added ErrorPolicy to other-modules --- ouroboros-consensus/ouroboros-consensus.cabal | 2 + ouroboros-network/ouroboros-network.cabal | 5 +- .../src/Ouroboros/Network/Diffusion.hs | 1662 ++++------------- .../src/Ouroboros/Network/Diffusion/Common.hs | 205 ++ .../src/Ouroboros/Network/Diffusion/NonP2P.hs | 502 +++++ .../src/Ouroboros/Network/Diffusion/P2P.hs | 1262 +++++++++++++ 6 files changed, 2346 insertions(+), 1292 deletions(-) create mode 100644 ouroboros-network/src/Ouroboros/Network/Diffusion/Common.hs create mode 100644 ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs create mode 100644 ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 556e5094703..3ac60d81239 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -238,6 +238,8 @@ library -- Strict wrapper around SOP Data.SOP.Strict + other-modules: Ouroboros.Consensus.Node.ErrorPolicy + default-language: Haskell2010 other-extensions: BangPatterns diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index 5f50eb44671..ff7602ec84c 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -128,7 +128,10 @@ library Ouroboros.Network.TxSubmission.Inbound Ouroboros.Network.TxSubmission.Mempool.Reader Ouroboros.Network.TxSubmission.Outbound - other-modules: Ouroboros.Network.PeerSelection.Governor.ActivePeers + other-modules: Ouroboros.Network.Diffusion.Common + Ouroboros.Network.Diffusion.P2P + Ouroboros.Network.Diffusion.NonP2P + Ouroboros.Network.PeerSelection.Governor.ActivePeers Ouroboros.Network.PeerSelection.Governor.EstablishedPeers Ouroboros.Network.PeerSelection.Governor.KnownPeers Ouroboros.Network.PeerSelection.Governor.Monitor diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs index 109ace31435..5ecab7cfa16 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs @@ -1,1320 +1,400 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE CPP #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE DataKinds #-} {-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TypeApplications #-} - -#if !defined(mingw32_HOST_OS) -#define POSIX -#endif module Ouroboros.Network.Diffusion - ( DiffusionTracers (..) - , DiffusionArguments (..) - , AcceptedConnectionsLimit (..) - , DiffusionApplications (..) - , LedgerPeersConsensusInterface (..) - , OuroborosApplication (..) - , runDataDiffusion - -- * Constants for /node-to-client/ diffusion - , local_PROTOCOL_IDLE_TIMEOUT - , local_TIME_WAIT_TIMEOUT - -- * re-exports - , simpleSingletonVersions - , ConnectionId (..) - -- ** Tracers + ( DiffusionTracers , nullTracers + , mkDiffusionTracersNonP2P + , mkDiffusionTracersP2P + , DiffusionArguments + , getDiffusionArguments + , daDiffusionMode + , mkDiffusionArgumentsNonP2P + , mkDiffusionArgumentsP2P + , DiffusionApplications + , mkDiffusionApplicationsP2P + , mkDiffusionApplicationsNonP2P + , runDataDiffusion , DiffusionInitializationTracer(..) - , TraceLocalRootPeers (..) - , TracePublicRootPeers (..) - , TracePeerSelection (..) - , DebugPeerSelection (..) - , PeerSelectionActionsTrace (..) - , PeerSelectionCounters (..) - , ConnectionManagerTrace (..) - , ConnectionHandlerTrace (..) - , ConnectionManagerCounters (..) - , ServerTrace (..) - , InboundGovernorCounters (..) - , InboundGovernorTrace (..) + , DiffusionFailure ) where -import qualified Control.Monad.Class.MonadAsync as Async -import Control.Monad.Class.MonadFork -import Control.Monad.Class.MonadSTM.Strict -import Control.Monad.Class.MonadTime -import Control.Exception -import Control.Tracer (Tracer, nullTracer, traceWith) -import Data.Foldable (asum) -import Data.List.NonEmpty (NonEmpty (..)) -import qualified Data.List.NonEmpty as NonEmpty -import Data.Map (Map) -import Data.Maybe (catMaybes, maybeToList) -import Data.Set (Set) -import Data.Void (Void) +import Control.Monad.Class.MonadSTM (STM) import Data.ByteString.Lazy (ByteString) -import Data.Kind (Type) -import System.Random (newStdGen, split) -#ifdef POSIX -import qualified System.Posix.Signals as Signals -#endif - -import Network.Mux ( MiniProtocolBundle (..) - , MiniProtocolInfo (..) - , MiniProtocolDirection (..) - , MuxTrace (..) - , WithMuxBearer (..) - ) -import Network.Mux.Timeout (withTimeoutSerial) -import qualified Network.DNS as DNS -import Network.Socket (SockAddr (..), Socket, AddrInfo) -import qualified Network.Socket as Socket - -import Ouroboros.Network.Snocket ( FileDescriptor - , LocalAddress - , LocalSnocket - , LocalSocket (..) - , SocketSnocket - , localSocketFileDescriptor - ) -import qualified Ouroboros.Network.Snocket as Snocket - -import Ouroboros.Network.BlockFetch -import Ouroboros.Network.Protocol.Handshake -import Ouroboros.Network.Protocol.Handshake.Version -import Ouroboros.Network.Protocol.Handshake.Codec - +import Data.Void (Void) +import Data.Map.Strict (Map) +import Data.Time (DiffTime) +import Data.Functor (void) +import Control.Exception (IOException) +import Control.Tracer (Tracer) + +import Network.Mux (WithMuxBearer, MuxTrace) +import Network.Socket (SockAddr, Socket, AddrInfo) + +import Ouroboros.Network.PeerSelection.RootPeersDNS + ( TracePublicRootPeers + , TraceLocalRootPeers + ) +import Ouroboros.Network.PeerSelection.Governor.Types + ( TracePeerSelection + , DebugPeerSelection + , PeerSelectionCounters + ) import Ouroboros.Network.ConnectionManager.Types -#ifdef POSIX -import qualified Ouroboros.Network.ConnectionManager.Types as ConnectionManager -#endif -import Ouroboros.Network.ConnectionManager.Core + ( ConnectionManagerTrace + ) +import Ouroboros.Network.PeerSelection.PeerStateActions + ( PeerSelectionActionsTrace + ) import Ouroboros.Network.ConnectionHandler -import Ouroboros.Network.RethrowPolicy -import qualified Ouroboros.Network.Diffusion.Policies as Diffusion.Policies -import Ouroboros.Network.IOManager -import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..)) -import Ouroboros.Network.InboundGovernor.State (InboundGovernorCounters (..)) -import Ouroboros.Network.PeerSelection.RootPeersDNS ( resolveDomainAddresses - , RelayAddress(..) - , TraceLocalRootPeers(..) - , TracePublicRootPeers(..) - - - - - - - , ioDNSActions - ) -import qualified Ouroboros.Network.PeerSelection.Governor as Governor -import Ouroboros.Network.PeerSelection.Governor.Types ( ChurnMode (..) - , TracePeerSelection (..) - , DebugPeerSelection (..) - , PeerSelectionCounters (..) - ) -import Ouroboros.Network.PeerSelection.LedgerPeers ( LedgerPeersConsensusInterface (..) - , TraceLedgerPeers - , NumberOfPeers - , UseLedgerAfter (..) - , runLedgerPeers) + ( ConnectionHandlerTrace + ) +import Ouroboros.Network.Server2 + ( ServerTrace + ) +import Ouroboros.Network.InboundGovernor + ( InboundGovernorTrace + ) + +import Ouroboros.Network.NodeToNode + ( RemoteAddress + , NodeToNodeVersionData + , DiffusionMode + , NodeToNodeVersion + , MiniProtocolParameters + , AcceptedConnectionsLimit + , IPSubscriptionTarget + , DnsSubscriptionTarget + ) +import qualified Ouroboros.Network.NodeToNode as NTN +import Ouroboros.Network.NodeToClient + ( LocalAddress + , NodeToClientVersionData + , Versions + , ConnectionId + , NodeToClientVersion + ) +import qualified Ouroboros.Network.NodeToClient as NTC + +import Ouroboros.Network.RethrowPolicy (RethrowPolicy) +import Ouroboros.Network.BlockFetch (FetchMode) +import Ouroboros.Network.Mux + ( Bundle + , MiniProtocol + , MuxMode (..) + , OuroborosApplication + , ControlMessage + ) +import Ouroboros.Network.PeerSelection.LedgerPeers + ( LedgerPeersConsensusInterface + , TraceLedgerPeers + , RelayAddress + , UseLedgerAfter + ) import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics) -import Ouroboros.Network.PeerSelection.PeerStateActions ( PeerSelectionActionsTrace (..) - , PeerStateActionsArguments (..) - , PeerConnectionHandle - , withPeerStateActions - ) -import Ouroboros.Network.PeerSelection.Simple -import Ouroboros.Network.Server2 ( ServerArguments (..) - , ServerTrace (..) - ) -import qualified Ouroboros.Network.Server2 as Server -import Ouroboros.Network.Mux hiding (MiniProtocol (..)) -import Ouroboros.Network.MuxMode -import Ouroboros.Network.NodeToClient ( NodeToClientVersion (..) - , NodeToClientVersionData) -import qualified Ouroboros.Network.NodeToClient as NodeToClient -import Ouroboros.Network.NodeToNode ( ConnectionId (..) - , MiniProtocolParameters (..) - , NodeToNodeVersion (..) - , NodeToNodeVersionData (..) - , AcceptedConnectionsLimit (..) - , DiffusionMode (..) - , RemoteAddress - , chainSyncProtocolLimits - , blockFetchProtocolLimits - , txSubmissionProtocolLimits - , keepAliveProtocolLimits - , nodeToNodeHandshakeCodec - ) -import qualified Ouroboros.Network.NodeToNode as NodeToNode - - --- TODO: use LocalAddress where appropriate rather than 'path'. --- -data DiffusionInitializationTracer - = RunServer !(NonEmpty SockAddr) - | RunLocalServer !LocalAddress - | UsingSystemdSocket !FilePath - -- Rename as 'CreateLocalSocket' - | CreateSystemdSocketForSnocketPath !FilePath - | CreatedLocalSocket !FilePath - | ConfiguringLocalSocket !FilePath !FileDescriptor - | ListeningLocalSocket !FilePath !FileDescriptor - | LocalSocketUp !FilePath !FileDescriptor - -- Rename as 'CreateServerSocket' - | CreatingServerSocket !SockAddr - | ConfiguringServerSocket !SockAddr - | ListeningServerSocket !SockAddr - | ServerSocketUp !SockAddr - -- Rename as 'UnsupportedLocalSocketType' - | UnsupportedLocalSystemdSocket !SockAddr - -- Remove (this is impossible case), there's no systemd on Windows - | UnsupportedReadySocketCase - | DiffusionErrored SomeException - deriving Show - - -data DiffusionTracers = DiffusionTracers { - dtMuxTracer - :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace) - - -- | Handshake protocol tracer - , dtHandshakeTracer - :: Tracer IO NodeToNode.HandshakeTr - - , dtTraceLocalRootPeersTracer - :: Tracer IO (TraceLocalRootPeers IOException) - - , dtTracePublicRootPeersTracer - :: Tracer IO TracePublicRootPeers - - , dtTracePeerSelectionTracer - :: Tracer IO (TracePeerSelection SockAddr) - - , dtDebugPeerSelectionInitiatorTracer - :: Tracer IO (DebugPeerSelection - SockAddr - (NodeToNodePeerConnectionHandle InitiatorMode Void)) - - , dtDebugPeerSelectionInitiatorResponderTracer - :: Tracer IO (DebugPeerSelection - SockAddr - (NodeToNodePeerConnectionHandle InitiatorResponderMode ())) - - , dtTracePeerSelectionCounters - :: Tracer IO PeerSelectionCounters - - , dtPeerSelectionActionsTracer - :: Tracer IO (PeerSelectionActionsTrace SockAddr) - - , dtConnectionManagerTracer - :: Tracer IO (ConnectionManagerTrace - SockAddr - (ConnectionHandlerTrace NodeToNodeVersion NodeToNodeVersionData)) - - , dtServerTracer - :: Tracer IO (ServerTrace SockAddr) - - , dtInboundGovernorTracer - :: Tracer IO (InboundGovernorTrace SockAddr) - - -- - -- NodeToClient tracers - -- - - -- | Mux tracer for local clients - , dtLocalMuxTracer - :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace) - - -- | Handshake protocol tracer for local clients - , dtLocalHandshakeTracer - :: Tracer IO NodeToClient.HandshakeTr - - -- | Connection manager tracer for local clients - , dtLocalConnectionManagerTracer - :: Tracer IO (ConnectionManagerTrace - LocalAddress - (ConnectionHandlerTrace NodeToClientVersion NodeToClientVersionData)) - - -- | Server tracer for local clients - , dtLocalServerTracer - :: Tracer IO (ServerTrace LocalAddress) - - -- | Inbound protocol governor tracer for local clients - , dtLocalInboundGovernorTracer - :: Tracer IO (InboundGovernorTrace LocalAddress) - - -- | Diffusion initialisation tracer - , dtDiffusionInitializationTracer - :: Tracer IO DiffusionInitializationTracer - - -- | Ledger Peers tracer - , dtLedgerPeersTracer :: Tracer IO TraceLedgerPeers - } - -nullTracers :: DiffusionTracers -nullTracers = DiffusionTracers { - dtMuxTracer = nullTracer - , dtHandshakeTracer = nullTracer - , dtTraceLocalRootPeersTracer = nullTracer - , dtTracePublicRootPeersTracer = nullTracer - , dtTracePeerSelectionTracer = nullTracer - , dtDebugPeerSelectionInitiatorTracer = nullTracer - , dtDebugPeerSelectionInitiatorResponderTracer = nullTracer - , dtTracePeerSelectionCounters = nullTracer - , dtPeerSelectionActionsTracer = nullTracer - , dtConnectionManagerTracer = nullTracer - , dtServerTracer = nullTracer - , dtInboundGovernorTracer = nullTracer - , dtLocalMuxTracer = nullTracer - , dtLocalHandshakeTracer = nullTracer - , dtLocalConnectionManagerTracer = nullTracer - , dtLocalServerTracer = nullTracer - , dtLocalInboundGovernorTracer = nullTracer - , dtDiffusionInitializationTracer = nullTracer - , dtLedgerPeersTracer = nullTracer - } - --- | Network Node argumets +import qualified Ouroboros.Network.Diffusion.Common as Common +import Ouroboros.Network.Diffusion.Common + ( DiffusionInitializationTracer + , DiffusionFailure + , daDiffusionMode + , dtP2P + , daP2P + , dapP2P + ) +import qualified Ouroboros.Network.Diffusion.P2P as P2P +import qualified Ouroboros.Network.Diffusion.NonP2P as NonP2P + +-- | DiffusionTracers for either P2P or Non-P2P node -- -data DiffusionArguments m = DiffusionArguments { - daIPv4Address :: Maybe (Either Socket.Socket AddrInfo) - -- ^ an @IPv4@ socket ready to accept connections or an @IPv4@ addresses - , daIPv6Address :: Maybe (Either Socket.Socket AddrInfo) - -- ^ an @IPV4@ socket ready to accept connections or an @IPv6@ addresses - , daLocalAddress :: Maybe (Either Socket.Socket FilePath) - -- ^ an @AF_UNIX@ socket ready to accept connections or an @AF_UNIX@ - -- socket path - , daPeerSelectionTargets :: PeerSelectionTargets - -- ^ selection targets for the peer governor - - , daReadLocalRootPeers :: STM m [(Int, Map RelayAddress PeerAdvertise)] - , daReadPublicRootPeers :: STM m [RelayAddress] - , daReadUseLedgerAfter :: STM m UseLedgerAfter - - , daAcceptedConnectionsLimit :: AcceptedConnectionsLimit - -- ^ parameters for limiting number of accepted connections - , daDiffusionMode :: DiffusionMode - -- ^ run in initiator only mode - - , daProtocolIdleTimeout :: DiffTime - -- ^ Timeout which starts once all responder protocols are idle. If the - -- responders stay idle for duration of the timeout, the connection will - -- be demoted, if it wasn't used by the p2p-governor it will be closed. - -- - -- Applies to 'Unidirectional' as well as 'Duplex' /node-to-node/ - -- connections. - -- - -- See 'serverProtocolIdleTimeout'. - - , daTimeWaitTimeout :: DiffTime - -- ^ Time for which /node-to-node/ connections are kept in - -- 'TerminatingState', it should correspond to the OS configured @TCP@ - -- @TIME_WAIT@ timeout. - -- - -- This timeout will apply to after a connection has been closed, its - -- purpose is to be resilitent for delayed packets in the same way @TCP@ - -- is using @TIME_WAIT@. - } +newtype DiffusionTracers = + DiffusionTracers + (Common.DiffusionTracers + (Either NonP2P.DTracersExtra P2P.DTracersExtra)) - --- --- Constants +-- | Constructs null tracers for either P2P or NonP2P -- +nullTracers :: Either NonP2P.DTracersExtra P2P.DTracersExtra + -> DiffusionTracers +nullTracers p2pNullTracers = + DiffusionTracers (Common.nullTracers p2pNullTracers) --- | Protocol inactivity timeout for local (e.g. /node-to-client/) connections. +-- | DiffusionArguments for either P2P or Non-P2P node -- -local_PROTOCOL_IDLE_TIMEOUT :: DiffTime -local_PROTOCOL_IDLE_TIMEOUT = 2 -- 2 seconds - --- | Used to set 'cmWaitTimeout' for local (e.g. /node-to-client/) connections. --- -local_TIME_WAIT_TIMEOUT :: DiffTime -local_TIME_WAIT_TIMEOUT = 0 - - -socketAddressType :: Socket.SockAddr -> Maybe AddressType -socketAddressType Socket.SockAddrInet {} = Just IPv4Address -socketAddressType Socket.SockAddrInet6 {} = Just IPv6Address -socketAddressType addr = error ("socketAddressType: unexpected address " ++ show addr) - - --- | Combine two uni-directional 'MiniProtocolBundle's into one bi-directional --- one. --- -combineMiniProtocolBundles :: MiniProtocolBundle InitiatorMode - -> MiniProtocolBundle ResponderMode - -> MiniProtocolBundle InitiatorResponderMode -combineMiniProtocolBundles (MiniProtocolBundle initiators) - (MiniProtocolBundle responders) - = MiniProtocolBundle $ - [ MiniProtocolInfo { miniProtocolNum, miniProtocolLimits, miniProtocolDir = InitiatorDirection } - | MiniProtocolInfo { miniProtocolNum, miniProtocolLimits } <- initiators - ] - ++ [ MiniProtocolInfo { miniProtocolNum, miniProtocolLimits, miniProtocolDir = ResponderDirection } - | MiniProtocolInfo { miniProtocolNum, miniProtocolLimits } <- responders - ] - - --- TODO: we need initiator only mode for Deadalus, there's no reason why it --- should run a node-to-node server side. --- -data DiffusionApplications ntnAddr ntcAddr ntnVersionData ntcVersionData m = - DiffusionApplications { - - -- | NodeToNode initiator applications for initiator only mode. - -- - -- TODO: we should accept one or the other, but not both: - -- 'daApplicationInitiatorMode', 'daApplicationInitiatorResponderMode'. - -- - daApplicationInitiatorMode - :: Versions NodeToNodeVersion - ntnVersionData - (OuroborosBundle - InitiatorMode ntnAddr - ByteString m () Void) - - -- | NodeToNode initiator & responder applications for bidirectional mode. - -- - , daApplicationInitiatorResponderMode - :: Versions NodeToNodeVersion - ntnVersionData - (OuroborosBundle - InitiatorResponderMode ntnAddr - ByteString m () ()) - - - -- | NodeToClient responder application (server role) - -- - , daLocalResponderApplication - :: Versions NodeToClientVersion - ntcVersionData - (OuroborosApplication - ResponderMode ntcAddr - ByteString m Void ()) - -- | configuration of mini-protocol parameters; they impact size limits of - -- mux ingress queues. - -- - , daMiniProtocolParameters :: MiniProtocolParameters - - -- | /node-to-node/ rethrow policy - -- - , daRethrowPolicy :: RethrowPolicy - - -- | /node-to-client/ rethrow policy - -- - , daLocalRethrowPolicy :: RethrowPolicy - - , daLedgerPeersCtx :: LedgerPeersConsensusInterface m - -- ^ Interface used to get peers from the current ledger. - , daPeerMetrics :: PeerMetrics m ntnAddr - , daBlockFetchMode :: STM m FetchMode - -- ^ Used by churn-governor +newtype DiffusionArguments m = + DiffusionArguments + { getDiffusionArguments + :: Common.DiffusionArguments + (Either NonP2P.DArgumentsExtra (P2P.DArgumentsExtra m)) } - --- TODO: add a tracer for these misconfiguration -data DiffusionFailure = UnsupportedLocalSocketType - | UnsupportedReadySocket -- Windows only - | UnexpectedIPv4Address - | UnexpectedIPv6Address - | UnexpectedUnixAddress - | NoSocket - deriving (Eq, Show) - -instance Exception DiffusionFailure - - --- | Diffusion will always run initiator of node-to-node protocols, but in some --- configurations, i.e. 'InitiatorOnlyDiffusionMode', it will not run the --- responder side. This type allows to reflect this. +-- | DiffusionApplications for either P2P or Non-P2P node -- --- This is only used internally by 'runDataDiffusion'; This type allows to --- construct configuration upfront, before all services like connection manager --- or server are initialised \/ started. +newtype DiffusionApplications ntnAddr ntcAddr ntnVersionData ntcVersionData m = + DiffusionApplications + (Common.DiffusionApplications + (Either + NonP2P.DApplicationsExtra + (P2P.DApplicationsExtra ntnAddr m)) + ntnAddr + ntcAddr + ntnVersionData + ntcVersionData + m + ) + +-- | Construct a value of NonP2P DiffusionArguments data type. +-- 'ouroboros-consensus' needs access to this constructor so we export this +-- function in order to avoid exporting the P2P and NonP2P internal modules. -- --- This is an existential wrapper for the higher order type @f :: MuxMode -> --- Type@, like @'ConnectionManagerDataInMode' (mode :: MuxMode)@ below. --- -data HasMuxMode (f :: MuxMode -> Type) where - HasInitiator :: !(f InitiatorMode) - -> HasMuxMode f - - HasInitiatorResponder - :: !(f InitiatorResponderMode) - -> HasMuxMode f - --- | Node-To-Node connection manager requires extra data when running in --- 'InitiatorResponderMode'. +mkDiffusionArgumentsNonP2P + :: Maybe (Either Socket AddrInfo) + -> Maybe (Either Socket AddrInfo) + -> Maybe (Either Socket FilePath) + -> AcceptedConnectionsLimit + -> DiffusionMode + -> IPSubscriptionTarget + -> [DnsSubscriptionTarget] + -> DiffusionArguments m +mkDiffusionArgumentsNonP2P + a1 a2 a3 a4 a5 a6 + a7 = + DiffusionArguments + $ Common.DiffusionArguments + a1 a2 a3 a4 a5 + $ Left + $ NonP2P.DArgumentsExtra + a6 a7 + +-- | Construct a value of P2P DiffusionArguments data type. +-- 'ouroboros-consensus' needs access to this constructor so we export this +-- function in order to avoid exporting the P2P and NonP2P internal modules. -- -data ConnectionManagerDataInMode (mode :: MuxMode) where - CMDInInitiatorMode - :: ConnectionManagerDataInMode InitiatorMode - - CMDInInitiatorResponderMode - :: Server.ControlChannel IO - (Server.NewConnection - SockAddr - (Handle InitiatorResponderMode SockAddr ByteString IO () ())) - -> StrictTVar IO Server.InboundGovernorObservableState - -> ConnectionManagerDataInMode InitiatorResponderMode - - +mkDiffusionArgumentsP2P + :: Maybe (Either Socket AddrInfo) + -> Maybe (Either Socket AddrInfo) + -> Maybe (Either Socket FilePath) + -> AcceptedConnectionsLimit + -> DiffusionMode + -> NTN.PeerSelectionTargets + -> STM m [(Int, Map RelayAddress NTN.PeerAdvertise)] + -> STM m [RelayAddress] + -> STM m UseLedgerAfter + -> DiffTime + -> DiffTime + -> DiffusionArguments m +mkDiffusionArgumentsP2P + a1 a2 a3 a4 a5 a6 + a7 a8 a9 a10 a11 + = + DiffusionArguments + $ Common.DiffusionArguments + a1 a2 a3 a4 a5 + $ Right + $ P2P.DArgumentsExtra + a6 a7 a8 a9 a10 a11 + +-- | Construct a value of NonP2P DiffusionApplications data type. +-- 'ouroboros-consensus' needs access to this constructor so we export this +-- function in order to avoid exporting the P2P and NonP2P internal modules. -- --- Node-To-Client type aliases +mkDiffusionApplicationsNonP2P + :: Versions + NodeToNodeVersion + ntnVersionData + (Bundle + (ConnectionId ntnAddr + -> STM m ControlMessage + -> [MiniProtocol 'InitiatorMode ByteString m () Void])) + -> Versions + NodeToNodeVersion + ntnVersionData + (Bundle + (ConnectionId ntnAddr + -> STM m ControlMessage + -> [MiniProtocol 'InitiatorResponderMode ByteString m () ()])) + -> Versions + NodeToClientVersion + ntcVersionData + (OuroborosApplication 'ResponderMode ntcAddr ByteString m Void ()) + -> LedgerPeersConsensusInterface m + -> NTC.ErrorPolicies + -> DiffusionApplications + ntnAddr + ntcAddr + ntnVersionData + ntcVersionData + m +mkDiffusionApplicationsNonP2P + a1 a2 a3 a4 a5 = + DiffusionApplications + $ Common.DiffusionApplications + a1 a2 a3 a4 + $ Left + $ NonP2P.DApplicationsExtra + a5 + +-- | Construct a value of P2P DiffusionApplications data type. +-- 'ouroboros-consensus' needs access to this constructor so we export this +-- function in order to avoid exporting the P2P and NonP2P internal modules. -- --- Node-To-Client diffusion is only used in 'ResponderMode'. +mkDiffusionApplicationsP2P + :: Versions + NodeToNodeVersion + ntnVersionData + (Bundle + (ConnectionId ntnAddr + -> STM m ControlMessage + -> [MiniProtocol 'InitiatorMode ByteString m () Void])) + -> Versions + NodeToNodeVersion + ntnVersionData + (Bundle + (ConnectionId ntnAddr + -> STM m ControlMessage + -> [MiniProtocol 'InitiatorResponderMode ByteString m () ()])) + -> Versions + NodeToClientVersion + ntcVersionData + (OuroborosApplication 'ResponderMode ntcAddr ByteString m Void ()) + -> LedgerPeersConsensusInterface m + -> MiniProtocolParameters + -> RethrowPolicy + -> RethrowPolicy + -> PeerMetrics m ntnAddr + -> STM m FetchMode + -> DiffusionApplications + ntnAddr + ntcAddr + ntnVersionData + ntcVersionData + m +mkDiffusionApplicationsP2P + a1 a2 a3 a4 a5 a6 + a7 a8 a9 = + DiffusionApplications + $ Common.DiffusionApplications + a1 a2 a3 a4 + $ Right + $ P2P.DApplicationsExtra + a5 a6 a7 a8 a9 + +-- | Construct a value of NonP2P DiffusionTracers data type. +-- 'ouroboros-consensus' needs access to this constructor so we export this +-- function in order to avoid exporting the P2P and NonP2P internal modules. -- - -type NodeToClientHandle = - Handle ResponderMode LocalAddress ByteString IO Void () - -type NodeToClientHandleError = - HandleError ResponderMode NodeToClientVersion - -type NodeToClientConnectionHandler = - ConnectionHandler - ResponderMode - (ConnectionHandlerTrace NodeToClientVersion NodeToClientVersionData) - LocalSocket - LocalAddress - NodeToClientHandle - NodeToClientHandleError - (NodeToClientVersion, NodeToClientVersionData) - IO - -type NodeToClientConnectionManagerArguments = - ConnectionManagerArguments - (ConnectionHandlerTrace NodeToClientVersion NodeToClientVersionData) - LocalSocket - LocalAddress - NodeToClientHandle - NodeToClientHandleError - (NodeToClientVersion, NodeToClientVersionData) - IO - -type NodeToClientConnectionManager = - ConnectionManager - ResponderMode - LocalSocket - LocalAddress - NodeToClientHandle - NodeToClientHandleError - IO - +mkDiffusionTracersNonP2P + :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace) + -> Tracer IO NTN.HandshakeTr + -> Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace) + -> Tracer IO NTC.HandshakeTr + -> Tracer IO DiffusionInitializationTracer + -> Tracer IO TraceLedgerPeers + -> Tracer IO (NTN.WithIPList (NTC.SubscriptionTrace SockAddr)) + -> Tracer IO (NTN.WithDomainName (NTC.SubscriptionTrace SockAddr)) + -> Tracer IO (NTN.WithDomainName NTN.DnsTrace) + -> Tracer IO (NTC.WithAddr SockAddr NTC.ErrorPolicyTrace) + -> Tracer IO (NTC.WithAddr LocalAddress NTC.ErrorPolicyTrace) + -> Tracer IO NTN.AcceptConnectionsPolicyTrace + -> DiffusionTracers +mkDiffusionTracersNonP2P + a1 a2 a3 a4 a5 a6 a7 a8 a9 + a10 a11 a12 = + DiffusionTracers + $ Common.DiffusionTracers + a1 a2 a3 a4 a5 a6 + $ Left $ NonP2P.DTracersExtra + a7 a8 a9 a10 a11 a12 + +-- | Construct a value of P2P DiffusionTracers data type. +-- ouroboros-consensus needs access to this constructor so we export this +-- function in order to avoid exporting the P2P and NonP2P internal modules. -- --- Node-To-Node type aliases --- --- Node-To-Node diffusion runs in either 'InitiatorMode' or 'InitiatorResponderMode'. --- - -type NodeToNodeHandle (mode :: MuxMode) a = - Handle mode SockAddr ByteString IO () a - -type NodeToNodeHandleError (mode :: MuxMode) = - HandleError mode NodeToNodeVersion - -type NodeToNodeConnectionHandler (mode :: MuxMode) a = - ConnectionHandler - mode - (ConnectionHandlerTrace NodeToNodeVersion NodeToNodeVersionData) - Socket - SockAddr - (NodeToNodeHandle mode a) - (NodeToNodeHandleError mode) - (NodeToNodeVersion, NodeToNodeVersionData) - IO - -type NodeToNodeConnectionManagerArguments (mode :: MuxMode) a = - ConnectionManagerArguments - (ConnectionHandlerTrace NodeToNodeVersion NodeToNodeVersionData) - Socket - SockAddr - (NodeToNodeHandle mode a) - (NodeToNodeHandleError mode) - (NodeToNodeVersion, NodeToNodeVersionData) - IO - -type NodeToNodeConnectionManager (mode :: MuxMode) a = - ConnectionManager - mode - Socket - SockAddr - (NodeToNodeHandle mode a) - (NodeToNodeHandleError mode) - IO - --- --- Governor type aliases --- - -type NodeToNodePeerConnectionHandle (mode :: MuxMode) a = - PeerConnectionHandle - mode - SockAddr - ByteString - IO () a - -type NodeToNodePeerStateActions (mode :: MuxMode) a = - Governor.PeerStateActions - SockAddr - (NodeToNodePeerConnectionHandle mode a) - IO - -type NodeToNodePeerSelectionActions (mode :: MuxMode) a = - Governor.PeerSelectionActions - SockAddr - (NodeToNodePeerConnectionHandle mode a) - IO - - --- | Main entry point for data diffusion service. It allows to: --- --- * connect to upstream peers; --- * accept connection from downstream peers, if run in --- 'InitiatorAndResponderDiffusionMode'. --- * runs a local service which allows to use node-to-client protocol to obtain --- information from the running system. This is used by 'cardano-cli' or --- a wallet and a like local services. +mkDiffusionTracersP2P + :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace) + -> Tracer IO NTN.HandshakeTr + -> Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace) + -> Tracer IO NTC.HandshakeTr + -> Tracer IO DiffusionInitializationTracer + -> Tracer IO TraceLedgerPeers + -> Tracer IO (TraceLocalRootPeers IOException) + -> Tracer IO TracePublicRootPeers + -> Tracer IO (TracePeerSelection SockAddr) + -> Tracer + IO + (DebugPeerSelection + SockAddr (P2P.NodeToNodePeerConnectionHandle 'InitiatorMode Void)) + -> Tracer + IO + (DebugPeerSelection + SockAddr + (P2P.NodeToNodePeerConnectionHandle 'InitiatorResponderMode ())) + -> Tracer IO PeerSelectionCounters + -> Tracer IO (PeerSelectionActionsTrace SockAddr) + -> Tracer + IO + (ConnectionManagerTrace + SockAddr + (ConnectionHandlerTrace + NodeToNodeVersion NodeToNodeVersionData)) + -> Tracer IO (ServerTrace SockAddr) + -> Tracer IO (InboundGovernorTrace SockAddr) + -> Tracer + IO + (ConnectionManagerTrace + LocalAddress + (ConnectionHandlerTrace + NodeToClientVersion NodeToClientVersionData)) + -> Tracer IO (ServerTrace LocalAddress) + -> Tracer IO (InboundGovernorTrace LocalAddress) + -> DiffusionTracers +mkDiffusionTracersP2P + a1 a2 a3 a4 a5 a6 a7 a8 a9 + a10 a11 a12 a13 a14 a15 a16 + a17 a18 a19 = + DiffusionTracers + $ Common.DiffusionTracers + a1 a2 a3 a4 a5 a6 + $ Right + $ P2P.DTracersExtra + a7 a8 a9 a10 a11 a12 + a13 a14 a15 a16 a17 + a18 a19 + +-- | runDataDiffusion for either P2P or Non-P2P node -- runDataDiffusion - :: DiffusionTracers - -> DiffusionArguments IO - -> DiffusionApplications - RemoteAddress LocalAddress - NodeToNodeVersionData NodeToClientVersionData - IO - -> IO Void -runDataDiffusion tracers - DiffusionArguments { daIPv4Address - , daIPv6Address - , daLocalAddress - , daPeerSelectionTargets - , daReadLocalRootPeers - , daReadPublicRootPeers - , daReadUseLedgerAfter - , daAcceptedConnectionsLimit - , daDiffusionMode - , daProtocolIdleTimeout - , daTimeWaitTimeout - } - DiffusionApplications { daApplicationInitiatorMode - , daApplicationInitiatorResponderMode - , daLocalResponderApplication - , daRethrowPolicy - , daMiniProtocolParameters - , daLocalRethrowPolicy - , daLedgerPeersCtx - , daPeerMetrics - , daBlockFetchMode - } = - -- We run two services: for /node-to-node/ and /node-to-client/. The - -- naming convention is that we use /local/ prefix for /node-to-client/ - -- related terms, as this is a local only service running over a unix - -- socket / windows named pipe. - handle (\e -> traceWith tracer (DiffusionErrored e) - >> throwIO e) $ - withIOManager $ \iocp -> - withTimeoutSerial $ \timeout -> do - - -- Thread to which 'RethrowPolicy' will throw fatal exceptions. - mainThreadId <- myThreadId - - cmIPv4Address - <- traverse (either Socket.getSocketName (pure . Socket.addrAddress)) - daIPv4Address - case cmIPv4Address of - Just SockAddrInet {} -> pure () - Just SockAddrInet6 {} -> throwIO UnexpectedIPv6Address - Just SockAddrUnix {} -> throwIO UnexpectedUnixAddress - Nothing -> pure () - - cmIPv6Address - <- traverse (either Socket.getSocketName (pure . Socket.addrAddress)) - daIPv6Address - case cmIPv6Address of - Just SockAddrInet {} -> throwIO UnexpectedIPv4Address - Just SockAddrInet6 {} -> pure () - Just SockAddrUnix {} -> throwIO UnexpectedUnixAddress - Nothing -> pure () - - -- control channel for the server; only required in - -- @'InitiatorResponderMode' :: 'MuxMode'@ - cmdInMode - <- case daDiffusionMode of - InitiatorOnlyDiffusionMode -> - -- action which we pass to connection handler - pure (HasInitiator CMDInInitiatorMode) - InitiatorAndResponderDiffusionMode -> do - -- we pass 'Server.newOutboundConnection serverControlChannel' to - -- connection handler - HasInitiatorResponder <$> - (CMDInInitiatorResponderMode - <$> Server.newControlChannel - <*> Server.newObservableStateVarIO) - - localControlChannel <- Server.newControlChannel - localServerStateVar <- Server.newObservableStateVarIO - - -- RNGs used for picking random peers from the ledger and for - -- demoting/promoting peers. - rng <- newStdGen - let (ledgerPeersRng, rng') = split rng - (policyRng, rng'') = split rng' - (churnRng, fuzzRng) = split rng'' - policyRngVar <- newTVarIO policyRng - - churnModeVar <- newTVarIO ChurnModeNormal - - -- Request interface, supply the number of peers desired. - ledgerPeersReq <- newEmptyTMVarIO :: IO (StrictTMVar IO NumberOfPeers) - -- Response interface, returns a Set of peers. Nothing indicates that the - -- ledger hasn't caught up to `useLedgerAfter`. May return less than - -- the number of peers requested. - ledgerPeersRsp <- newEmptyTMVarIO :: IO (StrictTMVar IO (Maybe (Set SockAddr, DiffTime))) - - - peerSelectionTargetsVar <- newTVarIO $ daPeerSelectionTargets { - -- Start with a smaller number of active peers, the churn governor will increase - -- it to the configured value after a delay. - targetNumberOfActivePeers = min 2 (targetNumberOfActivePeers daPeerSelectionTargets) - } - - let -- snocket for remote communication. - snocket :: SocketSnocket - snocket = Snocket.socketSnocket iocp - - localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0 - - -- - -- local connection manager - -- - localThread :: Maybe (IO Void) - localThread = - case daLocalAddress of - Nothing -> Nothing - Just localAddr -> - Just $ withLocalSocket iocp tracer localAddr - $ \localSnocket localSocket -> do - let localConnectionHandler :: NodeToClientConnectionHandler - localConnectionHandler = - makeConnectionHandler - dtLocalMuxTracer - SingResponderMode - localMiniProtocolBundle - HandshakeArguments { - haHandshakeTracer = dtLocalHandshakeTracer, - haHandshakeCodec = NodeToClient.nodeToClientHandshakeCodec, - haVersionDataCodec = cborTermVersionDataCodec NodeToClient.nodeToClientCodecCBORTerm, - haVersions = - (\(OuroborosApplication apps) - -> Bundle - (WithHot apps) - (WithWarm (\_ _ -> [])) - (WithEstablished (\_ _ -> []))) - <$> daLocalResponderApplication, - haAcceptVersion = acceptableVersion, - haTimeLimits = noTimeLimitsHandshake - } - (mainThreadId, rethrowPolicy <> daLocalRethrowPolicy) - - localConnectionManagerArguments :: NodeToClientConnectionManagerArguments - localConnectionManagerArguments = - ConnectionManagerArguments { - cmTracer = dtLocalConnectionManagerTracer, - cmTrTracer = nullTracer, -- TODO - cmMuxTracer = dtLocalMuxTracer, - cmIPv4Address = Nothing, - cmIPv6Address = Nothing, - cmAddressType = const Nothing, - cmSnocket = localSnocket, - connectionDataFlow = uncurry localDataFlow, - cmPrunePolicy = Server.randomPrunePolicy localServerStateVar, - cmConnectionsLimits = localConnectionLimits, - cmTimeWaitTimeout = local_TIME_WAIT_TIMEOUT - } - - withConnectionManager - localConnectionManagerArguments - localConnectionHandler - classifyHandleError - (InResponderMode localControlChannel) - $ \(localConnectionManager :: NodeToClientConnectionManager) -> do - - -- - -- run local server - -- - - traceWith tracer . RunLocalServer - =<< Snocket.getLocalAddr localSnocket localSocket - - Async.withAsync - (Server.run - ServerArguments { - serverSockets = localSocket :| [], - serverSnocket = localSnocket, - serverTracer = dtLocalServerTracer, - serverInboundGovernorTracer = dtLocalInboundGovernorTracer, - serverControlChannel = localControlChannel, - serverConnectionLimits = localConnectionLimits, - serverConnectionManager = localConnectionManager, - serverObservableStateVar = localServerStateVar, - serverProtocolIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT - }) Async.wait - - -- - -- remote connection manager - -- - - remoteThread :: IO Void - remoteThread = - Async.withAsync - (runLedgerPeers - ledgerPeersRng - dtLedgerPeersTracer - daReadUseLedgerAfter - daLedgerPeersCtx - (resolveDomainAddresses - dtTracePublicRootPeersTracer - timeout - DNS.defaultResolvConf - ioDNSActions - ) - (takeTMVar ledgerPeersReq) - (putTMVar ledgerPeersRsp) - ) - $ \ledgerPeerThread -> - case cmdInMode of - -- InitiatorOnlyMode - -- - -- Run peer selection only - HasInitiator CMDInInitiatorMode -> do - let connectionManagerArguments :: NodeToNodeConnectionManagerArguments InitiatorMode Void - connectionManagerArguments = - ConnectionManagerArguments { - cmTracer = dtConnectionManagerTracer, - cmTrTracer = nullTracer, -- TODO - cmMuxTracer = dtMuxTracer, - cmIPv4Address, - cmIPv6Address, - cmAddressType = socketAddressType, - cmSnocket = snocket, - connectionDataFlow = uncurry nodeDataFlow, - cmPrunePolicy = - case cmdInMode of - HasInitiator CMDInInitiatorMode -> - -- Server is not running, it will not be able to - -- advise which connections to prune. It's also not - -- expected that the governor targets will be larger - -- than limits imposed by 'cmConnectionsLimits'. - simplePrunePolicy, - cmConnectionsLimits = daAcceptedConnectionsLimit, - cmTimeWaitTimeout = daTimeWaitTimeout - } - - connectionHandler :: NodeToNodeConnectionHandler InitiatorMode Void - connectionHandler = - makeConnectionHandler - dtMuxTracer - SingInitiatorMode - miniProtocolBundleInitiatorMode - HandshakeArguments { - haHandshakeTracer = dtHandshakeTracer, - haHandshakeCodec = nodeToNodeHandshakeCodec, - haVersionDataCodec = cborTermVersionDataCodec NodeToNode.nodeToNodeCodecCBORTerm, - haVersions = daApplicationInitiatorMode, - haAcceptVersion = acceptableVersion, - haTimeLimits = timeLimitsHandshake - } - (mainThreadId, rethrowPolicy <> daRethrowPolicy) - - withConnectionManager - connectionManagerArguments - connectionHandler - classifyHandleError - NotInResponderMode - $ \(connectionManager :: NodeToNodeConnectionManager InitiatorMode Void) -> do -#ifdef POSIX - _ <- Signals.installHandler - Signals.sigUSR1 - (Signals.Catch - (do state <- ConnectionManager.readState connectionManager - traceWith dtConnectionManagerTracer - (TrState state) - ) - ) - Nothing -#endif - - -- - -- peer state actions - -- - -- Peer state actions run a job pool in the background which - -- tracks threads forked by 'PeerStateActions' - -- - - withPeerStateActions - timeout - PeerStateActionsArguments { - spsTracer = dtPeerSelectionActionsTracer, - spsDeactivateTimeout = Diffusion.Policies.deactivateTimeout, - spsCloseConnectionTimeout = Diffusion.Policies.closeConnectionTimeout, - spsConnectionManager = connectionManager - } - $ \(peerStateActions :: NodeToNodePeerStateActions InitiatorMode Void) -> - -- - -- Run peer selection (p2p governor) - -- - - withPeerSelectionActions - dtTraceLocalRootPeersTracer - dtTracePublicRootPeersTracer - timeout - (readTVar peerSelectionTargetsVar) - daReadLocalRootPeers - daReadPublicRootPeers - peerStateActions - (putTMVar ledgerPeersReq) - (takeTMVar ledgerPeersRsp) - $ \mbLocalPeerRootProviderThread - (peerSelectionActions - :: NodeToNodePeerSelectionActions - InitiatorMode Void) -> - - Async.withAsync - (Governor.peerSelectionGovernor - dtTracePeerSelectionTracer - dtDebugPeerSelectionInitiatorTracer - dtTracePeerSelectionCounters - fuzzRng - peerSelectionActions - (Diffusion.Policies.simplePeerSelectionPolicy - policyRngVar (readTVar churnModeVar) daPeerMetrics)) - $ \governorThread -> - Async.withAsync - (Governor.peerChurnGovernor - dtTracePeerSelectionTracer - daPeerMetrics - churnModeVar - churnRng - daBlockFetchMode - daPeerSelectionTargets - peerSelectionTargetsVar) - $ \churnGovernorThread -> - - -- wait for any thread to fail - snd <$> Async.waitAny - (maybeToList mbLocalPeerRootProviderThread - ++ [ governorThread - , ledgerPeerThread - , churnGovernorThread - ]) - - - -- InitiatorResponderMode - -- - -- Run peer selection and the server. - -- - HasInitiatorResponder (CMDInInitiatorResponderMode controlChannel observableStateVar) -> do - let connectionManagerArguments :: NodeToNodeConnectionManagerArguments InitiatorResponderMode () - connectionManagerArguments = - ConnectionManagerArguments { - cmTracer = dtConnectionManagerTracer, - cmTrTracer = nullTracer, -- TODO - cmMuxTracer = dtMuxTracer, - cmIPv4Address, - cmIPv6Address, - cmAddressType = socketAddressType, - cmSnocket = snocket, - connectionDataFlow = uncurry nodeDataFlow, - cmPrunePolicy = - case cmdInMode of - HasInitiatorResponder (CMDInInitiatorResponderMode _ serverStateVar) -> - Server.randomPrunePolicy serverStateVar, - cmConnectionsLimits = daAcceptedConnectionsLimit, - cmTimeWaitTimeout = daTimeWaitTimeout - } - - connectionHandler :: NodeToNodeConnectionHandler InitiatorResponderMode () - connectionHandler = - makeConnectionHandler - dtMuxTracer - SingInitiatorResponderMode - miniProtocolBundleInitiatorResponderMode - HandshakeArguments { - haHandshakeTracer = dtHandshakeTracer, - haHandshakeCodec = nodeToNodeHandshakeCodec, - haVersionDataCodec = cborTermVersionDataCodec NodeToNode.nodeToNodeCodecCBORTerm, - haVersions = daApplicationInitiatorResponderMode, - haAcceptVersion = acceptableVersion, - haTimeLimits = timeLimitsHandshake - } - (mainThreadId, rethrowPolicy <> daRethrowPolicy) - - withConnectionManager - connectionManagerArguments - connectionHandler - classifyHandleError - (InResponderMode controlChannel) - $ \(connectionManager :: NodeToNodeConnectionManager InitiatorResponderMode ()) -> do -#ifdef POSIX - _ <- Signals.installHandler - Signals.sigUSR1 - (Signals.Catch - (do state <- ConnectionManager.readState connectionManager - traceWith dtConnectionManagerTracer - (TrState state) - ) - ) - Nothing -#endif - -- - -- peer state actions - -- - -- Peer state actions run a job pool in the background which - -- tracks threads forked by 'PeerStateActions' - -- - - withPeerStateActions - timeout - PeerStateActionsArguments { - spsTracer = dtPeerSelectionActionsTracer, - spsDeactivateTimeout = Diffusion.Policies.deactivateTimeout, - spsCloseConnectionTimeout = Diffusion.Policies.closeConnectionTimeout, - spsConnectionManager = connectionManager - } - $ \(peerStateActions - :: NodeToNodePeerStateActions - InitiatorResponderMode ()) -> - - -- - -- Run peer selection (p2p governor) - -- - - withPeerSelectionActions - dtTraceLocalRootPeersTracer - dtTracePublicRootPeersTracer - timeout - (readTVar peerSelectionTargetsVar) - daReadLocalRootPeers - daReadPublicRootPeers - peerStateActions - (putTMVar ledgerPeersReq) - (takeTMVar ledgerPeersRsp) - $ \mbLocalPeerRootProviderThread - (peerSelectionActions - :: NodeToNodePeerSelectionActions - InitiatorResponderMode ()) -> - - Async.withAsync - (Governor.peerSelectionGovernor - dtTracePeerSelectionTracer - dtDebugPeerSelectionInitiatorResponderTracer - dtTracePeerSelectionCounters - fuzzRng - peerSelectionActions - (Diffusion.Policies.simplePeerSelectionPolicy - policyRngVar (readTVar churnModeVar) daPeerMetrics)) - $ \governorThread -> do - let mkAddr :: AddrInfo -> (Socket.Family, SockAddr) - mkAddr addr = ( Socket.addrFamily addr - , Socket.addrAddress addr - ) - - withSockets tracer snocket - (catMaybes - [ fmap (fmap mkAddr) daIPv4Address - , fmap (fmap mkAddr) daIPv6Address - ]) - $ \sockets addresses -> do - -- - -- Run server - -- - traceWith tracer (RunServer addresses) - Async.withAsync - (Server.run - ServerArguments { - serverSockets = sockets, - serverSnocket = snocket, - serverTracer = dtServerTracer, - serverInboundGovernorTracer = dtInboundGovernorTracer, - serverControlChannel = controlChannel, - serverConnectionLimits = daAcceptedConnectionsLimit, - serverConnectionManager = connectionManager, - serverProtocolIdleTimeout = daProtocolIdleTimeout, - serverObservableStateVar = observableStateVar - }) - $ \serverThread -> - Async.withAsync - (Governor.peerChurnGovernor - dtTracePeerSelectionTracer - daPeerMetrics - churnModeVar - churnRng - daBlockFetchMode - daPeerSelectionTargets - peerSelectionTargetsVar) - $ \churnGovernorThread -> - - -- wait for any thread to fail - snd <$> Async.waitAny - (maybeToList mbLocalPeerRootProviderThread - ++ [ serverThread - , governorThread - , ledgerPeerThread - , churnGovernorThread - ]) - - Async.runConcurrently - $ asum - $ Async.Concurrently <$> - ( remoteThread - : maybeToList localThread - ) - - where - DiffusionTracers { dtMuxTracer - , dtHandshakeTracer - , dtTracePeerSelectionTracer - , dtDebugPeerSelectionInitiatorTracer - , dtDebugPeerSelectionInitiatorResponderTracer - , dtTracePeerSelectionCounters - , dtPeerSelectionActionsTracer - , dtTraceLocalRootPeersTracer - , dtTracePublicRootPeersTracer - , dtConnectionManagerTracer - , dtServerTracer - , dtInboundGovernorTracer - , dtLocalMuxTracer - , dtLocalHandshakeTracer - , dtLocalConnectionManagerTracer - , dtLocalServerTracer - , dtLocalInboundGovernorTracer - , dtLedgerPeersTracer - -- the tracer - , dtDiffusionInitializationTracer = tracer - } = tracers - - - miniProtocolBundleInitiatorResponderMode :: MiniProtocolBundle InitiatorResponderMode - miniProtocolBundleInitiatorResponderMode = - combineMiniProtocolBundles miniProtocolBundleInitiatorMode - miniProtocolBundleResponderMode - - -- node-to-node responder bundle; it is only used in combination with - -- the node-to-node initiator bundle defined below. - -- - miniProtocolBundleResponderMode :: MiniProtocolBundle ResponderMode - miniProtocolBundleResponderMode = MiniProtocolBundle - [ MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 2, - miniProtocolDir = ResponderDirectionOnly, - miniProtocolLimits = chainSyncProtocolLimits daMiniProtocolParameters - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 3, - miniProtocolDir = ResponderDirectionOnly, - miniProtocolLimits = blockFetchProtocolLimits daMiniProtocolParameters - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 4, - miniProtocolDir = ResponderDirectionOnly, - miniProtocolLimits = txSubmissionProtocolLimits daMiniProtocolParameters - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 8, - miniProtocolDir = ResponderDirectionOnly, - miniProtocolLimits = keepAliveProtocolLimits daMiniProtocolParameters - } - -- TODO: `tip-sample` protocol - ] - - -- node-to-node initiator bundle - miniProtocolBundleInitiatorMode :: MiniProtocolBundle InitiatorMode - miniProtocolBundleInitiatorMode = MiniProtocolBundle - [ MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 2, - miniProtocolDir = InitiatorDirectionOnly, - miniProtocolLimits = chainSyncProtocolLimits daMiniProtocolParameters - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 3, - miniProtocolDir = InitiatorDirectionOnly, - miniProtocolLimits = blockFetchProtocolLimits daMiniProtocolParameters - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 4, - miniProtocolDir = InitiatorDirectionOnly, - miniProtocolLimits = txSubmissionProtocolLimits daMiniProtocolParameters - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 8, - miniProtocolDir = InitiatorDirectionOnly, - miniProtocolLimits = keepAliveProtocolLimits daMiniProtocolParameters - } - -- TODO: `tip-sample` protocol - ] - - -- node-to-client protocol bundle - localMiniProtocolBundle :: MiniProtocolBundle ResponderMode - localMiniProtocolBundle = MiniProtocolBundle - [ MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 5, - miniProtocolDir = ResponderDirectionOnly, - miniProtocolLimits = maximumMiniProtocolLimits - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 6, - miniProtocolDir = ResponderDirectionOnly, - miniProtocolLimits = maximumMiniProtocolLimits - } - , MiniProtocolInfo { - miniProtocolNum = MiniProtocolNum 7, - miniProtocolDir = ResponderDirectionOnly, - miniProtocolLimits = maximumMiniProtocolLimits - } - ] - where - maximumMiniProtocolLimits :: MiniProtocolLimits - maximumMiniProtocolLimits = - MiniProtocolLimits { - maximumIngressQueue = 0xffffffff - } - - -- Only the 'IOManagerError's are fatal, all the other exceptions in the - -- networking code will only shutdown the bearer (see 'ShutdownPeer' why - -- this is so). - rethrowPolicy = - RethrowPolicy $ \_ctx err -> - case fromException err of - Just (_ :: IOManagerError) -> ShutdownNode - Nothing -> mempty - --- --- Data flow --- - --- | For Node-To-Node protocol, any connection which negotiated at least --- 'NodeToNodeV_7' version and did not declared 'InitiatorOnlyDiffusionMode' --- will run in 'Duplex' mode. All connections from lower versions or one that --- declared themselves as 'InitiatorOnly' will run in 'UnidirectionalMode' --- -nodeDataFlow :: NodeToNodeVersion - -> NodeToNodeVersionData - -> DataFlow -nodeDataFlow v NodeToNodeVersionData { diffusionMode = InitiatorAndResponderDiffusionMode } - | v >= NodeToNodeV_7 - = Duplex -nodeDataFlow _ _ = Unidirectional - - --- | For Node-To-Client protocol all connection are considered 'Unidrectional'. --- -localDataFlow :: NodeToClientVersion - -> NodeToClientVersionData - -> DataFlow -localDataFlow _ _ = Unidirectional - - --- --- Socket utility functions --- - -withSockets :: Tracer IO DiffusionInitializationTracer - -> SocketSnocket - -> [Either Socket.Socket (Socket.Family, SockAddr)] - -> (NonEmpty Socket.Socket -> NonEmpty Socket.SockAddr -> IO a) - -> IO a -withSockets tracer sn addresses k = go [] addresses - where - go !acc (a : as) = withSocket a (\sa -> go (sa : acc) as) - go [] [] = throw NoSocket - go !acc [] = - let acc' = NonEmpty.fromList (reverse acc) - in (k $! (fst <$> acc')) $! (snd <$> acc') - - withSocket :: Either Socket.Socket (Socket.Family, SockAddr) - -> ((Socket.Socket, Socket.SockAddr) -> IO a) - -> IO a - withSocket (Left sock) f = - bracket - (pure sock) - (Snocket.close sn) - $ \_sock -> do - !addr <- Socket.getSocketName sock - f (sock, addr) - withSocket (Right (fam, !addr)) f = - bracket - (do traceWith tracer (CreatingServerSocket addr) - Snocket.open sn (Snocket.SocketFamily fam)) - (Snocket.close sn) - $ \sock -> do - traceWith tracer $ ConfiguringServerSocket addr - Snocket.bind sn sock addr - traceWith tracer $ ListeningServerSocket addr - Snocket.listen sn sock - traceWith tracer $ ServerSocketUp addr - f (sock, addr) - - -withLocalSocket :: IOManager - -> Tracer IO DiffusionInitializationTracer - -> Either Socket.Socket FilePath - -> (LocalSnocket -> LocalSocket -> IO a) - -> IO a -withLocalSocket iocp tracer localAddress k = - bracket - ( - case localAddress of -#if defined(mingw32_HOST_OS) - -- Windows uses named pipes so can't take advantage of existing sockets - Left _ -> traceWith tracer UnsupportedReadySocketCase - >> throwIO UnsupportedReadySocket -#else - Left sd -> do - addr <- Socket.getSocketName sd - case addr of - (Socket.SockAddrUnix path) -> do - traceWith tracer (UsingSystemdSocket path) - return (Left ( Snocket.localSnocket iocp path - , (LocalSocket sd) - )) - _ -> do - traceWith tracer $ UnsupportedLocalSystemdSocket addr - throwIO UnsupportedLocalSocketType -#endif - Right addr -> do - let sn :: LocalSnocket - sn = Snocket.localSnocket iocp addr - traceWith tracer $ CreateSystemdSocketForSnocketPath addr - sd <- Snocket.open sn Snocket.LocalFamily - traceWith tracer $ CreatedLocalSocket addr - return (Right (sn, sd, addr)) - ) - -- We close the socket here, even if it was provided to us. - (\case - Left (sn, sd) -> Snocket.close sn sd - Right (sn, sd, _) -> Snocket.close sn sd) - $ \case - -- unconfigured socket - Right (sn, sd, addr) -> do - traceWith tracer . ConfiguringLocalSocket addr - =<< localSocketFileDescriptor sd - Snocket.bind sn sd (NodeToClient.LocalAddress addr) - traceWith tracer . ListeningLocalSocket addr - =<< localSocketFileDescriptor sd - Snocket.listen sn sd - traceWith tracer . LocalSocketUp addr - =<< localSocketFileDescriptor sd - k sn sd - - -- pre-configured systemd socket - Left (sn, sd) -> k sn sd + :: DiffusionTracers + -> DiffusionArguments IO + -> DiffusionApplications + RemoteAddress + LocalAddress + NodeToNodeVersionData + NodeToClientVersionData + IO + -> IO () +runDataDiffusion + (DiffusionTracers + tr@Common.DiffusionTracers + { dtP2P }) + (DiffusionArguments + dargs@Common.DiffusionArguments + { daP2P }) + (DiffusionApplications + dapps@Common.DiffusionApplications + { dapP2P }) = + case (dtP2P, daP2P, dapP2P) of + (Left t, Left da, Left dapp) -> + NonP2P.runDataDiffusion + (tr { dtP2P = t}) + (dargs { daP2P = da }) + (dapps { dapP2P = dapp }) + (Right t, Right da, Right dapp) -> + void + $ P2P.runDataDiffusion + (tr { dtP2P = t}) + (dargs { daP2P = da }) + (dapps { dapP2P = dapp }) + _ -> + error "Non-matching arguments, every argument should be on the same side!" diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Common.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Common.hs new file mode 100644 index 00000000000..b2d38655093 --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Common.hs @@ -0,0 +1,205 @@ +-- Common things between P2P and NonP2P Diffusion modules +{-# LANGUAGE DataKinds #-} + +module Ouroboros.Network.Diffusion.Common + ( DiffusionInitializationTracer(..) + , DiffusionFailure(..) + , DiffusionTracers(..) + , nullTracers + , DiffusionArguments(..) + , DiffusionApplications(..) + ) where + +import Data.List.NonEmpty (NonEmpty) +import Data.ByteString.Lazy (ByteString) +import Data.Void (Void) + +import Control.Exception (SomeException, Exception) +import Control.Tracer (Tracer, nullTracer) + +import Network.Socket + ( SockAddr + , AddrInfo + , Socket + ) +import Network.Mux + ( WithMuxBearer + , MuxTrace + , MuxMode(..) + ) + +import Ouroboros.Network.Mux + ( OuroborosBundle + , OuroborosApplication + ) +import Ouroboros.Network.Snocket (FileDescriptor) +import Ouroboros.Network.PeerSelection.LedgerPeers + ( TraceLedgerPeers + , LedgerPeersConsensusInterface + ) +import Ouroboros.Network.NodeToNode + ( ConnectionId + , NodeToNodeVersion + , AcceptedConnectionsLimit + , DiffusionMode + ) +import qualified Ouroboros.Network.NodeToNode as NodeToNode +import Ouroboros.Network.NodeToClient + ( LocalAddress + , Versions + , NodeToClientVersion + ) +import qualified Ouroboros.Network.NodeToClient as NodeToClient + +-- TODO: use LocalAddress where appropriate rather than 'path'. +-- +data DiffusionInitializationTracer + = RunServer !(NonEmpty SockAddr) + | RunLocalServer !LocalAddress + | UsingSystemdSocket !FilePath + -- Rename as 'CreateLocalSocket' + | CreateSystemdSocketForSnocketPath !FilePath + | CreatedLocalSocket !FilePath + | ConfiguringLocalSocket !FilePath !FileDescriptor + | ListeningLocalSocket !FilePath !FileDescriptor + | LocalSocketUp !FilePath !FileDescriptor + -- Rename as 'CreateServerSocket' + | CreatingServerSocket !SockAddr + | ConfiguringServerSocket !SockAddr + | ListeningServerSocket !SockAddr + | ServerSocketUp !SockAddr + -- Rename as 'UnsupportedLocalSocketType' + | UnsupportedLocalSystemdSocket !SockAddr + -- Remove (this is impossible case), there's no systemd on Windows + | UnsupportedReadySocketCase + | DiffusionErrored SomeException + deriving Show + +-- TODO: add a tracer for these misconfiguration +data DiffusionFailure = UnsupportedLocalSocketType + | UnsupportedReadySocket -- Windows only + | UnexpectedIPv4Address + | UnexpectedIPv6Address + | UnexpectedUnixAddress + | NoSocket + deriving (Eq, Show) + +instance Exception DiffusionFailure + +-- | Common DiffusionTracers interface between P2P and NonP2P +-- +data DiffusionTracers p2p = DiffusionTracers { + -- | Mux tracer + dtMuxTracer + :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace) + + -- | Handshake protocol tracer + , dtHandshakeTracer + :: Tracer IO NodeToNode.HandshakeTr + + -- + -- NodeToClient tracers + -- + + -- | Mux tracer for local clients + , dtLocalMuxTracer + :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace) + + -- | Handshake protocol tracer for local clients + , dtLocalHandshakeTracer + :: Tracer IO NodeToClient.HandshakeTr + + -- | Diffusion initialisation tracer + , dtDiffusionInitializationTracer + :: Tracer IO DiffusionInitializationTracer + + -- | Ledger Peers tracer + , dtLedgerPeersTracer + :: Tracer IO TraceLedgerPeers + + -- | P2P or NonP2P DiffusionTracers + , dtP2P :: p2p + } + +nullTracers :: p2p -> DiffusionTracers p2p +nullTracers p2pNullTracers = DiffusionTracers { + dtMuxTracer = nullTracer + , dtHandshakeTracer = nullTracer + , dtLocalMuxTracer = nullTracer + , dtLocalHandshakeTracer = nullTracer + , dtDiffusionInitializationTracer = nullTracer + , dtLedgerPeersTracer = nullTracer + , dtP2P = p2pNullTracers + } + +-- | Common DiffusionArguments interface between P2P and NonP2P +-- +data DiffusionArguments p2p = DiffusionArguments { + -- | an @IPv4@ socket ready to accept connections or an @IPv4@ addresses + -- + daIPv4Address :: Maybe (Either Socket AddrInfo) + + -- | an @IPV4@ socket ready to accept connections or an @IPv6@ addresses + -- + , daIPv6Address :: Maybe (Either Socket AddrInfo) + + -- | an @AF_UNIX@ socket ready to accept connections or an @AF_UNIX@ + -- socket path + , daLocalAddress :: Maybe (Either Socket FilePath) + + -- | parameters for limiting number of accepted connections + -- + , daAcceptedConnectionsLimit :: AcceptedConnectionsLimit + + -- | run in initiator only mode + -- + , daDiffusionMode :: DiffusionMode + + -- | p2p polymorphic type argument to allow easy switching between + -- P2P and NonP2P DiffusionArguments Extras + -- + , daP2P :: p2p + } + +-- | Common DiffusionArguments interface between P2P and NonP2P +-- +data DiffusionApplications p2p ntnAddr ntcAddr ntnVersionData ntcVersionData m = + DiffusionApplications { + -- | NodeToNode initiator applications for initiator only mode. + -- + -- TODO: we should accept one or the other, but not both: + -- 'daApplicationInitiatorMode', 'daApplicationInitiatorResponderMode'. + -- + daApplicationInitiatorMode + :: Versions NodeToNodeVersion + ntnVersionData + (OuroborosBundle + InitiatorMode ntnAddr + ByteString m () Void) + + -- | NodeToNode initiator & responder applications for bidirectional mode. + -- + , daApplicationInitiatorResponderMode + :: Versions NodeToNodeVersion + ntnVersionData + (OuroborosBundle + InitiatorResponderMode ntnAddr + ByteString m () ()) + + -- | NodeToClient responder application (server role) + -- + , daLocalResponderApplication + :: Versions NodeToClientVersion + ntcVersionData + (OuroborosApplication + ResponderMode ntcAddr + ByteString m Void ()) + + -- | Interface used to get peers from the current ledger. + , daLedgerPeersCtx :: LedgerPeersConsensusInterface m + + -- | p2p polymorphic type argument to allow easy switching between + -- P2P and NonP2P DiffusionApplications Extras + -- + , dapP2P :: p2p + } diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs new file mode 100644 index 00000000000..7a8025c9d0f --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs @@ -0,0 +1,502 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE GADTs #-} + +module Ouroboros.Network.Diffusion.NonP2P + ( DTracersExtra (..) + , nullTracers + , DApplicationsExtra (..) + , DArgumentsExtra (..) + , runDataDiffusion + ) + where + +import qualified Control.Concurrent.Async as Async +import Control.Exception +import Control.Tracer (Tracer, traceWith, nullTracer) +import Data.Functor (void) +import Data.Maybe (maybeToList) +import Data.Foldable (asum) +import Data.Void (Void) + +import Network.Socket (SockAddr) +import qualified Network.Socket as Socket + +import Ouroboros.Network.Snocket + ( LocalAddress + , LocalSnocket + , LocalSocket (..) + , SocketSnocket + , localSocketFileDescriptor + ) +import qualified Ouroboros.Network.Snocket as Snocket + +import Ouroboros.Network.ErrorPolicy +import Ouroboros.Network.IOManager +import Ouroboros.Network.Mux +import Ouroboros.Network.NodeToClient (NodeToClientVersionData) +import qualified Ouroboros.Network.NodeToClient as NodeToClient +import Ouroboros.Network.NodeToNode + ( AcceptConnectionsPolicyTrace (..) + , DiffusionMode (..) + , RemoteAddress + , NodeToNodeVersionData + ) +import qualified Ouroboros.Network.NodeToNode as NodeToNode +import Ouroboros.Network.Socket + ( NetworkMutableState + , newNetworkMutableState + , cleanNetworkMutableState + , NetworkServerTracers (..) + ) +import Ouroboros.Network.Subscription.Ip +import Ouroboros.Network.Subscription.Dns +import Ouroboros.Network.Subscription.Worker (LocalAddresses (..)) +import Ouroboros.Network.Tracers +import qualified Ouroboros.Network.Diffusion.Common as Common + ( nullTracers ) +import Ouroboros.Network.Diffusion.Common + ( DiffusionTracers(..) + , DiffusionArguments(..) + , DiffusionApplications(..) + , DiffusionInitializationTracer(..) + , DiffusionFailure(..) + ) + +-- | NonP2P DiffusionTracers Extras +-- +data DTracersExtra = DTracersExtra { + -- | IP subscription tracer + -- + dtIpSubscriptionTracer + :: Tracer IO (WithIPList (SubscriptionTrace SockAddr)) + + -- | DNS subscription tracer + -- + , dtDnsSubscriptionTracer + :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr)) + + -- | DNS resolver tracer + -- + , dtDnsResolverTracer + :: Tracer IO (WithDomainName DnsTrace) + + , dtErrorPolicyTracer + :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace) + + , dtLocalErrorPolicyTracer + :: Tracer IO (WithAddr LocalAddress ErrorPolicyTrace) + + -- | Trace rate limiting of accepted connections + -- + , dtAcceptPolicyTracer + :: Tracer IO AcceptConnectionsPolicyTrace + } + +nullTracers :: DiffusionTracers DTracersExtra +nullTracers = Common.nullTracers nonP2PNullTracers + where + nonP2PNullTracers = + DTracersExtra { + dtIpSubscriptionTracer = nullTracer + , dtDnsSubscriptionTracer = nullTracer + , dtDnsResolverTracer = nullTracer + , dtErrorPolicyTracer = nullTracer + , dtLocalErrorPolicyTracer = nullTracer + , dtAcceptPolicyTracer = nullTracer + } + +-- | NonP2P DiffusionArguments Extras +-- +data DArgumentsExtra = DArgumentsExtra { + -- | ip subscription addresses + -- + daIpProducers :: IPSubscriptionTarget + + -- | list of domain names to subscribe to + -- + , daDnsProducers :: [DnsSubscriptionTarget] + } + +-- | NonP2P DiffusionApplications Extras +-- +newtype DApplicationsExtra = DApplicationsExtra { + -- | Error policies + -- + daErrorPolicies :: ErrorPolicies + } + +-- | Converts between OuroborosBundle and OuroborosApplication. +-- Useful for sharing the same DiffusionApplications modes. +-- +mkApp + :: OuroborosBundle mode addr bs m a b + -> OuroborosApplication mode addr bs m a b +mkApp bundle = + OuroborosApplication $ \connId controlMessageSTM -> + foldMap (\p -> p connId controlMessageSTM) bundle + +-- | Converts between OuroborosBundle and OuroborosApplication. +-- Converts from InitiatorResponderMode to ResponderMode. +-- +-- Useful for sharing the same DiffusionApplications modes. +-- +mkResponderApp + :: OuroborosBundle InitiatorResponderMode addr bs m a b + -> OuroborosApplication ResponderMode addr bs m Void b +mkResponderApp bundle = + OuroborosApplication $ \connId controlMessageSTM -> + foldMap (\p -> map f $ p connId controlMessageSTM) bundle + where + f :: MiniProtocol InitiatorResponderMode bs m a b + -> MiniProtocol ResponderMode bs m Void b + f MiniProtocol { miniProtocolNum + , miniProtocolLimits + , miniProtocolRun = InitiatorAndResponderProtocol _initiator + responder + } = + MiniProtocol { miniProtocolNum + , miniProtocolLimits + , miniProtocolRun = ResponderProtocolOnly responder + } + +runDataDiffusion + :: DiffusionTracers DTracersExtra + -> DiffusionArguments DArgumentsExtra + -> DiffusionApplications + DApplicationsExtra + RemoteAddress + LocalAddress + NodeToNodeVersionData + NodeToClientVersionData + IO + -> IO () +runDataDiffusion tracers + DiffusionArguments { daIPv4Address + , daIPv6Address + , daLocalAddress + , daAcceptedConnectionsLimit + , daDiffusionMode + , daP2P = DArgumentsExtra { + daIpProducers + , daDnsProducers + } + } + applications@DiffusionApplications + { dapP2P = DApplicationsExtra { daErrorPolicies } } = + traceException . withIOManager $ \iocp -> do + let -- snocket for remote communication. + snocket :: SocketSnocket + snocket = Snocket.socketSnocket iocp + addresses = maybeToList daIPv4Address + ++ maybeToList daIPv6Address + + -- networking mutable state + networkState <- newNetworkMutableState + networkLocalState <- newNetworkMutableState + + lias <- getInitiatorLocalAddresses snocket + + let + dnsSubActions = runDnsSubscriptionWorker snocket networkState lias + <$> daDnsProducers + + serverActions = case daDiffusionMode of + InitiatorAndResponderDiffusionMode -> + runServer snocket networkState . fmap Socket.addrAddress + <$> addresses + InitiatorOnlyDiffusionMode -> [] + + localServerAction = runLocalServer iocp networkLocalState + <$> maybeToList daLocalAddress + + actions = + [ -- clean state thread + cleanNetworkMutableState networkState + , -- clean local state thread + cleanNetworkMutableState networkLocalState + , -- fork ip subscription + runIpSubscriptionWorker snocket networkState lias + ] + -- fork dns subscriptions + ++ dnsSubActions + -- fork servers for remote peers + ++ serverActions + -- fork server for local clients + ++ localServerAction + + -- Runs all threads in parallel, using Async.Concurrently's Alternative instance + Async.runConcurrently $ asum $ Async.Concurrently <$> actions + + where + traceException :: IO a -> IO a + traceException f = catch f $ \(e :: SomeException) -> do + traceWith dtDiffusionInitializationTracer (DiffusionErrored e) + throwIO e + + DiffusionTracers { + dtMuxTracer + , dtLocalMuxTracer + , dtHandshakeTracer + , dtLocalHandshakeTracer + , dtDiffusionInitializationTracer + , dtP2P = DTracersExtra { + dtIpSubscriptionTracer + , dtDnsSubscriptionTracer + , dtDnsResolverTracer + , dtErrorPolicyTracer + , dtLocalErrorPolicyTracer + , dtAcceptPolicyTracer + } + } = tracers + + -- + -- We can't share portnumber with our server since we run separate + -- 'MuxInitiatorApplication' and 'MuxResponderApplication' + -- applications instead of a 'MuxInitiatorAndResponderApplication'. + -- This means we don't utilise full duplex connection. + getInitiatorLocalAddresses :: SocketSnocket -> IO (LocalAddresses SockAddr) + getInitiatorLocalAddresses sn = do + localIpv4 <- + case daIPv4Address of + Just (Right ipv4) -> do + return LocalAddresses + { laIpv4 = anyIPv4Addr (Socket.addrAddress ipv4) + , laIpv6 = Nothing + , laUnix = Nothing + } + + Just (Left ipv4Sock) -> do + ipv4Addrs <- Snocket.getLocalAddr sn ipv4Sock + return LocalAddresses + { laIpv4 = anyIPv4Addr ipv4Addrs + , laIpv6 = Nothing + , laUnix = Nothing + } + + Nothing -> do + return LocalAddresses + { laIpv4 = Nothing + , laIpv6 = Nothing + , laUnix = Nothing + } + + localIpv6 <- + case daIPv6Address of + Just (Right ipv6) -> do + return LocalAddresses + { laIpv4 = Nothing + , laIpv6 = anyIPv6Addr (Socket.addrAddress ipv6) + , laUnix = Nothing + } + + Just (Left ipv6Sock) -> do + ipv6Addrs <- Snocket.getLocalAddr sn ipv6Sock + return LocalAddresses + { laIpv4 = Nothing + , laIpv6 = anyIPv6Addr ipv6Addrs + , laUnix = Nothing + } + + Nothing -> do + return LocalAddresses + { laIpv4 = Nothing + , laIpv6 = Nothing + , laUnix = Nothing + } + + return (localIpv4 <> localIpv6) + where + -- Return an IPv4 address with an emphemeral portnumber if we use IPv4 + anyIPv4Addr :: SockAddr -> Maybe SockAddr + anyIPv4Addr Socket.SockAddrInet {} = Just (Socket.SockAddrInet 0 0) + anyIPv4Addr _ = Nothing + + -- Return an IPv6 address with an emphemeral portnumber if we use IPv6 + anyIPv6Addr :: SockAddr -> Maybe SockAddr + anyIPv6Addr Socket.SockAddrInet6 {} = + Just (Socket.SockAddrInet6 0 0 (0, 0, 0, 0) 0) + anyIPv6Addr _ = Nothing + + remoteErrorPolicy, localErrorPolicy :: ErrorPolicies + remoteErrorPolicy = NodeToNode.remoteNetworkErrorPolicy <> daErrorPolicies + localErrorPolicy = NodeToNode.localNetworkErrorPolicy <> daErrorPolicies + + runLocalServer :: IOManager + -> NetworkMutableState LocalAddress + -> Either Socket.Socket FilePath + -> IO () + runLocalServer iocp networkLocalState localAddress = + bracket + localServerInit + localServerCleanup + localServerBody + where + localServerInit :: IO (LocalSocket, LocalSnocket) + localServerInit = + case localAddress of +#if defined(mingw32_HOST_OS) + -- Windows uses named pipes so can't take advantage of existing sockets + Left _ -> do + traceWith dtDiffusionInitializationTracer UnsupportedReadySocketCase + throwIO UnsupportedReadySocket +#else + Left sd -> do + a <- Socket.getSocketName sd + case a of + (Socket.SockAddrUnix path) -> do + traceWith dtDiffusionInitializationTracer + $ UsingSystemdSocket path + return (LocalSocket sd, Snocket.localSnocket iocp path) + unsupportedAddr -> do + traceWith dtDiffusionInitializationTracer + $ UnsupportedLocalSystemdSocket unsupportedAddr + throwIO UnsupportedLocalSocketType +#endif + Right addr -> do + let sn = Snocket.localSnocket iocp addr + traceWith dtDiffusionInitializationTracer + $ CreateSystemdSocketForSnocketPath addr + sd <- Snocket.open + sn + (Snocket.addrFamily sn $ Snocket.localAddressFromPath addr) + traceWith dtDiffusionInitializationTracer + $ CreatedLocalSocket addr + return (sd, sn) + + -- We close the socket here, even if it was provided for us. + localServerCleanup :: (LocalSocket, LocalSnocket) -> IO () + localServerCleanup (sd, sn) = Snocket.close sn sd + + localServerBody :: (LocalSocket, LocalSnocket) -> IO () + localServerBody (sd, sn) = do + case localAddress of + -- If a socket was provided it should be ready to accept + Left _ -> pure () + Right path -> do + traceWith dtDiffusionInitializationTracer + . ConfiguringLocalSocket path + =<< localSocketFileDescriptor sd + + Snocket.bind sn sd $ Snocket.localAddressFromPath path + + traceWith dtDiffusionInitializationTracer + . ListeningLocalSocket path + =<< localSocketFileDescriptor sd + + Snocket.listen sn sd + + traceWith dtDiffusionInitializationTracer + . LocalSocketUp path + =<< localSocketFileDescriptor sd + + traceWith dtDiffusionInitializationTracer + . RunLocalServer =<< Snocket.getLocalAddr sn sd + + void $ NodeToClient.withServer + sn + (NetworkServerTracers + dtLocalMuxTracer + dtLocalHandshakeTracer + dtLocalErrorPolicyTracer + dtAcceptPolicyTracer) + networkLocalState + sd + (daLocalResponderApplication applications) + localErrorPolicy + + runServer :: SocketSnocket + -> NetworkMutableState SockAddr + -> Either Socket.Socket SockAddr + -> IO () + runServer sn networkState address = + bracket + ( + case address of + Left sd -> return sd + Right addr -> do + traceWith dtDiffusionInitializationTracer + $ CreatingServerSocket addr + Snocket.open sn (Snocket.addrFamily sn addr) + ) + (Snocket.close sn) -- We close the socket here, even if it was provided for us. + (\sd -> do + + addr <- case address of + -- If a socket was provided it should be ready to accept + Left _ -> Snocket.getLocalAddr sn sd + Right addr -> do + traceWith dtDiffusionInitializationTracer + $ ConfiguringServerSocket addr + Snocket.bind sn sd addr + traceWith dtDiffusionInitializationTracer + $ ListeningServerSocket addr + Snocket.listen sn sd + traceWith dtDiffusionInitializationTracer + $ ServerSocketUp addr + return addr + + traceWith dtDiffusionInitializationTracer $ RunServer (pure addr) + + void $ NodeToNode.withServer + sn + (NetworkServerTracers + dtMuxTracer + dtHandshakeTracer + dtErrorPolicyTracer + dtAcceptPolicyTracer) + networkState + daAcceptedConnectionsLimit + sd + (mkResponderApp <$> daApplicationInitiatorResponderMode applications) + remoteErrorPolicy + ) + runIpSubscriptionWorker :: SocketSnocket + -> NetworkMutableState SockAddr + -> LocalAddresses SockAddr + -> IO () + runIpSubscriptionWorker sn networkState la = + void + $ NodeToNode.ipSubscriptionWorker + sn + (NetworkSubscriptionTracers + dtMuxTracer + dtHandshakeTracer + dtErrorPolicyTracer + dtIpSubscriptionTracer) + networkState + SubscriptionParams + { spLocalAddresses = la + , spConnectionAttemptDelay = const Nothing + , spErrorPolicies = remoteErrorPolicy + , spSubscriptionTarget = daIpProducers + } + (mkApp <$> daApplicationInitiatorMode applications) + + runDnsSubscriptionWorker :: SocketSnocket + -> NetworkMutableState SockAddr + -> LocalAddresses SockAddr + -> DnsSubscriptionTarget + -> IO () + runDnsSubscriptionWorker sn networkState la dnsProducer = + void + $ NodeToNode.dnsSubscriptionWorker + sn + (NetworkDNSSubscriptionTracers + dtMuxTracer + dtHandshakeTracer + dtErrorPolicyTracer + dtDnsSubscriptionTracer + dtDnsResolverTracer) + networkState + SubscriptionParams + { spLocalAddresses = la + , spConnectionAttemptDelay = const Nothing + , spErrorPolicies = remoteErrorPolicy + , spSubscriptionTarget = dnsProducer + } + (mkApp <$> daApplicationInitiatorMode applications) diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs new file mode 100644 index 00000000000..7551f982640 --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -0,0 +1,1262 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} + +#if !defined(mingw32_HOST_OS) +#define POSIX +#endif + +module Ouroboros.Network.Diffusion.P2P + ( DTracersExtra (..) + , nullTracers + , DArgumentsExtra (..) + , AcceptedConnectionsLimit (..) + , DApplicationsExtra (..) + , runDataDiffusion + + , NodeToNodePeerConnectionHandle + ) + where + +import qualified Control.Monad.Class.MonadAsync as Async +import Control.Monad.Class.MonadFork +import Control.Monad.Class.MonadSTM.Strict +import Control.Monad.Class.MonadTime +import Control.Exception +import Control.Tracer (Tracer, nullTracer, traceWith) +import Data.Foldable (asum) +import Data.List.NonEmpty (NonEmpty (..)) +import qualified Data.List.NonEmpty as NonEmpty +import Data.Map (Map) +import Data.Maybe (catMaybes, maybeToList) +import Data.Set (Set) +import Data.Void (Void) +import Data.ByteString.Lazy (ByteString) +import Data.Kind (Type) +import System.Random (newStdGen, split) +#ifdef POSIX +import qualified System.Posix.Signals as Signals +#endif + +import Network.Mux + ( MiniProtocolBundle (..) + , MiniProtocolInfo (..) + , MiniProtocolDirection (..) + ) +import Network.Mux.Timeout (withTimeoutSerial) +import qualified Network.DNS as DNS +import Network.Socket (SockAddr (..), Socket, AddrInfo) +import qualified Network.Socket as Socket + +import Ouroboros.Network.Snocket + ( LocalAddress + , LocalSnocket + , LocalSocket (..) + , SocketSnocket + , localSocketFileDescriptor + ) +import qualified Ouroboros.Network.Snocket as Snocket + +import Ouroboros.Network.BlockFetch +import Ouroboros.Network.Protocol.Handshake +import Ouroboros.Network.Protocol.Handshake.Version +import Ouroboros.Network.Protocol.Handshake.Codec + +import Ouroboros.Network.ConnectionManager.Types +#ifdef POSIX +import qualified Ouroboros.Network.ConnectionManager.Types as ConnectionManager +#endif +import Ouroboros.Network.ConnectionManager.Core +import Ouroboros.Network.ConnectionHandler +import Ouroboros.Network.RethrowPolicy +import qualified Ouroboros.Network.Diffusion.Policies as Diffusion.Policies +import Ouroboros.Network.IOManager +import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..)) +import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..)) +import Ouroboros.Network.PeerSelection.RootPeersDNS + ( resolveDomainAddresses + , RelayAddress(..) + , TraceLocalRootPeers(..) + , TracePublicRootPeers(..) + ) +import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions + ( ioDNSActions + ) +import qualified Ouroboros.Network.PeerSelection.Governor as Governor +import Ouroboros.Network.PeerSelection.Governor.Types + ( TracePeerSelection (..) + , DebugPeerSelection (..) + , PeerSelectionCounters (..) + , ChurnMode (ChurnModeNormal) + ) +import Ouroboros.Network.PeerSelection.LedgerPeers + ( NumberOfPeers + , UseLedgerAfter (..) + , runLedgerPeers + ) +import Ouroboros.Network.PeerSelection.PeerStateActions + ( PeerSelectionActionsTrace (..) + , PeerStateActionsArguments (..) + , PeerConnectionHandle + , withPeerStateActions + ) +import Ouroboros.Network.PeerSelection.Simple +import Ouroboros.Network.Server2 + ( ServerArguments (..) + , ServerTrace (..) + ) +import qualified Ouroboros.Network.Server2 as Server +import Ouroboros.Network.Mux hiding (MiniProtocol (..)) +import Ouroboros.Network.MuxMode +import Ouroboros.Network.NodeToClient + ( NodeToClientVersion (..) + , NodeToClientVersionData + ) +import qualified Ouroboros.Network.NodeToClient as NodeToClient +import Ouroboros.Network.NodeToNode + ( MiniProtocolParameters (..) + , NodeToNodeVersion (..) + , NodeToNodeVersionData (..) + , AcceptedConnectionsLimit (..) + , DiffusionMode (..) + , RemoteAddress + , chainSyncProtocolLimits + , blockFetchProtocolLimits + , txSubmissionProtocolLimits + , keepAliveProtocolLimits + , nodeToNodeHandshakeCodec + ) +import qualified Ouroboros.Network.NodeToNode as NodeToNode +import qualified Ouroboros.Network.Diffusion.Common as Common + ( nullTracers ) +import Ouroboros.Network.Diffusion.Common + ( DiffusionTracers(..) + , DiffusionArguments(..) + , DiffusionApplications(..) + , DiffusionInitializationTracer(..) + , DiffusionFailure(..) + ) + +-- | P2P DiffusionTracers Extras +-- +data DTracersExtra = DTracersExtra { + dtTraceLocalRootPeersTracer + :: Tracer IO (TraceLocalRootPeers IOException) + + , dtTracePublicRootPeersTracer + :: Tracer IO TracePublicRootPeers + + , dtTracePeerSelectionTracer + :: Tracer IO (TracePeerSelection SockAddr) + + , dtDebugPeerSelectionInitiatorTracer + :: Tracer IO (DebugPeerSelection + SockAddr + (NodeToNodePeerConnectionHandle + InitiatorMode + Void)) + + , dtDebugPeerSelectionInitiatorResponderTracer + :: Tracer IO (DebugPeerSelection + SockAddr + (NodeToNodePeerConnectionHandle + InitiatorResponderMode + ())) + + , dtTracePeerSelectionCounters + :: Tracer IO PeerSelectionCounters + + , dtPeerSelectionActionsTracer + :: Tracer IO (PeerSelectionActionsTrace SockAddr) + + , dtConnectionManagerTracer + :: Tracer IO (ConnectionManagerTrace + SockAddr + (ConnectionHandlerTrace + NodeToNodeVersion + NodeToNodeVersionData)) + + , dtServerTracer + :: Tracer IO (ServerTrace SockAddr) + + , dtInboundGovernorTracer + :: Tracer IO (InboundGovernorTrace SockAddr) + + -- + -- NodeToClient tracers + -- + + -- | Connection manager tracer for local clients + , dtLocalConnectionManagerTracer + :: Tracer IO (ConnectionManagerTrace + LocalAddress + (ConnectionHandlerTrace + NodeToClientVersion + NodeToClientVersionData)) + + -- | Server tracer for local clients + , dtLocalServerTracer + :: Tracer IO (ServerTrace LocalAddress) + + -- | Inbound protocol governor tracer for local clients + , dtLocalInboundGovernorTracer + :: Tracer IO (InboundGovernorTrace LocalAddress) + } + +nullTracers :: DiffusionTracers DTracersExtra +nullTracers = Common.nullTracers p2pNullTracers + where + p2pNullTracers = + DTracersExtra { + dtTraceLocalRootPeersTracer = nullTracer + , dtTracePublicRootPeersTracer = nullTracer + , dtTracePeerSelectionTracer = nullTracer + , dtDebugPeerSelectionInitiatorTracer = nullTracer + , dtDebugPeerSelectionInitiatorResponderTracer = nullTracer + , dtTracePeerSelectionCounters = nullTracer + , dtPeerSelectionActionsTracer = nullTracer + , dtConnectionManagerTracer = nullTracer + , dtServerTracer = nullTracer + , dtInboundGovernorTracer = nullTracer + , dtLocalConnectionManagerTracer = nullTracer + , dtLocalServerTracer = nullTracer + , dtLocalInboundGovernorTracer = nullTracer + } + +-- | P2P DiffusionArguments Extras +-- +data DArgumentsExtra m = DArgumentsExtra { + -- | selection targets for the peer governor + -- + daPeerSelectionTargets :: PeerSelectionTargets + + , daReadLocalRootPeers :: STM m [(Int, Map RelayAddress PeerAdvertise)] + , daReadPublicRootPeers :: STM m [RelayAddress] + , daReadUseLedgerAfter :: STM m UseLedgerAfter + + -- | Timeout which starts once all responder protocols are idle. If the + -- responders stay idle for duration of the timeout, the connection will + -- be demoted, if it wasn't used by the p2p-governor it will be closed. + -- + -- Applies to 'Unidirectional' as well as 'Duplex' /node-to-node/ + -- connections. + -- + -- See 'serverProtocolIdleTimeout'. + -- + , daProtocolIdleTimeout :: DiffTime + + -- | Time for which /node-to-node/ connections are kept in + -- 'TerminatingState', it should correspond to the OS configured @TCP@ + -- @TIME_WAIT@ timeout. + -- + -- This timeout will apply to after a connection has been closed, its + -- purpose is to be resilitent for delayed packets in the same way @TCP@ + -- is using @TIME_WAIT@. + -- + , daTimeWaitTimeout :: DiffTime + } + +-- +-- Constants +-- + +-- | Protocol inactivity timeout for local (e.g. /node-to-client/) connections. +-- +local_PROTOCOL_IDLE_TIMEOUT :: DiffTime +local_PROTOCOL_IDLE_TIMEOUT = 2 -- 2 seconds + +-- | Used to set 'cmWaitTimeout' for local (e.g. /node-to-client/) connections. +-- +local_TIME_WAIT_TIMEOUT :: DiffTime +local_TIME_WAIT_TIMEOUT = 0 + + +socketAddressType :: Socket.SockAddr -> Maybe AddressType +socketAddressType Socket.SockAddrInet {} = Just IPv4Address +socketAddressType Socket.SockAddrInet6 {} = Just IPv6Address +socketAddressType addr = + error ("socketAddressType: unexpected address " ++ show addr) + + +-- | Combine two uni-directional 'MiniProtocolBundle's into one bi-directional +-- one. +-- +combineMiniProtocolBundles :: MiniProtocolBundle InitiatorMode + -> MiniProtocolBundle ResponderMode + -> MiniProtocolBundle InitiatorResponderMode +combineMiniProtocolBundles (MiniProtocolBundle initiators) + (MiniProtocolBundle responders) + = MiniProtocolBundle $ + [ MiniProtocolInfo + { miniProtocolNum + , miniProtocolLimits + , miniProtocolDir = InitiatorDirection + } + | MiniProtocolInfo { miniProtocolNum, miniProtocolLimits } <- initiators + ] + ++ [ MiniProtocolInfo + { miniProtocolNum + , miniProtocolLimits + , miniProtocolDir = ResponderDirection + } + | MiniProtocolInfo { miniProtocolNum, miniProtocolLimits } <- responders + ] + + +-- | P2P DiffusionApplications Extras +-- +-- TODO: we need initiator only mode for Deadalus, there's no reason why it +-- should run a node-to-node server side. +-- +data DApplicationsExtra ntnAddr m = + DApplicationsExtra { + -- | configuration of mini-protocol parameters; they impact size limits of + -- mux ingress queues. + -- + daMiniProtocolParameters :: MiniProtocolParameters + + -- | /node-to-node/ rethrow policy + -- + , daRethrowPolicy :: RethrowPolicy + + -- | /node-to-client/ rethrow policy + -- + , daLocalRethrowPolicy :: RethrowPolicy + + , daPeerMetrics :: PeerMetrics m ntnAddr + + -- | Used by churn-governor + -- + , daBlockFetchMode :: STM m FetchMode + } + +-- | Diffusion will always run initiator of node-to-node protocols, but in some +-- configurations, i.e. 'InitiatorOnlyDiffusionMode', it will not run the +-- responder side. This type allows to reflect this. +-- +-- This is only used internally by 'runDataDiffusion'; This type allows to +-- construct configuration upfront, before all services like connection manager +-- or server are initialised \/ started. +-- +-- This is an existential wrapper for the higher order type @f :: MuxMode -> +-- Type@, like @'ConnectionManagerDataInMode' (mode :: MuxMode)@ below. +-- +data HasMuxMode (f :: MuxMode -> Type) where + HasInitiator :: !(f InitiatorMode) + -> HasMuxMode f + + HasInitiatorResponder + :: !(f InitiatorResponderMode) + -> HasMuxMode f + +-- | Node-To-Node connection manager requires extra data when running in +-- 'InitiatorResponderMode'. +-- +data ConnectionManagerDataInMode (mode :: MuxMode) where + CMDInInitiatorMode + :: ConnectionManagerDataInMode InitiatorMode + + CMDInInitiatorResponderMode + :: Server.ControlChannel IO + (Server.NewConnection + SockAddr + (Handle InitiatorResponderMode SockAddr ByteString IO () ())) + -> StrictTVar IO Server.InboundGovernorObservableState + -> ConnectionManagerDataInMode InitiatorResponderMode + + +-- +-- Node-To-Client type aliases +-- +-- Node-To-Client diffusion is only used in 'ResponderMode'. +-- + +type NodeToClientHandle = + Handle ResponderMode LocalAddress ByteString IO Void () + +type NodeToClientHandleError = + HandleError ResponderMode NodeToClientVersion + +type NodeToClientConnectionHandler = + ConnectionHandler + ResponderMode + (ConnectionHandlerTrace NodeToClientVersion NodeToClientVersionData) + LocalSocket + LocalAddress + NodeToClientHandle + NodeToClientHandleError + (NodeToClientVersion, NodeToClientVersionData) + IO + +type NodeToClientConnectionManagerArguments = + ConnectionManagerArguments + (ConnectionHandlerTrace NodeToClientVersion NodeToClientVersionData) + LocalSocket + LocalAddress + NodeToClientHandle + NodeToClientHandleError + (NodeToClientVersion, NodeToClientVersionData) + IO + +type NodeToClientConnectionManager = + ConnectionManager + ResponderMode + LocalSocket + LocalAddress + NodeToClientHandle + NodeToClientHandleError + IO + +-- +-- Node-To-Node type aliases +-- +-- Node-To-Node diffusion runs in either 'InitiatorMode' or 'InitiatorResponderMode'. +-- + +type NodeToNodeHandle (mode :: MuxMode) a = + Handle mode SockAddr ByteString IO () a + +type NodeToNodeHandleError (mode :: MuxMode) = + HandleError mode NodeToNodeVersion + +type NodeToNodeConnectionHandler (mode :: MuxMode) a = + ConnectionHandler + mode + (ConnectionHandlerTrace NodeToNodeVersion NodeToNodeVersionData) + Socket + SockAddr + (NodeToNodeHandle mode a) + (NodeToNodeHandleError mode) + (NodeToNodeVersion, NodeToNodeVersionData) + IO + +type NodeToNodeConnectionManagerArguments (mode :: MuxMode) a = + ConnectionManagerArguments + (ConnectionHandlerTrace NodeToNodeVersion NodeToNodeVersionData) + Socket + SockAddr + (NodeToNodeHandle mode a) + (NodeToNodeHandleError mode) + (NodeToNodeVersion, NodeToNodeVersionData) + IO + +type NodeToNodeConnectionManager (mode :: MuxMode) a = + ConnectionManager + mode + Socket + SockAddr + (NodeToNodeHandle mode a) + (NodeToNodeHandleError mode) + IO + +-- +-- Governor type aliases +-- + +type NodeToNodePeerConnectionHandle (mode :: MuxMode) a = + PeerConnectionHandle + mode + SockAddr + ByteString + IO () a + +type NodeToNodePeerStateActions (mode :: MuxMode) a = + Governor.PeerStateActions + SockAddr + (NodeToNodePeerConnectionHandle mode a) + IO + +type NodeToNodePeerSelectionActions (mode :: MuxMode) a = + Governor.PeerSelectionActions + SockAddr + (NodeToNodePeerConnectionHandle mode a) + IO + +-- | Main entry point for data diffusion service. It allows to: +-- +-- * connect to upstream peers; +-- * accept connection from downstream peers, if run in +-- 'InitiatorAndResponderDiffusionMode'. +-- * runs a local service which allows to use node-to-client protocol to obtain +-- information from the running system. This is used by 'cardano-cli' or +-- a wallet and a like local services. +-- +runDataDiffusion + :: DiffusionTracers DTracersExtra + -> DiffusionArguments (DArgumentsExtra IO) + -> DiffusionApplications + (DApplicationsExtra RemoteAddress IO) + RemoteAddress + LocalAddress + NodeToNodeVersionData + NodeToClientVersionData + IO + -> IO Void +runDataDiffusion tracers + DiffusionArguments + { daIPv4Address + , daIPv6Address + , daLocalAddress + , daAcceptedConnectionsLimit + , daDiffusionMode + , daP2P = DArgumentsExtra + { daPeerSelectionTargets + , daReadLocalRootPeers + , daReadPublicRootPeers + , daReadUseLedgerAfter + , daProtocolIdleTimeout + , daTimeWaitTimeout + } + } + DiffusionApplications + { daApplicationInitiatorMode + , daApplicationInitiatorResponderMode + , daLocalResponderApplication + , daLedgerPeersCtx + , dapP2P = DApplicationsExtra + { daMiniProtocolParameters + , daRethrowPolicy + , daLocalRethrowPolicy + , daPeerMetrics + , daBlockFetchMode + } + } = + -- We run two services: for /node-to-node/ and /node-to-client/. The + -- naming convention is that we use /local/ prefix for /node-to-client/ + -- related terms, as this is a local only service running over a unix + -- socket / windows named pipe. + handle (\e -> traceWith tracer (DiffusionErrored e) + >> throwIO e) $ + withIOManager $ \iocp -> + withTimeoutSerial $ \timeout -> do + + -- Thread to which 'RethrowPolicy' will throw fatal exceptions. + mainThreadId <- myThreadId + + cmIPv4Address + <- traverse (either Socket.getSocketName (pure . Socket.addrAddress)) + daIPv4Address + case cmIPv4Address of + Just SockAddrInet {} -> pure () + Just SockAddrInet6 {} -> throwIO UnexpectedIPv6Address + Just SockAddrUnix {} -> throwIO UnexpectedUnixAddress + Nothing -> pure () + + cmIPv6Address + <- traverse (either Socket.getSocketName (pure . Socket.addrAddress)) + daIPv6Address + case cmIPv6Address of + Just SockAddrInet {} -> throwIO UnexpectedIPv4Address + Just SockAddrInet6 {} -> pure () + Just SockAddrUnix {} -> throwIO UnexpectedUnixAddress + Nothing -> pure () + + -- control channel for the server; only required in + -- @'InitiatorResponderMode' :: 'MuxMode'@ + cmdInMode + <- case daDiffusionMode of + InitiatorOnlyDiffusionMode -> + -- action which we pass to connection handler + pure (HasInitiator CMDInInitiatorMode) + InitiatorAndResponderDiffusionMode -> do + -- we pass 'Server.newOutboundConnection serverControlChannel' to + -- connection handler + HasInitiatorResponder <$> + (CMDInInitiatorResponderMode + <$> Server.newControlChannel + <*> Server.newObservableStateVarIO) + + localControlChannel <- Server.newControlChannel + localServerStateVar <- Server.newObservableStateVarIO + + -- RNGs used for picking random peers from the ledger and for + -- demoting/promoting peers. + rng <- newStdGen + let (ledgerPeersRng, rng') = split rng + (policyRng, rng'') = split rng' + (churnRng, fuzzRng) = split rng'' + policyRngVar <- newTVarIO policyRng + + churnModeVar <- newTVarIO ChurnModeNormal + + -- Request interface, supply the number of peers desired. + ledgerPeersReq <- newEmptyTMVarIO :: IO (StrictTMVar IO NumberOfPeers) + -- Response interface, returns a Set of peers. Nothing indicates that the + -- ledger hasn't caught up to `useLedgerAfter`. May return less than + -- the number of peers requested. + ledgerPeersRsp <- newEmptyTMVarIO + :: IO (StrictTMVar IO (Maybe (Set SockAddr, DiffTime))) + + + peerSelectionTargetsVar <- newTVarIO $ daPeerSelectionTargets { + -- Start with a smaller number of active peers, the churn governor will increase + -- it to the configured value after a delay. + targetNumberOfActivePeers = + min 2 (targetNumberOfActivePeers daPeerSelectionTargets) + } + + let -- snocket for remote communication. + snocket :: SocketSnocket + snocket = Snocket.socketSnocket iocp + + localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0 + + -- + -- local connection manager + -- + localThread :: Maybe (IO Void) + localThread = + case daLocalAddress of + Nothing -> Nothing + Just localAddr -> + Just $ withLocalSocket iocp tracer localAddr + $ \localSnocket localSocket -> do + let localConnectionHandler :: NodeToClientConnectionHandler + localConnectionHandler = + makeConnectionHandler + dtLocalMuxTracer + SingResponderMode + localMiniProtocolBundle + HandshakeArguments { + haHandshakeTracer = dtLocalHandshakeTracer, + haHandshakeCodec = + NodeToClient.nodeToClientHandshakeCodec, + haVersionDataCodec = + cborTermVersionDataCodec + NodeToClient.nodeToClientCodecCBORTerm, + haVersions = + (\(OuroborosApplication apps) + -> Bundle + (WithHot apps) + (WithWarm (\_ _ -> [])) + (WithEstablished (\_ _ -> []))) + <$> daLocalResponderApplication, + haAcceptVersion = acceptableVersion, + haTimeLimits = noTimeLimitsHandshake + } + (mainThreadId, rethrowPolicy <> daLocalRethrowPolicy) + + localConnectionManagerArguments + :: NodeToClientConnectionManagerArguments + localConnectionManagerArguments = + ConnectionManagerArguments { + cmTracer = dtLocalConnectionManagerTracer, + cmTrTracer = nullTracer, -- TODO + cmMuxTracer = dtLocalMuxTracer, + cmIPv4Address = Nothing, + cmIPv6Address = Nothing, + cmAddressType = const Nothing, + cmSnocket = localSnocket, + connectionDataFlow = uncurry localDataFlow, + cmPrunePolicy = Server.randomPrunePolicy + localServerStateVar, + cmConnectionsLimits = localConnectionLimits, + cmTimeWaitTimeout = local_TIME_WAIT_TIMEOUT + } + + withConnectionManager + localConnectionManagerArguments + localConnectionHandler + classifyHandleError + (InResponderMode localControlChannel) + $ \(localConnectionManager :: NodeToClientConnectionManager) + -> do + + -- + -- run local server + -- + + traceWith tracer . RunLocalServer + =<< Snocket.getLocalAddr localSnocket localSocket + + Async.withAsync + (Server.run + ServerArguments { + serverSockets = localSocket :| [], + serverSnocket = localSnocket, + serverTracer = dtLocalServerTracer, + serverInboundGovernorTracer = dtLocalInboundGovernorTracer, + serverControlChannel = localControlChannel, + serverConnectionLimits = localConnectionLimits, + serverConnectionManager = localConnectionManager, + serverObservableStateVar = localServerStateVar, + serverProtocolIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT + }) Async.wait + + -- + -- remote connection manager + -- + + remoteThread :: IO Void + remoteThread = + Async.withAsync + (runLedgerPeers + ledgerPeersRng + dtLedgerPeersTracer + daReadUseLedgerAfter + daLedgerPeersCtx + (resolveDomainAddresses + dtTracePublicRootPeersTracer + timeout + DNS.defaultResolvConf + ioDNSActions + ) + (takeTMVar ledgerPeersReq) + (putTMVar ledgerPeersRsp) + ) + $ \ledgerPeerThread -> + case cmdInMode of + -- InitiatorOnlyMode + -- + -- Run peer selection only + HasInitiator CMDInInitiatorMode -> do + let connectionManagerArguments + :: NodeToNodeConnectionManagerArguments InitiatorMode Void + connectionManagerArguments = + ConnectionManagerArguments { + cmTracer = dtConnectionManagerTracer, + cmTrTracer = nullTracer, -- TODO + cmMuxTracer = dtMuxTracer, + cmIPv4Address, + cmIPv6Address, + cmAddressType = socketAddressType, + cmSnocket = snocket, + connectionDataFlow = uncurry nodeDataFlow, + cmPrunePolicy = + case cmdInMode of + HasInitiator CMDInInitiatorMode -> + -- Server is not running, it will not be able to + -- advise which connections to prune. It's also not + -- expected that the governor targets will be larger + -- than limits imposed by 'cmConnectionsLimits'. + simplePrunePolicy, + cmConnectionsLimits = daAcceptedConnectionsLimit, + cmTimeWaitTimeout = daTimeWaitTimeout + } + + connectionHandler + :: NodeToNodeConnectionHandler InitiatorMode Void + connectionHandler = + makeConnectionHandler + dtMuxTracer + SingInitiatorMode + miniProtocolBundleInitiatorMode + HandshakeArguments { + haHandshakeTracer = dtHandshakeTracer, + haHandshakeCodec = nodeToNodeHandshakeCodec, + haVersionDataCodec = + cborTermVersionDataCodec + NodeToNode.nodeToNodeCodecCBORTerm, + haVersions = daApplicationInitiatorMode, + haAcceptVersion = acceptableVersion, + haTimeLimits = timeLimitsHandshake + } + (mainThreadId, rethrowPolicy <> daRethrowPolicy) + + withConnectionManager + connectionManagerArguments + connectionHandler + classifyHandleError + NotInResponderMode + $ \(connectionManager + :: NodeToNodeConnectionManager InitiatorMode Void) -> do +#ifdef POSIX + _ <- Signals.installHandler + Signals.sigUSR1 + (Signals.Catch + (do state <- ConnectionManager.readState connectionManager + traceWith dtConnectionManagerTracer + (TrState state) + ) + ) + Nothing +#endif + + -- + -- peer state actions + -- + -- Peer state actions run a job pool in the background which + -- tracks threads forked by 'PeerStateActions' + -- + + withPeerStateActions + timeout + PeerStateActionsArguments { + spsTracer = dtPeerSelectionActionsTracer, + spsDeactivateTimeout = Diffusion.Policies.deactivateTimeout, + spsCloseConnectionTimeout = + Diffusion.Policies.closeConnectionTimeout, + spsConnectionManager = connectionManager + } + $ \(peerStateActions + :: NodeToNodePeerStateActions InitiatorMode Void) -> + -- + -- Run peer selection (p2p governor) + -- + + withPeerSelectionActions + dtTraceLocalRootPeersTracer + dtTracePublicRootPeersTracer + timeout + (readTVar peerSelectionTargetsVar) + daReadLocalRootPeers + daReadPublicRootPeers + peerStateActions + (putTMVar ledgerPeersReq) + (takeTMVar ledgerPeersRsp) + $ \mbLocalPeerRootProviderThread + (peerSelectionActions + :: NodeToNodePeerSelectionActions + InitiatorMode Void) -> + + Async.withAsync + (Governor.peerSelectionGovernor + dtTracePeerSelectionTracer + dtDebugPeerSelectionInitiatorTracer + dtTracePeerSelectionCounters + fuzzRng + peerSelectionActions + (Diffusion.Policies.simplePeerSelectionPolicy + policyRngVar (readTVar churnModeVar) daPeerMetrics)) + $ \governorThread -> + Async.withAsync + (Governor.peerChurnGovernor + dtTracePeerSelectionTracer + daPeerMetrics + churnModeVar + churnRng + daBlockFetchMode + daPeerSelectionTargets + peerSelectionTargetsVar) + $ \churnGovernorThread -> + + -- wait for any thread to fail + snd <$> Async.waitAny + (maybeToList mbLocalPeerRootProviderThread + ++ [ governorThread + , ledgerPeerThread + , churnGovernorThread + ]) + + + -- InitiatorResponderMode + -- + -- Run peer selection and the server. + -- + HasInitiatorResponder + (CMDInInitiatorResponderMode controlChannel observableStateVar) -> do + let connectionManagerArguments + :: NodeToNodeConnectionManagerArguments + InitiatorResponderMode + () + connectionManagerArguments = + ConnectionManagerArguments { + cmTracer = dtConnectionManagerTracer, + cmTrTracer = nullTracer, -- TODO + cmMuxTracer = dtMuxTracer, + cmIPv4Address, + cmIPv6Address, + cmAddressType = socketAddressType, + cmSnocket = snocket, + connectionDataFlow = uncurry nodeDataFlow, + cmPrunePolicy = + case cmdInMode of + HasInitiatorResponder (CMDInInitiatorResponderMode _ serverStateVar) -> + Server.randomPrunePolicy serverStateVar, + cmConnectionsLimits = daAcceptedConnectionsLimit, + cmTimeWaitTimeout = daTimeWaitTimeout + } + + connectionHandler + :: NodeToNodeConnectionHandler + InitiatorResponderMode + () + connectionHandler = + makeConnectionHandler + dtMuxTracer + SingInitiatorResponderMode + miniProtocolBundleInitiatorResponderMode + HandshakeArguments { + haHandshakeTracer = dtHandshakeTracer, + haHandshakeCodec = nodeToNodeHandshakeCodec, + haVersionDataCodec = + cborTermVersionDataCodec + NodeToNode.nodeToNodeCodecCBORTerm, + haVersions = daApplicationInitiatorResponderMode, + haAcceptVersion = acceptableVersion, + haTimeLimits = timeLimitsHandshake + } + (mainThreadId, rethrowPolicy <> daRethrowPolicy) + + withConnectionManager + connectionManagerArguments + connectionHandler + classifyHandleError + (InResponderMode controlChannel) + $ \(connectionManager + :: NodeToNodeConnectionManager InitiatorResponderMode ()) -> do +#ifdef POSIX + _ <- Signals.installHandler + Signals.sigUSR1 + (Signals.Catch + (do state <- ConnectionManager.readState connectionManager + traceWith dtConnectionManagerTracer + (TrState state) + ) + ) + Nothing +#endif + -- + -- peer state actions + -- + -- Peer state actions run a job pool in the background which + -- tracks threads forked by 'PeerStateActions' + -- + + withPeerStateActions + timeout + PeerStateActionsArguments { + spsTracer = dtPeerSelectionActionsTracer, + spsDeactivateTimeout = Diffusion.Policies.deactivateTimeout, + spsCloseConnectionTimeout = + Diffusion.Policies.closeConnectionTimeout, + spsConnectionManager = connectionManager + } + $ \(peerStateActions + :: NodeToNodePeerStateActions + InitiatorResponderMode ()) -> + + -- + -- Run peer selection (p2p governor) + -- + + withPeerSelectionActions + dtTraceLocalRootPeersTracer + dtTracePublicRootPeersTracer + timeout + (readTVar peerSelectionTargetsVar) + daReadLocalRootPeers + daReadPublicRootPeers + peerStateActions + (putTMVar ledgerPeersReq) + (takeTMVar ledgerPeersRsp) + $ \mbLocalPeerRootProviderThread + (peerSelectionActions + :: NodeToNodePeerSelectionActions + InitiatorResponderMode ()) -> + + Async.withAsync + (Governor.peerSelectionGovernor + dtTracePeerSelectionTracer + dtDebugPeerSelectionInitiatorResponderTracer + dtTracePeerSelectionCounters + fuzzRng + peerSelectionActions + (Diffusion.Policies.simplePeerSelectionPolicy + policyRngVar (readTVar churnModeVar) daPeerMetrics)) + $ \governorThread -> do + let mkAddr :: AddrInfo -> (Socket.Family, SockAddr) + mkAddr addr = ( Socket.addrFamily addr + , Socket.addrAddress addr + ) + + withSockets tracer snocket + (catMaybes + [ fmap (fmap mkAddr) daIPv4Address + , fmap (fmap mkAddr) daIPv6Address + ]) + $ \sockets addresses -> do + -- + -- Run server + -- + traceWith tracer (RunServer addresses) + Async.withAsync + (Server.run + ServerArguments { + serverSockets = sockets, + serverSnocket = snocket, + serverTracer = dtServerTracer, + serverInboundGovernorTracer = dtInboundGovernorTracer, + serverControlChannel = controlChannel, + serverConnectionLimits = daAcceptedConnectionsLimit, + serverConnectionManager = connectionManager, + serverProtocolIdleTimeout = daProtocolIdleTimeout, + serverObservableStateVar = observableStateVar + }) + $ \serverThread -> + Async.withAsync + (Governor.peerChurnGovernor + dtTracePeerSelectionTracer + daPeerMetrics + churnModeVar + churnRng + daBlockFetchMode + daPeerSelectionTargets + peerSelectionTargetsVar) + $ \churnGovernorThread -> + + -- wait for any thread to fail + snd <$> Async.waitAny + (maybeToList mbLocalPeerRootProviderThread + ++ [ serverThread + , governorThread + , ledgerPeerThread + , churnGovernorThread + ]) + + Async.runConcurrently + $ asum + $ Async.Concurrently <$> + ( remoteThread + : maybeToList localThread + ) + + where + DiffusionTracers { + dtMuxTracer + , dtLocalMuxTracer + , dtHandshakeTracer + , dtLocalHandshakeTracer + , dtLedgerPeersTracer + -- the tracer + , dtDiffusionInitializationTracer = tracer + , dtP2P = DTracersExtra { + dtTracePeerSelectionTracer + , dtDebugPeerSelectionInitiatorTracer + , dtDebugPeerSelectionInitiatorResponderTracer + , dtTracePeerSelectionCounters + , dtPeerSelectionActionsTracer + , dtTraceLocalRootPeersTracer + , dtTracePublicRootPeersTracer + , dtConnectionManagerTracer + , dtServerTracer + , dtInboundGovernorTracer + , dtLocalConnectionManagerTracer + , dtLocalServerTracer + , dtLocalInboundGovernorTracer + } + } = tracers + + + miniProtocolBundleInitiatorResponderMode + :: MiniProtocolBundle InitiatorResponderMode + miniProtocolBundleInitiatorResponderMode = + combineMiniProtocolBundles miniProtocolBundleInitiatorMode + miniProtocolBundleResponderMode + + -- node-to-node responder bundle; it is only used in combination with + -- the node-to-node initiator bundle defined below. + -- + miniProtocolBundleResponderMode :: MiniProtocolBundle ResponderMode + miniProtocolBundleResponderMode = MiniProtocolBundle + [ MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 2, + miniProtocolDir = ResponderDirectionOnly, + miniProtocolLimits = chainSyncProtocolLimits daMiniProtocolParameters + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 3, + miniProtocolDir = ResponderDirectionOnly, + miniProtocolLimits = blockFetchProtocolLimits daMiniProtocolParameters + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 4, + miniProtocolDir = ResponderDirectionOnly, + miniProtocolLimits = txSubmissionProtocolLimits daMiniProtocolParameters + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 8, + miniProtocolDir = ResponderDirectionOnly, + miniProtocolLimits = keepAliveProtocolLimits daMiniProtocolParameters + } + -- TODO: `tip-sample` protocol + ] + + -- node-to-node initiator bundle + miniProtocolBundleInitiatorMode :: MiniProtocolBundle InitiatorMode + miniProtocolBundleInitiatorMode = MiniProtocolBundle + [ MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 2, + miniProtocolDir = InitiatorDirectionOnly, + miniProtocolLimits = chainSyncProtocolLimits daMiniProtocolParameters + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 3, + miniProtocolDir = InitiatorDirectionOnly, + miniProtocolLimits = blockFetchProtocolLimits daMiniProtocolParameters + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 4, + miniProtocolDir = InitiatorDirectionOnly, + miniProtocolLimits = txSubmissionProtocolLimits daMiniProtocolParameters + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 8, + miniProtocolDir = InitiatorDirectionOnly, + miniProtocolLimits = keepAliveProtocolLimits daMiniProtocolParameters + } + -- TODO: `tip-sample` protocol + ] + + -- node-to-client protocol bundle + localMiniProtocolBundle :: MiniProtocolBundle ResponderMode + localMiniProtocolBundle = MiniProtocolBundle + [ MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 5, + miniProtocolDir = ResponderDirectionOnly, + miniProtocolLimits = maximumMiniProtocolLimits + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 6, + miniProtocolDir = ResponderDirectionOnly, + miniProtocolLimits = maximumMiniProtocolLimits + } + , MiniProtocolInfo { + miniProtocolNum = MiniProtocolNum 7, + miniProtocolDir = ResponderDirectionOnly, + miniProtocolLimits = maximumMiniProtocolLimits + } + ] + where + maximumMiniProtocolLimits :: MiniProtocolLimits + maximumMiniProtocolLimits = + MiniProtocolLimits { + maximumIngressQueue = 0xffffffff + } + + -- Only the 'IOManagerError's are fatal, all the other exceptions in the + -- networking code will only shutdown the bearer (see 'ShutdownPeer' why + -- this is so). + rethrowPolicy = + RethrowPolicy $ \_ctx err -> + case fromException err of + Just (_ :: IOManagerError) -> ShutdownNode + Nothing -> mempty + +-- +-- Data flow +-- + +-- | For Node-To-Node protocol, any connection which negotiated at least +-- 'NodeToNodeV_7' version and did not declared 'InitiatorOnlyDiffusionMode' +-- will run in 'Duplex' mode. All connections from lower versions or one that +-- declared themselves as 'InitiatorOnly' will run in 'UnidirectionalMode' +-- +nodeDataFlow :: NodeToNodeVersion + -> NodeToNodeVersionData + -> DataFlow +nodeDataFlow v NodeToNodeVersionData { diffusionMode = InitiatorAndResponderDiffusionMode } + | v >= NodeToNodeV_7 + = Duplex +nodeDataFlow _ _ = Unidirectional + + +-- | For Node-To-Client protocol all connection are considered 'Unidrectional'. +-- +localDataFlow :: NodeToClientVersion + -> NodeToClientVersionData + -> DataFlow +localDataFlow _ _ = Unidirectional + + +-- +-- Socket utility functions +-- + +withSockets :: Tracer IO DiffusionInitializationTracer + -> SocketSnocket + -> [Either Socket.Socket (Socket.Family, SockAddr)] + -> (NonEmpty Socket.Socket -> NonEmpty Socket.SockAddr -> IO a) + -> IO a +withSockets tracer sn addresses k = go [] addresses + where + go !acc (a : as) = withSocket a (\sa -> go (sa : acc) as) + go [] [] = throw NoSocket + go !acc [] = + let acc' = NonEmpty.fromList (reverse acc) + in (k $! (fst <$> acc')) $! (snd <$> acc') + + withSocket :: Either Socket.Socket (Socket.Family, SockAddr) + -> ((Socket.Socket, Socket.SockAddr) -> IO a) + -> IO a + withSocket (Left sock) f = + bracket + (pure sock) + (Snocket.close sn) + $ \_sock -> do + !addr <- Socket.getSocketName sock + f (sock, addr) + withSocket (Right (fam, !addr)) f = + bracket + (do traceWith tracer (CreatingServerSocket addr) + Snocket.open sn (Snocket.SocketFamily fam)) + (Snocket.close sn) + $ \sock -> do + traceWith tracer $ ConfiguringServerSocket addr + Snocket.bind sn sock addr + traceWith tracer $ ListeningServerSocket addr + Snocket.listen sn sock + traceWith tracer $ ServerSocketUp addr + f (sock, addr) + + +withLocalSocket :: IOManager + -> Tracer IO DiffusionInitializationTracer + -> Either Socket.Socket FilePath + -> (LocalSnocket -> LocalSocket -> IO a) + -> IO a +withLocalSocket iocp tracer localAddress k = + bracket + ( + case localAddress of +#if defined(mingw32_HOST_OS) + -- Windows uses named pipes so can't take advantage of existing sockets + Left _ -> traceWith tracer UnsupportedReadySocketCase + >> throwIO UnsupportedReadySocket +#else + Left sd -> do + addr <- Socket.getSocketName sd + case addr of + (Socket.SockAddrUnix path) -> do + traceWith tracer (UsingSystemdSocket path) + return (Left ( Snocket.localSnocket iocp path + , LocalSocket sd + )) + _ -> do + traceWith tracer $ UnsupportedLocalSystemdSocket addr + throwIO UnsupportedLocalSocketType +#endif + Right addr -> do + let sn :: LocalSnocket + sn = Snocket.localSnocket iocp addr + traceWith tracer $ CreateSystemdSocketForSnocketPath addr + sd <- Snocket.open sn Snocket.LocalFamily + traceWith tracer $ CreatedLocalSocket addr + return (Right (sn, sd, addr)) + ) + -- We close the socket here, even if it was provided to us. + (\case + Left (sn, sd) -> Snocket.close sn sd + Right (sn, sd, _) -> Snocket.close sn sd) + $ \case + -- unconfigured socket + Right (sn, sd, addr) -> do + traceWith tracer . ConfiguringLocalSocket addr + =<< localSocketFileDescriptor sd + Snocket.bind sn sd (NodeToClient.LocalAddress addr) + traceWith tracer . ListeningLocalSocket addr + =<< localSocketFileDescriptor sd + Snocket.listen sn sd + traceWith tracer . LocalSocketUp addr + =<< localSocketFileDescriptor sd + k sn sd + + -- pre-configured systemd socket + Left (sn, sd) -> k sn sd