Skip to content

Commit

Permalink
Make the LoP follow the GSM states
Browse files Browse the repository at this point in the history
This also involves some fairly important changes to the leaky bucket and
to the leaky bucket API. These changes make the leaky bucket
significantly more robust.

Co-authored-by: Alexander Esgen <alexander.esgen@iohk.io>
  • Loading branch information
2 people authored and facundominguez committed May 6, 2024
1 parent c9e58eb commit 9fb3137
Show file tree
Hide file tree
Showing 12 changed files with 575 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import qualified Codec.CBOR.Decoding as CBOR
import Codec.CBOR.Encoding (Encoding)
import qualified Codec.CBOR.Encoding as CBOR
import Codec.CBOR.Read (DeserialiseFailure)
import Control.Concurrent.Class.MonadSTM.Strict.TVar as TVar.Unchecked
import qualified Control.Concurrent.Class.MonadSTM.Strict.TVar as TVar.Unchecked
import Control.Monad.Class.MonadTime.SI (MonadTime)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.Tracer
Expand Down Expand Up @@ -571,6 +571,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
(contramap (TraceLabelPeer them) (Node.chainSyncClientTracer (getTracers kernel)))
(CsClient.defaultChainDbView (getChainDB kernel))
(getChainSyncHandles kernel)
(getGsmState kernel)
them
version
lopBucketConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
LedgerPeersConsensusInterface {
lpGetLatestSlot = getImmTipSlot kernel,
lpGetLedgerPeers = fromMaybe [] <$> getPeersFromCurrentLedger kernel (const True),
lpGetLedgerStateJudgement = getLedgerStateJudgement kernel
lpGetLedgerStateJudgement = GSM.gsmStateToLedgerJudgement <$> getGsmState kernel
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ module Ouroboros.Consensus.Node.GSM (
-- * Auxiliaries
, TraceGsmEvent (..)
, gsmStateToLedgerJudgement
, initializationLedgerJudgement
, initializationGsmState
-- * Constructors
, realDurationUntilTooOld
, realGsmEntryPoints
, realMarkerFileView
-- * Re-exported
, module Ouroboros.Consensus.Node.GsmState
) where

import qualified Cardano.Slotting.Slot as Slot
Expand All @@ -43,6 +45,7 @@ import qualified Ouroboros.Consensus.HardFork.Abstract as HardFork
import qualified Ouroboros.Consensus.HardFork.History as HardFork
import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry
import qualified Ouroboros.Consensus.Ledger.Basics as L
import Ouroboros.Consensus.Node.GsmState
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar)
import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM
Expand Down Expand Up @@ -80,19 +83,6 @@ data CandidateVersusSelection =
-- ^ Whether the candidate is better than the selection
deriving (Eq, Show)

-- | Current state of the Genesis State Machine
data GsmState =
PreSyncing
-- ^ We are syncing, and the Honest Availability Assumption is not
-- satisfied.
|
Syncing
-- ^ We are syncing, and the Honest Availability Assumption is satisfied.
|
CaughtUp
-- ^ We are caught-up.
deriving (Eq, Show, Read)

data GsmView m upstreamPeer selection chainSyncState = GsmView {
antiThunderingHerd :: Maybe StdGen
-- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up
Expand Down Expand Up @@ -168,34 +158,34 @@ data GsmEntryPoints m = GsmEntryPoints {

-----

-- | Determine the initial 'LedgerStateJudgment'
-- | Determine the initial 'GsmState'
--
-- Also initializes the persistent marker file.
initializationLedgerJudgement ::
initializationGsmState ::
( L.GetTip (L.LedgerState blk)
, Monad m
)
=> m (L.LedgerState blk)
-> Maybe (WrapDurationUntilTooOld m blk)
-- ^ 'Nothing' if @blk@ has no age limit
-> MarkerFileView m
-> m LedgerStateJudgement
initializationLedgerJudgement
-> m GsmState
initializationGsmState
getCurrentLedger
mbDurationUntilTooOld
markerFileView
= do
wasCaughtUp <- hasMarkerFile markerFileView
if not wasCaughtUp then pure TooOld else do
if not wasCaughtUp then pure PreSyncing else do
case mbDurationUntilTooOld of
Nothing -> return YoungEnough
Nothing -> return CaughtUp
Just wd -> do
sno <- L.getTipSlot <$> getCurrentLedger
getDurationUntilTooOld wd sno >>= \case
After{} -> return YoungEnough
After{} -> return CaughtUp
Already -> do
removeMarkerFile markerFileView
return TooOld
return PreSyncing

-- | For 'LedgerStateJudgement' as used in the Diffusion layer, there is no
-- difference between 'PreSyncing' and 'Syncing'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ import Ouroboros.Consensus.Util.AnchoredFragment
(preferAnchoredCandidate)
import Ouroboros.Consensus.Util.EarlyExit
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.LeakyBucket
(atomicallyWithMonotonicTime)
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM
Expand Down Expand Up @@ -128,9 +130,10 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
--
, getFetchMode :: STM m FetchMode

-- | The ledger judgement, used by diffusion.
-- | The GSM state, used by diffusion. A ledger judgement can be derived
-- from it with 'GSM.gsmStateToLedgerJudgement'.
--
, getLedgerStateJudgement :: STM m LedgerStateJudgement
, getGsmState :: STM m GSM.GsmState

-- | The kill handle and exposed state for each ChainSync client.
, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
Expand Down Expand Up @@ -201,7 +204,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, mempool
, peerSharingRegistry
, varChainSyncHandles
, varLedgerJudgement
, varGsmState
} = st

do let GsmNodeKernelArgs {..} = gsmArgs
Expand Down Expand Up @@ -237,13 +240,16 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, GSM.setCaughtUpPersistentMark = \upd ->
(if upd then GSM.touchMarkerFile else GSM.removeMarkerFile)
gsmMarkerFileView
, GSM.writeGsmState = \x -> atomically $ do
writeTVar varLedgerJudgement $ GSM.gsmStateToLedgerJudgement x
, GSM.writeGsmState = \gsmState ->
atomicallyWithMonotonicTime $ \time -> do
writeTVar varGsmState gsmState
handles <- readTVar varChainSyncHandles
traverse_ (($ time) . ($ gsmState) . cschGsmCallback) handles
, -- In the context of bootstrap peers, it is fine to always
-- return 'True' as all peers are trusted during syncing.
GSM.isHaaSatisfied = pure True
}
judgment <- readTVarIO varLedgerJudgement
judgment <- GSM.gsmStateToLedgerJudgement <$> readTVarIO varGsmState
void $ forkLinkedThread registry "NodeKernel.GSM" $ case judgment of
TooOld -> GSM.enterPreSyncing gsm
YoungEnough -> GSM.enterCaughtUp gsm
Expand Down Expand Up @@ -272,7 +278,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, getTopLevelConfig = cfg
, getFetchClientRegistry = fetchClientRegistry
, getFetchMode = readFetchMode blockFetchInterface
, getLedgerStateJudgement = readTVar varLedgerJudgement
, getGsmState = readTVar varGsmState
, getChainSyncHandles = varChainSyncHandles
, getPeerSharingRegistry = peerSharingRegistry
, getTracers = tracers
Expand Down Expand Up @@ -305,9 +311,9 @@ data InternalState m addrNTN addrNTC blk = IS {
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
, varGsmState :: StrictTVar m GSM.GsmState
, mempool :: Mempool m blk
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
, varLedgerJudgement :: StrictTVar m LedgerStateJudgement
}

initInternalState ::
Expand All @@ -324,9 +330,9 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
, mempoolCapacityOverride
, gsmArgs, getUseBootstrapPeers
} = do
varLedgerJudgement <- do
varGsmState <- do
let GsmNodeKernelArgs {..} = gsmArgs
j <- GSM.initializationLedgerJudgement
j <- GSM.initializationGsmState
(atomically $ ledgerState <$> ChainDB.getCurrentLedger chainDB)
gsmDurationUntilTooOld
gsmMarkerFileView
Expand All @@ -350,7 +356,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
btime
(ChainDB.getCurrentChain chainDB)
getUseBootstrapPeers
(readTVar varLedgerJudgement)
(GSM.gsmStateToLedgerJudgement <$> readTVar varGsmState)
blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
blockFetchInterface = BlockFetchClientInterface.mkBlockFetchConsensusInterface
(configBlock cfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
Consensus, bracketChainSyncClient, chainSyncClient)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
import Ouroboros.Consensus.Node.GsmState (GsmState (Syncing))
import Ouroboros.Consensus.Util (ShowProxy)
import Ouroboros.Consensus.Util.IOLike (Exception (fromException),
IOLike, MonadCatch (try), StrictTVar)
Expand Down Expand Up @@ -141,11 +142,12 @@ runChainSyncClient
csjConfig
StateViewTracers {svtPeerSimulatorResultsTracer}
varHandles
channel = do
channel =
bracketChainSyncClient
nullTracer
chainDbView
varHandles
(pure Syncing)
peerId
(maxBound :: NodeToNodeVersion)
lopBucketConfig
Expand Down
2 changes: 2 additions & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ library
Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server
Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server
Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
Ouroboros.Consensus.Node.GsmState
Ouroboros.Consensus.Node.InitStorage
Ouroboros.Consensus.Node.NetworkProtocolVersion
Ouroboros.Consensus.Node.ProtocolInfo
Expand Down Expand Up @@ -588,6 +589,7 @@ test-suite infra-test
build-depends:
QuickCheck,
base,
io-classes,
io-sim,
mtl,
ouroboros-consensus:{ouroboros-consensus, unstable-consensus-testlib},
Expand Down

0 comments on commit 9fb3137

Please sign in to comment.