Skip to content

Commit

Permalink
Merge #4385
Browse files Browse the repository at this point in the history
4385: Fix PeerStateActions bugs and update testing r=coot a=coot


# Description

This PR fixes three bugs (two in production code, one in testing)

* fixed a `hot -> warm -> hot` cycle; we used to forget the reason for `hot -> warm` demotion and then use `0` promtion delay.
* fixed exception handling in `deactivatePeerConnection` (part of
  `PeerStateActions`), which can result in a tight loop (eventually broken by mux shutdown)

* fixed a deadlock of chain-sync & block-fetch in `sim-net`: we used two
  different `FetchClientRegistry` which blocked `chain-sync` to start.

This PR also includes a test which can reproduce the first bug and various
smaller code changes.



Co-authored-by: Marcin Szamotulski <coot@coot.me>
  • Loading branch information
iohk-bors[bot] and coot committed Feb 23, 2023
2 parents d3ce895 + 426e2bd commit 6d5a968
Show file tree
Hide file tree
Showing 17 changed files with 775 additions and 345 deletions.
15 changes: 7 additions & 8 deletions network-mux/src/Network/Mux.hs
Expand Up @@ -282,9 +282,9 @@ miniProtocolJob tracer egressQueue
MiniProtocolNum a -> "prtcl-" ++ show a)
w <- newTVarIO BL.empty
let chan = muxChannel tracer egressQueue (Wanton w)
miniProtocolNum miniProtocolDirEnum
miniProtocolIngressQueue
(result, remainder) <- protocolAction chan
miniProtocolNum miniProtocolDirEnum
miniProtocolIngressQueue
(result, remainder) <- protocolAction chan
traceWith tracer (MuxTraceTerminating miniProtocolNum miniProtocolDirEnum)
atomically $ do
-- The Wanton w is the SDUs that are queued but not yet sent for this job.
Expand Down Expand Up @@ -413,9 +413,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
>> return True
_ -> writeTVar muxStatus (MuxFailed e)
>> return False
if r
then return ()
else throwIO e
unless r (throwIO e)

EventControlCmd (CmdStartProtocolThread
StartEagerly
Expand Down Expand Up @@ -455,7 +453,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
go monitorCtx'

EventControlCmd CmdShutdown -> do
traceWith tracer MuxTraceShutdown
traceWith tracer MuxTraceStopping
atomically $ writeTVar muxStatus MuxStopping
JobPool.cancelGroup jobpool MiniProtocolJob
-- wait for 2 seconds before the egress queue is drained
Expand All @@ -464,6 +462,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
tryPeekTBQueue egressQueue
>>= check . isNothing
atomically $ writeTVar muxStatus MuxStopped
traceWith tracer MuxTraceStopped
-- by exiting the 'monitor' loop we let the job pool kill demuxer and
-- muxer threads

Expand Down Expand Up @@ -685,7 +684,7 @@ runMiniProtocol Mux { muxMiniProtocols, muxControlCmdQueue , muxStatus}
case st of
MuxReady -> readTMVar completionVar
MuxStopping -> readTMVar completionVar
<|> return (Left $ toException (MuxError (MuxShutdown Nothing) "Mux stoping"))
<|> return (Left $ toException (MuxError (MuxShutdown Nothing) "Mux stopping"))
MuxStopped -> readTMVar completionVar
<|> return (Left $ toException (MuxError (MuxShutdown Nothing) "Mux stopped"))
MuxFailed e -> readTMVar completionVar
Expand Down
6 changes: 4 additions & 2 deletions network-mux/src/Network/Mux/Trace.hs
Expand Up @@ -153,7 +153,8 @@ data MuxTrace =
| MuxTraceStartOnDemand MiniProtocolNum MiniProtocolDir
| MuxTraceStartedOnDemand MiniProtocolNum MiniProtocolDir
| MuxTraceTerminating MiniProtocolNum MiniProtocolDir
| MuxTraceShutdown
| MuxTraceStopping
| MuxTraceStopped
| MuxTraceTCPInfo StructTCPInfo Word16

instance Show MuxTrace where
Expand Down Expand Up @@ -191,7 +192,8 @@ instance Show MuxTrace where
show (MuxTraceStartOnDemand mid dir) = printf "Preparing to start (%s) in %s" (show mid) (show dir)
show (MuxTraceStartedOnDemand mid dir) = printf "Started on demand (%s) in %s" (show mid) (show dir)
show (MuxTraceTerminating mid dir) = printf "Terminating (%s) in %s" (show mid) (show dir)
show MuxTraceShutdown = "Mux shutdown"
show MuxTraceStopping = "Mux stopping"
show MuxTraceStopped = "Mux stoppped"
#ifdef os_HOST_linux
show (MuxTraceTCPInfo StructTCPInfo
{ tcpi_snd_mss, tcpi_rcv_mss, tcpi_lost, tcpi_retrans
Expand Down
Expand Up @@ -837,7 +837,6 @@ data ConnectionManagerTrace peerAddr handlerTrace
| TrShutdown
| TrConnectionExists Provenance peerAddr AbstractState
| TrForbiddenConnection (ConnectionId peerAddr)
| TrImpossibleConnection (ConnectionId peerAddr)
| TrConnectionFailure (ConnectionId peerAddr)
| TrConnectionNotFound Provenance peerAddr
| TrForbiddenOperation peerAddr AbstractState
Expand Down
Expand Up @@ -280,8 +280,6 @@ connectionManagerTraceMap (TrConnectionExists p _ as) =
"TrConnectionExists " ++ show p ++ " " ++ show as
connectionManagerTraceMap (TrForbiddenConnection _) =
"TrForbiddenConnection"
connectionManagerTraceMap (TrImpossibleConnection _) =
"TrImpossibleConnection"
connectionManagerTraceMap (TrConnectionFailure _) =
"TrConnectionFailure"
connectionManagerTraceMap (TrConnectionNotFound p _) =
Expand Down
Expand Up @@ -33,7 +33,7 @@ import Ouroboros.Network.Protocol.ChainSync.Server
data Client header point tip m t = Client
{ rollbackward :: point -> tip -> m (Either t (Client header point tip m t))
, rollforward :: header -> m (Either t (Client header point tip m t))
, points :: [point] -> m (Client header point tip m t)
, points :: [point] -> m (Either t (Client header point tip m t))
}

-- | A client which doesn't do anything and never ends. Used with
Expand All @@ -43,7 +43,7 @@ pureClient :: Applicative m => Client header point tip m void
pureClient = Client
{ rollbackward = \_ _ -> pure (Right pureClient)
, rollforward = \_ -> pure (Right pureClient)
, points = \_ -> pure pureClient
, points = \_ -> pure (Right pureClient)
}

controlledClient :: MonadSTM m
Expand All @@ -64,7 +64,7 @@ controlledClient controlMessageSTM = go
Continue -> pure (Right go)
Quiesce -> error "Ouroboros.Network.Protocol.ChainSync.Examples.controlledClient: unexpected Quiesce"
Terminate -> pure (Left ())
, points = \_ -> pure go
, points = \_ -> pure (Right go)
}


Expand All @@ -80,7 +80,7 @@ chainSyncClientExample :: forall header tip m a.
-> Client header (Point header) tip m a
-> ChainSyncClient header (Point header) tip m a
chainSyncClientExample chainvar client = ChainSyncClient $
initialise <$> getChainPoints
either SendMsgDone initialise <$> getChainPoints
where
initialise :: ([Point header], Client header (Point header) tip m a)
-> ClientStIdle header (Point header) tip m a
Expand Down Expand Up @@ -126,11 +126,13 @@ chainSyncClientExample chainvar client = ChainSyncClient $
Right client'' -> requestNext client''
}

getChainPoints :: m ([Point header], Client header (Point header) tip m a)
getChainPoints :: m (Either a ([Point header], Client header (Point header) tip m a))
getChainPoints = do
pts <- Chain.selectPoints recentOffsets <$> atomically (readTVar chainvar)
client' <- points client pts
pure (pts, client')
choice <- points client pts
pure $ case choice of
Left a -> Left a
Right client' -> Right (pts, client')

addBlock :: header -> m ()
addBlock b = atomically $ do
Expand Down
Expand Up @@ -41,7 +41,7 @@ chainSyncClientPipelined
-> Client header (Point header) (Tip header) m a
-> ChainSyncClientPipelined header (Point header) (Tip header) m a
chainSyncClientPipelined mkPipelineDecision0 chainvar =
ChainSyncClientPipelined . fmap initialise . getChainPoints
ChainSyncClientPipelined . fmap (either SendMsgDone initialise) . getChainPoints
where
initialise :: ([Point header], Client header (Point header) (Tip header) m a)
-> ClientPipelinedStIdle Z header (Point header) (Tip header) m a
Expand Down Expand Up @@ -160,11 +160,13 @@ chainSyncClientPipelined mkPipelineDecision0 chainvar =


getChainPoints :: Client header (Point header) (Tip header) m a
-> m ([Point header], Client header (Point header) (Tip header) m a)
-> m (Either a ([Point header], Client header (Point header) (Tip header) m a))
getChainPoints client = do
pts <- Chain.selectPoints recentOffsets <$> atomically (readTVar chainvar)
client' <- points client pts
pure (pts, client')
choice <- points client pts
pure $ case choice of
Left a -> Left a
Right client' -> Right (pts, client')

addBlock :: header -> m ()
addBlock b = atomically $ do
Expand Down
Expand Up @@ -124,7 +124,7 @@ testClient doneVar tip =
atomically $ writeTVar doneVar True
return $ Left ()
else return $ Right (testClient doneVar tip),
ChainSyncExamples.points = \_ -> return (testClient doneVar tip)
ChainSyncExamples.points = \_ -> return (Right $ testClient doneVar tip)
}

-- | An experiment in which the client has a fork of the server chain. The
Expand Down
2 changes: 1 addition & 1 deletion ouroboros-network/src/Ouroboros/Network/BlockFetch.hs
Expand Up @@ -138,7 +138,7 @@ data BlockFetchConfiguration =
-- | Maximum requests in flight per each peer.
bfcMaxRequestsInflight :: !Word,

-- | Desired intervall between calls to fetchLogicIteration
-- | Desired interval between calls to fetchLogicIteration
bfcDecisionLoopInterval :: !DiffTime,

-- | Salt used when comparing peers
Expand Down
Expand Up @@ -168,7 +168,12 @@ connections PeerSelectionActions{
Map.keysSet (EstablishedPeers.toMap establishedPeers'))
Decision {
decisionTrace = [ TraceDemoteLocalAsynchronous localDemotions
, TraceDemoteAsynchronous nonLocalDemotions],
| not $ null localDemotions
]
<> [ TraceDemoteAsynchronous nonLocalDemotions
| not $ null nonLocalDemotions
],

decisionJobs = [],
decisionState = st {
activePeers = activePeers',
Expand Down

0 comments on commit 6d5a968

Please sign in to comment.