Skip to content

Commit

Permalink
Preserve the candidate fragment when demoting objectors
Browse files Browse the repository at this point in the history
  • Loading branch information
facundominguez committed Apr 23, 2024
1 parent f6cfcff commit 5631b42
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,17 @@ traceSchedulerEventTestBlockWith setTickTime tracer0 _tracer = \case
traceJumpingState :: ChainSyncJumpingState m TestBlock -> String
traceJumpingState = \case
Dynamo restart lastJump -> "Dynamo " ++ show restart ++ " " ++ terseWithOrigin show lastJump
Objector restart badPoint ->
"Objector " ++ show restart ++ " " ++ tersePoint (castPoint badPoint)
Objector restart badFragment ->
"Objector " ++ show restart ++ " " ++ tersePoint (castPoint $ headPoint badFragment)
Disengaged -> "Disengaged"
Jumper _ goodPoint st ->
"Jumper _ " ++ tersePoint (castPoint goodPoint) ++ " " ++ traceJumperState st

traceJumperState :: ChainSyncJumpingJumperState TestBlock -> String
traceJumperState = \case
Happy -> "Happy"
FoundIntersection point ->
"(FoundIntersection " ++ (tersePoint $ castPoint point) ++ ")"
FoundIntersection badFragment ->
"(FoundIntersection " ++ (tersePoint $ castPoint $ headPoint badFragment) ++ ")"
LookingForIntersection fragment ->
"(LookingForIntersection " ++ tersePoint (castPoint $ headPoint fragment) ++ ")"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ import qualified Ouroboros.Network.AnchoredFragment as AF
-- | Hooks for ChainSync jumping.
data Jumping m blk = Jumping
{ -- | Get the next instruction to execute, which can be either to run normal
-- ChainSync or to jump to a given point. When the peer is a jumper and
-- there is no jump request, 'jgNextInstruction' blocks until a jump request
-- is made.
-- ChainSync, restart ChainSync, or jump to a given point. When the peer is
-- a jumper and there is no jump request, 'jgNextInstruction' blocks until a
-- jump request is made.
--
-- Restarting ChainSync is a device to rebuild the internal ChainSync state
-- that depends on the candidate fragment (like the header state history).
-- Restarts are requested when a client transitions from being a jumper
-- to being an objector or dynamo.
jgNextInstruction :: !(m (Instruction blk)),

-- | To be called whenever the peer claims to have no more headers.
Expand Down Expand Up @@ -165,7 +170,11 @@ mkJumping ::
PeerContext m peer blk ->
Jumping m blk
mkJumping peerContext = Jumping
{ jgNextInstruction = atomically $ nextInstruction peerContext
{ jgNextInstruction = do
-- Demoted objectors might need to set their candidate fragment before
-- blocking for the next jump.
atomically $ resetCandidateFragmentOfDemotedObjector (handle peerContext)
atomically $ nextInstruction peerContext
, jgOnAwaitReply = atomically $ onAwaitReply peerContext
, jgOnRollForward = atomically . onRollForward peerContext
, jgOnRollBackward = atomically . onRollBackward peerContext
Expand Down Expand Up @@ -274,6 +283,37 @@ nextInstruction context = whenEnabled context RunNormally $
writeTVar nextJumpVar (AF.Empty AF.AnchorGenesis)
pure $ JumpTo fragment


-- A demoted objector might end up with a shorter candidate fragment than
-- the last jump. This is because the objector restarts chain sync and might
-- not download sufficient headers before being demoted. Here we correct the
-- candidate fragment before blocking for the next jump.
resetCandidateFragmentOfDemotedObjector :: forall m blk.
( MonadSTM m,
HasHeader blk,
HasHeader (Header blk)
) =>
ChainSyncClientHandle m blk ->
STM m ()
resetCandidateFragmentOfDemotedObjector h = do
readTVar (cschJumping h) >>= \case
Jumper _ _ (FoundIntersection badFragment) -> do
csState <- readTVar (cschState h)
let goodFragment = AF.dropNewest 1 badFragment
when (AF.headSlot (csCandidate csState) < AF.headSlot goodFragment) $
writeTVar (cschState h) $
csState
{ csCandidate = goodFragment
, csLatestSlot =
max
(Just $ AF.headSlot badFragment)
(csLatestSlot csState)
}
_ -> pure ()
where
-- Avoid redundant constraint "HasHeader blk" reported by some ghc's
_ = getHeaderFields @blk

-- | This function is called when we receive a 'MsgRollForward' message.
--
-- We request jumpers to jump here if the next header received by the dynamo is
Expand All @@ -292,8 +332,8 @@ onRollForward :: forall m peer blk.
STM m ()
onRollForward context point = whenEnabled context () $
readTVar (cschJumping (handle context)) >>= \case
Objector _ badPoint
| badPoint == castPoint point -> do
Objector _ badFragment
| AF.headPoint badFragment == castPoint point -> do
disengage (handle context)
electNewObjector (stripContext context)
| otherwise -> pure ()
Expand Down Expand Up @@ -333,8 +373,8 @@ onRollBackward :: forall m peer blk.
STM m ()
onRollBackward context slot = whenEnabled context () $
readTVar (cschJumping (handle context)) >>= \case
Objector _ badPoint
| slot < pointSlot badPoint -> do
Objector _ badFragment
| slot < AF.headSlot badFragment -> do
disengage (handle context)
electNewObjector (stripContext context)
| otherwise -> pure ()
Expand Down Expand Up @@ -447,7 +487,7 @@ processJumpResult context jumpResult = whenEnabled context () $
-- intersection is the good point.
writeTVar (cschJumping (handle context)) $
Jumper nextJumpVar goodPoint $
FoundIntersection $ AF.headPoint badFragment
FoundIntersection badFragment
demoteObjector (stripContext context)
electNewObjector (stripContext context)
else do
Expand Down Expand Up @@ -578,6 +618,7 @@ findM p (x : xs) = p x >>= \case
--- | If there is an objector, demote it back to being a jumper.
demoteObjector ::
( MonadSTM m,
HasHeader blk,
HasHeader (Header blk),
NoThunks (Header blk)
) =>
Expand All @@ -589,8 +630,16 @@ demoteObjector context = do
forM_ mObjector $ \(_peer, handle) ->
readTVar (cschJumping handle) >>= \case
Objector _ badFragment -> do
newJumper (AF.Empty AF.AnchorGenesis) (FoundIntersection badFragment) >>=
writeTVar (cschJumping handle)
-- Demoting an objector requires setting its candidate fragment to at
-- least the last accepted jump. Because the objector might be in the
-- middle of restarting ChainSync, we also check that the candidate
-- fragment is sufficiently advanced before asking for the next CSJ
-- instruction.
resetCandidateFragmentOfDemotedObjector handle
nextJumpVar <- newTVar (AF.Empty AF.AnchorGenesis)
let goodFragment = AF.dropNewest 1 badFragment
writeTVar (cschJumping handle) $
Jumper nextJumpVar (AF.headPoint goodFragment) (FoundIntersection badFragment)
_ -> pure ()
where
findObjector =
Expand All @@ -602,24 +651,26 @@ demoteObjector context = do
--- | Look into all objector candidates and promote the one with the oldest
--- intersection with the dynamo as the new objector.
electNewObjector ::
( MonadSTM m ) =>
( MonadSTM m,
HasHeader (Header blk)
) =>
Context m peer blk ->
STM m ()
electNewObjector context = do
peerStates <- Map.toList <$> readTVar (handlesVar context)
dissentingJumpers <- collectDissentingJumpers peerStates
let sortedJumpers = sortOn (pointSlot . fst) dissentingJumpers
let sortedJumpers = sortOn (AF.headSlot . fst) dissentingJumpers
case sortedJumpers of
[] ->
pure ()
(badPoint, handle):_ ->
writeTVar (cschJumping handle) $ Objector ToRestart badPoint
(badFragment, handle):_ ->
writeTVar (cschJumping handle) $ Objector ToRestart badFragment
where
collectDissentingJumpers peerStates =
fmap catMaybes $
forM peerStates $ \(_, handle) ->
readTVar (cschJumping handle) >>= \case
Jumper _ _ (FoundIntersection badPoint) ->
pure $ Just (badPoint, handle)
Jumper _ _ (FoundIntersection badFragment) ->
pure $ Just (badFragment, handle)
_ ->
pure Nothing
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ data ChainSyncJumpingState m blk
-- which one to disconnect from.
Objector
Restart
-- | The point where the objector dissented with the dynamo when it was a
-- jumper.
!(Point (Header blk))
-- | The fragment with whose tip the objector dissented with the dynamo
-- when it was a jumper.
!(AnchoredFragment (Header blk))
| -- | Headers continued to be downloaded from 'Disengaged' peers. They
-- are not requested to jump, nor elected as dynamos or objectors.
Disengaged
Expand Down Expand Up @@ -135,9 +135,9 @@ data ChainSyncJumpingJumperState blk
LookingForIntersection !(AnchoredFragment (Header blk))
| -- | The jumper disagrees with the dynamo and we have determined the latest
-- point where dynamo and jumper agree. This point is stored in the 'Jumper'
-- constructor of 'ChainSyncJumpingState'. This constructor carries the
-- oldest point of disagreement.
FoundIntersection !(Point (Header blk))
-- constructor of 'ChainSyncJumpingState'. This constructor carries a
-- fragment whose tip is the oldest point of disagreement.
FoundIntersection !(AnchoredFragment (Header blk))
deriving (Generic)

deriving anyclass instance (HasHeader blk, NoThunks (Header blk)) => NoThunks (ChainSyncJumpingJumperState blk)

0 comments on commit 5631b42

Please sign in to comment.