Skip to content

Commit

Permalink
Set jumpers to jump as soon as the dynamo changes
Browse files Browse the repository at this point in the history
  • Loading branch information
facundominguez committed Apr 18, 2024
1 parent 5e2ce8d commit e561506
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 30 deletions.
Expand Up @@ -32,14 +32,15 @@ import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import GHC.Generics (Generic)
import Ouroboros.Consensus.Block (GenesisWindow (unGenesisWindow),
HasHeader, Header, castPoint, pointSlot,
HasHeader, Header, Point(..), castPoint, pointSlot,
succWithOrigin)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State
(ChainSyncClientHandle (..), ChainSyncJumpingState (..),
ChainSyncState (..))
import Ouroboros.Consensus.Util.IOLike hiding (handle)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
headPoint, headSlot)
AnchoredSeq(Empty), Anchor(AnchorGenesis), headPoint,
headSlot)

-- | A context for ChainSync jumping, pointing for some data.
--
Expand Down Expand Up @@ -116,12 +117,13 @@ nextInstruction context =
readTVar (cschJumping (handle context)) >>= \case
Dynamo lastJumpSlot -> maybeSetNextJump lastJumpSlot >> pure RunNormally
Objector _ -> pure RunNormally
Jumper nextJumpVar ->
readTVar nextJumpVar >>= \case
Nothing -> retry
Just fragment -> do
writeTVar nextJumpVar Nothing
pure $ JumpTo fragment
Jumper nextJumpVar -> do
fragment <- readTVar nextJumpVar
if headPoint fragment == GenesisPoint then
retry
else do
writeTVar nextJumpVar (Empty AnchorGenesis)
pure $ JumpTo fragment
where
-- | We are the dynamo. When the tip of our candidate fragment is 'jumpSize'
-- slots younger than the last jump, set jumpers to jump to it.
Expand All @@ -131,14 +133,10 @@ nextInstruction context =
handles <- readTVar (handlesVar context)
forM_ (Map.elems handles) $ \ChainSyncClientHandle{cschJumping = cschJumping'} ->
readTVar cschJumping' >>= \case
Jumper nextJumpVar -> writeTVar nextJumpVar $ Just dynamoFragment
Jumper nextJumpVar -> writeTVar nextJumpVar dynamoFragment
_ -> pure ()
writeTVar (cschJumping (handle context)) $ Dynamo (headSlot dynamoFragment)

-- | The CSJ dynamo offers an empty genesis window in the chain it is serving.
data DynamoOffersEmptyGenesisWindow = DynamoOffersEmptyGenesisWindow
deriving (Exception, Show)

-- | This function is called when we receive a 'MsgRollForward' message.
--
-- When a dynamo rolls forward, we might need to jump to the candidate
Expand All @@ -152,7 +150,6 @@ data DynamoOffersEmptyGenesisWindow = DynamoOffersEmptyGenesisWindow
--
onRollForward ::
( MonadSTM m,
MonadThrow (STM m),
HasHeader blk,
HasHeader (Header blk)
) =>
Expand All @@ -167,17 +164,16 @@ onRollForward context slot =
| slot >= succWithOrigin lastJumpSlot + genesisWindowSlot -> do
fragment <- csCandidate <$> readTVar (cschState (handle context))
let csTipPoint = headPoint fragment
if pointSlot csTipPoint > lastJumpSlot
then setJumps fragment
else throwSTM DynamoOffersEmptyGenesisWindow
when (pointSlot csTipPoint > lastJumpSlot) $
setJumps fragment
| otherwise -> pure ()
where
genesisWindowSlot = SlotNo (unGenesisWindow (genesisWindow context))
setJumps fragment = do
handles <- readTVar (handlesVar context)
forM_ (Map.elems handles) $ \h ->
readTVar (cschJumping h) >>= \case
Jumper nextJumpVar -> writeTVar nextJumpVar $ Just fragment
Jumper nextJumpVar -> writeTVar nextJumpVar fragment
_ -> pure ()

-- | This function is called when we receive a 'MsgAwaitReply' message.
Expand Down Expand Up @@ -250,9 +246,10 @@ newJumper ::
HasHeader blk,
NoThunks (Header blk)
) =>
AnchoredFragment (Header blk) ->
STM m (ChainSyncJumpingState m blk)
newJumper = do
nextJumpVar <- newTVar Nothing
newJumper nextJump = do
nextJumpVar <- newTVar nextJump
pure $ Jumper nextJumpVar

-- | Register a new ChainSync client to a context, returning a 'PeerContext' for
Expand All @@ -270,12 +267,12 @@ registerClient ::
(StrictTVar m (ChainSyncJumpingState m blk) -> ChainSyncClientHandle m blk) ->
STM m (PeerContext m peer blk)
registerClient context peer mkHandle = do
cschJumping <-
newTVar
=<< ( getDynamo (handlesVar context) >>= \case
Nothing -> pure $ Dynamo Origin
Just _ -> newJumper
)
csjState <- getDynamo (handlesVar context) >>= \case
Nothing -> pure $ Dynamo Origin
Just handle -> do
fragment <- csCandidate <$> readTVar (cschState handle)
newJumper fragment
cschJumping <- newTVar csjState
let handle = mkHandle cschJumping
modifyTVar (handlesVar context) $ Map.insert peer handle
pure $ context {peer, handle}
Expand Down Expand Up @@ -316,8 +313,9 @@ electNewDynamo context = do
Just (dynId, dynamo) -> do
writeTVar (cschJumping dynamo) $ Dynamo Origin
forM_ peerStates $ \(peer, st) ->
when (peer /= dynId) $
writeTVar (cschJumping st) =<< newJumper
when (peer /= dynId) $ do
fragment <- csCandidate <$> readTVar (cschState dynamo)
writeTVar (cschJumping st) =<< newJumper fragment
where
findNonIdling [] = pure Nothing
findNonIdling ((peer, st) : rest) = do
Expand Down
Expand Up @@ -91,8 +91,8 @@ data ChainSyncJumpingState m blk
-- they become candidates to be the objector. See
-- 'ChainSyncJumpingJumperState' for more details.
Jumper
-- | A TVar containing the next jump to be executed, if there is one.
!(StrictTVar m (Maybe (AnchoredFragment (Header blk))))
-- | A TVar containing the next jump to be executed.
!(StrictTVar m (AnchoredFragment (Header blk)))
deriving (Generic)

deriving anyclass instance (IOLike m, HasHeader blk, NoThunks (Header blk)) => NoThunks (ChainSyncJumpingState m blk)

0 comments on commit e561506

Please sign in to comment.