Skip to content

Commit

Permalink
Implement ChainSync jumping
Browse files Browse the repository at this point in the history
See the documentation in
ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/Jumping.hs
for an overview.
  • Loading branch information
Niols authored and facundominguez committed Apr 25, 2024
1 parent 3cc2168 commit 946bbcc
Show file tree
Hide file tree
Showing 16 changed files with 1,150 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,10 +541,11 @@ mkApps ::
-> ByteLimits bCS bBF bTX bKA
-> m ChainSyncTimeout
-> CsClient.ChainSyncLoPBucketConfig
-> CsClient.CSJConfig
-> ReportPeerMetrics m (ConnectionId addrNTN)
-> Handlers m addrNTN blk
-> Apps m addrNTN bCS bBF bTX bKA bPS NodeToNodeInitiatorResult ()
mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucketConfig ReportPeerMetrics {..} Handlers {..} =
mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucketConfig csjConfig ReportPeerMetrics {..} Handlers {..} =
Apps {..}
where
aChainSyncClient
Expand Down Expand Up @@ -573,6 +574,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
them
version
lopBucketConfig
csjConfig
$ \csState -> do
chainSyncTimeout <- genChainSyncTimeout
(r, trailing) <-
Expand All @@ -594,6 +596,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
, CsClient.idling = csvIdling csState
, CsClient.loPBucket = csvLoPBucket csState
, CsClient.setLatestSlot = csvSetLatestSlot csState
, CsClient.jumping = csvJumping csState
}
return (ChainSyncInitiatorResult r, trailing)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ import Ouroboros.Consensus.Fragment.InFuture (CheckInFuture,
import qualified Ouroboros.Consensus.Fragment.InFuture as InFuture
import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..))
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(ChainSyncLoPBucketConfig (..))
(CSJConfig (..), ChainSyncLoPBucketConfig (..))
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
import qualified Ouroboros.Consensus.Network.NodeToClient as NTC
import qualified Ouroboros.Consensus.Network.NodeToNode as NTN
Expand Down Expand Up @@ -252,6 +252,9 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk
-- | See 'CsClient.ChainSyncLoPBucketConfig'
, llrnChainSyncLoPBucketConfig :: ChainSyncLoPBucketConfig

-- | See 'CsClient.CSJConfig'
, llrnCSJConfig :: CSJConfig

-- | How to run the data diffusion applications
--
-- 'run' will not return before this does.
Expand Down Expand Up @@ -519,6 +522,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
NTN.byteLimits
llrnChainSyncTimeout
llrnChainSyncLoPBucketConfig
llrnCSJConfig
(reportMetric Diffusion.peerMetricsConfiguration peerMetrics)
(NTN.mkHandlers nodeKernelArgs nodeKernel)

Expand Down Expand Up @@ -889,6 +893,7 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo
{ llrnBfcSalt
, llrnChainSyncTimeout = fromMaybe stdChainSyncTimeout srnChainSyncTimeout
, llrnChainSyncLoPBucketConfig = ChainSyncLoPBucketDisabled
, llrnCSJConfig = CSJDisabled
, llrnCustomiseHardForkBlockchainTimeArgs = id
, llrnGsmAntiThunderingHerd
, llrnKeepAliveRng
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
, idleTimeout = waitForever
})
CSClient.ChainSyncLoPBucketDisabled
CSClient.CSJDisabled
nullMetric
-- The purpose of this test is not testing protocols, so
-- returning constant empty list is fine if we have thorough
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ genChains genNumForks = do
-- ^ REVIEW: Do we want to generate those randomly? For now, the chosen
-- values carry no special meaning. Someone needs to think about what values
-- would make for interesting tests.
gtCSJParams = CSJParams $ fromIntegral $ scg `div` 2,
gtBlockTree = foldl' (flip BT.addBranch') (BT.mkTrunk goodChain) $ zipWith (genAdversarialFragment goodBlocks) [1..] alternativeChainSchemas,
gtSchedule = ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ prop_serveAdversarialBranches = forAllGenesisTest
(genChains (QC.choose (1, 4)) `enrichedWith` genUniformSchedulePoints)

(defaultSchedulerConfig
{scTraceState = False, scTrace = False, scEnableLoE = True})
{ scTraceState = False
, scTrace = False
, scEnableLoE = True
, scEnableCSJ = True
})

-- We cannot shrink by removing points from the adversarial schedules.
-- Otherwise, the immutable tip could get stuck because a peer doesn't
Expand Down Expand Up @@ -190,6 +194,7 @@ prop_leashingAttackStalling =
{ scTrace = False
, scEnableLoE = True
, scEnableLoP = True
, scEnableCSJ = True
}

shrinkPeerSchedules
Expand Down Expand Up @@ -268,6 +273,7 @@ prop_leashingAttackTimeLimited =
, scEnableLoE = True
, scEnableLoP = True
, scEnableBlockFetchTimeouts = False
, scEnableCSJ = True
}

shrinkPeerSchedules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import Ouroboros.Consensus.Block (Header, Point)
import Ouroboros.Consensus.Config (TopLevelConfig (..))
import Ouroboros.Consensus.Ledger.SupportsProtocol
(LedgerSupportsProtocol)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client (ChainDbView,
ChainSyncClientHandle, ChainSyncLoPBucketConfig,
ChainSyncStateView (..), Consensus, bracketChainSyncClient,
chainSyncClient)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(CSJConfig (..), ChainDbView, ChainSyncClientHandle,
ChainSyncLoPBucketConfig, ChainSyncStateView (..),
Consensus, bracketChainSyncClient, chainSyncClient)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
import Ouroboros.Consensus.Util (ShowProxy)
Expand Down Expand Up @@ -94,6 +94,7 @@ basicChainSyncClient
, CSClient.idling = csvIdling csState
, CSClient.loPBucket = csvLoPBucket csState
, CSClient.setLatestSlot = csvSetLatestSlot csState
, CSClient.jumping = csvJumping csState
}
where
dummyHeaderInFutureCheck ::
Expand All @@ -120,6 +121,8 @@ runChainSyncClient ::
-- ^ Timeouts for this client.
ChainSyncLoPBucketConfig ->
-- ^ Configuration for the LoP bucket.
CSJConfig ->
-- ^ Configuration for ChainSync Jumping
StateViewTracers blk m ->
-- ^ Tracers used to record information for the future 'StateView'.
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
Expand All @@ -135,6 +138,7 @@ runChainSyncClient
peerId
chainSyncTimeouts
lopBucketConfig
csjConfig
StateViewTracers {svtPeerSimulatorResultsTracer}
varHandles
channel = do
Expand All @@ -145,6 +149,7 @@ runChainSyncClient
peerId
(maxBound :: NodeToNodeVersion)
lopBucketConfig
csjConfig
$ \csState -> do
res <-
try $
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Test.Consensus.PeerSimulator.Run (
, runPointSchedule
) where

import Control.Monad (forM)
import Control.Monad.Class.MonadTime (MonadTime)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.Tracer (Tracer (..), nullTracer, traceWith)
Expand All @@ -24,7 +25,8 @@ import Ouroboros.Consensus.Genesis.Governor (runGdd,
updateLoEFragGenesis)
import Ouroboros.Consensus.Ledger.SupportsProtocol
(LedgerSupportsProtocol)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client (ChainDbView,
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(CSJConfig (..), CSJEnabledConfig (..), ChainDbView,
ChainSyncClientHandle, ChainSyncLoPBucketConfig (..),
ChainSyncLoPBucketEnabledConfig (..), ChainSyncState (..),
viewChainSyncState)
Expand Down Expand Up @@ -57,7 +59,7 @@ import Test.Consensus.PeerSimulator.StateView
import Test.Consensus.PeerSimulator.Trace
import qualified Test.Consensus.PointSchedule as PointSchedule
import Test.Consensus.PointSchedule (BlockFetchTimeout,
GenesisTest (GenesisTest), GenesisTestFull,
CSJParams (..), GenesisTest (GenesisTest), GenesisTestFull,
LoPBucketParams (..), NodeState, PeersSchedule,
peersStatesRelative)
import Test.Consensus.PointSchedule.Peers (Peer (..), PeerId,
Expand Down Expand Up @@ -94,9 +96,13 @@ data SchedulerConfig =
-- governor (GDD).
, scEnableLoE :: Bool

-- | Whether to enable to LoP. The parameters of the LoP come from
-- | Whether to enable the LoP. The parameters of the LoP come from
-- 'GenesisTest'.
, scEnableLoP :: Bool

-- | Whether to enable ChainSync Jumping. The parameters come from
-- 'GenesisTest'.
, scEnableCSJ :: Bool
}

-- | Default scheduler config
Expand All @@ -109,7 +115,8 @@ defaultSchedulerConfig =
scTrace = True,
scTraceState = False,
scEnableLoE = False,
scEnableLoP = False
scEnableLoP = False,
scEnableCSJ = False
}

-- | Enable debug tracing during a scheduler test.
Expand All @@ -135,6 +142,7 @@ startChainSyncConnectionThread ::
ChainSyncResources m blk ->
ChainSyncTimeout ->
ChainSyncLoPBucketConfig ->
CSJConfig ->
StateViewTracers blk m ->
StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) ->
m (Thread m (), Thread m ())
Expand All @@ -148,14 +156,15 @@ startChainSyncConnectionThread
ChainSyncResources {csrServer}
chainSyncTimeouts_
chainSyncLoPBucketConfig
csjConfig
tracers
varHandles
= do
(clientChannel, serverChannel) <- createConnectedChannels
clientThread <-
forkLinkedThread registry ("ChainSyncClient" <> condense srPeerId) $
bracketSyncWithFetchClient fetchClientRegistry srPeerId $
ChainSync.runChainSyncClient tracer cfg chainDbView srPeerId chainSyncTimeouts_ chainSyncLoPBucketConfig tracers varHandles clientChannel
ChainSync.runChainSyncClient tracer cfg chainDbView srPeerId chainSyncTimeouts_ chainSyncLoPBucketConfig csjConfig tracers varHandles clientChannel
serverThread <-
forkLinkedThread registry ("ChainSyncServer" <> condense srPeerId) $
ChainSync.runChainSyncServer tracer srPeerId tracers csrServer serverChannel
Expand Down Expand Up @@ -219,10 +228,20 @@ dispatchTick tracer stateTracer chainDb varHandles peers (number, (duration, Pee
traceNewTick :: m ()
traceNewTick = do
currentChain <- atomically $ ChainDB.getCurrentChain chainDb
csState <- atomically $ do
(csState, jumpingStates) <- atomically $ do
m <- readTVar varHandles
traverse (readTVar . CSClient.cschState) (m Map.!? pid)
traceWith tracer $ TraceNewTick number duration (Peer pid state) currentChain (CSClient.csCandidate <$> csState)
csState <- traverse (readTVar . CSClient.cschState) (m Map.!? pid)
jumpingStates <- forM (Map.toList m) $ \(peer, h) -> do
st <- readTVar (CSClient.cschJumping h)
pure (peer, st)
pure (csState, jumpingStates)
traceWith tracer $ TraceNewTick
number
duration
(Peer pid state)
currentChain
(CSClient.csCandidate <$> csState)
jumpingStates

-- | Iterate over a 'PointSchedule', sending each tick to the associated peer in turn,
-- giving each peer a chunk of computation time, sequentially, until it satisfies the
Expand Down Expand Up @@ -283,7 +302,7 @@ runPointSchedule schedulerConfig genesisTest tracer0 =
-- the registry is closed and all threads related to the peer are
-- killed.
withRegistry $ \peerRegistry -> do
(csClient, csServer) <- startChainSyncConnectionThread peerRegistry tracer config chainDbView fetchClientRegistry prShared prChainSync chainSyncTimeouts_ chainSyncLoPBucketConfig stateViewTracers (psrHandles resources)
(csClient, csServer) <- startChainSyncConnectionThread peerRegistry tracer config chainDbView fetchClientRegistry prShared prChainSync chainSyncTimeouts_ chainSyncLoPBucketConfig csjConfig stateViewTracers (psrHandles resources)
BlockFetch.startKeepAliveThread peerRegistry fetchClientRegistry (srPeerId prShared)
(bfClient, bfServer) <- startBlockFetchConnectionThread peerRegistry tracer stateViewTracers fetchClientRegistry (pure Continue) prShared prBlockFetch blockFetchTimeouts_
waitAnyThread [csClient, csServer, bfClient, bfServer]
Expand Down Expand Up @@ -327,6 +346,7 @@ runPointSchedule schedulerConfig genesisTest tracer0 =
, gtChainSyncTimeouts
, gtBlockFetchTimeouts
, gtLoPBucketParams = LoPBucketParams { lbpCapacity, lbpRate }
, gtCSJParams = CSJParams { csjpJumpSize }
, gtForecastRange
, gtGenesisWindow
} = genesisTest
Expand All @@ -346,6 +366,11 @@ runPointSchedule schedulerConfig genesisTest tracer0 =
then ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig { csbcCapacity = lbpCapacity, csbcRate = lbpRate }
else ChainSyncLoPBucketDisabled

csjConfig =
if scEnableCSJ schedulerConfig
then CSJEnabled CSJEnabledConfig { csjcJumpSize = csjpJumpSize }
else CSJDisabled

blockFetchTimeouts_ =
if scEnableBlockFetchTimeouts schedulerConfig
then gtBlockFetchTimeouts
Expand Down

0 comments on commit 946bbcc

Please sign in to comment.