Skip to content

Commit

Permalink
inbound-governor: (re)added PublicInboundGovernorState
Browse files Browse the repository at this point in the history
`PublicInboundGovernorState` extracts information directly from
`InboundGovernorState` and exposes an `STM` action which allows to read
it.
  • Loading branch information
coot committed Apr 30, 2024
1 parent d602e78 commit 98ca7c0
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 139 deletions.
29 changes: 13 additions & 16 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -272,22 +272,19 @@ withBidirectionalConnectionManager snocket makeBearer socket
(InResponderMode $ Just outgovInfoChannel)
$ \connectionManager -> do
serverAddr <- Snocket.getLocalAddr snocket socket
withAsync
(Server.run
ServerArguments {
serverSockets = socket :| [],
serverSnocket = snocket,
serverTracer = ("server",) `contramap` debugTracer, -- ServerTrace
serverTrTracer = nullTracer,
serverInboundGovernorTracer = ("inbound-governor",) `contramap` debugTracer,
serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverInboundInfoChannel = inbgovInfoChannel
}
)
(\thread -> link thread
>> k connectionManager serverAddr)
Server.with
ServerArguments {
serverSockets = socket :| [],
serverSnocket = snocket,
serverTracer = ("server",) `contramap` debugTracer, -- ServerTrace
serverTrTracer = nullTracer,
serverInboundGovernorTracer = ("inbound-governor",) `contramap` debugTracer,
serverConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0,
serverConnectionManager = connectionManager,
serverInboundIdleTimeout = Just protocolIdleTimeout,
serverInboundInfoChannel = inbgovInfoChannel
}
(\_ _ -> k connectionManager serverAddr)
where
serverApplication :: LazySTM.TVar m [[Int]]
-> LazySTM.TVar m [[Int]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ data ConnectionId addr = ConnectionId {

-- | Order first by `remoteAddress` then by `localAddress`.
--
-- /Note:/ we relay on the fact that `remoteAddress` is an order
-- preserving map (which allows us to use `Map.mapKeysMonotonic` in some
-- cases).
--
instance Ord addr => Ord (ConnectionId addr) where
conn `compare` conn' =
remoteAddress conn `compare` remoteAddress conn'
Expand Down
110 changes: 78 additions & 32 deletions ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
--
module Ouroboros.Network.InboundGovernor
( -- * Run Inbound Protocol Governor
inboundGovernor
PublicInboundGovernorState (..)
, withInboundGovernor
-- * Trace
, InboundGovernorTrace (..)
, RemoteSt (..)
Expand Down Expand Up @@ -44,6 +45,7 @@ import Data.Cache
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Monoid.Synchronisation
import Data.OrdPSQ as OrdPSQ
import Data.Void (Void)

import Network.Mux qualified as Mux
Expand All @@ -60,6 +62,14 @@ import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.Mux
import Ouroboros.Network.Server.RateLimiting


-- | Period of time after which a peer transitions from a fresh to a mature one,
-- see `igsMatureDuplexPeers` and `igsFreshDuplexPeers`.
--
inboundMaturePeerDelay :: DiffTime
inboundMaturePeerDelay = 15 * 60


-- | Run the server, which consists of the following components:
--
-- * /inbound governor/, it corresponds to p2p-governor on outbound side
Expand All @@ -75,7 +85,7 @@ import Ouroboros.Network.Server.RateLimiting
-- The first one is used in data diffusion for /Node-To-Node protocol/, while the
-- other is useful for running a server for the /Node-To-Client protocol/.
--
inboundGovernor :: forall (muxMode :: MuxMode) socket initiatorCtx peerAddr versionData versionNumber m a b.
withInboundGovernor :: forall (muxMode :: MuxMode) socket initiatorCtx peerAddr versionData versionNumber m a b x.
( Alternative (STM m)
, MonadAsync m
, MonadCatch m
Expand All @@ -93,42 +103,48 @@ inboundGovernor :: forall (muxMode :: MuxMode) socket initiatorCtx peerAddr vers
-> InboundGovernorInfoChannel muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Maybe DiffTime -- protocol idle timeout
-> MuxConnectionManager muxMode socket initiatorCtx (ResponderContext peerAddr) peerAddr versionData versionNumber ByteString m a b
-> m Void
inboundGovernor trTracer tracer inboundInfoChannel
inboundIdleTimeout connectionManager = do
-> (Async m Void -> m (PublicInboundGovernorState peerAddr versionData) -> m x)
-> m x
withInboundGovernor trTracer tracer inboundInfoChannel
inboundIdleTimeout connectionManager k = do
-- State needs to be a TVar, otherwise, when catching the exception inside
-- the loop we do not have access to the most recent version of the state
-- and might be truncating transitions.
st <- newTVarIO emptyState
inboundGovernorLoop st
`catch`
(\(e :: SomeException) -> do
state <- readTVarIO st
_ <- Map.traverseWithKey
(\connId _ -> do
-- Remove the connection from the state so
-- mkRemoteTransitionTrace can create the correct state
-- transition to Nothing value.
let state' = unregisterConnection connId state
traceWith trTracer
(mkRemoteTransitionTrace connId state state')
)
(igsConnections state)
traceWith tracer (TrInboundGovernorError e)
throwIO e
)
withAsync
(inboundGovernorLoop st
`catch`
(\(e :: SomeException) -> do
state <- readTVarIO st
_ <- Map.traverseWithKey
(\connId _ -> do
-- Remove the connection from the state so
-- mkRemoteTransitionTrace can create the correct state
-- transition to Nothing value.
let state' = unregisterConnection connId state
traceWith trTracer
(mkRemoteTransitionTrace connId state state')
)
(igsConnections state)
traceWith tracer (TrInboundGovernorError e)
throwIO e
)
)
$ \thread -> k thread (mkPublicInboundGovernorState <$> readTVarIO st)
where
emptyState :: InboundGovernorState muxMode initiatorCtx peerAddr m a b
emptyState :: InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
emptyState = InboundGovernorState {
igsConnections = Map.empty,
igsCountersCache = mempty
igsConnections = Map.empty,
igsMatureDuplexPeers = Map.empty,
igsFreshDuplexPeers = OrdPSQ.empty,
igsCountersCache = mempty
}

-- The inbound protocol governor recursive loop. The 'igsConnections' is
-- updated as we recurse.
--
inboundGovernorLoop
:: StrictTVar m (InboundGovernorState muxMode initiatorCtx peerAddr m a b)
:: StrictTVar m (InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b)
-> m Void
inboundGovernorLoop !st = do
state <- readTVarIO st
Expand All @@ -140,6 +156,8 @@ inboundGovernor trTracer tracer inboundInfoChannel
mkRemoteSt . csRemoteState
<$> igsConnections state

time <- getMonotonicTime

event
<- atomically $ runFirstToFinish $
Map.foldMapWithKey
Expand All @@ -155,7 +173,17 @@ inboundGovernor trTracer tracer inboundInfoChannel
)
(igsConnections state)
<> FirstToFinish (
NewConnection <$> InfoChannel.readMessage inboundInfoChannel)
NewConnection <$> InfoChannel.readMessage inboundInfoChannel
)
<> FirstToFinish (
-- move connections from
case OrdPSQ.atMostView
(inboundMaturePeerDelay `addTime` time)
(igsFreshDuplexPeers state) of
([], _) -> retry
(as, pq') -> let m = Map.fromList ((\(addr, _p, v) -> (addr, v)) <$> as)
in pure $ MaturedDuplexPeers m pq'
)
(mbConnId, state') <- case event of
NewConnection
-- new connection has been announced by either accept loop or
Expand All @@ -165,7 +193,11 @@ inboundGovernor trTracer tracer inboundInfoChannel
provenance
connId
csDataFlow
(Handle csMux muxBundle _ _)) -> do
Handle {
hMux = csMux,
hMuxBundle = muxBundle,
hVersionData = csVersionData
}) -> do

traceWith tracer (TrNewConnection provenance connId)
let responderContext = ResponderContext { rcConnectionId = connId }
Expand Down Expand Up @@ -243,6 +275,7 @@ inboundGovernor trTracer tracer inboundInfoChannel
connState = ConnectionState {
csMux,
csDataFlow,
csVersionData,
csMiniProtocolMap,
csCompletionMap,
csRemoteState
Expand All @@ -262,9 +295,16 @@ inboundGovernor trTracer tracer inboundInfoChannel
connId
(igsConnections state)


time' <- getMonotonicTime
-- update state and continue the recursive loop
let state' = state { igsConnections }
let state' = state {
igsConnections,
igsFreshDuplexPeers =
case csDataFlow of
Unidirectional -> igsFreshDuplexPeers state
Duplex -> OrdPSQ.insert (remoteAddress connId) time' csVersionData
(igsFreshDuplexPeers state)
}
return (Just connId, state')

MuxFinished connId merr -> do
Expand Down Expand Up @@ -449,6 +489,12 @@ inboundGovernor trTracer tracer inboundInfoChannel

return (Just connId, state')

MaturedDuplexPeers newMatureDuplexPeers igsFreshDuplexPeers ->
pure (Nothing, state { igsMatureDuplexPeers = newMatureDuplexPeers
<> igsMatureDuplexPeers state,
igsFreshDuplexPeers })


mask_ $ do
atomically $ writeTVar st state'
case mbConnId of
Expand Down Expand Up @@ -527,8 +573,8 @@ type RemoteTransitionTrace peerAddr = TransitionTrace' peerAddr (Maybe RemoteSt)

mkRemoteTransitionTrace :: Ord peerAddr
=> ConnectionId peerAddr
-> InboundGovernorState muxMode initiatorCtx peerAddr m a b
-> InboundGovernorState muxMode initiatorCtx peerAddr m a b
-> InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
-> InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteTransitionTrace peerAddr
mkRemoteTransitionTrace connId fromState toState =
TransitionTrace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ module Ouroboros.Network.InboundGovernor.Event
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow hiding (handle)
import Control.Monad.Class.MonadTime.SI

import Data.ByteString.Lazy (ByteString)
import Data.Functor (($>))
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Monoid.Synchronisation
import Data.OrdPSQ (OrdPSQ)
import Data.Set qualified as Set

import Network.Mux qualified as Mux
Expand Down Expand Up @@ -109,6 +111,11 @@ data Event (muxMode :: MuxMode) initiatorCtx peerAddr versionData m a b
--
| AwakeRemote !(ConnectionId peerAddr)

-- | Update `igsMatureDuplexPeers` and `igsFreshDuplexPeers`.
--
| MaturedDuplexPeers !(Map peerAddr versionData) -- ^ newly matured duplex peers
!(OrdPSQ peerAddr Time versionData) -- ^ queue of fresh duplex peers


--
-- STM transactions which detect 'Event's (signals)
Expand All @@ -120,7 +127,7 @@ data Event (muxMode :: MuxMode) initiatorCtx peerAddr versionData m a b
--
type EventSignal (muxMode :: MuxMode) initiatorCtx peerAddr versionData m a b =
ConnectionId peerAddr
-> ConnectionState muxMode initiatorCtx peerAddr m a b
-> ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> FirstToFinish (STM m) (Event muxMode initiatorCtx peerAddr versionData m a b)

-- | A mux stopped. If mux exited cleanly no error is attached.
Expand Down Expand Up @@ -255,7 +262,7 @@ firstPeerPromotedToHot
RemoteIdle {} -> mempty
where
-- only hot mini-protocols;
hotMiniProtocolStateMap :: ConnectionState muxMode pinitiatorCtx peerAddr m a b
hotMiniProtocolStateMap :: ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Map (MiniProtocolNum, MiniProtocolDir)
(STM m MiniProtocolStatus)
hotMiniProtocolStateMap ConnectionState { csMux, csMiniProtocolMap } =
Expand Down Expand Up @@ -300,7 +307,7 @@ firstPeerDemotedToWarm
_ -> mempty
where
-- only hot mini-protocols;
hotMiniProtocolStateMap :: ConnectionState muxMode initiatorCtx peerAddr m a b
hotMiniProtocolStateMap :: ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Map (MiniProtocolNum, MiniProtocolDir)
(STM m MiniProtocolStatus)
hotMiniProtocolStateMap ConnectionState { csMux, csMiniProtocolMap } =
Expand Down

0 comments on commit 98ca7c0

Please sign in to comment.