Skip to content

Commit

Permalink
Bundle resources allocated by bracketChainSyncClient
Browse files Browse the repository at this point in the history
and provide a specialized interface to ChainSync
  • Loading branch information
tek committed Mar 27, 2024
1 parent 9a26d22 commit 9701a57
Show file tree
Hide file tree
Showing 14 changed files with 373 additions and 334 deletions.
Expand Up @@ -56,6 +56,8 @@ import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(ChainSyncStateView (..))
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.Node.ExitPolicy
Expand Down Expand Up @@ -569,13 +571,11 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
CsClient.bracketChainSyncClient
(contramap (TraceLabelPeer them) (Node.chainSyncClientTracer (getTracers kernel)))
(CsClient.defaultChainDbView (getChainDB kernel))
(getNodeCandidates kernel)
(getNodeIdlers kernel)
(getChainSyncHandles kernel)
them
version
lopBucketConfig
$ \varCandidate (startIdling, stopIdling) (pauseLoPBucket, resumeLoPBucket, grantLoPToken) setTheirTip setLatestSlot -> do
$ \csState -> do
chainSyncTimeout <- genChainSyncTimeout
(r, trailing) <-
runPipelinedPeerWithLimits
Expand All @@ -592,14 +592,10 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
CsClient.version
, CsClient.controlMessageSTM
, CsClient.headerMetricsTracer = TraceLabelPeer them `contramap` reportHeader
, CsClient.varCandidate
, CsClient.startIdling
, CsClient.stopIdling
, CsClient.pauseLoPBucket
, CsClient.resumeLoPBucket
, CsClient.grantLoPToken
, CsClient.setTheirTip
, CsClient.setLatestSlot
, CsClient.setCandidate = csvSetCandidate csState
, CsClient.idling = csvIdling csState
, CsClient.loPBucket = csvLoPBucket csState
, CsClient.setLatestSlot = csvSetLatestSlot csState
}
return (ChainSyncInitiatorResult r, trailing)

Expand Down
Expand Up @@ -34,7 +34,6 @@ import qualified Control.Monad.Class.MonadTimer.SI as SI
import Control.Tracer (Tracer, traceWith)
import Data.Functor ((<&>))
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Time (NominalDiffTime)
import qualified Ouroboros.Consensus.BlockchainTime.WallClock.Types as Clock
import qualified Ouroboros.Consensus.HardFork.Abstract as HardFork
Expand Down Expand Up @@ -78,7 +77,7 @@ data CandidateVersusSelection =
-- ^ Whether the candidate is better than the selection
deriving (Eq, Show)

data GsmView m upstreamPeer selection candidate = GsmView {
data GsmView m upstreamPeer selection chainSyncState = GsmView {
antiThunderingHerd :: Maybe StdGen
-- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up
-- to 15% every transition from OnlyBootstrap to CaughtUp, in order to
Expand All @@ -87,7 +86,9 @@ data GsmView m upstreamPeer selection candidate = GsmView {
-- 'Nothing' should only be used for testing.
,
candidateOverSelection ::
selection -> candidate -> CandidateVersusSelection
selection -> chainSyncState -> CandidateVersusSelection
,
peerIsIdle :: chainSyncState -> Bool
,
durationUntilTooOld :: Maybe (selection -> m DurationFromNow)
-- ^ How long from now until the selection will be so old that the node
Expand All @@ -99,13 +100,10 @@ data GsmView m upstreamPeer selection candidate = GsmView {
-- ^ Whether the two selections are equivalent for the purpose of the
-- Genesis State Machine
,
getChainSyncCandidates ::
STM m (Map.Map upstreamPeer (StrictTVar m candidate))
-- ^ The latest candidates from the upstream ChainSync peers
,
getChainSyncIdlers :: STM m (Set.Set upstreamPeer)
-- ^ The ChainSync peers whose latest message claimed that they have no
-- subsequent headers
getChainSyncStates ::
STM m (Map.Map upstreamPeer (StrictTVar m chainSyncState))
-- ^ The current ChainSync state with the latest candidates from the
-- upstream peers
,
getCurrentSelection :: STM m selection
-- ^ The node's current selection
Expand Down Expand Up @@ -187,7 +185,6 @@ initializationLedgerJudgement
realGsmEntryPoints :: forall m upstreamPeer selection tracedSelection candidate.
( SI.MonadDelay m
, SI.MonadTimer m
, Eq upstreamPeer
)
=> (selection -> tracedSelection, Tracer m (TraceGsmEvent tracedSelection))
-> GsmView m upstreamPeer selection candidate
Expand All @@ -204,14 +201,14 @@ realGsmEntryPoints tracerArgs gsmView = GsmEntryPoints {
antiThunderingHerd
,
candidateOverSelection
,
peerIsIdle
,
durationUntilTooOld
,
equivalent
,
getChainSyncCandidates
,
getChainSyncIdlers
getChainSyncStates
,
getCurrentSelection
,
Expand Down Expand Up @@ -314,12 +311,11 @@ realGsmEntryPoints tracerArgs gsmView = GsmEntryPoints {
blockUntilCaughtUp :: m (TraceGsmEvent tracedSelection)
blockUntilCaughtUp = atomically $ do
-- STAGE 1: all ChainSync clients report no subsequent headers
idlers <- getChainSyncIdlers
varsCandidate <- getChainSyncCandidates
varsState <- getChainSyncStates
states <- traverse StrictSTM.readTVar varsState
check $
0 < Map.size varsCandidate
&& Set.size idlers == Map.size varsCandidate
&& idlers == Map.keysSet varsCandidate
not (Map.null states)
&& all peerIsIdle states

-- STAGE 2: no candidate is better than the node's current
-- selection
Expand All @@ -332,14 +328,14 @@ realGsmEntryPoints tracerArgs gsmView = GsmEntryPoints {
-- block; general Praos reasoning ensures that won't take particularly
-- long.
selection <- getCurrentSelection
candidates <- traverse StrictSTM.readTVar varsCandidate
candidates <- traverse StrictSTM.readTVar varsState
let ok candidate =
WhetherCandidateIsBetter False
== candidateOverSelection selection candidate
check $ all ok candidates

pure $ GsmEventEnterCaughtUp
(Set.size idlers)
(Map.size states)
(cnvSelection selection)

-- STAGE 3: the previous stages weren't so slow that the idler
Expand Down
Expand Up @@ -44,7 +44,6 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.Maybe (isJust, mapMaybe)
import Data.Proxy
import Data.Set (Set)
import qualified Data.Text as Text
import Data.Void (Void)
import Ouroboros.Consensus.Block hiding (blockMatchesHeader)
Expand All @@ -61,7 +60,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Mempool
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(ChainSyncClientHandle)
(ChainSyncClientHandle (..), ChainSyncState (..),
viewChainSyncState)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
(SomeHeaderInFutureCheck)
import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..))
Expand Down Expand Up @@ -128,14 +128,8 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
--
, getLedgerStateJudgement :: STM m LedgerStateJudgement

-- | Read the current candidates
, getNodeCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk))))

-- | Read the set of peers that have claimed to have no subsequent
-- headers beyond their current candidate
, getNodeIdlers :: StrictTVar m (Set (ConnectionId addrNTN))

, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
-- | The kill handle and exposed state for each ChainSync client.
, getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))

-- | Read the current peer sharing registry, used for interacting with
-- the PeerSharing protocol
Expand Down Expand Up @@ -195,8 +189,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, fetchClientRegistry
, mempool
, peerSharingRegistry
, varCandidates
, varIdlers
, varChainSyncHandles
, varLedgerJudgement
} = st

Expand All @@ -208,23 +201,23 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers

let gsm = GSM.realGsmEntryPoints gsmTracerArgs GSM.GsmView
{ GSM.antiThunderingHerd = Just gsmAntiThunderingHerd
, GSM.candidateOverSelection = \(headers, _lst) candidate ->
case AF.intersectionPoint headers candidate of
, GSM.candidateOverSelection = \(headers, _lst) state ->
case AF.intersectionPoint headers (csCandidate state) of
Nothing -> GSM.CandidateDoesNotIntersect
Just{} ->
GSM.WhetherCandidateIsBetter
$ -- precondition requires intersection
preferAnchoredCandidate
(configBlock cfg)
headers
candidate
(csCandidate state)
, GSM.peerIsIdle = csIdling
, GSM.durationUntilTooOld =
gsmDurationUntilTooOld
<&> \wd (_headers, lst) ->
GSM.getDurationUntilTooOld wd (getTipSlot lst)
, GSM.equivalent = (==) `on` (AF.headPoint . fst)
, GSM.getChainSyncCandidates = readTVar varCandidates
, GSM.getChainSyncIdlers = readTVar varIdlers
, GSM.getChainSyncStates = fmap cschState <$> readTVar varChainSyncHandles
, GSM.getCurrentSelection = do
headers <- ChainDB.getCurrentChain chainDB
extLedgerState <- ChainDB.getCurrentLedger chainDB
Expand Down Expand Up @@ -254,18 +247,14 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
fetchClientRegistry
blockFetchConfiguration

varChainSyncHandles <- newTVarIO mempty

return NodeKernel
{ getChainDB = chainDB
, getMempool = mempool
, getTopLevelConfig = cfg
, getFetchClientRegistry = fetchClientRegistry
, getFetchMode = readFetchMode blockFetchInterface
, getLedgerStateJudgement = readTVar varLedgerJudgement
, getNodeCandidates = varCandidates
, getChainSyncHandles = varChainSyncHandles
, getNodeIdlers = varIdlers
, getPeerSharingRegistry = peerSharingRegistry
, getTracers = tracers
, setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a
Expand Down Expand Up @@ -295,8 +284,7 @@ data InternalState m addrNTN addrNTC blk = IS {
, chainDB :: ChainDB m blk
, blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m
, fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m
, varCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk))))
, varIdlers :: StrictTVar m (Set (ConnectionId addrNTN))
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk))
, mempool :: Mempool m blk
, peerSharingRegistry :: PeerSharingRegistry addrNTN m
, varLedgerJudgement :: StrictTVar m LedgerStateJudgement
Expand Down Expand Up @@ -324,8 +312,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
gsmMarkerFileView
newTVarIO j

varCandidates <- newTVarIO mempty
varIdlers <- newTVarIO mempty
varChainSyncHandles <- newTVarIO mempty
mempool <- openMempool registry
(chainDBLedgerInterface chainDB)
(configLedger cfg)
Expand All @@ -336,7 +323,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
fetchClientRegistry <- newFetchClientRegistry

let getCandidates :: STM m (Map (ConnectionId addrNTN) (AnchoredFragment (Header blk)))
getCandidates = readTVar varCandidates >>= traverse readTVar
getCandidates = viewChainSyncState varChainSyncHandles csCandidate

slotForgeTimeOracle <- BlockFetchClientInterface.initSlotForgeTimeOracle cfg chainDB
let readFetchMode = BlockFetchClientInterface.readFetchModeDefault
Expand Down
Expand Up @@ -70,8 +70,7 @@ semantics ::
semantics vars cmd = pre $ case cmd of
Disconnect peer -> do
atomically $ do
modifyTVar varCandidates $ Map.delete peer
modifyTVar varIdlers $ Set.delete peer
modifyTVar varStates $ Map.delete peer
pure Unit
ExtendSelection sdel -> do
atomically $ do
Expand All @@ -81,34 +80,32 @@ semantics vars cmd = pre $ case cmd of
ModifyCandidate peer bdel -> do
atomically $ do

modifyTVar varIdlers $ Set.delete peer

v <- (Map.! peer) <$> readTVar varCandidates
Candidate b <- readTVar v
writeTVar v $! Candidate (b + bdel)
v <- (Map.! peer) <$> readTVar varStates
Candidate b <- psCandidate <$> readTVar v
writeTVar v $! PeerState (Candidate (b + bdel)) (Idling False)

pure Unit
NewCandidate peer bdel -> do
atomically $ do
Selection b _s <- readTVar varSelection
v <- newTVar $! Candidate (b + bdel)
modifyTVar varCandidates $ Map.insert peer v
v <- newTVar $! PeerState (Candidate (b + bdel)) (Idling False)
modifyTVar varStates $ Map.insert peer v
pure Unit
ReadJudgment -> do
fmap ReadThisJudgment $ atomically $ readTVar varJudgment
ReadMarker -> do
fmap ReadThisMarker $ atomically $ readTVar varMarker
StartIdling peer -> do
atomically $ modifyTVar varIdlers $ Set.insert peer
StartIdling peer -> atomically $ do
v <- (Map.! peer) <$> readTVar varStates
modifyTVar v $ \ (PeerState c _) -> PeerState c (Idling True)
pure Unit
TimePasses dur -> do
SI.threadDelay (0.1 * fromIntegral dur)
pure Unit
where
Vars
varSelection
varCandidates
varIdlers
varStates
varJudgment
varMarker
varEvents
Expand Down Expand Up @@ -172,8 +169,7 @@ prop_sequential1 ::
prop_sequential1 j0 cmds = runSimQC $ do
-- these variables are part of the 'GSM.GsmView'
varSelection <- newTVarIO (mSelection $ initModel j0)
varCandidates <- newTVarIO Map.empty
varIdlers <- newTVarIO Set.empty
varStates <- newTVarIO Map.empty
varJudgment <- newTVarIO j0
varMarker <- newTVarIO (toMarker j0)

Expand All @@ -186,8 +182,7 @@ prop_sequential1 j0 cmds = runSimQC $ do
let vars =
Vars
varSelection
varCandidates
varIdlers
varStates
varJudgment
varMarker
varEvents
Expand All @@ -200,15 +195,15 @@ prop_sequential1 j0 cmds = runSimQC $ do
let gsm = GSM.realGsmEntryPoints (id, tracer) GSM.GsmView {
GSM.antiThunderingHerd = Nothing
,
GSM.candidateOverSelection = candidateOverSelection
GSM.candidateOverSelection = \ s (PeerState c _) -> candidateOverSelection s c
,
GSM.peerIsIdle = isIdling
,
GSM.durationUntilTooOld = Just durationUntilTooOld
,
GSM.equivalent = (==) -- unsound, but harmless in this test
,
GSM.getChainSyncCandidates = readTVar varCandidates
,
GSM.getChainSyncIdlers = readTVar varIdlers
GSM.getChainSyncStates = readTVar varStates
,
GSM.getCurrentSelection = readTVar varSelection
,
Expand Down Expand Up @@ -386,17 +381,25 @@ push (EvRecorder var) ev = do
now <- SI.getMonotonicTime
atomically $ modifyTVar var $ (:) (now, ev)

isIdling :: PeerState -> Bool
isIdling (PeerState {psIdling = Idling i}) = i

-----

-- | merely a tidy bundle of arguments
data Vars m = Vars
(StrictTVar m Selection)
(StrictTVar m (Map.Map UpstreamPeer (StrictTVar m Candidate)))
(StrictTVar m (Set.Set UpstreamPeer))
(StrictTVar m (Map.Map UpstreamPeer (StrictTVar m PeerState)))
(StrictTVar m LedgerStateJudgement)
(StrictTVar m MarkerState)
(EvRecorder m)

newtype Idling = Idling Bool
deriving (Eq, Ord, Show)

data PeerState = PeerState { psCandidate :: !Candidate, psIdling :: !Idling }
deriving (Eq, Ord, Show)

-----

-- | a straight-forwardtrivial alias
Expand Down

0 comments on commit 9701a57

Please sign in to comment.