diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index 632f34da277..e1fa519e5e5 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -445,6 +445,7 @@ type NodeToNodePeerSelectionActions (mode :: MuxMode) ntnAddr m a = (NodeToNodePeerConnectionHandle mode ntnAddr m a) m + data Interfaces ntnFd ntnAddr ntnVersion ntnVersionData ntcFd ntcAddr ntcVersion ntcVersionData resolver resolverError @@ -656,57 +657,22 @@ runM Interfaces -> throwIO (UnexpectedIPv6Address addr) Nothing -> pure () - lookupReqs <- case (cmIPv4Address, cmIPv6Address) of - (Just _, Nothing) -> return LookupReqAOnly - (Nothing, Just _) -> return LookupReqAAAAOnly - (Just _, Just _) -> return LookupReqAAndAAAA - _ -> - throwIO (NoSocket :: Failure RemoteAddress) - - -- control channel for the server; only required in - -- @'InitiatorResponderMode' :: 'MuxMode'@ - cmdInMode - <- case diffusionMode of - InitiatorOnlyDiffusionMode -> - -- action which we pass to connection handler - pure (HasInitiator CMDInInitiatorMode) - InitiatorAndResponderDiffusionMode -> do - -- we pass 'Server.newOutboundConnection serverControlChannel' to - -- connection handler - HasInitiatorResponder <$> - (CMDInInitiatorResponderMode - <$> Server.newControlChannel - <*> Server.newObservableStateVar ntnInbgovRng) - - localControlChannel <- Server.newControlChannel - localServerStateVar <- Server.newObservableStateVar ntcInbgovRng - - -- RNGs used for picking random peers from the ledger and for - -- demoting/promoting peers. - policyRngVar <- newTVarIO policyRng - - churnModeVar <- newTVarIO ChurnModeNormal - - peerSelectionTargetsVar <- newTVarIO $ daPeerSelectionTargets { - -- Start with a smaller number of active peers, the churn governor will increase - -- it to the configured value after a delay. - targetNumberOfActivePeers = - min 2 (targetNumberOfActivePeers daPeerSelectionTargets) - } - - let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0 - - -- - -- local connection manager - -- - localThread :: Maybe (m Void) + -- + -- local connection manager + -- + let localThread :: Maybe (m Void) localThread = case daLocalAddress of Nothing -> Nothing Just localAddr -> Just $ withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr $ \localSocket -> do - let localConnectionHandler :: NodeToClientConnectionHandler + localControlChannel <- Server.newControlChannel + localServerStateVar <- Server.newObservableStateVar ntcInbgovRng + + let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0 + + localConnectionHandler :: NodeToClientConnectionHandler ntcFd ntcAddr ntcVersion ntcVersionData m localConnectionHandler = makeConnectionHandler @@ -751,34 +717,69 @@ runM Interfaces ntcVersionData m) -> do - -- - -- run local server - -- + -- + -- run local server + -- - traceWith tracer . RunLocalServer - =<< Snocket.getLocalAddr diNtcSnocket localSocket - - Async.withAsync - (Server.run - ServerArguments { - serverSockets = localSocket :| [], - serverSnocket = diNtcSnocket, - serverTracer = dtLocalServerTracer, - serverTrTracer = nullTracer, -- TODO: issue #3320 - serverInboundGovernorTracer = dtLocalInboundGovernorTracer, - serverInboundIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT, - serverConnectionLimits = localConnectionLimits, - serverConnectionManager = localConnectionManager, - serverControlChannel = localControlChannel, - serverObservableStateVar = localServerStateVar - }) Async.wait + traceWith tracer . RunLocalServer + =<< Snocket.getLocalAddr diNtcSnocket localSocket + + Async.withAsync + (Server.run + ServerArguments { + serverSockets = localSocket :| [], + serverSnocket = diNtcSnocket, + serverTracer = dtLocalServerTracer, + serverTrTracer = nullTracer, -- TODO: issue #3320 + serverInboundGovernorTracer = dtLocalInboundGovernorTracer, + serverInboundIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT, + serverConnectionLimits = localConnectionLimits, + serverConnectionManager = localConnectionManager, + serverControlChannel = localControlChannel, + serverObservableStateVar = localServerStateVar + }) Async.wait - -- - -- remote connection manager - -- + -- + -- remote connection manager + -- + let remoteThread :: m Void + remoteThread = do + lookupReqs <- case (cmIPv4Address, cmIPv6Address) of + (Just _, Nothing) -> return LookupReqAOnly + (Nothing, Just _) -> return LookupReqAAAAOnly + (Just _, Just _) -> return LookupReqAAndAAAA + _ -> + throwIO (NoSocket :: Failure RemoteAddress) + + -- control channel for the server; only required in + -- @'InitiatorResponderMode' :: 'MuxMode'@ + cmdInMode + <- case diffusionMode of + InitiatorOnlyDiffusionMode -> + -- action which we pass to connection handler + pure (HasInitiator CMDInInitiatorMode) + InitiatorAndResponderDiffusionMode -> + -- we pass 'Server.newOutboundConnection serverControlChannel' to + -- connection handler + -- GR-FIXME: last comment? not understandable in context. DEI? + HasInitiatorResponder <$> + (CMDInInitiatorResponderMode + <$> Server.newControlChannel + <*> Server.newObservableStateVar ntnInbgovRng) + + -- RNGs used for picking random peers from the ledger and for + -- demoting/promoting peers. + policyRngVar <- newTVarIO policyRng + + churnModeVar <- newTVarIO ChurnModeNormal + + peerSelectionTargetsVar <- newTVarIO $ daPeerSelectionTargets { + -- Start with a smaller number of active peers, the churn governor will increase + -- it to the configured value after a delay. + targetNumberOfActivePeers = + min 2 (targetNumberOfActivePeers daPeerSelectionTargets) + } - remoteThread :: m Void - remoteThread = withLedgerPeers ledgerPeersRng diNtnToPeerAddr @@ -809,14 +810,11 @@ runM Interfaces cmAddressType = diNtnAddressType, cmSnocket = diNtnSnocket, connectionDataFlow = uncurry diNtnDataFlow, - cmPrunePolicy = - case cmdInMode of - HasInitiator CMDInInitiatorMode -> - -- Server is not running, it will not be able to - -- advise which connections to prune. It's also not - -- expected that the governor targets will be larger - -- than limits imposed by 'cmConnectionsLimits'. - simplePrunePolicy, + cmPrunePolicy = simplePrunePolicy, + -- Server is not running, it will not be able to + -- advise which connections to prune. It's also not + -- expected that the governor targets will be larger + -- than limits imposed by 'cmConnectionsLimits'. cmConnectionsLimits = daAcceptedConnectionsLimit, cmTimeWaitTimeout = daTimeWaitTimeout, cmOutboundIdleTimeout = daProtocolIdleTimeout @@ -882,35 +880,33 @@ runM Interfaces :: NodeToNodePeerSelectionActions InitiatorMode ntnAddr m Void) -> - Async.withAsync - (Governor.peerSelectionGovernor - dtTracePeerSelectionTracer - dtDebugPeerSelectionInitiatorTracer - dtTracePeerSelectionCounters - fuzzRng - peerSelectionActions - (Diffusion.Policies.simplePeerSelectionPolicy - policyRngVar (readTVar churnModeVar) daPeerMetrics)) - $ \governorThread -> Async.withAsync - (Governor.peerChurnGovernor - dtTracePeerSelectionTracer - daPeerMetrics - churnModeVar - churnRng - daBlockFetchMode - daPeerSelectionTargets - peerSelectionTargetsVar) - $ \churnGovernorThread -> - - -- wait for any thread to fail - snd <$> Async.waitAny - (maybeToList mbLocalPeerSelectionActionsThread - ++ [ governorThread - , ledgerPeerThread - , churnGovernorThread - ]) - + (Governor.peerSelectionGovernor + dtTracePeerSelectionTracer + dtDebugPeerSelectionInitiatorTracer + dtTracePeerSelectionCounters + fuzzRng + peerSelectionActions + (Diffusion.Policies.simplePeerSelectionPolicy + policyRngVar (readTVar churnModeVar) daPeerMetrics)) + $ \governorThread -> + Async.withAsync + (Governor.peerChurnGovernor + dtTracePeerSelectionTracer + daPeerMetrics + churnModeVar + churnRng + daBlockFetchMode + daPeerSelectionTargets + peerSelectionTargetsVar) + $ \churnGovernorThread -> + -- wait for any thread to fail + snd <$> Async.waitAny + (maybeToList mbLocalPeerSelectionActionsThread + ++ [ governorThread + , ledgerPeerThread + , churnGovernorThread + ]) -- InitiatorResponderMode -- @@ -935,10 +931,7 @@ runM Interfaces cmAddressType = diNtnAddressType, cmSnocket = diNtnSnocket, connectionDataFlow = uncurry diNtnDataFlow, - cmPrunePolicy = - case cmdInMode of - HasInitiatorResponder (CMDInInitiatorResponderMode _ serverStateVar) -> - Diffusion.Policies.prunePolicy serverStateVar, + cmPrunePolicy = Diffusion.Policies.prunePolicy observableStateVar, cmConnectionsLimits = daAcceptedConnectionsLimit, cmTimeWaitTimeout = daTimeWaitTimeout, cmOutboundIdleTimeout = daProtocolIdleTimeout @@ -967,6 +960,7 @@ runM Interfaces InitiatorResponderMode ntnFd ntnAddr ntnVersion m () ) -> do diInstallSigUSR1Handler connectionManager + -- -- peer state actions -- @@ -985,7 +979,6 @@ runM Interfaces $ \(peerStateActions :: NodeToNodePeerStateActions InitiatorResponderMode ntnAddr m ()) -> - -- -- Run peer selection (p2p governor) -- @@ -1193,7 +1186,6 @@ run tracers tracersExtra args argsExtra apps appsExtra = do } tracers tracersExtra args argsExtra apps appsExtra - -- -- Data flow -- @@ -1321,3 +1313,4 @@ withLocalSocket tracer getFileDescriptor sn localAddress k = -- pre-configured systemd socket Left sd -> k sd +