Skip to content

Commit

Permalink
Make the LoP follow the GSM states
Browse files Browse the repository at this point in the history
This also involves some fairly important changes to the leaky bucket and
to the leaky bucket API. These changes make the leaky bucket
significantly more robust.

Co-authored-by: Alexander Esgen <alexander.esgen@iohk.io>
  • Loading branch information
Niols and amesgen committed May 6, 2024
1 parent f6764e7 commit e13e081
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import qualified Codec.CBOR.Decoding as CBOR
import Codec.CBOR.Encoding (Encoding)
import qualified Codec.CBOR.Encoding as CBOR
import Codec.CBOR.Read (DeserialiseFailure)
import Control.Concurrent.Class.MonadSTM.Strict.TVar as TVar.Unchecked
import qualified Control.Concurrent.Class.MonadSTM.Strict.TVar as TVar.Unchecked
import Control.Monad.Class.MonadTime.SI (MonadTime)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.Tracer
Expand Down Expand Up @@ -571,6 +571,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
(contramap (TraceLabelPeer them) (Node.chainSyncClientTracer (getTracers kernel)))
(CsClient.defaultChainDbView (getChainDB kernel))
(getChainSyncHandles kernel)
(getGsmState kernel)
them
version
lopBucketConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
LedgerPeersConsensusInterface {
lpGetLatestSlot = getImmTipSlot kernel,
lpGetLedgerPeers = fromMaybe [] <$> getPeersFromCurrentLedger kernel (const True),
lpGetLedgerStateJudgement = getLedgerStateJudgement kernel
lpGetLedgerStateJudgement = GSM.gsmStateToLedgerJudgement <$> getGsmState kernel
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ module Ouroboros.Consensus.Node.GSM (
-- * Auxiliaries
, TraceGsmEvent (..)
, gsmStateToLedgerJudgement
, initializationLedgerJudgement
, initializationGsmState
-- * Constructors
, realDurationUntilTooOld
, realGsmEntryPoints
, realMarkerFileView
-- * Re-exported
, module Ouroboros.Consensus.Node.GsmState
) where

import qualified Cardano.Slotting.Slot as Slot
Expand All @@ -43,6 +45,7 @@ import qualified Ouroboros.Consensus.HardFork.Abstract as HardFork
import qualified Ouroboros.Consensus.HardFork.History as HardFork
import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry
import qualified Ouroboros.Consensus.Ledger.Basics as L
import Ouroboros.Consensus.Node.GsmState
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar)
import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM
Expand Down Expand Up @@ -80,19 +83,6 @@ data CandidateVersusSelection =
-- ^ Whether the candidate is better than the selection
deriving (Eq, Show)

-- | Current state of the Genesis State Machine
data GsmState =
PreSyncing
-- ^ We are syncing, and the Honest Availability Assumption is not
-- satisfied.
|
Syncing
-- ^ We are syncing, and the Honest Availability Assumption is satisfied.
|
CaughtUp
-- ^ We are caught-up.
deriving (Eq, Show, Read)

data GsmView m upstreamPeer selection chainSyncState = GsmView {
antiThunderingHerd :: Maybe StdGen
-- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up
Expand Down Expand Up @@ -168,34 +158,34 @@ data GsmEntryPoints m = GsmEntryPoints {

-----

-- | Determine the initial 'LedgerStateJudgment'
-- | Determine the initial 'GsmState'
--
-- Also initializes the persistent marker file.
initializationLedgerJudgement ::
initializationGsmState ::
( L.GetTip (L.LedgerState blk)
, Monad m
)
=> m (L.LedgerState blk)
-> Maybe (WrapDurationUntilTooOld m blk)
-- ^ 'Nothing' if @blk@ has no age limit
-> MarkerFileView m
-> m LedgerStateJudgement
initializationLedgerJudgement
-> m GsmState
initializationGsmState
getCurrentLedger
mbDurationUntilTooOld
markerFileView
= do
wasCaughtUp <- hasMarkerFile markerFileView
if not wasCaughtUp then pure TooOld else do
if not wasCaughtUp then pure PreSyncing else do
case mbDurationUntilTooOld of
Nothing -> return YoungEnough
Nothing -> return CaughtUp
Just wd -> do
sno <- L.getTipSlot <$> getCurrentLedger
getDurationUntilTooOld wd sno >>= \case
After{} -> return YoungEnough
After{} -> return CaughtUp
Already -> do
removeMarkerFile markerFileView
return TooOld
return PreSyncing

-- | For 'LedgerStateJudgement' as used in the Diffusion layer, there is no
-- difference between 'PreSyncing' and 'Syncing'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ import Ouroboros.Consensus.Util.AnchoredFragment
(preferAnchoredCandidate)
import Ouroboros.Consensus.Util.EarlyExit
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.LeakyBucket
(atomicallyWithMonotonicTime)
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM
Expand Down Expand Up @@ -128,9 +130,10 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
--
, getFetchMode :: STM m FetchMode

-- | The ledger judgement, used by diffusion.
-- | The GSM state, used by diffusion. A ledger judgement can be derived
-- from it with 'GSM.gsmStateToLedgerJudgement'.
--
, getLedgerStateJudgement :: STM m LedgerStateJudgement
, getGsmState :: STM m GSM.GsmState

-- | The kill handle and exposed state for each ChainSync client.
, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
Expand Down Expand Up @@ -201,7 +204,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, mempool
, peerSharingRegistry
, varChainSyncHandles
, varLedgerJudgement
, varGsmState
} = st

do let GsmNodeKernelArgs {..} = gsmArgs
Expand Down Expand Up @@ -237,13 +240,16 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, GSM.setCaughtUpPersistentMark = \upd ->
(if upd then GSM.touchMarkerFile else GSM.removeMarkerFile)
gsmMarkerFileView
, GSM.writeGsmState = \x -> atomically $ do
writeTVar varLedgerJudgement $ GSM.gsmStateToLedgerJudgement x
, GSM.writeGsmState = \gsmState ->
atomicallyWithMonotonicTime $ \time -> do
writeTVar varGsmState gsmState
handles <- readTVar varChainSyncHandles
traverse_ (($ time) . ($ gsmState) . cschGsmCallback) handles
, -- In the context of bootstrap peers, it is fine to always
-- return 'True' as all peers are trusted during syncing.
GSM.isHaaSatisfied = pure True
}
judgment <- readTVarIO varLedgerJudgement
judgment <- GSM.gsmStateToLedgerJudgement <$> readTVarIO varGsmState
void $ forkLinkedThread registry "NodeKernel.GSM" $ case judgment of
TooOld -> GSM.enterPreSyncing gsm
YoungEnough -> GSM.enterCaughtUp gsm
Expand Down Expand Up @@ -272,7 +278,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, getTopLevelConfig = cfg
, getFetchClientRegistry = fetchClientRegistry
, getFetchMode = readFetchMode blockFetchInterface
, getLedgerStateJudgement = readTVar varLedgerJudgement
, getGsmState = readTVar varGsmState
, getChainSyncHandles = varChainSyncHandles
, getPeerSharingRegistry = peerSharingRegistry
, getTracers = tracers
Expand Down Expand Up @@ -305,9 +311,9 @@ data InternalState m addrNTN addrNTC blk = IS {
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
, varGsmState :: StrictTVar m GSM.GsmState
, mempool :: Mempool m blk
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
, varLedgerJudgement :: StrictTVar m LedgerStateJudgement
}

initInternalState ::
Expand All @@ -324,9 +330,9 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
, mempoolCapacityOverride
, gsmArgs, getUseBootstrapPeers
} = do
varLedgerJudgement <- do
varGsmState <- do
let GsmNodeKernelArgs {..} = gsmArgs
j <- GSM.initializationLedgerJudgement
j <- GSM.initializationGsmState
(atomically $ ledgerState <$> ChainDB.getCurrentLedger chainDB)
gsmDurationUntilTooOld
gsmMarkerFileView
Expand All @@ -350,7 +356,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
btime
(ChainDB.getCurrentChain chainDB)
getUseBootstrapPeers
(readTVar varLedgerJudgement)
(GSM.gsmStateToLedgerJudgement <$> readTVar varGsmState)
blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
blockFetchInterface = BlockFetchClientInterface.mkBlockFetchConsensusInterface
(configBlock cfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
Consensus, bracketChainSyncClient, chainSyncClient)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
import Ouroboros.Consensus.Node.GsmState (GsmState (Syncing))
import Ouroboros.Consensus.Util (ShowProxy)
import Ouroboros.Consensus.Util.IOLike (Exception (fromException),
IOLike, MonadCatch (try), StrictTVar)
Expand Down Expand Up @@ -141,11 +142,12 @@ runChainSyncClient
csjConfig
StateViewTracers {svtPeerSimulatorResultsTracer}
varHandles
channel = do
channel =
bracketChainSyncClient
nullTracer
chainDbView
varHandles
(pure Syncing)
peerId
(maxBound :: NodeToNodeVersion)
lopBucketConfig
Expand Down
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ library
Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server
Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server
Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
Ouroboros.Consensus.Node.GsmState
Ouroboros.Consensus.Node.InitStorage
Ouroboros.Consensus.Node.NetworkProtocolVersion
Ouroboros.Consensus.Node.ProtocolInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as Jumping
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State
import Ouroboros.Consensus.Node.GsmState (GsmState (..))
import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB (ChainDB,
Expand All @@ -113,6 +114,8 @@ import Ouroboros.Consensus.Util.Assert (assertWithMsg)
import Ouroboros.Consensus.Util.EarlyExit (WithEarlyExit, exitEarly)
import qualified Ouroboros.Consensus.Util.EarlyExit as EarlyExit
import Ouroboros.Consensus.Util.IOLike hiding (handle)
import Ouroboros.Consensus.Util.LeakyBucket
(atomicallyWithMonotonicTime)
import qualified Ouroboros.Consensus.Util.LeakyBucket as LeakyBucket
import Ouroboros.Consensus.Util.STM (Fingerprint, Watcher (..),
WithFingerprint (..), withWatcher)
Expand Down Expand Up @@ -313,7 +316,8 @@ deriving anyclass instance (
NoThunks (Header blk)
) => NoThunks (ChainSyncStateView m blk)

bracketChainSyncClient :: forall m peer blk a.
bracketChainSyncClient ::
forall m peer blk a.
( IOLike m
, Ord peer
, LedgerSupportsProtocol blk
Expand All @@ -323,6 +327,8 @@ bracketChainSyncClient :: forall m peer blk a.
-> StrictTVar m (Map peer (ChainSyncClientHandle m blk))
-- ^ The kill handle and states for each peer, we need the whole map because we
-- (de)register nodes (@peer@).
-> STM m GsmState
-- ^ A function giving the current GSM state; only used at startup.
-> peer
-> NodeToNodeVersion
-> ChainSyncLoPBucketConfig
Expand All @@ -333,19 +339,21 @@ bracketChainSyncClient
tracer
ChainDbView { getIsInvalidBlock }
varHandles
getGsmState
peer
version
csBucketConfig
csjConfig
body
= mkChainSyncClientHandleState >>= \csHandleState ->
withCSJCallbacks csHandleState csjConfig $ \csjCallbacks ->
=
LeakyBucket.execAgainstBucket'
$ \lopBucket ->
mkChainSyncClientHandleState >>= \csHandleState ->
withCSJCallbacks lopBucket csHandleState csjConfig $ \csjCallbacks ->
withWatcher
"ChainSync.Client.rejectInvalidBlocks"
(invalidBlockWatcher csHandleState)
$ LeakyBucket.execAgainstBucket lopBucketConfig
$ \lopBucket ->
body ChainSyncStateView {
$ body ChainSyncStateView {
csvSetCandidate =
modifyTVar csHandleState . \ c s -> s {csCandidate = c}
, csvSetLatestSlot =
Expand All @@ -355,9 +363,9 @@ bracketChainSyncClient
, idlingStop = atomically $ modifyTVar csHandleState $ \ s -> s {csIdling = False}
}
, csvLoPBucket = LoPBucket {
lbPause = LeakyBucket.setPaused lopBucket True
, lbResume = LeakyBucket.setPaused lopBucket False
, lbGrantToken = void $ LeakyBucket.fill lopBucket 1
lbPause = LeakyBucket.setPaused' lopBucket True
, lbResume = LeakyBucket.setPaused' lopBucket False
, lbGrantToken = void $ LeakyBucket.fill' lopBucket 1
}
, csvJumping = csjCallbacks
}
Expand All @@ -370,34 +378,43 @@ bracketChainSyncClient
}

withCSJCallbacks ::
LeakyBucket.Handlers m ->
StrictTVar m (ChainSyncState blk) ->
CSJConfig ->
(Jumping.Jumping m blk -> m a) ->
m a
withCSJCallbacks cschState CSJDisabled f = do
withCSJCallbacks lopBucket cschState CSJDisabled f = do
tid <- myThreadId
cschJumpInfo <- newTVarIO Nothing
cschJumping <- newTVarIO (Disengaged DisengagedDone)
let handle = ChainSyncClientHandle {
cschGDDKill = throwTo tid DensityTooLow
, cschGsmCallback = updateLopBucketConfig lopBucket
, cschState
, cschJumping
, cschJumpInfo
}
insertHandle = atomically $ modifyTVar varHandles $ Map.insert peer handle
insertHandle = atomicallyWithMonotonicTime $ \time -> do
initialGsmState <- getGsmState
updateLopBucketConfig lopBucket initialGsmState time
modifyTVar varHandles $ Map.insert peer handle
deleteHandle = atomically $ modifyTVar varHandles $ Map.delete peer
bracket_ insertHandle deleteHandle $ f Jumping.noJumping

withCSJCallbacks csHandleState (CSJEnabled csjEnabledConfig) f =
bracket (acquireContext csHandleState csjEnabledConfig) releaseContext $ \peerContext ->
withCSJCallbacks lopBucket csHandleState (CSJEnabled csjEnabledConfig) f =
bracket (acquireContext lopBucket csHandleState csjEnabledConfig) releaseContext $ \peerContext ->
f $ Jumping.mkJumping peerContext
acquireContext cschState (CSJEnabledConfig jumpSize) = do

acquireContext lopBucket cschState (CSJEnabledConfig jumpSize) = do
tid <- myThreadId
atomically $ do
atomicallyWithMonotonicTime $ \time -> do
initialGsmState <- getGsmState
updateLopBucketConfig lopBucket initialGsmState time
cschJumpInfo <- newTVar Nothing
context <- Jumping.makeContext varHandles jumpSize
Jumping.registerClient context peer cschState $ \cschJumping -> ChainSyncClientHandle
{ cschGDDKill = throwTo tid DensityTooLow
, cschGsmCallback = updateLopBucketConfig lopBucket
, cschState
, cschJumping
, cschJumpInfo
Expand All @@ -409,19 +426,33 @@ bracketChainSyncClient
invalidBlockRejector
tracer version getIsInvalidBlock (csCandidate <$> readTVar varState)

-- | Update the configuration of the bucket to match the given GSM state.
-- NOTE: The new level is currently the maximal capacity of the bucket;
-- maybe we want to change that later.
updateLopBucketConfig :: LeakyBucket.Handlers m -> GsmState -> Time -> STM m ()
updateLopBucketConfig lopBucket gsmState =
LeakyBucket.updateConfig lopBucket $ \_ ->
let config = lopBucketConfig gsmState in
(LeakyBucket.capacity config, config)

-- | Wrapper around 'LeakyBucket.execAgainstBucket' that handles the
-- disabled bucket by running the given action with dummy handlers.
lopBucketConfig :: LeakyBucket.Config m
lopBucketConfig =
case csBucketConfig of
ChainSyncLoPBucketDisabled -> LeakyBucket.dummyConfig
ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig {csbcCapacity, csbcRate} ->
lopBucketConfig :: GsmState -> LeakyBucket.Config m
lopBucketConfig gsmState =
case (gsmState, csBucketConfig) of
(Syncing, ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig {csbcCapacity, csbcRate}) ->
LeakyBucket.Config
{ capacity = fromInteger $ csbcCapacity,
rate = csbcRate,
onEmpty = throwIO EmptyBucket,
fillOnOverflow = True
}
-- NOTE: If we decide to slow the bucket down when “almost caught-up”,
-- we should add a state to the GSM and corresponding configuration
-- fields and a bucket config here.
(_, ChainSyncLoPBucketDisabled) -> LeakyBucket.dummyConfig
(PreSyncing, ChainSyncLoPBucketEnabled _) -> LeakyBucket.dummyConfig
(CaughtUp, ChainSyncLoPBucketEnabled _) -> LeakyBucket.dummyConfig

-- Our task: after connecting to an upstream node, try to maintain an
-- up-to-date header-only fragment representing their chain. We maintain
Expand Down

0 comments on commit e13e081

Please sign in to comment.