Skip to content

Commit

Permalink
outbound-governor: light peer sharing using readInboundPeers API
Browse files Browse the repository at this point in the history
This patch also update `PeerSelection` tests to cover inbound peers.
  • Loading branch information
coot committed May 6, 2024
1 parent c3aa384 commit 56e3c4a
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ traceNum TraceDemoteWarmBigLedgerPeerDone{} = 43
traceNum TraceDemoteHotBigLedgerPeers{} = 44
traceNum TraceDemoteHotBigLedgerPeerFailed{} = 45
traceNum TraceDemoteHotBigLedgerPeerDone{} = 46
traceNum TraceKnownInboundConnection{} = 47
traceNum TracePickInboundPeers{} = 47
traceNum TraceDemoteBigLedgerPeersAsynchronous{} = 48
traceNum TraceLedgerStateJudgementChanged{} = 49
traceNum TraceOnlyBootstrapPeers{} = 50
Expand Down Expand Up @@ -931,7 +931,7 @@ allTraceNames =
, (44, "TraceDemoteHotBigLedgerPeers")
, (45, "TraceDemoteHotBigLedgerPeerFailed")
, (46, "TraceDemoteHotBigLedgerPeerDone")
, (47, "TraceKnownInboundConnection")
, (47, "TracePickInboundPeers")
, (48, "TraceDemoteBigLedgerPeersAsynchronous")
, (49, "TraceLedgerStateJudgementChanged")
, (50, "TraceOnlyBootstrapPeers")
Expand Down Expand Up @@ -3657,6 +3657,7 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap
policyPickWarmPeersToPromote = \_ _ _ -> pickTrivially,
policyPickHotPeersToDemote = \_ _ _ -> pickTrivially,
policyPickWarmPeersToDemote = \_ _ _ -> pickTrivially,
policyPickInboundPeers = \_ _ _ -> pickTrivially,
policyFindPublicRootTimeout = 5,
policyMaxInProgressPeerShareReqs = 0,
policyPeerShareRetryTime = 0, -- seconds
Expand Down Expand Up @@ -3701,6 +3702,7 @@ prop_issue_3550 = prop_governor_target_established_below defaultMaxTime $
pickHotPeersToDemote = Script (PickSome (Set.fromList [PeerAddr 29]) :| []),
pickWarmPeersToDemote = Script (PickFirst :| []),
pickColdPeersToForget = Script (PickFirst :| []),
pickInboundPeers = Script (PickFirst :| []),
peerSharingFlag = PeerSharingEnabled,
useBootstrapPeers = Script ((DontUseBootstrapPeers, NoDelay) :| []),
useLedgerPeers = Script ((UseLedgerPeers Always, NoDelay) :| []),
Expand Down Expand Up @@ -3738,6 +3740,7 @@ prop_issue_3515 = prop_governor_nolivelock $
pickHotPeersToDemote = Script (PickFirst :| []),
pickWarmPeersToDemote = Script (PickFirst :| []),
pickColdPeersToForget = Script (PickFirst :| []),
pickInboundPeers = Script (PickFirst :| []),
peerSharingFlag = PeerSharingEnabled,
useBootstrapPeers = Script ((DontUseBootstrapPeers, NoDelay) :| []),
useLedgerPeers = Script ((UseLedgerPeers Always, NoDelay) :| []),
Expand Down Expand Up @@ -3775,6 +3778,7 @@ prop_issue_3494 = prop_governor_nofail $
pickHotPeersToDemote = Script (PickFirst :| []),
pickWarmPeersToDemote = Script (PickFirst :| []),
pickColdPeersToForget = Script (PickFirst :| []),
pickInboundPeers = Script (PickFirst :| []),
peerSharingFlag = PeerSharingEnabled,
useBootstrapPeers = Script ((DontUseBootstrapPeers, NoDelay) :| []),
useLedgerPeers = Script ((UseLedgerPeers Always, NoDelay) :| []),
Expand Down Expand Up @@ -3828,6 +3832,7 @@ prop_issue_3233 = prop_governor_nolivelock $
pickHotPeersToDemote = Script (PickFirst :| []),
pickWarmPeersToDemote = Script (PickFirst :| []),
pickColdPeersToForget = Script (PickFirst :| []),
pickInboundPeers = Script (PickFirst :| []),
peerSharingFlag = PeerSharingEnabled,
useBootstrapPeers = Script ((DontUseBootstrapPeers, NoDelay) :| []),
useLedgerPeers = Script ((UseLedgerPeers Always, NoDelay) :| []),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ data GovernorMockEnvironment = GovernorMockEnvironment {
pickHotPeersToDemote :: !(PickScript PeerAddr),
pickWarmPeersToDemote :: !(PickScript PeerAddr),
pickColdPeersToForget :: !(PickScript PeerAddr),
pickInboundPeers :: !(PickScript PeerAddr),
peerSharingFlag :: !PeerSharing,
useBootstrapPeers :: !(TimedScript UseBootstrapPeers),
useLedgerPeers :: !(TimedScript UseLedgerPeers),
Expand Down Expand Up @@ -592,21 +593,24 @@ mockPeerSelectionPolicy GovernorMockEnvironment {
pickWarmPeersToPromote,
pickHotPeersToDemote,
pickWarmPeersToDemote,
pickColdPeersToForget
pickColdPeersToForget,
pickInboundPeers
} = do
pickKnownPeersForPeerShareVar <- initScript' pickKnownPeersForPeerShare
pickColdPeersToPromoteVar <- initScript' pickColdPeersToPromote
pickWarmPeersToPromoteVar <- initScript' pickWarmPeersToPromote
pickHotPeersToDemoteVar <- initScript' pickHotPeersToDemote
pickWarmPeersToDemoteVar <- initScript' pickWarmPeersToDemote
pickColdPeersToForgetVar <- initScript' pickColdPeersToForget
pickInboundPeersVar <- initScript' pickInboundPeers
return PeerSelectionPolicy {
policyPickKnownPeersForPeerShare = \_ _ _ -> interpretPickScript pickKnownPeersForPeerShareVar,
policyPickColdPeersToPromote = \_ _ _ -> interpretPickScript pickColdPeersToPromoteVar,
policyPickWarmPeersToPromote = \_ _ _ -> interpretPickScript pickWarmPeersToPromoteVar,
policyPickHotPeersToDemote = \_ _ _ -> interpretPickScript pickHotPeersToDemoteVar,
policyPickWarmPeersToDemote = \_ _ _ -> interpretPickScript pickWarmPeersToDemoteVar,
policyPickColdPeersToForget = \_ _ _ -> interpretPickScript pickColdPeersToForgetVar,
policyPickInboundPeers = \_ _ _ -> interpretPickScript pickInboundPeersVar,
policyFindPublicRootTimeout = 5, -- seconds
policyMaxInProgressPeerShareReqs = 2,
policyPeerShareRetryTime = 3600, -- seconds
Expand Down Expand Up @@ -649,10 +653,10 @@ tracerTracePeerSelection = contramap f tracerTestTraceEvent
f a@(TraceBigLedgerPeersResults !_ !_ !_) = GovernorEvent a
f a@(TraceBigLedgerPeersFailure !_ !_ !_) = GovernorEvent a
f a@(TraceForgetBigLedgerPeers !_ !_ !_) = GovernorEvent a
f a@(TracePickInboundPeers !_ !_ !_ !_) = GovernorEvent a
f a@(TracePeerShareRequests !_ !_ !_ !_ !_) = GovernorEvent a
f a@(TracePeerShareResults !_) = GovernorEvent a
f a@(TracePeerShareResultsFiltered !_) = GovernorEvent a
f a@(TraceKnownInboundConnection !_ !_) = GovernorEvent a
f a@(TracePromoteColdPeers !_ !_ !_) = GovernorEvent a
f a@(TracePromoteColdLocalPeers !_ !_) = GovernorEvent a
f a@(TracePromoteColdFailed !_ !_ !_ !_ !_) = GovernorEvent a
Expand Down Expand Up @@ -790,6 +794,7 @@ instance Arbitrary GovernorMockEnvironment where
pickHotPeersToDemote <- arbitraryPickScript arbitrarySubsetOfPeers
pickWarmPeersToDemote <- arbitraryPickScript arbitrarySubsetOfPeers
pickColdPeersToForget <- arbitraryPickScript arbitrarySubsetOfPeers
pickInboundPeers <- arbitraryPickScript arbitrarySubsetOfPeers
peerSharingFlag <- arbitrary
useBootstrapPeers <- arbitrary
useLedgerPeers <- arbitrary
Expand Down Expand Up @@ -872,6 +877,7 @@ instance Arbitrary GovernorMockEnvironment where
pickHotPeersToDemote,
pickWarmPeersToDemote,
pickColdPeersToForget,
pickInboundPeers,
peerSharingFlag,
useBootstrapPeers,
useLedgerPeers,
Expand Down Expand Up @@ -914,6 +920,9 @@ instance Arbitrary GovernorMockEnvironment where
++ [ env { pickColdPeersToForget = pickColdPeersToForget' }
| pickColdPeersToForget' <- shrink pickColdPeersToForget
]
++ [ env { pickInboundPeers = pickInboundPeers' }
| pickInboundPeers' <- shrink pickInboundPeers
]
++ [ env { useBootstrapPeers = useBootstrapPeers' }
| useBootstrapPeers' <- shrink useBootstrapPeers
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,8 +978,8 @@ prop_peer_selection_trace_coverage defaultBearerInfo diffScript =
"TracePeerShareResults"
peerSelectionTraceMap TracePeerShareResultsFiltered {} =
"TracePeerShareResultsFiltered"
peerSelectionTraceMap (TraceKnownInboundConnection addr ps) =
"TraceKnownInboundConnection " ++ show addr ++ " " ++ show ps
peerSelectionTraceMap TracePickInboundPeers {} =
"TracePickInboundInboundPeers"
peerSelectionTraceMap TraceForgetColdPeers {} =
"TraceForgetColdPeers"
peerSelectionTraceMap TracePromoteColdPeers {} =
Expand Down
7 changes: 7 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ peerMetricsConfiguration = PeerMetricsConfiguration {
}


-- | Maximal number of light peers included at once.
--
maxInboundPeers :: Int
maxInboundPeers = 10


-- | Merge two dictionaries where values of the first one are obligatory, while
-- the second one are optional.
--
Expand Down Expand Up @@ -101,6 +107,7 @@ simplePeerSelectionPolicy rngVar getChurnMode metrics errorDelay = PeerSelection
policyPickKnownPeersForPeerShare = simplePromotionPolicy,
policyPickColdPeersToPromote = simplePromotionPolicy,
policyPickWarmPeersToPromote = simplePromotionPolicy,
policyPickInboundPeers = simplePromotionPolicy,

policyPickHotPeersToDemote = hotDemotionPolicy,
policyPickWarmPeersToDemote = warmDemotionPolicy,
Expand Down
23 changes: 13 additions & 10 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ module Ouroboros.Network.PeerSelection.Governor

import Data.Foldable (traverse_)
import Data.Hashable
import Data.Map.Strict (Map)
import Data.Set qualified as Set
import Data.Void (Void)

Expand Down Expand Up @@ -604,8 +605,9 @@ peerSelectionGovernorLoop tracer
evalGuardedDecisions :: Time
-> PeerSelectionState peeraddr peerconn
-> m (TimedDecision m peeraddr peerconn)
evalGuardedDecisions blockedAt st =
case guardedDecisions blockedAt st of
evalGuardedDecisions blockedAt st = do
inboundPeers <- readInboundPeers actions
case guardedDecisions blockedAt st inboundPeers of
GuardedSkip _ ->
-- impossible since guardedDecisions always has something to wait for
error "peerSelectionGovernorLoop: impossible: nothing to do"
Expand All @@ -626,8 +628,9 @@ peerSelectionGovernorLoop tracer

guardedDecisions :: Time
-> PeerSelectionState peeraddr peerconn
-> Map peeraddr PeerSharing
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
guardedDecisions blockedAt st =
guardedDecisions blockedAt st inboundPeers =
-- All the alternative potentially-blocking decisions.

-- The Governor needs to react to changes in the bootstrap peer flag,
Expand All @@ -653,13 +656,13 @@ peerSelectionGovernorLoop tracer
<> BigLedgerPeers.aboveTarget policy st

-- All the alternative non-blocking internal decisions.
<> RootPeers.belowTarget actions blockedAt st
<> KnownPeers.belowTarget actions policy st
<> KnownPeers.aboveTarget policy st
<> EstablishedPeers.belowTarget actions policy st
<> EstablishedPeers.aboveTarget actions policy st
<> ActivePeers.belowTarget actions policy st
<> ActivePeers.aboveTarget actions policy st
<> RootPeers.belowTarget actions blockedAt st
<> KnownPeers.belowTarget actions inboundPeers policy st
<> KnownPeers.aboveTarget policy st
<> EstablishedPeers.belowTarget actions policy st
<> EstablishedPeers.aboveTarget actions policy st
<> ActivePeers.belowTarget actions policy st
<> ActivePeers.aboveTarget actions policy st

-- There is no rootPeersAboveTarget since the roots target is one sided.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ jobPromoteColdPeer PeerSelectionActions {
x
)
(Set.singleton peeraddr)
$ KnownPeers.setSuccessfulConnectionFlag peeraddr
$ KnownPeers.setSuccessfulConnectionFlag (Set.singleton peeraddr)
$ KnownPeers.clearTepidFlag peeraddr $
KnownPeers.resetFailCount
peeraddr
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
Expand All @@ -11,6 +12,8 @@ module Ouroboros.Network.PeerSelection.Governor.KnownPeers

import Data.Hashable
import Data.List (sortBy)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Set qualified as Set
import GHC.Stack (HasCallStack)
Expand All @@ -23,6 +26,7 @@ import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI

import Ouroboros.Network.Diffusion.Policies qualified as Policies
import Ouroboros.Network.PeerSelection.Bootstrap (requiresBootstrapPeers)
import Ouroboros.Network.PeerSelection.Governor.Types
import Ouroboros.Network.PeerSelection.PeerAdvertise (PeerAdvertise (..))
Expand All @@ -38,24 +42,30 @@ import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharingAmount)
-- Known peers below target
--


-- | If we are below the target of /known peers/ we peer share (if we are above
-- the peer share request threshold).
-- | If we are below the target of /known peers/ we flip a coin to either get
-- new peers from:
--
-- * inbound connections; or
-- * peer share (if we are above the peer share request threshold).
--
-- It should be noted if the node is in bootstrap mode (i.e. in a sensitive
-- state) then this monitoring action will be disabled.
--
belowTarget :: (MonadAsync m, MonadTimer m, Ord peeraddr, Hashable peeraddr,
HasCallStack)
=> PeerSelectionActions peeraddr peerconn m
-> MkGuardedDecision peeraddr peerconn m
belowTarget actions
belowTarget
:: (MonadAsync m, MonadTimer m, Ord peeraddr, Hashable peeraddr)
=> PeerSelectionActions peeraddr peerconn m
-> Map peeraddr PeerSharing
-> MkGuardedDecision peeraddr peerconn m
belowTarget actions@PeerSelectionActions { peerSharing }
inboundPeers
policy@PeerSelectionPolicy {
policyMaxInProgressPeerShareReqs,
policyPickKnownPeersForPeerShare,
policyPickInboundPeers,
policyPeerShareRetryTime
}
st@PeerSelectionState {
knownPeers,
establishedPeers,
inProgressPeerShareReqs,
inProgressDemoteToCold,
Expand All @@ -66,8 +76,56 @@ belowTarget actions
bootstrapPeersFlag,
stdGen
}
-- Only start Peer Sharing request if PeerSharing was enabled
| PeerSharingEnabled <- peerSharing actions
--
-- Light peer sharing
--

| PeerSharingEnabled <- peerSharing
-- Are we under target for number of known peers?
, numKnownPeers < targetNumberOfKnownPeers

-- There are no peer share requests in-flight.
, inProgressPeerShareReqs <= 0

-- No inbound peers should be used when the node is using bootstrap peers.
, not (requiresBootstrapPeers bootstrapPeersFlag ledgerStateJudgement)

-- Use inbound peers either if it won the coin flip or if there are no
-- available peers to do peer sharing.
, useInboundPeers || Set.null availableForPeerShare

, let availablePeers = Map.keysSet inboundPeers
Set.\\ KnownPeers.toSet knownPeers
, not (Set.null availablePeers)

= Guarded Nothing $ do
selected <- pickUnknownPeers
st
policyPickInboundPeers
availablePeers
(Policies.maxInboundPeers `min` (targetNumberOfKnownPeers - numKnownPeers))
let selectedMap = inboundPeers `Map.restrictKeys` selected
return $ \_now -> Decision {
decisionTrace = [TracePickInboundPeers
targetNumberOfKnownPeers
numKnownPeers
selectedMap
availablePeers
],
decisionState = st { knownPeers = KnownPeers.setSuccessfulConnectionFlag selected
$ KnownPeers.insert
(Map.map (\ps -> (Just ps, Just DoAdvertisePeer)) selectedMap)
knownPeers,
stdGen = stdGen'
},
decisionJobs = []
}

--
-- Peer sharing
--

| PeerSharingEnabled <- peerSharing
-- Are we under target for number of known peers?
, numKnownPeers < targetNumberOfKnownPeers

Expand All @@ -78,9 +136,10 @@ belowTarget actions
-- We can only ask ones where we have not asked them within a certain time.
, not (Set.null availableForPeerShare)

-- No peer share requests should be issued when the node is in a sensitive
-- state
-- No peer share requests should be issued when the node is using bootstrap
-- peers.
, not (requiresBootstrapPeers bootstrapPeersFlag ledgerStateJudgement)

= Guarded Nothing $ do
-- Max selected should be <= numPeerShareReqsPossible
selectedForPeerShare <- pickPeers st
Expand All @@ -99,7 +158,7 @@ belowTarget actions
numPeersToReq :: PeerSharingAmount
!numPeersToReq = fromIntegral
$ min 255 (max 8 (objective `div` numPeerShareReqs))
(salt, stdGen') = random stdGen
(salt, stdGen'') = random stdGen'

return $ \now -> Decision {
decisionTrace = [TracePeerShareRequests
Expand All @@ -115,7 +174,7 @@ belowTarget actions
selectedForPeerShare
(addTime policyPeerShareRetryTime now)
establishedPeers,
stdGen = stdGen'
stdGen = stdGen''
},
decisionJobs =
[jobPeerShare actions policy objective salt numPeersToReq
Expand All @@ -132,12 +191,12 @@ belowTarget actions
| otherwise
= GuardedSkip Nothing
where
(useInboundPeers, stdGen') = random stdGen
PeerSelectionCounters {
numberOfKnownPeers = numKnownPeers
}
=
peerSelectionStateToCounters st

numPeerShareReqsPossible = policyMaxInProgressPeerShareReqs
- inProgressPeerShareReqs
-- Only peer which permit peersharing are available
Expand Down

0 comments on commit 56e3c4a

Please sign in to comment.