Skip to content

Commit

Permalink
inbound-governor: wakeup inbound governor on regular intervals
Browse files Browse the repository at this point in the history
To push peers that become mature, we need to wakeup the inbound governor
on regular intervals.  Long inactivity periods could happen on nodes
which start at the edge of network, when they have very few inbound
connections.  The inbound governor is woken up every 30s.
  • Loading branch information
coot committed May 6, 2024
1 parent 00527d3 commit 2a8c78b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Monoid.Synchronisation
import Data.OrdPSQ as OrdPSQ
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Void (Void)

import Network.Mux qualified as Mux
Expand All @@ -71,6 +73,13 @@ inboundMaturePeerDelay :: DiffTime
inboundMaturePeerDelay = 15 * 60


-- | Every 10s we wake up the inbound governor. This is to give a chance to
-- mark some of the inbound connections as mature.
--
inactionTimeout :: DiffTime
inactionTimeout = 30


-- | Run the server, which consists of the following components:
--
-- * /inbound governor/, it corresponds to p2p-governor on outbound side
Expand Down Expand Up @@ -159,10 +168,20 @@ withInboundGovernor trTracer tracer debugTracer inboundInfoChannel
<$> igsConnections state

time <- getMonotonicTime
inactivityVar <- registerDelay inactionTimeout

event
<- atomically $ runFirstToFinish $
Map.foldMapWithKey
FirstToFinish (
-- mark connections as mature
case OrdPSQ.atMostView
((-inboundMaturePeerDelay) `addTime` time)
(igsFreshDuplexPeers state) of
([], _) -> retry
(as, pq') -> let m = Map.fromList ((\(addr, _p, v) -> (addr, v)) <$> as)
in pure $ MaturedDuplexPeers m pq'
)
<> Map.foldMapWithKey
( firstMuxToFinish
<> firstMiniProtocolToFinish
<> firstPeerPromotedToWarm
Expand All @@ -177,14 +196,10 @@ withInboundGovernor trTracer tracer debugTracer inboundInfoChannel
<> FirstToFinish (
NewConnection <$> InfoChannel.readMessage inboundInfoChannel
)
<> FirstToFinish (
-- move connections from
case OrdPSQ.atMostView
((-inboundMaturePeerDelay) `addTime` time)
(igsFreshDuplexPeers state) of
([], _) -> retry
(as, pq') -> let m = Map.fromList ((\(addr, _p, v) -> (addr, v)) <$> as)
in pure $ MaturedDuplexPeers m pq'
<> FirstToFinish (
-- spin the inbound governor loop; it will re-run with new
-- time, which allows to make some peers mature.
LazySTM.readTVar inactivityVar >>= check >> pure InactivityTimeout
)
(mbConnId, state') <- case event of
NewConnection
Expand Down Expand Up @@ -491,11 +506,17 @@ withInboundGovernor trTracer tracer debugTracer inboundInfoChannel

return (Just connId, state')

MaturedDuplexPeers newMatureDuplexPeers igsFreshDuplexPeers ->
MaturedDuplexPeers newMatureDuplexPeers igsFreshDuplexPeers -> do
traceWith tracer $ TrMaturedConnections (Map.keysSet newMatureDuplexPeers)
(Set.fromList $ OrdPSQ.keys igsFreshDuplexPeers)
pure (Nothing, state { igsMatureDuplexPeers = newMatureDuplexPeers
<> igsMatureDuplexPeers state,
igsFreshDuplexPeers })

InactivityTimeout -> do
traceWith tracer $ TrInactive ((\(a,b,_) -> (a,b)) <$> OrdPSQ.toList (igsFreshDuplexPeers state))
pure (Nothing, state)


mask_ $ do
atomically $ writeTVar st state'
Expand Down Expand Up @@ -616,6 +637,8 @@ data InboundGovernorTrace peerAddr
| TrUnexpectedlyFalseAssertion !(IGAssertionLocation peerAddr)
-- ^ This case is unexpected at call site.
| TrInboundGovernorError !SomeException
| TrMaturedConnections !(Set peerAddr) !(Set peerAddr)
| TrInactive ![(peerAddr, Time)]
deriving Show


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ data Event (muxMode :: MuxMode) initiatorCtx peerAddr versionData m a b
| MaturedDuplexPeers !(Map peerAddr versionData) -- ^ newly matured duplex peers
!(OrdPSQ peerAddr Time versionData) -- ^ queue of fresh duplex peers

| InactivityTimeout


--
-- STM transactions which detect 'Event's (signals)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ inboundGovernorTraceMap (IG.TrUnexpectedlyFalseAssertion _) =
"TrUnexpectedlyFalseAssertion"
inboundGovernorTraceMap (TrInboundGovernorError se) =
"TrInboundGovernorError " ++ show se
inboundGovernorTraceMap TrMaturedConnections {} =
"TrMaturedConnections"
inboundGovernorTraceMap TrInactive {} =
"TrMaturedConnections"


serverTraceMap :: Show ntnAddr => ServerTrace ntnAddr -> String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3303,7 +3303,14 @@ unit_peer_sharing =
sim = diffusionSimulation (toBearerInfo absNoAttenuation)
script
iosimTracer
trace = take 125000
-- We need roughly 1200 because:
-- * first peer sharing request will be issued after
-- `policyPeerSharAcitvationDelay = 300`
-- * this request will not bring any new peers, because non of the peers
-- are yet mature
-- * inbound connections become mature at 900s (15 mins)
-- * next peer share request happens after 900s, e.g. around 1200s.
trace = takeWhile (\(t,_,_,_) -> t < Time 1250)
. traceEvents
$ runSimTrace sim

Expand All @@ -3322,11 +3329,8 @@ unit_peer_sharing =
events' =
Trace.toList
. splitWithNameTrace
. Trace.fromList ()
. fmap snd
. Trace.toList
. fmap (\(WithTime t (WithName name b))
-> (t, WithName name (WithTime t b)))
-> (WithName name (WithTime t b)))
. withTimeNameTraceEvents
@DiffusionTestTrace
@NtNAddr
Expand Down

0 comments on commit 2a8c78b

Please sign in to comment.