Skip to content

Commit

Permalink
Various improvements to the ChainSync client
Browse files Browse the repository at this point in the history
Co-authored-by: Nicolas BACQUEY <nicolas.bacquey@tweag.io>
Co-authored-by: Torsten Schmits <git@tryp.io>
  • Loading branch information
3 people authored and amesgen committed Mar 28, 2024
1 parent 7ace045 commit a31843a
Show file tree
Hide file tree
Showing 14 changed files with 384 additions and 339 deletions.
Expand Up @@ -56,6 +56,8 @@ import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(ChainSyncStateView (..))
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.Node.ExitPolicy
Expand Down Expand Up @@ -569,13 +571,11 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
CsClient.bracketChainSyncClient
(contramap (TraceLabelPeer them) (Node.chainSyncClientTracer (getTracers kernel)))
(CsClient.defaultChainDbView (getChainDB kernel))
(getNodeCandidates kernel)
(getNodeIdlers kernel)
(getChainSyncHandles kernel)
them
version
lopBucketConfig
$ \varCandidate (startIdling, stopIdling) (pauseLoPBucket, resumeLoPBucket, grantLoPToken) setTheirTip setLatestSlot -> do
$ \csState -> do
chainSyncTimeout <- genChainSyncTimeout
(r, trailing) <-
runPipelinedPeerWithLimits
Expand All @@ -592,14 +592,10 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
CsClient.version
, CsClient.controlMessageSTM
, CsClient.headerMetricsTracer = TraceLabelPeer them `contramap` reportHeader
, CsClient.varCandidate
, CsClient.startIdling
, CsClient.stopIdling
, CsClient.pauseLoPBucket
, CsClient.resumeLoPBucket
, CsClient.grantLoPToken
, CsClient.setTheirTip
, CsClient.setLatestSlot
, CsClient.setCandidate = csvSetCandidate csState
, CsClient.idling = csvIdling csState
, CsClient.loPBucket = csvLoPBucket csState
, CsClient.setLatestSlot = csvSetLatestSlot csState
}
return (ChainSyncInitiatorResult r, trailing)

Expand Down
Expand Up @@ -37,7 +37,6 @@ import qualified Control.Monad.Class.MonadTimer.SI as SI
import Control.Tracer (Tracer, traceWith)
import Data.Functor ((<&>))
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Time (NominalDiffTime)
import qualified Ouroboros.Consensus.BlockchainTime.WallClock.Types as Clock
import qualified Ouroboros.Consensus.HardFork.Abstract as HardFork
Expand Down Expand Up @@ -94,7 +93,7 @@ data GsmState =
-- ^ We are caught-up.
deriving (Eq, Show, Read)

data GsmView m upstreamPeer selection candidate = GsmView {
data GsmView m upstreamPeer selection chainSyncState = GsmView {
antiThunderingHerd :: Maybe StdGen
-- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up
-- to 15% every transition from Syncing to CaughtUp, in order to avoid a
Expand All @@ -103,7 +102,9 @@ data GsmView m upstreamPeer selection candidate = GsmView {
-- 'Nothing' should only be used for testing.
,
candidateOverSelection ::
selection -> candidate -> CandidateVersusSelection
selection -> chainSyncState -> CandidateVersusSelection
,
peerIsIdle :: chainSyncState -> Bool
,
durationUntilTooOld :: Maybe (selection -> m DurationFromNow)
-- ^ How long from now until the selection will be so old that the node
Expand All @@ -115,13 +116,10 @@ data GsmView m upstreamPeer selection candidate = GsmView {
-- ^ Whether the two selections are equivalent for the purpose of the
-- Genesis State Machine
,
getChainSyncCandidates ::
STM m (Map.Map upstreamPeer (StrictTVar m candidate))
-- ^ The latest candidates from the upstream ChainSync peers
,
getChainSyncIdlers :: STM m (Set.Set upstreamPeer)
-- ^ The ChainSync peers whose latest message claimed that they have no
-- subsequent headers
getChainSyncStates ::
STM m (Map.Map upstreamPeer (StrictTVar m chainSyncState))
-- ^ The current ChainSync state with the latest candidates from the
-- upstream peers
,
getCurrentSelection :: STM m selection
-- ^ The node's current selection
Expand Down Expand Up @@ -224,7 +222,6 @@ gsmStateToLedgerJudgement = \case
realGsmEntryPoints :: forall m upstreamPeer selection tracedSelection candidate.
( SI.MonadDelay m
, SI.MonadTimer m
, Eq upstreamPeer
)
=> (selection -> tracedSelection, Tracer m (TraceGsmEvent tracedSelection))
-> GsmView m upstreamPeer selection candidate
Expand All @@ -241,14 +238,14 @@ realGsmEntryPoints tracerArgs gsmView = GsmEntryPoints {
antiThunderingHerd
,
candidateOverSelection
,
peerIsIdle
,
durationUntilTooOld
,
equivalent
,
getChainSyncCandidates
,
getChainSyncIdlers
getChainSyncStates
,
getCurrentSelection
,
Expand Down Expand Up @@ -374,12 +371,11 @@ realGsmEntryPoints tracerArgs gsmView = GsmEntryPoints {
blockUntilCaughtUp :: STM m (TraceGsmEvent tracedSelection)
blockUntilCaughtUp = do
-- STAGE 1: all ChainSync clients report no subsequent headers
idlers <- getChainSyncIdlers
varsCandidate <- getChainSyncCandidates
varsState <- getChainSyncStates
states <- traverse StrictSTM.readTVar varsState
check $
0 < Map.size varsCandidate
&& Set.size idlers == Map.size varsCandidate
&& idlers == Map.keysSet varsCandidate
not (Map.null states)
&& all peerIsIdle states

-- STAGE 2: no candidate is better than the node's current
-- selection
Expand All @@ -392,14 +388,14 @@ realGsmEntryPoints tracerArgs gsmView = GsmEntryPoints {
-- block; general Praos reasoning ensures that won't take particularly
-- long.
selection <- getCurrentSelection
candidates <- traverse StrictSTM.readTVar varsCandidate
candidates <- traverse StrictSTM.readTVar varsState
let ok candidate =
WhetherCandidateIsBetter False
== candidateOverSelection selection candidate
check $ all ok candidates

pure $ GsmEventEnterCaughtUp
(Set.size idlers)
(Map.size states)
(cnvSelection selection)

-- STAGE 3: the previous stages weren't so slow that the idler
Expand Down
Expand Up @@ -44,7 +44,6 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.Maybe (isJust, mapMaybe)
import Data.Proxy
import Data.Set (Set)
import qualified Data.Text as Text
import Data.Void (Void)
import Ouroboros.Consensus.Block hiding (blockMatchesHeader)
Expand All @@ -61,7 +60,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Mempool
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(ChainSyncClientHandle)
(ChainSyncClientHandle (..), ChainSyncState (..),
viewChainSyncState)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
(SomeHeaderInFutureCheck)
import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..))
Expand Down Expand Up @@ -128,14 +128,8 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
--
, getLedgerStateJudgement :: STM m LedgerStateJudgement

-- | Read the current candidates
, getNodeCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk))))

-- | Read the set of peers that have claimed to have no subsequent
-- headers beyond their current candidate
, getNodeIdlers :: StrictTVar m (Set (ConnectionId addrNTN))

, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
-- | The kill handle and exposed state for each ChainSync client.
, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))

-- | Read the current peer sharing registry, used for interacting with
-- the PeerSharing protocol
Expand Down Expand Up @@ -195,8 +189,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, fetchClientRegistry
, mempool
, peerSharingRegistry
, varCandidates
, varIdlers
, varChainSyncHandles
, varLedgerJudgement
} = st

Expand All @@ -208,23 +201,23 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers

let gsm = GSM.realGsmEntryPoints gsmTracerArgs GSM.GsmView
{ GSM.antiThunderingHerd = Just gsmAntiThunderingHerd
, GSM.candidateOverSelection = \(headers, _lst) candidate ->
case AF.intersectionPoint headers candidate of
, GSM.candidateOverSelection = \(headers, _lst) state ->
case AF.intersectionPoint headers (csCandidate state) of
Nothing -> GSM.CandidateDoesNotIntersect
Just{} ->
GSM.WhetherCandidateIsBetter
$ -- precondition requires intersection
preferAnchoredCandidate
(configBlock cfg)
headers
candidate
(csCandidate state)
, GSM.peerIsIdle = csIdling
, GSM.durationUntilTooOld =
gsmDurationUntilTooOld
<&> \wd (_headers, lst) ->
GSM.getDurationUntilTooOld wd (getTipSlot lst)
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
, GSM.getChainSyncCandidates = readTVar varCandidates
, GSM.getChainSyncIdlers = readTVar varIdlers
, GSM.getChainSyncStates = fmap cschState <$> readTVar varChainSyncHandles
, GSM.getCurrentSelection = do
headers <- ChainDB.getCurrentChain chainDB
extLedgerState <- ChainDB.getCurrentLedger chainDB
Expand Down Expand Up @@ -257,18 +250,14 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
fetchClientRegistry
blockFetchConfiguration

varChainSyncHandles <- newTVarIO mempty

return NodeKernel
{ getChainDB = chainDB
, getMempool = mempool
, getTopLevelConfig = cfg
, getFetchClientRegistry = fetchClientRegistry
, getFetchMode = readFetchMode blockFetchInterface
, getLedgerStateJudgement = readTVar varLedgerJudgement
, getNodeCandidates = varCandidates
, getChainSyncHandles = varChainSyncHandles
, getNodeIdlers = varIdlers
, getPeerSharingRegistry = peerSharingRegistry
, getTracers = tracers
, setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a
Expand Down Expand Up @@ -298,8 +287,7 @@ data InternalState m addrNTN addrNTC blk = IS {
, chainDB :: ChainDB m blk
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
, varCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk))))
, varIdlers :: StrictTVar m (Set (ConnectionId addrNTN))
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
, mempool :: Mempool m blk
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
, varLedgerJudgement :: StrictTVar m LedgerStateJudgement
Expand Down Expand Up @@ -327,8 +315,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
gsmMarkerFileView
newTVarIO j

varCandidates <- newTVarIO mempty
varIdlers <- newTVarIO mempty
varChainSyncHandles <- newTVarIO mempty
mempool <- openMempool registry
(chainDBLedgerInterface chainDB)
(configLedger cfg)
Expand All @@ -339,7 +326,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
fetchClientRegistry <- newFetchClientRegistry

let getCandidates :: STM m (Map (ConnectionId addrNTN) (AnchoredFragment (Header blk)))
getCandidates = readTVar varCandidates >>= traverse readTVar
getCandidates = viewChainSyncState varChainSyncHandles csCandidate

slotForgeTimeOracle <- BlockFetchClientInterface.initSlotForgeTimeOracle cfg chainDB
let readFetchMode = BlockFetchClientInterface.readFetchModeDefault
Expand Down

0 comments on commit a31843a

Please sign in to comment.