Skip to content

Commit

Permalink
Merge pull request #4871 from IntersectMBO/coot/inbound-governor-patch
Browse files Browse the repository at this point in the history
inbound governor error handling
  • Loading branch information
coot committed May 7, 2024
2 parents 811b1de + 2416f75 commit c5e2110
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,47 +123,46 @@ withInboundGovernor :: forall (muxMode :: MuxMode) socket initiatorCtx peerAddr
-> m x
withInboundGovernor trTracer tracer debugTracer inboundInfoChannel
inboundIdleTimeout connectionManager k = do
-- State needs to be a TVar, otherwise, when catching the exception inside
-- the loop we do not have access to the most recent version of the state
-- and might be truncating transitions.
st <- newTVarIO emptyState
withAsync
(inboundGovernorLoop st
`catch`
(\(e :: SomeException) -> do
state <- readTVarIO st
_ <- Map.traverseWithKey
(\connId _ -> do
-- Remove the connection from the state so
-- mkRemoteTransitionTrace can create the correct state
-- transition to Nothing value.
let state' = unregisterConnection connId state
traceWith trTracer
(mkRemoteTransitionTrace connId state state')
)
(igsConnections state)
traceWith tracer (TrInboundGovernorError e)
throwIO e
)
)
$ \thread -> k thread (mkPublicInboundGovernorState <$> readTVarIO st)
var <- newTVarIO (mkPublicInboundGovernorState emptyState)
withAsync (inboundGovernorLoop var emptyState
`catch`
handleError var) $
\thread ->
k thread (readTVarIO var)
where
emptyState :: InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
emptyState = InboundGovernorState {
igsConnections = Map.empty,
igsMatureDuplexPeers = Map.empty,
igsFreshDuplexPeers = OrdPSQ.empty,
igsCountersCache = mempty
}
igsConnections = Map.empty,
igsMatureDuplexPeers = Map.empty,
igsFreshDuplexPeers = OrdPSQ.empty,
igsCountersCache = mempty
}

-- trace final transition, mostly for testing purposes
handleError
:: StrictTVar m (PublicInboundGovernorState peerAddr versionData)
-> SomeException
-> m Void
handleError var e = do
PublicInboundGovernorState { remoteStateMap } <- readTVarIO var
_ <- Map.traverseWithKey
(\connId remoteSt ->
traceWith trTracer $
TransitionTrace (remoteAddress connId)
Transition { fromState = Just remoteSt,
toState = Nothing }
)
remoteStateMap
throwIO e

-- The inbound protocol governor recursive loop. The 'igsConnections' is
-- updated as we recurse.
--
inboundGovernorLoop
:: StrictTVar m (InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b)
:: StrictTVar m (PublicInboundGovernorState peerAddr versionData)
-> InboundGovernorState muxMode initiatorCtx peerAddr versionData m a b
-> m Void
inboundGovernorLoop !st = do
state <- readTVarIO st
inboundGovernorLoop var !state = do
mapTraceWithCache TrInboundGovernorCounters
tracer
(igsCountersCache state)
Expand Down Expand Up @@ -522,13 +521,13 @@ withInboundGovernor trTracer tracer debugTracer inboundInfoChannel


mask_ $ do
atomically $ writeTVar st state'
atomically $ writeTVar var (mkPublicInboundGovernorState state')
traceWith debugTracer (DebugInboundGovernor state')
case mbConnId of
Just cid -> traceWith trTracer (mkRemoteTransitionTrace cid state state')
Nothing -> pure ()

inboundGovernorLoop st
inboundGovernorLoop var state'


-- | Run a responder mini-protocol.
Expand Down Expand Up @@ -586,21 +585,6 @@ maturedPeers time freshPeers =
-- Trace
--

-- | Remote connection state tracked by inbound protocol governor.
--
data RemoteSt = RemoteWarmSt
| RemoteHotSt
| RemoteIdleSt
| RemoteColdSt
deriving (Eq, Show)


mkRemoteSt :: RemoteState m -> RemoteSt
mkRemoteSt RemoteWarm = RemoteWarmSt
mkRemoteSt RemoteHot = RemoteHotSt
mkRemoteSt (RemoteIdle _) = RemoteIdleSt
mkRemoteSt RemoteCold = RemoteColdSt


-- | 'Nothing' represents uninitialised state.
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ module Ouroboros.Network.InboundGovernor.State
, unregisterConnection
, updateMiniProtocol
, RemoteState (.., RemoteEstablished)
, RemoteSt (..)
, mkRemoteSt
, updateRemoteState
, mapRemoteState
, MiniProtocolData (..)
Expand All @@ -43,11 +45,20 @@ import Ouroboros.Network.Mux

-- | Public inbound governor state.
--
newtype PublicInboundGovernorState peerAddr versionData = PublicInboundGovernorState {
data PublicInboundGovernorState peerAddr versionData = PublicInboundGovernorState {
-- | A map of mature inbound duplex connections. These peers are used for
-- light peer sharing (e.g. bootstrapping peer sharing).
--
inboundDuplexPeers :: Map peerAddr versionData
inboundDuplexPeers :: !(Map peerAddr versionData),

-- | Map of `RemoteSt`.
--
-- It is lazy on purpose. It is created from `igsConnections`, so it
-- should not point to any thunks that should be GC-ed leading to a memory
-- leak.
--
remoteStateMap :: Map (ConnectionId peerAddr) RemoteSt

}


Expand All @@ -63,10 +74,11 @@ mkPublicInboundGovernorState
InboundGovernorState muxMode initatorCtx peerAddr versionData m a b
-> PublicInboundGovernorState peerAddr versionData
mkPublicInboundGovernorState
InboundGovernorState { igsMatureDuplexPeers }
InboundGovernorState { igsConnections, igsMatureDuplexPeers }
=
PublicInboundGovernorState {
inboundDuplexPeers = igsMatureDuplexPeers
inboundDuplexPeers = igsMatureDuplexPeers,
remoteStateMap = Map.map (mkRemoteSt . csRemoteState) igsConnections
}

-- | 'InboundGovernorState', which consist of pure part, and a mutable part.
Expand Down Expand Up @@ -315,3 +327,21 @@ mapRemoteState connId fn state =
connId
(igsConnections state)
}


-- | Remote connection state tracked by inbound protocol governor.
--
-- This type is used for tracing.
--
data RemoteSt = RemoteWarmSt
| RemoteHotSt
| RemoteIdleSt
| RemoteColdSt
deriving (Eq, Show)


mkRemoteSt :: RemoteState m -> RemoteSt
mkRemoteSt RemoteWarm = RemoteWarmSt
mkRemoteSt RemoteHot = RemoteHotSt
mkRemoteSt (RemoteIdle _) = RemoteIdleSt
mkRemoteSt RemoteCold = RemoteColdSt

0 comments on commit c5e2110

Please sign in to comment.