Skip to content

Commit

Permalink
inbound-governor: (re)added PublicInboundGovernorState
Browse files Browse the repository at this point in the history
`PublicInboundGovernorState` extracts information directly from
`InboundGovernorState` and exposes an `STM` action which allows to read
it.
  • Loading branch information
coot committed May 7, 2024
1 parent 4155800 commit 3849735
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 139 deletions.
29 changes: 13 additions & 16 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -272,22 +272,19 @@ withBidirectionalConnectionManager snocket makeBearer socket
(InResponderMode $ Just outgovInfoChannel)
$ \connectionManager -> do
serverAddr <- Snocket.getLocalAddr snocket socket
withAsync
(Server.run
ServerArguments {
serverSockets = socket :| [],
serverSnocket = snocket,
serverTracer = ("server",) `contramap` debugTracer, -- ServerTrace
serverTrTracer = nullTracer,
serverInboundGovernorTracer = ("inbound-governor",) `contramap` debugTracer,
serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverInboundInfoChannel = inbgovInfoChannel
}
)
(\thread -> link thread
>> k connectionManager serverAddr)
Server.with
ServerArguments {
serverSockets = socket :| [],
serverSnocket = snocket,
serverTracer = ("server",) `contramap` debugTracer, -- ServerTrace
serverTrTracer = nullTracer,
serverInboundGovernorTracer = ("inbound-governor",) `contramap` debugTracer,
serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverInboundInfoChannel = inbgovInfoChannel
}
(\_ _ -> k connectionManager serverAddr)
where
serverApplication :: LazySTM.TVar m [[Int]]
-> LazySTM.TVar m [[Int]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ data ConnectionId addr = ConnectionId {

-- | Order first by `remoteAddress` then by `localAddress`.
--
-- /Note:/ we relay on the fact that `remoteAddress` is an order
-- preserving map (which allows us to use `Map.mapKeysMonotonic` in some
-- cases).
--
instance Ord addr => Ord (ConnectionId addr) where
conn `compare` conn' =
remoteAddress conn `compare` remoteAddress conn'
Expand Down
110 changes: 78 additions & 32 deletions ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
--
module Ouroboros.Network.InboundGovernor
( -- * Run Inbound Protocol Governor
inboundGovernor
PublicInboundGovernorState (..)
, withInboundGovernor
-- * Trace
, InboundGovernorTrace (..)
, RemoteSt (..)
Expand Down Expand Up @@ -44,6 +45,7 @@ import Data.Cache
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Monoid.Synchronisation
import Data.OrdPSQ qualified as OrdPSQ
import Data.Void (Void)

import Network.Mux qualified as Mux
Expand All @@ -60,6 +62,14 @@ import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.Mux
import Ouroboros.Network.Server.RateLimiting


-- | Period of time after which a peer transitions from a fresh to a mature one,
-- see `igsMatureDuplexPeers` and `igsFreshDuplexPeers`.
--
inboundMaturePeerDelay :: DiffTime
inboundMaturePeerDelay = 15 * 60


-- | Run the server, which consists of the following components:
--
-- * /inbound governor/, it corresponds to p2p-governor on outbound side
Expand All @@ -75,7 +85,7 @@ import Ouroboros.Network.Server.RateLimiting
-- The first one is used in data diffusion for /Node-To-Node protocol/, while the
-- other is useful for running a server for the /Node-To-Client protocol/.
--
inboundGovernor :: forall (muxMode :: MuxMode) socket initiatorCtx peerAddr versionData versionNumber m a b.
withInboundGovernor :: forall (muxMode :: MuxMode) socket initiatorCtx peerAddr versionData versionNumber m a b x.
( Alternative (STM m)
, MonadAsync m
, MonadCatch m
Expand All @@ -93,42 +103,48 @@ inboundGovernor :: forall (muxMode :: MuxMode) socket initiatorCtx peerAddr vers
-> InboundGovernorInfoChannel muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Maybe DiffTime -- protocol idle timeout
-> MuxConnectionManager muxMode socket initiatorCtx (ResponderContext peerAddr) peerAddr versionData versionNumber ByteString m a b
-> m Void
inboundGovernor trTracer tracer inboundInfoChannel
inboundIdleTimeout connectionManager = do
-> (Async m Void -> m (PublicInboundGovernorState peerAddr versionData) -> m x)
-> m x
withInboundGovernor trTracer tracer inboundInfoChannel
inboundIdleTimeout connectionManager k = do
-- State needs to be a TVar, otherwise, when catching the exception inside
-- the loop we do not have access to the most recent version of the state
-- and might be truncating transitions.
st <- newTVarIO emptyState
inboundGovernorLoop st
`catch`
(\(e :: SomeException) -> do
state <- readTVarIO st
_ <- Map.traverseWithKey
(\connId _ -> do
-- Remove the connection from the state so
-- mkRemoteTransitionTrace can create the correct state
-- transition to Nothing value.
let state' = unregisterConnection connId state
traceWith trTracer
(mkRemoteTransitionTrace connId state state')
)
(igsConnections state)
traceWith tracer (TrInboundGovernorError e)
throwIO e
)
withAsync
(inboundGovernorLoop st
`catch`
(\(e :: SomeException) -> do
state <- readTVarIO st
_ <- Map.traverseWithKey
(\connId _ -> do
-- Remove the connection from the state so
-- mkRemoteTransitionTrace can create the correct state
-- transition to Nothing value.
let state' = unregisterConnection connId state
traceWith trTracer
(mkRemoteTransitionTrace connId state state')
)
(igsConnections state)
traceWith tracer (TrInboundGovernorError e)
throwIO e
)
)
$ \thread -> k thread (mkPublicInboundGovernorState <$> readTVarIO st)
where
emptyState :: InboundGovernorState muxMode initiatorCtx peerAddr m a b
emptyState :: InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
emptyState = InboundGovernorState {
igsConnections = Map.empty,
igsCountersCache = mempty
igsConnections = Map.empty,
igsMatureDuplexPeers = Map.empty,
igsFreshDuplexPeers = OrdPSQ.empty,
igsCountersCache = mempty
}

-- The inbound protocol governor recursive loop. The 'igsConnections' is
-- updated as we recurse.
--
inboundGovernorLoop
:: StrictTVar m (InboundGovernorState muxMode initiatorCtx peerAddr m a b)
:: StrictTVar m (InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b)
-> m Void
inboundGovernorLoop !st = do
state <- readTVarIO st
Expand All @@ -140,6 +156,8 @@ inboundGovernor trTracer tracer inboundInfoChannel
mkRemoteSt . csRemoteState
<$> igsConnections state

time <- getMonotonicTime

event
<- atomically $ runFirstToFinish $
Map.foldMapWithKey
Expand All @@ -155,7 +173,17 @@ inboundGovernor trTracer tracer inboundInfoChannel
)
(igsConnections state)
<> FirstToFinish (
NewConnection <$> InfoChannel.readMessage inboundInfoChannel)
NewConnection <$> InfoChannel.readMessage inboundInfoChannel
)
<> FirstToFinish (
-- move connections from
case OrdPSQ.atMostView
((-inboundMaturePeerDelay) `addTime` time)
(igsFreshDuplexPeers state) of
([], _) -> retry
(as, pq') -> let m = Map.fromList ((\(addr, _p, v) -> (addr, v)) <$> as)
in pure $ MaturedDuplexPeers m pq'
)
(mbConnId, state') <- case event of
NewConnection
-- new connection has been announced by either accept loop or
Expand All @@ -165,7 +193,11 @@ inboundGovernor trTracer tracer inboundInfoChannel
provenance
connId
csDataFlow
(Handle csMux muxBundle _ _)) -> do
Handle {
hMux = csMux,
hMuxBundle = muxBundle,
hVersionData = csVersionData
}) -> do

traceWith tracer (TrNewConnection provenance connId)
let responderContext = ResponderContext { rcConnectionId = connId }
Expand Down Expand Up @@ -243,6 +275,7 @@ inboundGovernor trTracer tracer inboundInfoChannel
connState = ConnectionState {
csMux,
csDataFlow,
csVersionData,
csMiniProtocolMap,
csCompletionMap,
csRemoteState
Expand All @@ -262,9 +295,16 @@ inboundGovernor trTracer tracer inboundInfoChannel
connId
(igsConnections state)


time' <- getMonotonicTime
-- update state and continue the recursive loop
let state' = state { igsConnections }
let state' = state {
igsConnections,
igsFreshDuplexPeers =
case csDataFlow of
Unidirectional -> igsFreshDuplexPeers state
Duplex -> OrdPSQ.insert (remoteAddress connId) time' csVersionData
(igsFreshDuplexPeers state)
}
return (Just connId, state')

MuxFinished connId merr -> do
Expand Down Expand Up @@ -449,6 +489,12 @@ inboundGovernor trTracer tracer inboundInfoChannel

return (Just connId, state')

MaturedDuplexPeers newMatureDuplexPeers igsFreshDuplexPeers ->
pure (Nothing, state { igsMatureDuplexPeers = newMatureDuplexPeers
<> igsMatureDuplexPeers state,
igsFreshDuplexPeers })


mask_ $ do
atomically $ writeTVar st state'
case mbConnId of
Expand Down Expand Up @@ -527,8 +573,8 @@ type RemoteTransitionTrace peerAddr = TransitionTrace' peerAddr (Maybe RemoteSt)

mkRemoteTransitionTrace :: Ord peerAddr
=> ConnectionId peerAddr
-> InboundGovernorState muxMode initiatorCtx peerAddr m a b
-> InboundGovernorState muxMode initiatorCtx peerAddr m a b
-> InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
-> InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteTransitionTrace peerAddr
mkRemoteTransitionTrace connId fromState toState =
TransitionTrace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module Ouroboros.Network.InboundGovernor.Event

import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadThrow hiding (handle)

import Data.ByteString.Lazy (ByteString)
Expand All @@ -34,6 +35,7 @@ import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Monoid.Synchronisation
import Data.Set qualified as Set
import Data.OrdPSQ (OrdPSQ)

import Network.Mux qualified as Mux
import Network.Mux.Types (MiniProtocolDir (..), MiniProtocolStatus (..))
Expand Down Expand Up @@ -109,6 +111,11 @@ data Event (muxMode :: MuxMode) initiatorCtx peerAddr versionData m a b
--
| AwakeRemote !(ConnectionId peerAddr)

-- | Update `igsMatureDuplexPeers` and `igsFreshDuplexPeers`.
--
| MaturedDuplexPeers !(Map peerAddr versionData) -- ^ newly matured duplex peers
!(OrdPSQ peerAddr Time versionData) -- ^ queue of fresh duplex peers


--
-- STM transactions which detect 'Event's (signals)
Expand All @@ -120,7 +127,7 @@ data Event (muxMode :: MuxMode) initiatorCtx peerAddr versionData m a b
--
type EventSignal (muxMode :: MuxMode) initiatorCtx peerAddr versionData m a b =
ConnectionId peerAddr
-> ConnectionState muxMode initiatorCtx peerAddr m a b
-> ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> FirstToFinish (STM m) (Event muxMode initiatorCtx peerAddr versionData m a b)

-- | A mux stopped. If mux exited cleanly no error is attached.
Expand Down Expand Up @@ -255,7 +262,7 @@ firstPeerPromotedToHot
RemoteIdle {} -> mempty
where
-- only hot mini-protocols;
hotMiniProtocolStateMap :: ConnectionState muxMode pinitiatorCtx peerAddr m a b
hotMiniProtocolStateMap :: ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Map (MiniProtocolNum, MiniProtocolDir)
(STM m MiniProtocolStatus)
hotMiniProtocolStateMap ConnectionState { csMux, csMiniProtocolMap } =
Expand Down Expand Up @@ -300,7 +307,7 @@ firstPeerDemotedToWarm
_ -> mempty
where
-- only hot mini-protocols;
hotMiniProtocolStateMap :: ConnectionState muxMode initiatorCtx peerAddr m a b
hotMiniProtocolStateMap :: ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Map (MiniProtocolNum, MiniProtocolDir)
(STM m MiniProtocolStatus)
hotMiniProtocolStateMap ConnectionState { csMux, csMiniProtocolMap } =
Expand Down

0 comments on commit 3849735

Please sign in to comment.