Skip to content

Commit

Permalink
Have objectors send their last good intersection to servers so they k…
Browse files Browse the repository at this point in the history
…now which headers to send
  • Loading branch information
facundominguez committed Apr 29, 2024
1 parent 0c81dfd commit 5f1657c
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ traceSchedulerEventTestBlockWith setTickTime tracer0 _tracer = \case
traceJumpingState :: ChainSyncJumpingState m TestBlock -> String
traceJumpingState = \case
Dynamo lastJump -> "Dynamo " ++ terseWithOrigin show lastJump
Objector goodFragment badPoint -> unwords
Objector initState goodFragment badPoint -> unwords
[ "Objector"
, show initState
, tersePoint (castPoint $ headPoint goodFragment)
, tersePoint (castPoint badPoint)
]
Expand Down Expand Up @@ -386,6 +387,10 @@ traceChainSyncClientEventTestBlockWith pid tracer = \case
trace $ "Accepted jump to " ++ tersePoint (castPoint $ headPoint $ jTheirFragment ji)
TraceJumpResult (RejectedJump ji) ->
trace $ "Rejected jump to " ++ tersePoint (castPoint $ headPoint $ jTheirFragment ji)
TraceJumpResult (AcceptedGoodPointJump fragment) ->
trace $ "Accepted jump to good point: " ++ terseHFragment fragment
TraceJumpResult (RejectedGoodPointJump fragment) ->
trace $ "Rejected jump to good point: " ++ terseHFragment fragment
TraceJumpingWaitingForNextInstruction ->
trace "Waiting for next instruction from the jumping governor"
TraceJumpingInstructionIs instr ->
Expand All @@ -396,6 +401,7 @@ traceChainSyncClientEventTestBlockWith pid tracer = \case
showInstr :: Instruction TestBlock -> String
showInstr = \case
JumpTo ji -> "JumpTo " ++ tersePoint (castPoint $ headPoint $ jTheirFragment ji)
JumpToGoodPoint fragment -> "JumpToGoodPoint " ++ terseHFragment fragment
RunNormally -> "RunNormally"

traceChainSyncClientTerminationEventTestBlockWith ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,11 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv =
Jumping.JumpTo jumpInfo ->
continueWithState kis
$ drainThePipe n
$ offerJump mkPipelineDecision jumpInfo
$ offerJump mkPipelineDecision (Right jumpInfo)
Jumping.JumpToGoodPoint fragment ->
continueWithState kis
$ drainThePipe n
$ offerJump mkPipelineDecision (Left fragment)
Jumping.RunNormally -> do
lbResume loPBucket
continueWithState kis
Expand Down Expand Up @@ -1215,29 +1219,42 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv =

offerJump ::
MkPipelineDecision
-> JumpInfo blk
-> Either (AnchoredFragment (Header blk)) (JumpInfo blk)
-> Stateful m blk
(KnownIntersectionState blk)
(ClientPipelinedStIdle Z)
offerJump mkPipelineDecision jumpInfo = Stateful $ \kis -> do
let dynamoTipPt = castPoint $ AF.headPoint $ jTheirFragment jumpInfo
offerJump mkPipelineDecision jump = Stateful $ \kis -> do
let dynamoTipPt = castPoint $ AF.headPoint $ either id jTheirFragment jump
traceWith tracer $ TraceOfferJump dynamoTipPt
return $
SendMsgFindIntersect [dynamoTipPt] $
ClientPipelinedStIntersect {
recvMsgIntersectFound = \pt theirTip ->
if
| pt == dynamoTipPt -> do
Jumping.jgProcessJumpResult jumping $ Jumping.AcceptedJump jumpInfo
traceWith tracer $ TraceJumpResult $ Jumping.AcceptedJump jumpInfo
let kis' = combineJumpInfo kis jumpInfo
continueWithState kis' $ nextStep mkPipelineDecision Zero (Their theirTip)
case jump of
Right jumpInfo -> do
Jumping.jgProcessJumpResult jumping $ Jumping.AcceptedJump jumpInfo
traceWith tracer $ TraceJumpResult $ Jumping.AcceptedJump jumpInfo
let kis' = combineJumpInfo kis jumpInfo
continueWithState kis' $ nextStep mkPipelineDecision Zero (Their theirTip)
Left fragment -> do
Jumping.jgProcessJumpResult jumping $ Jumping.AcceptedGoodPointJump fragment
traceWith tracer $ TraceJumpResult $ Jumping.AcceptedGoodPointJump fragment
let kis' = combineJumpFragment kis fragment
continueWithState kis' $ nextStep mkPipelineDecision Zero (Their theirTip)
| otherwise -> throwIO InvalidJumpResponse
,
recvMsgIntersectNotFound = \theirTip -> do
Jumping.jgProcessJumpResult jumping $ Jumping.RejectedJump jumpInfo
traceWith tracer $ TraceJumpResult $ Jumping.RejectedJump jumpInfo
continueWithState kis $ nextStep mkPipelineDecision Zero (Their theirTip)
case jump of
Right jumpInfo -> do
Jumping.jgProcessJumpResult jumping $ Jumping.RejectedJump jumpInfo
traceWith tracer $ TraceJumpResult $ Jumping.RejectedJump jumpInfo
continueWithState kis $ nextStep mkPipelineDecision Zero (Their theirTip)
Left fragment -> do
Jumping.jgProcessJumpResult jumping $ Jumping.RejectedGoodPointJump fragment
traceWith tracer $ TraceJumpResult $ Jumping.RejectedGoodPointJump fragment
continueWithState kis $ nextStep mkPipelineDecision Zero (Their theirTip)
}
where
combineJumpInfo ::
Expand Down Expand Up @@ -1279,6 +1296,35 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv =
, kBestBlockNo = max (fromWithOrigin 0 $ AF.headBlockNo $ jTheirFragment ji) (kBestBlockNo kis)
}

combineJumpFragment ::
KnownIntersectionState blk
-> AnchoredFragment (Header blk)
-> KnownIntersectionState blk
combineJumpFragment kis@KnownIntersectionState{ourFrag} fragment =
let mRewoundHistory =
HeaderStateHistory.rewind
(AF.castPoint $ AF.headPoint fragment)
(theirHeaderStateHistory kis)
rewoundHistory =
-- When jumping to a good point, the history should be
-- rewindable because it was rewindable when the good point
-- was discovered.
fromMaybe (error "offerJump: cannot rewind history for a good point") mRewoundHistory
intersection =
case AF.intersect ourFrag fragment of
Just (po, _, _, _) -> castPoint $ AF.headPoint po
-- ourFrag should intersect with the good fragment, or otherwise
-- it would not intersect with it at the time the good
-- fragment was discovered.
Nothing -> error "offerJump: the fragment should have a valid intersection with the current selection"
in KnownIntersectionState
{ mostRecentIntersection = intersection
, ourFrag = ourFrag
, theirFrag = fragment
, theirHeaderStateHistory = rewoundHistory
, kBestBlockNo = max (fromWithOrigin 0 $ AF.headBlockNo fragment) (kBestBlockNo kis)
}

requestNext ::
KnownIntersectionState blk
-> MkPipelineDecision
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State
(ChainSyncClientHandle (..),
ChainSyncJumpingJumperState (..),
ChainSyncJumpingState (..), ChainSyncState (..),
JumpInfo (..))
JumpInfo (..), ObjectorInitState (..))
import Ouroboros.Consensus.Util.IOLike hiding (handle)
import qualified Ouroboros.Network.AnchoredFragment as AF

Expand Down Expand Up @@ -298,6 +298,10 @@ data Instruction blk
= RunNormally
| -- | Jump to the tip of the given fragment.
JumpTo !(JumpInfo blk)
| -- | Used to set the intersection of the servers of starting objectors.
-- Otherwise, the ChainSync server wouldn't know which headers to start
-- serving.
JumpToGoodPoint !(AF.AnchoredFragment (Header blk))
deriving (Generic)

deriving instance (HasHeader (Header blk), Eq (Header blk)) => Eq (Instruction blk)
Expand All @@ -307,6 +311,8 @@ instance (HasHeader (Header blk), Show (Header blk)) => Show (Instruction blk) w
RunNormally -> showString "RunNormally"
JumpTo jumpInfo ->
showParen (p > 10) $ showString "JumpTo " . shows (AF.headPoint $ jTheirFragment jumpInfo)
JumpToGoodPoint fragment ->
showParen (p > 10) $ showString "JumpToGoodPoint " . shows (AF.headPoint fragment)

deriving anyclass instance
( HasHeader blk,
Expand All @@ -318,6 +324,8 @@ deriving anyclass instance
data JumpResult blk
= AcceptedJump !(JumpInfo blk)
| RejectedJump !(JumpInfo blk)
| AcceptedGoodPointJump !(AF.AnchoredFragment (Header blk))
| RejectedGoodPointJump !(AF.AnchoredFragment (Header blk))
deriving (Generic)

deriving instance (HasHeader (Header blk), Eq (Header blk)) => Eq (JumpResult blk)
Expand All @@ -328,6 +336,10 @@ instance (HasHeader (Header blk), Show (Header blk)) => Show (JumpResult blk) wh
showParen (p > 10) $ showString "AcceptedJump " . shows (AF.headPoint $ jTheirFragment jumpInfo)
RejectedJump jumpInfo ->
showParen (p > 10) $ showString "RejectedJump " . shows (AF.headPoint $ jTheirFragment jumpInfo)
AcceptedGoodPointJump fragment ->
showParen (p > 10) $ showString "AcceptedGoodPointJump " . shows (AF.headPoint fragment)
RejectedGoodPointJump fragment ->
showParen (p > 10) $ showString "RejectedGoodPointJump " . shows (AF.headPoint fragment)

deriving anyclass instance
( HasHeader blk,
Expand Down Expand Up @@ -357,7 +369,9 @@ nextInstruction context = whenEnabled context RunNormally $
readTVar (cschJumping (handle context)) >>= \case
Disengaged -> pure RunNormally
Dynamo{} -> pure RunNormally
Objector{} -> pure RunNormally
Objector Starting goodFragment _ -> do
pure $ JumpToGoodPoint goodFragment
Objector Started _ _ -> pure RunNormally
Jumper nextJumpVar _ _ -> do
readTVar nextJumpVar >>= \case
Nothing -> retry
Expand All @@ -383,7 +397,7 @@ onRollForward :: forall m peer blk.
STM m ()
onRollForward context point = whenEnabled context () $
readTVar (cschJumping (handle context)) >>= \case
Objector _ badPoint
Objector _ _ badPoint
| badPoint == castPoint point -> do
disengage (handle context)
electNewObjector (stripContext context)
Expand Down Expand Up @@ -423,7 +437,7 @@ onRollBackward :: forall m peer blk.
STM m ()
onRollBackward context slot = whenEnabled context () $
readTVar (cschJumping (handle context)) >>= \case
Objector _ badPoint
Objector _ _ badPoint
| slot < pointSlot badPoint -> do
disengage (handle context)
electNewObjector (stripContext context)
Expand Down Expand Up @@ -479,7 +493,22 @@ processJumpResult context jumpResult = whenEnabled context () $
readTVar (cschJumping (handle context)) >>= \case
Dynamo _ -> pure ()
Disengaged -> pure ()
Objector{} -> pure ()
Objector Starting goodFragment badPoint ->
case jumpResult of
AcceptedGoodPointJump fragment -> do
writeTVar (cschJumping (handle context)) $
Objector Started goodFragment badPoint
updateChainSyncState (handle context) fragment
RejectedGoodPointJump{} ->
-- If the objector rejects a good point, it is a sign of a rollback
-- to earlier than the last jump.
disengage (handle context)

-- Not interesting in the objector state
AcceptedJump{} -> pure ()
RejectedJump{} -> pure ()

Objector Started _ _ -> pure ()
Jumper nextJumpVar goodFragment jumperState ->
case jumpResult of
AcceptedJump jumpInfo -> do
Expand All @@ -489,8 +518,7 @@ processJumpResult context jumpResult = whenEnabled context () $
-- The candidate fragments of jumpers don't grow otherwise, as only the
-- objector and the dynamo request further headers.
let fragment = jTheirFragment jumpInfo
modifyTVar (cschState (handle context)) $ \csState ->
csState {csCandidate = fragment, csLatestSlot = Just (AF.headSlot fragment) }
updateChainSyncState (handle context) fragment
writeTVar (cschJumpInfo (handle context)) $ Just jumpInfo
case jumperState of
LookingForIntersection badJumpInfo ->
Expand All @@ -517,9 +545,19 @@ processJumpResult context jumpResult = whenEnabled context () $
-- jumper is looking for an intersection, and such jumper only asks
-- for jumps that meet this condition.
lookForIntersection nextJumpVar goodFragment badJumpInfo

-- These aren't interesting in the case of jumpers.
AcceptedGoodPointJump{} -> pure ()
RejectedGoodPointJump{} -> pure ()
where
-- Avoid redundant constraint "HasHeader blk" reported by some ghc's
_ = getHeaderFields @blk

updateChainSyncState :: ChainSyncClientHandle m blk -> AF.AnchoredFragment (Header blk) -> STM m ()
updateChainSyncState handle fragment = do
modifyTVar (cschState handle) $ \csState ->
csState {csCandidate = fragment, csLatestSlot = Just (AF.headSlot fragment) }

-- | Given a good point (where we know we agree with the dynamo) and a bad
-- fragment (where we know the tip disagrees with the dynamo), either decide
-- that we know the intersection for sure (if the bad point is the successor
Expand Down Expand Up @@ -552,7 +590,7 @@ processJumpResult context jumpResult = whenEnabled context () $
findObjector (stripContext context) >>= \case
Nothing ->
-- There is no objector yet. Promote the jumper to objector.
writeTVar (cschJumping (handle context)) (Objector goodFragment badPoint)
writeTVar (cschJumping (handle context)) (Objector Starting goodFragment badPoint)
Just (objectorGoodFragment, objectorPoint, objectorHandle)
| pointSlot objectorPoint <= pointSlot badPoint ->
-- The objector's intersection is still old enough. Keep it.
Expand All @@ -563,7 +601,7 @@ processJumpResult context jumpResult = whenEnabled context () $
-- promote the jumper to objector.
newJumper Nothing objectorGoodFragment (FoundIntersection objectorPoint) >>=
writeTVar (cschJumping objectorHandle)
writeTVar (cschJumping (handle context)) (Objector goodFragment badPoint)
writeTVar (cschJumping (handle context)) (Objector Starting goodFragment badPoint)

updateJumpInfo ::
(MonadSTM m) =>
Expand Down Expand Up @@ -702,7 +740,7 @@ findObjector context = do
go [] = pure Nothing
go ((_, handle):xs) =
readTVar (cschJumping handle) >>= \case
Objector goodFragment badPoint -> pure $ Just (goodFragment, badPoint, handle)
Objector _ goodFragment badPoint -> pure $ Just (goodFragment, badPoint, handle)
_ -> go xs

-- | Look into all dissenting jumper and promote the one with the oldest
Expand All @@ -717,7 +755,7 @@ electNewObjector context = do
let sortedJumpers = sortOn (pointSlot . fst) dissentingJumpers
case sortedJumpers of
(badPoint, (goodFragment, handle)):_ ->
writeTVar (cschJumping handle) $ Objector goodFragment badPoint
writeTVar (cschJumping handle) $ Objector Starting goodFragment badPoint
_ ->
pure ()
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State (
, ChainSyncJumpingState (..)
, ChainSyncState (..)
, JumpInfo (..)
, ObjectorInitState (..)
) where

import Cardano.Slotting.Slot (SlotNo, WithOrigin)
Expand Down Expand Up @@ -86,6 +87,13 @@ deriving anyclass instance (
NoThunks (Header blk)
) => NoThunks (ChainSyncClientHandle m blk)

data ObjectorInitState
= -- | The objector still needs to set the intersection of the ChainSync
-- server before resuming retrieval of headers.
Starting
| Started
deriving (Generic, Show, NoThunks)

-- | State of a peer with respect to ChainSync jumping.
data ChainSyncJumpingState m blk
= -- | The dynamo, of which there is exactly one unless there are no peers,
Expand All @@ -101,6 +109,7 @@ data ChainSyncJumpingState m blk
-- that happened, we spun it up to let normal ChainSync and Genesis decide
-- which one to disconnect from.
Objector
ObjectorInitState
-- | The youngest point where the objector agrees with the dynamo.
!(AnchoredFragment (Header blk))
-- | The point where the objector dissented with the dynamo when it was a
Expand Down

0 comments on commit 5f1657c

Please sign in to comment.