From 26da4393e76efb5bba36ccb7ae474e3029304f63 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Wed, 10 Apr 2024 18:09:48 +0200 Subject: [PATCH] churn: implemented explicit synchronisation Chrun now explicitly synchronises with outbound governor using `PeerSelectionCounters`. Each churn action can timeout. Co-authored-by: Armando Santos (@bolt12) Co-authored-by: Marcin Szamotulski (@coot) --- ouroboros-network/CHANGELOG.md | 3 + .../Test/Ouroboros/Network/PeerSelection.hs | 6 + .../Network/PeerSelection/MockEnvironment.hs | 4 + .../Test/Ouroboros/Network/Testnet.hs | 4 + .../Network/Diffusion/Configuration.hs | 2 +- .../src/Ouroboros/Network/Diffusion/P2P.hs | 7 +- .../Ouroboros/Network/Diffusion/Policies.hs | 8 + .../Ouroboros/Network/PeerSelection/Churn.hs | 563 +++++++++++++----- .../Network/PeerSelection/Governor.hs | 19 +- .../Network/PeerSelection/Governor/Types.hs | 35 +- 10 files changed, 484 insertions(+), 167 deletions(-) diff --git a/ouroboros-network/CHANGELOG.md b/ouroboros-network/CHANGELOG.md index ca86781061..626be97cc3 100644 --- a/ouroboros-network/CHANGELOG.md +++ b/ouroboros-network/CHANGELOG.md @@ -11,6 +11,9 @@ ### Non-Breaking changes +* Improved Churn governor by synchronizing according to the counters instead + of relying on `threadDelay`. + ## 0.14.0.0 -- 2024-04-04 ### Breaking changes diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs index a052252c66..a82dcc6866 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs @@ -709,6 +709,8 @@ traceNum TraceBootstrapPeersFlagChangedWhilstInSensitiveState = 51 traceNum TraceUseBootstrapPeersChanged {} = 52 traceNum TraceOutboundGovernorCriticalFailure {} = 53 traceNum TraceDebugState {} = 54 +traceNum TraceChurnAction {} = 55 +traceNum TraceChurnTimeout {} = 56 allTraceNames :: Map Int String allTraceNames = @@ -768,6 +770,8 @@ allTraceNames = , (52, "TraceUseBootstrapPeersChanged") , (53, "TraceOutboundGovernorCriticalFailure") , (54, "TraceDebugState") + , (55, "TraceChurnAction") + , (56, "TraceChurnTimeout") ] @@ -3333,10 +3337,12 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap (ioDNSActions LookupReqAAndAAAA) $ \requestPublicRootPeers -> do publicStateVar <- makePublicPeerSelectionStateVar debugVar <- newTVarIO $ emptyPeerSelectionState (mkStdGen 42) [] + countersVar <- newTVarIO $ emptyPeerSelectionCounters [] peerSelectionGovernor tracer tracer tracer -- TODO: #3182 Rng seed should come from quickcheck. (mkStdGen 42) + countersVar publicStateVar debugVar actions diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs index 623b15860b..83162ca658 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs @@ -211,6 +211,7 @@ governorAction mockEnv = do usbVar <- playTimedScript (contramap TraceEnvSetUseBootstrapPeers tracerMockEnv) (useBootstrapPeers mockEnv) debugVar <- StrictTVar.newTVarIO (emptyPeerSelectionState (mkStdGen 42) []) + countersVar <- StrictTVar.newTVarIO (emptyPeerSelectionCounters []) policy <- mockPeerSelectionPolicy mockEnv actions <- mockPeerSelectionActions tracerMockEnv mockEnv (readTVar usbVar) (readTVar lsjVar) policy exploreRaces -- explore races within the governor @@ -221,6 +222,7 @@ governorAction mockEnv = do tracerDebugPeerSelection tracerTracePeerSelectionCounters (mkStdGen 42) + countersVar publicStateVar debugVar actions @@ -655,6 +657,8 @@ tracerTracePeerSelection = contramap f tracerTestTraceEvent f a@(TraceUseBootstrapPeersChanged !_) = GovernorEvent a f a@(TraceOutboundGovernorCriticalFailure !_) = GovernorEvent a f a@(TraceDebugState !_ !_) = GovernorEvent a + f a@(TraceChurnAction !_) = GovernorEvent a + f a@(TraceChurnTimeout !_) = GovernorEvent a tracerDebugPeerSelection :: Tracer (IOSim s) (DebugPeerSelection PeerAddr) tracerDebugPeerSelection = GovernorDebug `contramap` tracerTestTraceEvent diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet.hs index 30e5b29256..6270689ead 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet.hs @@ -1065,6 +1065,10 @@ prop_peer_selection_trace_coverage defaultBearerInfo diffScript = "TraceOutboundGovernorCriticalFailure" peerSelectionTraceMap TraceDebugState {} = "TraceDebugState" + peerSelectionTraceMap TraceChurnAction {} = + "TraceChurnTimeout" + peerSelectionTraceMap TraceChurnTimeout {} = + "TraceChurnTimeout" eventsSeenNames = map peerSelectionTraceMap events diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs index 41066d516e..c71e30dddd 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Configuration.hs @@ -62,7 +62,7 @@ defaultNumBootstrapPeers = DefaultNumBootstrapPeers 30 defaultPeerSelectionTargets :: PeerSelectionTargets defaultPeerSelectionTargets = PeerSelectionTargets { - targetNumberOfRootPeers = 85, + targetNumberOfRootPeers = 60, targetNumberOfKnownPeers = 85, targetNumberOfEstablishedPeers = 40, targetNumberOfActivePeers = 15, diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index 0733da0940..cb536adf85 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -104,7 +104,8 @@ import Ouroboros.Network.PeerSelection.Governor.Types (ChurnMode (ChurnModeNormal), DebugPeerSelection (..), PeerSelectionActions, PeerSelectionCounters (..), PeerSelectionPolicy (..), PeerSelectionState, - TracePeerSelection (..), emptyPeerSelectionState) + TracePeerSelection (..), emptyPeerSelectionCounters, + emptyPeerSelectionState) #ifdef POSIX import Ouroboros.Network.PeerSelection.Governor.Types (makeDebugPeerSelectionState) @@ -807,6 +808,8 @@ runM Interfaces min 2 (targetNumberOfActivePeers daPeerSelectionTargets) } + countersVar <- newTVarIO (emptyPeerSelectionCounters []) + -- Design notes: -- - We split the following code into two parts: -- - Part (a): plumb data flow (in particular arguments and tracersr) @@ -996,6 +999,7 @@ runM Interfaces peerSelectionTracer dtTracePeerSelectionCounters fuzzRng + countersVar daPublicPeerSelectionVar dbgVar peerSelectionActions @@ -1015,6 +1019,7 @@ runM Interfaces daBlockFetchMode daPeerSelectionTargets peerSelectionTargetsVar + (readTVar countersVar) daReadUseBootstrapPeers -- diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs index 3821c3f752..bc75772b8d 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs @@ -54,6 +54,14 @@ minChainSyncTimeout = 135 maxChainSyncTimeout :: DiffTime maxChainSyncTimeout = 269 +-- | Churn timeouts after 60s trying to establish a connection. +-- +-- This doesn't mean the connection is terminated after it, just churns moves +-- on. +-- +churnEstablishConnectionTimeout :: DiffTime +churnEstablishConnectionTimeout = 60 + -- | Number of events tracked by 'PeerMetrics'. This corresponds to one hour of -- blocks on mainnet. diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Churn.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Churn.hs index 2b882bd428..1aad8ae5fc 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Churn.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Churn.hs @@ -1,5 +1,7 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} -- | This subsystem manages the discovery and selection of /upstream/ peers. @@ -9,18 +11,26 @@ module Ouroboros.Network.PeerSelection.Churn (peerChurnGovernor) where import Data.Void (Void) import Control.Concurrent.Class.MonadSTM.Strict +import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI import Control.Tracer (Tracer (..), traceWith) import System.Random +import Control.Applicative (Alternative) +import Data.Functor (($>)) +import Data.Monoid.Synchronisation (FirstToFinish (..)) import Ouroboros.Network.BlockFetch (FetchMode (..)) -import Ouroboros.Network.Diffusion.Policies (closeConnectionTimeout) +import Ouroboros.Network.Diffusion.Policies (churnEstablishConnectionTimeout, + closeConnectionTimeout, deactivateTimeout) import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers (..)) import Ouroboros.Network.PeerSelection.Governor.Types hiding (targets) import Ouroboros.Network.PeerSelection.PeerMetric +type ModifyPeerSelectionTargets = PeerSelectionTargets -> PeerSelectionTargets +type CheckPeerSelectionCounters = PeerSelectionCounters -> PeerSelectionTargets -> Bool + -- | Churn governor. -- -- At every churn interval decrease active peers for a short while (1s), so that @@ -30,8 +40,10 @@ import Ouroboros.Network.PeerSelection.PeerMetric -- root peers. -- peerChurnGovernor :: forall m peeraddr. - ( MonadSTM m - , MonadDelay m + ( MonadDelay m + , Alternative (STM m) + , MonadTimer m + , MonadCatch m ) => Tracer m (TracePeerSelection peeraddr) -> DiffTime @@ -39,17 +51,22 @@ peerChurnGovernor :: forall m peeraddr. -> DiffTime -- ^ the base for churn interval in the bulk sync mode. -> DiffTime - -- ^ the max peersharing timeout + -- ^ the timeout for outbound governor to find new (thus + -- cold) peers through peer sharing mechanism. -> PeerMetrics m peeraddr -> StrictTVar m ChurnMode -> StdGen -> STM m FetchMode -> PeerSelectionTargets + -- ^ base targets; set in a configuration file -> StrictTVar m PeerSelectionTargets + -> STM m PeerSelectionCounters -> STM m UseBootstrapPeers -> m Void -peerChurnGovernor tracer deadlineChurnInterval bulkChurnInterval psOverallTimeout - _metrics churnModeVar inRng getFetchMode base peerSelectionVar +peerChurnGovernor tracer + deadlineChurnInterval bulkChurnInterval requestPeersTimeout + _metrics churnModeVar inRng getFetchMode base + peerSelectionVar readCounters getUseBootstrapPeers = do -- Wait a while so that not only the closest peers have had the time -- to become warm. @@ -61,10 +78,10 @@ peerChurnGovernor tracer deadlineChurnInterval bulkChurnInterval psOverallTimeou (mode, ubp) <- atomically ((,) <$> updateChurnMode <*> getUseBootstrapPeers) atomically $ do - increaseActivePeers mode - increaseEstablishedPeers mode ubp + modifyTVar peerSelectionVar ( increaseActivePeers mode + . increaseEstablishedPeers mode ubp ) endTs0 <- getMonotonicTime - fuzzyDelay inRng (endTs0 `diffTime` startTs0) >>= go + fuzzyDelay inRng (endTs0 `diffTime` startTs0) >>= churnLoop where @@ -72,169 +89,402 @@ peerChurnGovernor tracer deadlineChurnInterval bulkChurnInterval psOverallTimeou updateChurnMode = do fm <- getFetchMode let mode = case fm of - FetchModeDeadline -> ChurnModeNormal - FetchModeBulkSync -> ChurnModeBulkSync + FetchModeDeadline -> ChurnModeNormal + FetchModeBulkSync -> ChurnModeBulkSync writeTVar churnModeVar mode return mode + -- | Update the targets to a given value, and block until they are reached. + -- The time we are blocked is limited by a timeout. + -- + updateTargets + :: ChurnAction + -- ^ churn actions for tracing + -> DiffTime + -- ^ timeout + -> ModifyPeerSelectionTargets + -- ^ update counters function + -> CheckPeerSelectionCounters + -- ^ check counters + -> m () + updateTargets churnAction timeoutDelay modifyTargets checkCounters = do + -- update targets, and return the new targets + targets <- atomically $ stateTVar peerSelectionVar ((\a -> (a, a)) . modifyTargets) + + -- create timeout and block on counters + bracketOnError (registerDelayCancellable timeoutDelay) + (\(_readTimeout, cancelTimeout) -> cancelTimeout) + (\( readTimeout, cancelTimeout) -> do + -- block until counters reached the targets, or the timeout fires + a <- atomically $ runFirstToFinish $ + FirstToFinish ((readCounters>>= check . flip checkCounters targets) $> True) + <> + FirstToFinish (readTimeout >>= \case TimeoutPending -> retry + _ -> pure False) + if a + then cancelTimeout + >> traceWith tracer (TraceChurnAction churnAction) + else traceWith tracer (TraceChurnTimeout churnAction) + ) + + -- + -- Functions to modify `PeerSelectionTargets` and check + -- `PeerSelectionCounters`. + -- + -- TODO: #3396 revisit the policy for genesis - increaseActivePeers :: ChurnMode -> STM m () - increaseActivePeers mode = - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfActivePeers = - case mode of - ChurnModeNormal -> - targetNumberOfActivePeers base - ChurnModeBulkSync -> - min 2 (targetNumberOfActivePeers base) - }) - - decreaseActivePeers :: ChurnMode -> STM m () - decreaseActivePeers mode = - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfActivePeers = - case mode of - ChurnModeNormal -> - decrease $ targetNumberOfActivePeers base - ChurnModeBulkSync -> - min 1 (targetNumberOfActivePeers base - 1) - }) - - increaseEstablishedPeers :: ChurnMode -> UseBootstrapPeers -> STM m () - increaseEstablishedPeers mode ubp = - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfEstablishedPeers = - case (mode, ubp) of - (ChurnModeBulkSync, UseBootstrapPeers _) -> - min (targetNumberOfActivePeers targets + 1) - (targetNumberOfEstablishedPeers base) - _ -> targetNumberOfEstablishedPeers base - }) - - decreaseEstablished :: ChurnMode -> UseBootstrapPeers -> STM m () - decreaseEstablished mode ubp = - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfEstablishedPeers = - case (mode, ubp) of - (ChurnModeBulkSync, UseBootstrapPeers _) -> - min (targetNumberOfActivePeers targets) (targetNumberOfEstablishedPeers base - 1) - _ -> decrease (targetNumberOfEstablishedPeers base - targetNumberOfActivePeers base) - + targetNumberOfActivePeers base - }) - - increaseActiveBigLedgerPeers :: ChurnMode -> STM m () - increaseActiveBigLedgerPeers mode = - modifyTVar peerSelectionVar (\targets -> targets { - -- TODO: when chain-skipping will be implemented and chain-sync client - -- will take into account big ledger peers, we don't need pattern - -- match on the churn mode, but use - -- `targetNumberOfActiveBigLedgerPeers` (issue #4609). - targetNumberOfActiveBigLedgerPeers = - case mode of - ChurnModeNormal -> - targetNumberOfActiveBigLedgerPeers base - ChurnModeBulkSync -> - min 1 (targetNumberOfActiveBigLedgerPeers base) - }) - - decreaseActiveBigLedgerPeers :: ChurnMode -> STM m () - decreaseActiveBigLedgerPeers mode = - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfActiveBigLedgerPeers = - case mode of - ChurnModeNormal -> - decrease $ targetNumberOfActiveBigLedgerPeers base - ChurnModeBulkSync -> - min 1 (targetNumberOfActiveBigLedgerPeers base) - }) - - decreaseEstablishedBigLedgerPeers :: STM m () - decreaseEstablishedBigLedgerPeers = - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfEstablishedBigLedgerPeers = - decrease (targetNumberOfEstablishedBigLedgerPeers base - - targetNumberOfActiveBigLedgerPeers base) - + targetNumberOfActiveBigLedgerPeers base - }) - - - - go :: StdGen -> m Void - go !rng = do + increaseActivePeers :: ChurnMode + -> ModifyPeerSelectionTargets + increaseActivePeers + mode targets + = + targets { + targetNumberOfActivePeers = + case mode of + ChurnModeNormal -> + targetNumberOfActivePeers base + ChurnModeBulkSync -> + min 2 (targetNumberOfActivePeers base) + } + + checkActivePeersIncreased :: CheckPeerSelectionCounters + checkActivePeersIncreased + PeerSelectionCounters { numberOfActivePeers } + PeerSelectionTargets { targetNumberOfActivePeers } + = + numberOfActivePeers >= targetNumberOfActivePeers + + + decreaseActivePeers :: ChurnMode -> ModifyPeerSelectionTargets + decreaseActivePeers mode targets = + targets { + targetNumberOfActivePeers = + case mode of + ChurnModeNormal -> + decrease $ targetNumberOfActivePeers base + ChurnModeBulkSync -> + min 1 (targetNumberOfActivePeers base - 1) + } + + checkActivePeersDecreased :: CheckPeerSelectionCounters + checkActivePeersDecreased + PeerSelectionCounters { numberOfActivePeers, numberOfActivePeersDemotions } + PeerSelectionTargets { targetNumberOfActivePeers } + = + numberOfActivePeers + - numberOfActivePeersDemotions + <= targetNumberOfActivePeers + + + increaseEstablishedPeers + :: ChurnMode -> UseBootstrapPeers + -> ModifyPeerSelectionTargets + increaseEstablishedPeers + mode ubp targets + = + targets { + targetNumberOfEstablishedPeers = + case (mode, ubp) of + (ChurnModeBulkSync, UseBootstrapPeers _) -> + min (targetNumberOfActivePeers targets + 1) + (targetNumberOfEstablishedPeers base) + _ -> targetNumberOfEstablishedPeers base + } + + checkEstablishedPeersIncreased :: CheckPeerSelectionCounters + checkEstablishedPeersIncreased + PeerSelectionCounters { numberOfEstablishedPeers, + numberOfColdPeersPromotions } + PeerSelectionTargets { targetNumberOfEstablishedPeers } + = + numberOfEstablishedPeers + + numberOfColdPeersPromotions + >= targetNumberOfEstablishedPeers + + + increaseEstablishedBigLedgerPeers + :: ModifyPeerSelectionTargets + increaseEstablishedBigLedgerPeers + targets + = + targets { targetNumberOfEstablishedBigLedgerPeers = targetNumberOfEstablishedBigLedgerPeers base } + + checkEstablishedBigLedgerPeersIncreased + :: CheckPeerSelectionCounters + checkEstablishedBigLedgerPeersIncreased + PeerSelectionCounters { numberOfEstablishedBigLedgerPeers } + PeerSelectionTargets { targetNumberOfEstablishedBigLedgerPeers } + = + numberOfEstablishedBigLedgerPeers >= targetNumberOfEstablishedBigLedgerPeers + + + decreaseEstablishedPeers + :: ChurnMode -> UseBootstrapPeers + -> ModifyPeerSelectionTargets + decreaseEstablishedPeers mode ubp targets = + targets { + targetNumberOfEstablishedPeers = + case (mode, ubp) of + (ChurnModeBulkSync, UseBootstrapPeers _) -> + min (targetNumberOfActivePeers targets) + (targetNumberOfEstablishedPeers base - 1) + _ -> decrease (targetNumberOfEstablishedPeers base - targetNumberOfActivePeers base) + + targetNumberOfActivePeers base + } + + checkEstablishedPeersDecreased + :: CheckPeerSelectionCounters + checkEstablishedPeersDecreased + PeerSelectionCounters { numberOfEstablishedPeers, + numberOfWarmPeersDemotions } + PeerSelectionTargets { targetNumberOfEstablishedPeers } + = + numberOfEstablishedPeers + - numberOfWarmPeersDemotions + <= targetNumberOfEstablishedPeers + + + increaseActiveBigLedgerPeers + :: ChurnMode -> ModifyPeerSelectionTargets + increaseActiveBigLedgerPeers mode targets = + targets { + -- TODO: when chain-skipping will be implemented and chain-sync client + -- will take into account big ledger peers, we don't need pattern + -- match on the churn mode, but use + -- `targetNumberOfActiveBigLedgerPeers` (issue #4609). + targetNumberOfActiveBigLedgerPeers = + case mode of + ChurnModeNormal -> + targetNumberOfActiveBigLedgerPeers base + ChurnModeBulkSync -> + min 1 (targetNumberOfActiveBigLedgerPeers base) + } + + checkActiveBigLedgerPeersIncreased + :: CheckPeerSelectionCounters + checkActiveBigLedgerPeersIncreased + PeerSelectionCounters { numberOfActiveBigLedgerPeers } + PeerSelectionTargets { targetNumberOfActiveBigLedgerPeers } + = + numberOfActiveBigLedgerPeers >= targetNumberOfActiveBigLedgerPeers + + + decreaseActiveBigLedgerPeers + :: ChurnMode + -> ModifyPeerSelectionTargets + decreaseActiveBigLedgerPeers mode targets = + targets { + targetNumberOfActiveBigLedgerPeers = + case mode of + ChurnModeNormal -> + decrease $ targetNumberOfActiveBigLedgerPeers base + ChurnModeBulkSync -> + min 1 (targetNumberOfActiveBigLedgerPeers base) + } + + checkActiveBigLedgerPeersDecreased + :: CheckPeerSelectionCounters + checkActiveBigLedgerPeersDecreased + PeerSelectionCounters { numberOfActiveBigLedgerPeers, + numberOfActiveBigLedgerPeersDemotions } + PeerSelectionTargets { targetNumberOfActiveBigLedgerPeers } + = + numberOfActiveBigLedgerPeers + - numberOfActiveBigLedgerPeersDemotions + <= targetNumberOfActiveBigLedgerPeers + + + decreaseEstablishedBigLedgerPeers + :: ModifyPeerSelectionTargets + decreaseEstablishedBigLedgerPeers targets = + targets { + targetNumberOfEstablishedBigLedgerPeers = + decrease (targetNumberOfEstablishedBigLedgerPeers base - + targetNumberOfActiveBigLedgerPeers base) + + targetNumberOfActiveBigLedgerPeers base + } + + checkEstablishedBigLedgerPeersDecreased + :: CheckPeerSelectionCounters + checkEstablishedBigLedgerPeersDecreased + PeerSelectionCounters { numberOfEstablishedBigLedgerPeers, + numberOfWarmBigLedgerPeersDemotions } + PeerSelectionTargets { targetNumberOfEstablishedBigLedgerPeers } + = + numberOfEstablishedBigLedgerPeers + - numberOfWarmBigLedgerPeersDemotions + <= targetNumberOfEstablishedBigLedgerPeers + + + decreaseKnownPeers + :: ModifyPeerSelectionTargets + decreaseKnownPeers targets = + targets { + targetNumberOfRootPeers = + decrease (targetNumberOfRootPeers base - targetNumberOfEstablishedPeers base) + + targetNumberOfEstablishedPeers base + , targetNumberOfKnownPeers = + decrease (targetNumberOfKnownPeers base - targetNumberOfEstablishedPeers base) + + targetNumberOfEstablishedPeers base + } + + checkKnownPeersDecreased + :: PeerSelectionCounters -> PeerSelectionTargets -> Bool + checkKnownPeersDecreased + PeerSelectionCounters { numberOfKnownPeers } + PeerSelectionTargets { targetNumberOfKnownPeers } + = + -- note: we are not checking target root peers, since it is a one-sided + -- target + numberOfKnownPeers <= targetNumberOfKnownPeers + + decreaseKnownBigLedgerPeers + :: ModifyPeerSelectionTargets + decreaseKnownBigLedgerPeers targets = + targets { + targetNumberOfKnownBigLedgerPeers = + decrease (targetNumberOfKnownBigLedgerPeers base - + targetNumberOfEstablishedBigLedgerPeers base) + + targetNumberOfEstablishedBigLedgerPeers base + } + + checkKnownBigLedgerPeersDecreased + :: PeerSelectionCounters -> PeerSelectionTargets -> Bool + checkKnownBigLedgerPeersDecreased + PeerSelectionCounters { numberOfKnownBigLedgerPeers } + PeerSelectionTargets { targetNumberOfKnownBigLedgerPeers } + = numberOfKnownBigLedgerPeers <= targetNumberOfKnownBigLedgerPeers + + + increaseKnownPeers + :: ModifyPeerSelectionTargets + increaseKnownPeers targets = + targets { + targetNumberOfRootPeers = targetNumberOfRootPeers base + , targetNumberOfKnownPeers = targetNumberOfKnownPeers base + } + + checkKnownPeersIncreased + :: CheckPeerSelectionCounters + checkKnownPeersIncreased + PeerSelectionCounters { numberOfRootPeers, + numberOfKnownPeers } + PeerSelectionTargets { targetNumberOfRootPeers, + targetNumberOfKnownPeers } + = + numberOfRootPeers >= targetNumberOfRootPeers + && numberOfKnownPeers >= targetNumberOfKnownPeers + + + increaseKnownBigLedgerPeers + :: ModifyPeerSelectionTargets + increaseKnownBigLedgerPeers targets = + targets { + targetNumberOfKnownBigLedgerPeers = targetNumberOfKnownBigLedgerPeers base + } + + checkKnownBigLedgerPeersIncreased + :: CheckPeerSelectionCounters + checkKnownBigLedgerPeersIncreased + PeerSelectionCounters { numberOfKnownBigLedgerPeers } + PeerSelectionTargets { targetNumberOfKnownBigLedgerPeers } + = + numberOfKnownBigLedgerPeers >= targetNumberOfKnownBigLedgerPeers + + + -- + -- Main loop + -- + + churnLoop :: StdGen -> m Void + churnLoop !rng = do startTs <- getMonotonicTime (churnMode, ubp) <- atomically ((,) <$> updateChurnMode <*> getUseBootstrapPeers) traceWith tracer $ TraceChurnMode churnMode - atomically $ do - -- Purge the worst active peer(s). - decreaseActivePeers churnMode + -- Purge the worst active peers. + updateTargets DecreasedActivePeers + deactivateTimeout -- chainsync might timeout after 5mins + (decreaseActivePeers churnMode) + checkActivePeersDecreased + + -- Pick new active peers. + updateTargets IncreasedActivePeers + shortTimeout + (increaseActivePeers churnMode) + checkActivePeersIncreased + + -- Purge the worst active big ledger peers. + updateTargets DecreasedActiveBigLedgerPeers + deactivateTimeout -- chainsync might timeout after 5mins + (decreaseActiveBigLedgerPeers churnMode) + (checkActiveBigLedgerPeersDecreased) + + -- Pick new active big ledger peers. + updateTargets IncreasedActiveBigLedgerPeers + shortTimeout + (increaseActiveBigLedgerPeers churnMode) + checkActiveBigLedgerPeersIncreased - -- Short delay, we may have no active peers right now - threadDelay 1 - - atomically $ do - -- Pick new active peer(s). - increaseActivePeers churnMode - - -- Purge the worst active big ledger peer(s). - decreaseActiveBigLedgerPeers churnMode + -- Forget the worst performing established peers. + updateTargets DecreasedEstablishedPeers + (1 + closeConnectionTimeout) + (decreaseEstablishedPeers churnMode ubp) + (checkEstablishedPeersDecreased) + + -- Forget the worst performing established big ledger peers. + updateTargets DecreasedEstablishedBigLedgerPeers + (1 + closeConnectionTimeout) + decreaseEstablishedBigLedgerPeers + checkEstablishedBigLedgerPeersDecreased + + -- Forget the worst performing known peers (root peers, ledger peers) + updateTargets DecreasedKnownPeers + shortTimeout + decreaseKnownPeers + checkKnownPeersDecreased - -- Short delay, we may have no active big ledger peers right now - threadDelay 1 + -- Pick new known peers + updateTargets IncreasedKnownPeers + (2 * requestPeersTimeout + shortTimeout) + increaseKnownPeers + checkKnownPeersIncreased + + -- Forget the worst performing known big ledger peers. + updateTargets DecreasedKnownBigLedgerPeers + shortTimeout + decreaseKnownBigLedgerPeers + checkKnownBigLedgerPeersDecreased + + -- Pick new known big ledger peers + updateTargets IncreasedKnownBigLedgerPeers + (2 * requestPeersTimeout + shortTimeout) + increaseKnownBigLedgerPeers + checkKnownBigLedgerPeersIncreased - -- Pick new active peer(s). - atomically $ increaseActiveBigLedgerPeers churnMode + -- Pick new non-active peers + updateTargets IncreasedEstablishedPeers + churnEstablishConnectionTimeout + (increaseEstablishedPeers churnMode ubp) + checkEstablishedPeersIncreased - -- Give the promotion process time to start - threadDelay 1 + -- Pick new non-active big ledger peers + updateTargets IncreasedEstablishedBigLedgerPeers + churnEstablishConnectionTimeout + increaseEstablishedBigLedgerPeers + checkEstablishedBigLedgerPeersIncreased - -- Forget the worst performing established peers. - atomically $ do - decreaseEstablished churnMode ubp - decreaseEstablishedBigLedgerPeers - - -- Give the governor time to properly demote them. - threadDelay $ 1 + closeConnectionTimeout - - -- Forget the worst performing known peers - atomically $ - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfRootPeers = - decrease (targetNumberOfRootPeers base - targetNumberOfEstablishedPeers base) - + targetNumberOfEstablishedPeers base - , targetNumberOfKnownPeers = - decrease (targetNumberOfKnownPeers base - targetNumberOfEstablishedPeers base) - + targetNumberOfEstablishedPeers base - , targetNumberOfKnownBigLedgerPeers = - decrease (targetNumberOfKnownBigLedgerPeers base - - targetNumberOfEstablishedBigLedgerPeers base) - + targetNumberOfEstablishedBigLedgerPeers base - }) - - -- Forgetting cold peers should be quick - threadDelay 1 + endTs <- getMonotonicTime - -- Pick new known peers - atomically $ modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfRootPeers = targetNumberOfRootPeers base - , targetNumberOfKnownPeers = targetNumberOfKnownPeers base - , targetNumberOfKnownBigLedgerPeers = targetNumberOfKnownBigLedgerPeers base - }) + fuzzyDelay rng (endTs `diffTime` startTs) >>= churnLoop - -- Give the governor time to find some new peers - threadDelay $ 1 + psOverallTimeout - -- Pick new non-active peers - atomically $ do - increaseEstablishedPeers churnMode ubp - modifyTVar peerSelectionVar (\targets -> targets { - targetNumberOfEstablishedBigLedgerPeers = targetNumberOfEstablishedBigLedgerPeers base - }) - endTs <- getMonotonicTime + -- + -- Auxiliary functions and constants + -- - fuzzyDelay rng (endTs `diffTime` startTs) >>= go -- Randomly delay between churnInterval and churnInterval + maxFuzz seconds. fuzzyDelay :: StdGen -> DiffTime -> m StdGen @@ -244,6 +494,7 @@ peerChurnGovernor tracer deadlineChurnInterval bulkChurnInterval psOverallTimeou FetchModeDeadline -> longDelay rng execTime FetchModeBulkSync -> shortDelay rng execTime + fuzzyDelay' :: DiffTime -> Double -> StdGen -> DiffTime -> m StdGen fuzzyDelay' baseDelay maxFuzz rng execTime = do let (fuzz, rng') = randomR (0, maxFuzz) rng @@ -260,9 +511,9 @@ peerChurnGovernor tracer deadlineChurnInterval bulkChurnInterval psOverallTimeou shortDelay :: StdGen -> DiffTime -> m StdGen shortDelay = fuzzyDelay' bulkChurnInterval 60 + shortTimeout :: DiffTime + shortTimeout = 1 + -- Replace 20% or at least one peer every churnInterval. decrease :: Int -> Int decrease v = max 0 $ v - max 1 (v `div` 5) - - - diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs index dd7dac8ece..cb9f8ef89b 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs @@ -16,6 +16,7 @@ module Ouroboros.Network.PeerSelection.Governor , PeerSelectionActions (..) , PeerStateActions (..) , TracePeerSelection (..) + , ChurnAction (..) , DebugPeerSelection (..) , DebugPeerSelectionState (..) , peerSelectionGovernor @@ -29,6 +30,7 @@ module Ouroboros.Network.PeerSelection.Governor , PublicPeerSelectionState (..) , makePublicPeerSelectionStateVar , PeerSelectionCounters (..) + , emptyPeerSelectionCounters , nullPeerSelectionTargets , emptyPeerSelectionState , ChurnMode (..) @@ -447,12 +449,13 @@ peerSelectionGovernor :: ( Alternative (STM m) -> Tracer m (DebugPeerSelection peeraddr) -> Tracer m PeerSelectionCounters -> StdGen + -> StrictTVar m PeerSelectionCounters -> StrictTVar m (PublicPeerSelectionState peeraddr) -> StrictTVar m (PeerSelectionState peeraddr peerconn) -> PeerSelectionActions peeraddr peerconn m -> PeerSelectionPolicy peeraddr m -> m Void -peerSelectionGovernor tracer debugTracer countersTracer fuzzRng publicStateVar debugStateVar actions policy = +peerSelectionGovernor tracer debugTracer countersTracer fuzzRng countersVar publicStateVar debugStateVar actions policy = JobPool.withJobPool $ \jobPool -> do localPeers <- map (\(w, h, _) -> (w, h)) <$> atomically (readLocalRootPeers actions) @@ -460,6 +463,7 @@ peerSelectionGovernor tracer debugTracer countersTracer fuzzRng publicStateVar d tracer debugTracer countersTracer + countersVar publicStateVar debugStateVar actions @@ -495,6 +499,7 @@ peerSelectionGovernorLoop :: forall m peeraddr peerconn. => Tracer m (TracePeerSelection peeraddr) -> Tracer m (DebugPeerSelection peeraddr) -> Tracer m PeerSelectionCounters + -> StrictTVar m PeerSelectionCounters -> StrictTVar m (PublicPeerSelectionState peeraddr) -> StrictTVar m (PeerSelectionState peeraddr peerconn) -> PeerSelectionActions peeraddr peerconn m @@ -505,6 +510,7 @@ peerSelectionGovernorLoop :: forall m peeraddr peerconn. peerSelectionGovernorLoop tracer debugTracer countersTracer + countersVar publicStateVar debugStateVar actions @@ -552,11 +558,20 @@ peerSelectionGovernorLoop tracer let Decision { decisionTrace, decisionJobs, decisionState } = timedDecision now !newCounters = peerStateToCounters decisionState - traverse_ (traceWith tracer) decisionTrace + + atomically $ do + -- Update counters + withCacheA (countersCache decisionState) + newCounters + (writeTVar countersVar) + + -- Trace counters traceWithCache countersTracer (countersCache decisionState) newCounters + traverse_ (traceWith tracer) decisionTrace + mapM_ (JobPool.forkJob jobPool) decisionJobs loop (decisionState { countersCache = Cache newCounters }) dbgUpdateAt' diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs index ca77425081..565b7cfd3d 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs @@ -44,6 +44,7 @@ module Ouroboros.Network.PeerSelection.Governor.Types , PeerSharingResult (..) -- * Traces , TracePeerSelection (..) + , ChurnAction (..) , DebugPeerSelection (..) -- * Error types , BootstrapPeersCriticalTimeoutError (..) @@ -762,8 +763,7 @@ peerStateToCounters PeerSelectionState { knownPeers, inProgressDemoteWarm, inProgressDemoteHot } = PeerSelectionCounters { - numberOfRootPeers = LocalRootPeers.size localRootPeers - + PublicRootPeers.size publicRootPeers, + numberOfRootPeers = Set.size rootPeersSet, numberOfKnownPeers = Set.size knownPeersSet, numberOfColdPeersPromotions = Set.size $ inProgressPromoteCold Set.\\ bigLedgerSet, @@ -816,6 +816,10 @@ peerStateToCounters PeerSelectionState { knownPeers, establishedSet = EstablishedPeers.toSet establishedPeers bigLedgerSet = PublicRootPeers.getBigLedgerPeers publicRootPeers + -- root peers + rootPeersSet = PublicRootPeers.toSet publicRootPeers + <> localRootSet + -- non big ledger peers knownPeersSet = knownSet Set.\\ bigLedgerSet establishedPeersSet = establishedSet Set.\\ bigLedgerSet @@ -846,13 +850,11 @@ peerStateToCounters PeerSelectionState { knownPeers, activeBootstrapPeersSet = activePeersSet `Set.intersection` bootstrapSet -- shared peers - rootSet = PublicRootPeers.toSet publicRootPeers - <> localRootSet -- shared peers are not big ledger peers, hence we can use `knownPeersSet`, -- `establishedPeersSet` and `activePeersSet` below. - knownSharedPeersSet = knownPeersSet Set.\\ rootSet - establishedSharedPeersSet = establishedPeersSet Set.\\ rootSet - activeSharedPeersSet = activePeersSet Set.\\ rootSet + knownSharedPeersSet = knownPeersSet Set.\\ rootPeersSet + establishedSharedPeersSet = establishedPeersSet Set.\\ rootPeersSet + activeSharedPeersSet = activePeersSet Set.\\ rootPeersSet @@ -1364,6 +1366,9 @@ data TracePeerSelection peeraddr = | TraceChurnWait DiffTime | TraceChurnMode ChurnMode + | TraceChurnAction ChurnAction + | TraceChurnTimeout ChurnAction + | TraceLedgerStateJudgementChanged LedgerStateJudgement | TraceOnlyBootstrapPeers | TraceBootstrapPeersFlagChangedWhilstInSensitiveState @@ -1381,6 +1386,22 @@ data TracePeerSelection peeraddr = | TraceDebugState Time (DebugPeerSelectionState peeraddr) deriving Show + +data ChurnAction = DecreasedActivePeers + | IncreasedActivePeers + | DecreasedActiveBigLedgerPeers + | IncreasedActiveBigLedgerPeers + | DecreasedEstablishedPeers + | IncreasedEstablishedPeers + | IncreasedEstablishedBigLedgerPeers + | DecreasedEstablishedBigLedgerPeers + | DecreasedKnownPeers + | IncreasedKnownPeers + | DecreasedKnownBigLedgerPeers + | IncreasedKnownBigLedgerPeers + deriving (Eq, Show) + + data BootstrapPeersCriticalTimeoutError = BootstrapPeersCriticalTimeoutError deriving (Eq, Show)