From f9ed189eacc7c35fba6b636accdcb0de8756602c Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Fri, 16 Jul 2021 13:51:32 +0200 Subject: [PATCH] diffusion: run diffusion in an abstract monad Also introduce `DiffusionInterfaces` record which provides all the interfaces (e.g. snockets, dns resolver, etc) needed to run data diffusion. --- .../src/Ouroboros/Network/Diffusion.hs | 6 +- .../src/Ouroboros/Network/Diffusion/P2P.hs | 571 +++++++++++------- .../Ouroboros/Network/PeerSelection/Simple.hs | 62 +- 3 files changed, 375 insertions(+), 264 deletions(-) diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs index ba3aeb96cfa..b1e1334264e 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs @@ -121,7 +121,7 @@ newtype DiffusionTracers = (P2P.DiffusionTracersExtra RemoteAddress NodeToNodeVersion NodeToNodeVersionData LocalAddress NodeToClientVersion NodeToClientVersionData - IO)) + IOException IO)) RemoteAddress NodeToNodeVersion LocalAddress NodeToClientVersion IO) @@ -334,12 +334,12 @@ mkDiffusionTracersP2P -> Tracer IO (DebugPeerSelection - SockAddr (P2P.NodeToNodePeerConnectionHandle 'InitiatorMode SockAddr Void)) + SockAddr (P2P.NodeToNodePeerConnectionHandle 'InitiatorMode SockAddr IO Void)) -> Tracer IO (DebugPeerSelection SockAddr - (P2P.NodeToNodePeerConnectionHandle 'InitiatorResponderMode SockAddr ())) + (P2P.NodeToNodePeerConnectionHandle 'InitiatorResponderMode SockAddr IO ())) -> Tracer IO PeerSelectionCounters -> Tracer IO (PeerSelectionActionsTrace SockAddr) -> Tracer diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index 41a980ffe03..4ef28ea0f0d 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -1,13 +1,14 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE CPP #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeApplications #-} #if !defined(mingw32_HOST_OS) #define POSIX @@ -20,19 +21,26 @@ module Ouroboros.Network.Diffusion.P2P , AcceptedConnectionsLimit (..) , DiffusionApplicationsExtra (..) , runDataDiffusion + , DiffusionInterfaces (..) + , runDataDiffusionM , NodeToNodePeerConnectionHandle ) where + +import Control.Applicative (Alternative) import qualified Control.Monad.Class.MonadAsync as Async +import Control.Monad.Class.MonadAsync (MonadAsync) import Control.Monad.Class.MonadFork import Control.Monad.Class.MonadSTM.Strict import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime +import Control.Monad.Class.MonadTimer import Control.Exception (IOException) import Control.Tracer (Tracer, nullTracer, traceWith) import Data.Foldable (asum) +import Data.Functor (void) import Data.IP (IP) import qualified Data.IP as IP import Data.List.NonEmpty (NonEmpty (..)) @@ -44,7 +52,7 @@ import Data.Typeable (Typeable) import Data.Void (Void) import Data.ByteString.Lazy (ByteString) import Data.Kind (Type) -import System.Random (newStdGen, split) +import System.Random (StdGen, newStdGen, split) #ifdef POSIX import qualified System.Posix.Signals as Signals #endif @@ -86,14 +94,13 @@ import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..)) import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..)) import Ouroboros.Network.PeerSelection.RootPeersDNS ( resolveDomainAddresses + , ioDNSActions , DomainAddress + , DNSActions , RelayAddress(..) , TraceLocalRootPeers(..) , TracePublicRootPeers(..) ) -import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions - ( ioDNSActions - ) import qualified Ouroboros.Network.PeerSelection.Governor as Governor import Ouroboros.Network.PeerSelection.Governor.Types ( TracePeerSelection (..) @@ -149,10 +156,11 @@ import Ouroboros.Network.Diffusion.Common -- | P2P DiffusionTracers Extras -- data DiffusionTracersExtra ntnAddr ntnVersion ntnVersionData - ntcAddr ntcVersion ntcVersionData m = + ntcAddr ntcVersion ntcVersionData + resolverError m = DiffusionTracersExtra { dtTraceLocalRootPeersTracer - :: Tracer m (TraceLocalRootPeers ntnAddr IOException) + :: Tracer m (TraceLocalRootPeers ntnAddr resolverError) , dtTracePublicRootPeersTracer :: Tracer m TracePublicRootPeers @@ -221,7 +229,7 @@ data DiffusionTracersExtra ntnAddr ntnVersion ntnVersionData nullTracers :: Applicative m => DiffusionTracersExtra ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion ntcVersionData - m + resolverError m nullTracers = DiffusionTracersExtra { dtTraceLocalRootPeersTracer = nullTracer @@ -368,17 +376,17 @@ data HasMuxMode (f :: MuxMode -> Type) where -- | Node-To-Node connection manager requires extra data when running in -- 'InitiatorResponderMode'. -- -data ConnectionManagerDataInMode peerAddr (mode :: MuxMode) where +data ConnectionManagerDataInMode peerAddr m (mode :: MuxMode) where CMDInInitiatorMode - :: ConnectionManagerDataInMode peerAddr InitiatorMode + :: ConnectionManagerDataInMode peerAddr m InitiatorMode CMDInInitiatorResponderMode - :: Server.ControlChannel IO + :: Server.ControlChannel m (Server.NewConnection peerAddr - (Handle InitiatorResponderMode peerAddr ByteString IO () ())) - -> StrictTVar IO Server.InboundGovernorObservableState - -> ConnectionManagerDataInMode peerAddr InitiatorResponderMode + (Handle InitiatorResponderMode peerAddr ByteString m () ())) + -> StrictTVar m Server.InboundGovernorObservableState + -> ConnectionManagerDataInMode peerAddr m InitiatorResponderMode -- @@ -387,44 +395,44 @@ data ConnectionManagerDataInMode peerAddr (mode :: MuxMode) where -- Node-To-Client diffusion is only used in 'ResponderMode'. -- -type NodeToClientHandle ntcAddr = - Handle ResponderMode ntcAddr ByteString IO Void () +type NodeToClientHandle ntcAddr m = + Handle ResponderMode ntcAddr ByteString m Void () type NodeToClientHandleError ntcVersion = HandleError ResponderMode ntcVersion type NodeToClientConnectionHandler - ntcFd ntcAddr ntcVersion ntcVersionData = + ntcFd ntcAddr ntcVersion ntcVersionData m = ConnectionHandler ResponderMode (ConnectionHandlerTrace ntcVersion ntcVersionData) ntcFd ntcAddr - (NodeToClientHandle ntcAddr) + (NodeToClientHandle ntcAddr m) (NodeToClientHandleError ntcVersion) (ntcVersion, ntcVersionData) - IO + m type NodeToClientConnectionManagerArguments - ntcFd ntcAddr ntcVersion ntcVersionData = + ntcFd ntcAddr ntcVersion ntcVersionData m = ConnectionManagerArguments (ConnectionHandlerTrace ntcVersion ntcVersionData) ntcFd ntcAddr - (NodeToClientHandle ntcAddr) + (NodeToClientHandle ntcAddr m) (NodeToClientHandleError ntcVersion) (ntcVersion, ntcVersionData) - IO + m type NodeToClientConnectionManager - ntcFd ntcAddr ntcVersion ntcVersionData = + ntcFd ntcAddr ntcVersion ntcVersionData m = ConnectionManager ResponderMode ntcFd ntcAddr - (NodeToClientHandle ntcAddr) + (NodeToClientHandle ntcAddr m) (NodeToClientHandleError ntcVersion) - IO + m -- -- Node-To-Node type aliases @@ -434,201 +442,202 @@ type NodeToClientConnectionManager type NodeToNodeHandle (mode :: MuxMode) - ntnAddr - a = - Handle mode ntnAddr ByteString IO () a + ntnAddr m a = + Handle mode ntnAddr ByteString m () a type NodeToNodeConnectionHandler (mode :: MuxMode) - ntnFd ntnAddr ntnVersion ntnVersionData - a = + ntnFd ntnAddr ntnVersion ntnVersionData m a = ConnectionHandler mode (ConnectionHandlerTrace ntnVersion ntnVersionData) ntnFd ntnAddr - (NodeToNodeHandle mode ntnAddr a) + (NodeToNodeHandle mode ntnAddr m a) (HandleError mode ntnVersion) (ntnVersion, ntnVersionData) - IO + m type NodeToNodeConnectionManagerArguments (mode :: MuxMode) - ntnFd ntnAddr ntnVersion ntnVersionData - a = + ntnFd ntnAddr ntnVersion ntnVersionData m a = ConnectionManagerArguments (ConnectionHandlerTrace ntnVersion ntnVersionData) ntnFd ntnAddr - (NodeToNodeHandle mode ntnAddr a) + (NodeToNodeHandle mode ntnAddr m a) (HandleError mode ntnVersion) (ntnVersion, ntnVersionData) - IO + m type NodeToNodeConnectionManager (mode :: MuxMode) - ntnFd ntnAddr ntnVersion - a = + ntnFd ntnAddr ntnVersion m a = ConnectionManager mode ntnFd ntnAddr - (NodeToNodeHandle mode ntnAddr a) + (NodeToNodeHandle mode ntnAddr m a) (HandleError mode ntnVersion) - IO + m -- -- Governor type aliases -- -type NodeToNodePeerConnectionHandle (mode :: MuxMode) ntnAddr a = +type NodeToNodePeerConnectionHandle (mode :: MuxMode) ntnAddr m a = PeerConnectionHandle mode ntnAddr ByteString - IO () a + m () a -type NodeToNodePeerStateActions (mode :: MuxMode) ntnAddr a = +type NodeToNodePeerStateActions (mode :: MuxMode) ntnAddr m a = Governor.PeerStateActions ntnAddr - (NodeToNodePeerConnectionHandle mode ntnAddr a) - IO + (NodeToNodePeerConnectionHandle mode ntnAddr m a) + m -type NodeToNodePeerSelectionActions (mode :: MuxMode) ntnAddr a = +type NodeToNodePeerSelectionActions (mode :: MuxMode) ntnAddr m a = Governor.PeerSelectionActions ntnAddr - (NodeToNodePeerConnectionHandle mode ntnAddr a) - IO + (NodeToNodePeerConnectionHandle mode ntnAddr m a) + m + +data DiffusionInterfaces ntnFd ntnAddr ntnVersion ntnVersionData + ntcFd ntcAddr ntcVersion ntcVersionData + resolver resolverError + m = + DiffusionInterfaces { + -- | node-to-node snocket + -- + diNtnSnocket + :: Snocket m ntnFd ntnAddr, --- | Main entry point for data diffusion service. It allows to: --- --- * connect to upstream peers; --- * accept connection from downstream peers, if run in --- 'InitiatorAndResponderDiffusionMode'. --- * runs a local service which allows to use node-to-client protocol to obtain --- information from the running system. This is used by 'cardano-cli' or --- a wallet and a like local services. --- -runDataDiffusion - :: DiffusionTracers (DiffusionTracersExtra - RemoteAddress NodeToNodeVersion NodeToNodeVersionData - LocalAddress NodeToClientVersion NodeToClientVersionData - IO) - RemoteAddress NodeToNodeVersion - LocalAddress NodeToClientVersion - IO - -> DiffusionArguments (DiffusionArgumentsExtra IO) - Socket RemoteAddress - LocalSocket LocalAddress - -> DiffusionApplications - (DiffusionApplicationsExtra RemoteAddress IO) - RemoteAddress NodeToNodeVersion NodeToNodeVersionData - LocalAddress NodeToClientVersion NodeToClientVersionData - IO - -> IO Void -runDataDiffusion tracers args apps = do - -- We run two services: for /node-to-node/ and /node-to-client/. The - -- naming convention is that we use /local/ prefix for /node-to-client/ - -- related terms, as this is a local only service running over a unix - -- socket / windows named pipe. - handle (\e -> traceWith (dtDiffusionInitializationTracer tracers) - (DiffusionErrored e) - >> throwIO e) - $ withIOManager $ \iocp -> do - let ntnSnocket :: SocketSnocket - ntnSnocket = Snocket.socketSnocket iocp + -- | node-to-node handshake configuration + -- + diNtnHandshakeArguments + :: HandshakeArguments (ConnectionId ntnAddr) ntnVersion ntnVersionData m, - ntcSnocket :: LocalSnocket - ntcSnocket = Snocket.localSnocket iocp + -- | node-to-node address type + -- + diNtnAddressType + :: ntnAddr -> Maybe AddressType, - ntnHandshakeArgs = - HandshakeArguments { - haHandshakeTracer = dtHandshakeTracer tracers, - haHandshakeCodec = NodeToNode.nodeToNodeHandshakeCodec, - haVersionDataCodec = - cborTermVersionDataCodec - NodeToNode.nodeToNodeCodecCBORTerm, - haAcceptVersion = acceptableVersion, - haTimeLimits = timeLimitsHandshake - } - ntcHandshakeArgs = - HandshakeArguments { - haHandshakeTracer = dtLocalHandshakeTracer tracers, - haHandshakeCodec = NodeToClient.nodeToClientHandshakeCodec, - haVersionDataCodec = - cborTermVersionDataCodec - NodeToClient.nodeToClientCodecCBORTerm, - haAcceptVersion = acceptableVersion, - haTimeLimits = noTimeLimitsHandshake - } - withTimeoutSerial $ \timeoutFn -> - let domainResolver :: [DomainAddress] - -> IO (Map DomainAddress (Set Socket.SockAddr)) - domainResolver = - resolveDomainAddresses - (dtTracePublicRootPeersTracer . dtExtra $ tracers) - timeoutFn - DNS.defaultResolvConf - ioDNSActions - in runDataDiffusionM - ntnSnocket - ntnHandshakeArgs - socketAddressType - nodeDataFlow - (curry IP.toSockAddr) - domainResolver - ntcSnocket - ntcHandshakeArgs - localSocketFileDescriptor - tracers args apps + -- | node-to-node data flow used by connection manager to classify + -- negotiated connections + -- + diNtnDataFlow + :: ntnVersion -> ntnVersionData -> DataFlow, + + -- | node-to-node peer address + -- + diNtnToPeerAddr + :: IP -> Socket.PortNumber -> ntnAddr, + + -- | node-to-node domain resolver + -- + diNtnDomainResolver + :: [DomainAddress] -> m (Map DomainAddress (Set ntnAddr)), + + -- | node-to-client snocket + -- + diNtcSnocket + :: Snocket m ntcFd ntcAddr, + -- | node-to-client handshake configuration + -- + diNtcHandshakeArguments + :: HandshakeArguments (ConnectionId ntcAddr) ntcVersion ntcVersionData m, + + -- | node-to-client file descriptor + -- + diNtcGetFileDescriptor + :: ntcFd -> m FileDescriptor, + + -- | diffusion pseudo random generator. It is split between various + -- components that need randomness, e.g. inbound governor, peer + -- selection, policies, etc. + -- + diRng + :: StdGen, + + -- | callback which is used to register @SIGUSR1@ signal handler. + diInstallSigUSR1Handler + :: forall mode x. + NodeToNodeConnectionManager mode ntnFd ntnAddr ntnVersion m x + -> m (), + + -- | diffusion dns actions + -- + diDnsActions + :: DNSActions resolver resolverError m + } runDataDiffusionM :: forall m ntnFd ntnAddr ntnVersion ntnVersionData - ntcFd ntcAddr ntcVersion ntcVersionData. - ( Monad m - , Typeable ntnAddr - , Ord ntnAddr - , Show ntnAddr - , Typeable ntnVersion - , Ord ntnVersion - , Show ntnVersion - , Typeable ntcAddr - , Ord ntcAddr - , Show ntcAddr - , Ord ntcVersion - , m ~ IO + ntcFd ntcAddr ntcVersion ntcVersionData + resolver resolverError. + ( Alternative m + , MonadAsync m + , MonadEvaluate m + , MonadFork m + , MonadLabelledSTM m + , MonadMask m + , MonadThrow (STM m) + , MonadTime m + , MonadTimer m + , Typeable ntnAddr + , Ord ntnAddr + , Show ntnAddr + , Typeable ntnVersion + , Ord ntnVersion + , Show ntnVersion + , Typeable ntcAddr + , Ord ntcAddr + , Show ntcAddr + , Ord ntcVersion + , Exception resolverError ) - => Snocket m ntnFd ntnAddr - -> HandshakeArguments (ConnectionId ntnAddr) ntnVersion ntnVersionData m - -> (ntnAddr -> Maybe AddressType) - -> (ntnVersion -> ntnVersionData -> DataFlow) - -> (IP -> Socket.PortNumber -> ntnAddr) - -> ([DomainAddress] -> m (Map DomainAddress (Set ntnAddr))) - -> Snocket m ntcFd ntcAddr - -> HandshakeArguments (ConnectionId ntcAddr) ntcVersion ntcVersionData m - -> (ntcFd -> m FileDescriptor) + => DiffusionInterfaces ntnFd ntnAddr ntnVersion ntnVersionData + ntcFd ntcAddr ntcVersion ntcVersionData + resolver resolverError + m + -- ^ interfaces -> DiffusionTracers (DiffusionTracersExtra ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion ntcVersionData - m) + resolverError m) ntnAddr ntnVersion ntcAddr ntcVersion m + -- ^ tracers -> DiffusionArguments (DiffusionArgumentsExtra m) ntnFd ntnAddr ntcFd ntcAddr + + -- ^ configuration -> DiffusionApplications (DiffusionApplicationsExtra ntnAddr m) ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion ntcVersionData m + -- ^ protocol handlers -> m Void -runDataDiffusionM ntnSnocket ntnHandshakeArgs - ntnAddressType ntnDataFlow - ntnToPeerAddr - domainResolver - ntcSnocket ntcHandshakeArgs - ntcGetFileDescriptor +runDataDiffusionM DiffusionInterfaces + { diNtnSnocket + , diNtnHandshakeArguments + , diNtnAddressType + , diNtnDataFlow + , diNtnToPeerAddr + , diNtnDomainResolver + , diNtcSnocket + , diNtcHandshakeArguments + , diNtcGetFileDescriptor + , diRng + , diInstallSigUSR1Handler + , diDnsActions + } + tracers DiffusionArguments { daIPv4Address @@ -645,6 +654,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs , daTimeWaitTimeout } } + DiffusionApplications { daApplicationInitiatorMode , daApplicationInitiatorResponderMode @@ -660,26 +670,26 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs } = -- TODO: this is wrong, the 'withTimeoutSerial' cannot be shared between -- concurrent threads! - withTimeoutSerial $ \timeout -> do + withTimeoutSerial $ \timeoutFn -> do -- Thread to which 'RethrowPolicy' will throw fatal exceptions. mainThreadId <- myThreadId cmIPv4Address - <- traverse (either (Snocket.getLocalAddr ntnSnocket) pure) + <- traverse (either (Snocket.getLocalAddr diNtnSnocket) pure) daIPv4Address case cmIPv4Address of - Just addr | Just IPv4Address <- ntnAddressType addr + Just addr | Just IPv4Address <- diNtnAddressType addr -> pure () | otherwise -> throwIO (UnexpectedIPv4Address addr) Nothing -> pure () cmIPv6Address - <- traverse (either (Snocket.getLocalAddr ntnSnocket) pure) + <- traverse (either (Snocket.getLocalAddr diNtnSnocket) pure) daIPv6Address case cmIPv6Address of - Just addr | Just IPv6Address <- ntnAddressType addr + Just addr | Just IPv6Address <- diNtnAddressType addr -> pure () | otherwise -> throwIO (UnexpectedIPv6Address addr) @@ -698,17 +708,13 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs HasInitiatorResponder <$> (CMDInInitiatorResponderMode <$> Server.newControlChannel - <*> Server.newObservableStateVarIO) + <*> Server.newObservableStateVar ntnInbgovRng) localControlChannel <- Server.newControlChannel - localServerStateVar <- Server.newObservableStateVarIO + localServerStateVar <- Server.newObservableStateVar ntcInbgovRng -- RNGs used for picking random peers from the ledger and for -- demoting/promoting peers. - rng <- newStdGen - let (ledgerPeersRng, rng') = split rng - (policyRng, rng'') = split rng' - (churnRng, fuzzRng) = split rng'' policyRngVar <- newTVarIO policyRng churnModeVar <- newTVarIO ChurnModeNormal @@ -739,16 +745,16 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs case daLocalAddress of Nothing -> Nothing Just localAddr -> - Just $ withLocalSocket tracer ntcGetFileDescriptor ntcSnocket localAddr + Just $ withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr $ \localSocket -> do let localConnectionHandler :: NodeToClientConnectionHandler - ntcFd ntcAddr ntcVersion ntcVersionData + ntcFd ntcAddr ntcVersion ntcVersionData m localConnectionHandler = makeConnectionHandler dtLocalMuxTracer SingResponderMode localMiniProtocolBundle - ntcHandshakeArgs + diNtcHandshakeArguments ( ( \ (OuroborosApplication apps) -> Bundle (WithHot apps) @@ -759,7 +765,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs localConnectionManagerArguments :: NodeToClientConnectionManagerArguments - ntcFd ntcAddr ntcVersion ntcVersionData + ntcFd ntcAddr ntcVersion ntcVersionData m localConnectionManagerArguments = ConnectionManagerArguments { cmTracer = dtLocalConnectionManagerTracer, @@ -768,7 +774,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs cmIPv4Address = Nothing, cmIPv6Address = Nothing, cmAddressType = const Nothing, - cmSnocket = ntcSnocket, + cmSnocket = diNtcSnocket, cmTimeWaitTimeout = local_TIME_WAIT_TIMEOUT, cmOutboundIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT, connectionDataFlow = uncurry localDataFlow, @@ -783,7 +789,8 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs classifyHandleError (InResponderMode localControlChannel) $ \(localConnectionManager :: NodeToClientConnectionManager - ntcFd ntcAddr ntcVersion ntcVersionData) + ntcFd ntcAddr ntcVersion + ntcVersionData m) -> do -- @@ -791,13 +798,13 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs -- traceWith tracer . RunLocalServer - =<< Snocket.getLocalAddr ntcSnocket localSocket + =<< Snocket.getLocalAddr diNtcSnocket localSocket Async.withAsync (Server.run ServerArguments { serverSockets = localSocket :| [], - serverSnocket = ntcSnocket, + serverSnocket = diNtcSnocket, serverTracer = dtLocalServerTracer, serverInboundGovernorTracer = dtLocalInboundGovernorTracer, serverInboundIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT, @@ -816,11 +823,11 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs Async.withAsync (runLedgerPeers ledgerPeersRng - ntnToPeerAddr + diNtnToPeerAddr dtLedgerPeersTracer daReadUseLedgerAfter daLedgerPeersCtx - domainResolver + diNtnDomainResolver (takeTMVar ledgerPeersReq) (putTMVar ledgerPeersRsp) ) @@ -834,7 +841,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs :: NodeToNodeConnectionManagerArguments InitiatorMode ntnFd ntnAddr ntnVersion ntnVersionData - Void + m Void connectionManagerArguments = ConnectionManagerArguments { cmTracer = dtConnectionManagerTracer, @@ -842,9 +849,9 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs cmMuxTracer = dtMuxTracer, cmIPv4Address, cmIPv6Address, - cmAddressType = ntnAddressType, - cmSnocket = ntnSnocket, - connectionDataFlow = uncurry ntnDataFlow, + cmAddressType = diNtnAddressType, + cmSnocket = diNtnSnocket, + connectionDataFlow = uncurry diNtnDataFlow, cmPrunePolicy = case cmdInMode of HasInitiator CMDInInitiatorMode -> @@ -862,13 +869,13 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs :: NodeToNodeConnectionHandler InitiatorMode ntnFd ntnAddr ntnVersion ntnVersionData - Void + m Void connectionHandler = makeConnectionHandler dtMuxTracer SingInitiatorMode miniProtocolBundleInitiatorMode - ntnHandshakeArgs + diNtnHandshakeArguments daApplicationInitiatorMode (mainThreadId, rethrowPolicy <> daRethrowPolicy) @@ -879,19 +886,9 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs NotInResponderMode $ \(connectionManager :: NodeToNodeConnectionManager - InitiatorMode ntnFd ntnAddr ntnVersion Void) + InitiatorMode ntnFd ntnAddr ntnVersion m Void) -> do -#ifdef POSIX - _ <- Signals.installHandler - Signals.sigUSR1 - (Signals.Catch - (do state <- readState connectionManager - traceWith dtConnectionManagerTracer - (TrState state) - ) - ) - Nothing -#endif + diInstallSigUSR1Handler connectionManager -- -- peer state actions @@ -901,7 +898,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs -- withPeerStateActions - timeout + timeoutFn PeerStateActionsArguments { spsTracer = dtPeerSelectionActionsTracer, spsDeactivateTimeout = Diffusion.Policies.deactivateTimeout, @@ -910,7 +907,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs spsConnectionManager = connectionManager } $ \(peerStateActions - :: NodeToNodePeerStateActions InitiatorMode ntnAddr Void) -> + :: NodeToNodePeerStateActions InitiatorMode ntnAddr m Void) -> -- -- Run peer selection (p2p governor) -- @@ -918,8 +915,9 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs withPeerSelectionActions dtTraceLocalRootPeersTracer dtTracePublicRootPeersTracer - ntnToPeerAddr - timeout + diNtnToPeerAddr + diDnsActions + timeoutFn (readTVar peerSelectionTargetsVar) daReadLocalRootPeers daReadPublicRootPeers @@ -929,7 +927,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs $ \mbLocalPeerRootProviderThread (peerSelectionActions :: NodeToNodePeerSelectionActions - InitiatorMode ntnAddr Void) -> + InitiatorMode ntnAddr m Void) -> Async.withAsync (Governor.peerSelectionGovernor @@ -971,7 +969,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs :: NodeToNodeConnectionManagerArguments InitiatorResponderMode ntnFd ntnAddr ntnVersion ntnVersionData - () + m () connectionManagerArguments = ConnectionManagerArguments { cmTracer = dtConnectionManagerTracer, @@ -979,9 +977,9 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs cmMuxTracer = dtMuxTracer, cmIPv4Address, cmIPv6Address, - cmAddressType = ntnAddressType, - cmSnocket = ntnSnocket, - connectionDataFlow = uncurry ntnDataFlow, + cmAddressType = diNtnAddressType, + cmSnocket = diNtnSnocket, + connectionDataFlow = uncurry diNtnDataFlow, cmPrunePolicy = case cmdInMode of HasInitiatorResponder (CMDInInitiatorResponderMode _ serverStateVar) -> @@ -995,13 +993,13 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs :: NodeToNodeConnectionHandler InitiatorResponderMode ntnFd ntnAddr ntnVersion ntnVersionData - () + m () connectionHandler = makeConnectionHandler dtMuxTracer SingInitiatorResponderMode miniProtocolBundleInitiatorResponderMode - ntnHandshakeArgs + diNtnHandshakeArguments daApplicationInitiatorResponderMode (mainThreadId, rethrowPolicy <> daRethrowPolicy) @@ -1012,19 +1010,9 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs (InResponderMode controlChannel) $ \(connectionManager :: NodeToNodeConnectionManager - InitiatorResponderMode ntnFd ntnAddr ntnVersion () + InitiatorResponderMode ntnFd ntnAddr ntnVersion m () ) -> do -#ifdef POSIX - _ <- Signals.installHandler - Signals.sigUSR1 - (Signals.Catch - (do state <- readState connectionManager - traceWith dtConnectionManagerTracer - (TrState state) - ) - ) - Nothing -#endif + diInstallSigUSR1Handler connectionManager -- -- peer state actions -- @@ -1033,7 +1021,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs -- withPeerStateActions - timeout + timeoutFn PeerStateActionsArguments { spsTracer = dtPeerSelectionActionsTracer, spsDeactivateTimeout = Diffusion.Policies.deactivateTimeout, @@ -1043,7 +1031,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs } $ \(peerStateActions :: NodeToNodePeerStateActions - InitiatorResponderMode ntnAddr ()) -> + InitiatorResponderMode ntnAddr m ()) -> -- -- Run peer selection (p2p governor) @@ -1052,8 +1040,9 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs withPeerSelectionActions dtTraceLocalRootPeersTracer dtTracePublicRootPeersTracer - ntnToPeerAddr - timeout + diNtnToPeerAddr + diDnsActions + timeoutFn (readTVar peerSelectionTargetsVar) daReadLocalRootPeers daReadPublicRootPeers @@ -1063,7 +1052,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs $ \mbLocalPeerRootProviderThread (peerSelectionActions :: NodeToNodePeerSelectionActions - InitiatorResponderMode ntnAddr ()) -> + InitiatorResponderMode ntnAddr m ()) -> Async.withAsync (Governor.peerSelectionGovernor @@ -1075,7 +1064,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs (Diffusion.Policies.simplePeerSelectionPolicy policyRngVar (readTVar churnModeVar) daPeerMetrics)) $ \governorThread -> - withSockets tracer ntnSnocket + withSockets tracer diNtnSnocket ( catMaybes [ daIPv4Address , daIPv6Address @@ -1090,7 +1079,7 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs (Server.run ServerArguments { serverSockets = sockets, - serverSnocket = ntnSnocket, + serverSnocket = diNtnSnocket, serverTracer = dtServerTracer, serverInboundGovernorTracer = dtInboundGovernorTracer, serverConnectionLimits = daAcceptedConnectionsLimit, @@ -1128,6 +1117,12 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs ) where + (ledgerPeersRng, rng1) = split diRng + (policyRng, rng2) = split rng1 + (churnRng, rng3) = split rng2 + (fuzzRng, rng4) = split rng3 + (ntnInbgovRng, ntcInbgovRng) = split rng4 + DiffusionTracers { dtMuxTracer , dtLocalMuxTracer @@ -1247,6 +1242,116 @@ runDataDiffusionM ntnSnocket ntnHandshakeArgs Just (_ :: IOManagerError) -> ShutdownNode Nothing -> mempty + +-- | Main entry point for data diffusion service. It allows to: +-- +-- * connect to upstream peers; +-- * accept connection from downstream peers, if run in +-- 'InitiatorAndResponderDiffusionMode'. +-- * runs a local service which allows to use node-to-client protocol to obtain +-- information from the running system. This is used by 'cardano-cli' or +-- a wallet and a like local services. +-- +runDataDiffusion + :: DiffusionTracers (DiffusionTracersExtra + RemoteAddress NodeToNodeVersion NodeToNodeVersionData + LocalAddress NodeToClientVersion NodeToClientVersionData + IOException IO) + RemoteAddress NodeToNodeVersion + LocalAddress NodeToClientVersion + IO + -> DiffusionArguments (DiffusionArgumentsExtra IO) + Socket RemoteAddress + LocalSocket LocalAddress + -> DiffusionApplications + (DiffusionApplicationsExtra RemoteAddress IO) + RemoteAddress NodeToNodeVersion NodeToNodeVersionData + LocalAddress NodeToClientVersion NodeToClientVersionData + IO + -> IO Void +runDataDiffusion tracers args apps = do + -- We run two services: for /node-to-node/ and /node-to-client/. The + -- naming convention is that we use /local/ prefix for /node-to-client/ + -- related terms, as this is a local only service running over a unix + -- socket / windows named pipe. + handle (\e -> traceWith (dtDiffusionInitializationTracer tracers) + (DiffusionErrored e) + >> throwIO e) + $ withIOManager $ \iocp -> do + let diNtnSnocket :: SocketSnocket + diNtnSnocket = Snocket.socketSnocket iocp + + diNtcSnocket :: LocalSnocket + diNtcSnocket = Snocket.localSnocket iocp + + diNtnHandshakeArguments = + HandshakeArguments { + haHandshakeTracer = dtHandshakeTracer tracers, + haHandshakeCodec = NodeToNode.nodeToNodeHandshakeCodec, + haVersionDataCodec = + cborTermVersionDataCodec + NodeToNode.nodeToNodeCodecCBORTerm, + haAcceptVersion = acceptableVersion, + haTimeLimits = timeLimitsHandshake + } + diNtcHandshakeArguments = + HandshakeArguments { + haHandshakeTracer = dtLocalHandshakeTracer tracers, + haHandshakeCodec = NodeToClient.nodeToClientHandshakeCodec, + haVersionDataCodec = + cborTermVersionDataCodec + NodeToClient.nodeToClientCodecCBORTerm, + haAcceptVersion = acceptableVersion, + haTimeLimits = noTimeLimitsHandshake + } + + diInstallSigUSR1Handler + :: forall mode x. + NodeToNodeConnectionManager mode Socket RemoteAddress NodeToNodeVersion IO x + -> IO () +#ifdef POSIX + diInstallSigUSR1Handler = \connectionManager -> + void $ Signals.installHandler + Signals.sigUSR1 + (Signals.Catch + (do state <- readState connectionManager + traceWith (dtConnectionManagerTracer . dtExtra $ tracers) + (TrState state) + ) + ) + Nothing +#else + diInstallSigUSR1Handler = \_ -> pure () +#endif + withTimeoutSerial $ \timeoutFn -> do + let diNtnDomainResolver :: [DomainAddress] + -> IO (Map DomainAddress (Set Socket.SockAddr)) + diNtnDomainResolver = + resolveDomainAddresses + (dtTracePublicRootPeersTracer . dtExtra $ tracers) + timeoutFn + DNS.defaultResolvConf + ioDNSActions + diRng <- newStdGen + runDataDiffusionM + DiffusionInterfaces { + diNtnSnocket, + diNtnHandshakeArguments, + diNtnAddressType = socketAddressType, + diNtnDataFlow = nodeDataFlow, + diNtnToPeerAddr = curry IP.toSockAddr, + diNtnDomainResolver, + + diNtcSnocket, + diNtcHandshakeArguments, + diNtcGetFileDescriptor = localSocketFileDescriptor, + + diRng, + diInstallSigUSR1Handler, + diDnsActions = ioDNSActions + } + tracers args apps + -- -- Data flow -- diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs index 914dc6e2d04..ce24e398031 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs @@ -14,10 +14,11 @@ module Ouroboros.Network.PeerSelection.Simple import Data.Foldable (toList) import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime +import Control.Monad.Class.MonadTimer import Control.Monad.Class.MonadSTM.Strict import Control.Tracer (Tracer) -import Control.Exception (IOException) import Data.Map (Map) import Data.Set (Set) @@ -34,31 +35,38 @@ import Ouroboros.Network.PeerSelection.RootPeersDNS withPeerSelectionActions - :: forall peeraddr peerconn a. - Ord peeraddr - => Tracer IO (TraceLocalRootPeers peeraddr IOException) - -> Tracer IO TracePublicRootPeers + :: forall peeraddr peerconn resolver exception m a. + ( MonadAsync m + , MonadDelay m + , MonadThrow m + , Ord peeraddr + , Exception exception + ) + => Tracer m (TraceLocalRootPeers peeraddr exception) + -> Tracer m TracePublicRootPeers -> (IP -> Socket.PortNumber -> peeraddr) - -> TimeoutFn IO - -> STM IO PeerSelectionTargets - -> STM IO [(Int, Map RelayAddress PeerAdvertise)] + -> DNSActions resolver exception m + -> TimeoutFn m + -> STM m PeerSelectionTargets + -> STM m [(Int, Map RelayAddress PeerAdvertise)] -- ^ local root peers - -> STM IO [RelayAddress] + -> STM m [RelayAddress] -- ^ public root peers - -> PeerStateActions peeraddr peerconn IO - -> (NumberOfPeers -> STM IO ()) - -> STM IO (Maybe (Set peeraddr, DiffTime)) - -> (Maybe (Async IO Void) - -> PeerSelectionActions peeraddr peerconn IO - -> IO a) + -> PeerStateActions peeraddr peerconn m + -> (NumberOfPeers -> STM m ()) + -> STM m (Maybe (Set peeraddr, DiffTime)) + -> (Maybe (Async m Void) + -> PeerSelectionActions peeraddr peerconn m + -> m a) -- ^ continuation, recieves a handle to the local roots peer provider thread -- (only if local root peers where non-empty). - -> IO a + -> m a withPeerSelectionActions localRootTracer publicRootTracer toPeerAddr - timeout + dnsActions + timeoutFn readTargets readLocalRootPeers readPublicRootPeers @@ -70,7 +78,7 @@ withPeerSelectionActions let peerSelectionActions = PeerSelectionActions { readPeerSelectionTargets = readTargets, readLocalRootPeers = toList <$> readTVar localRootsVar, - requestPublicRootPeers = requestLedgerPeers ioDNSActions, + requestPublicRootPeers = requestLedgerPeers, requestPeerGossip = \_ -> pure [], peerStateActions } @@ -78,34 +86,32 @@ withPeerSelectionActions (localRootPeersProvider localRootTracer toPeerAddr - timeout + timeoutFn DNS.defaultResolvConf localRootsVar readLocalRootPeers - ioDNSActions) + dnsActions) (\thread -> k (Just thread) peerSelectionActions) where -- We first try to get poublic root peers from the ledger, but if it fails -- (for example because the node hasn't synced far enough) we fall back -- to using the manually configured bootstrap root peers. - requestLedgerPeers :: DNSActions DNS.Resolver IOException IO - -> Int -> IO (Set peeraddr, DiffTime) - requestLedgerPeers dnsActions n = do + requestLedgerPeers :: Int -> m (Set peeraddr, DiffTime) + requestLedgerPeers n = do atomically $ reqLedgerPeers $ NumberOfPeers $ fromIntegral n peers_m <- atomically getLedgerPeers case peers_m of - Nothing -> requestPublicRootPeers dnsActions n + Nothing -> requestPublicRootPeers n Just peers -> return peers -- For each call we re-initialise the dns library which forces reading -- `/etc/resolv.conf`: -- https://github.com/input-output-hk/cardano-node/issues/731 - requestPublicRootPeers :: DNSActions DNS.Resolver IOException IO - -> Int -> IO (Set peeraddr, DiffTime) - requestPublicRootPeers dnsActions n = + requestPublicRootPeers :: Int -> m (Set peeraddr, DiffTime) + requestPublicRootPeers n = publicRootPeersProvider publicRootTracer toPeerAddr - timeout + timeoutFn DNS.defaultResolvConf readPublicRootPeers dnsActions