Skip to content

Commit

Permalink
Integrate [ouroboros-network #4795](IntersectMBO/ouroboros-network#4795)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 authored and amesgen committed Mar 28, 2024
1 parent 8c485b2 commit 17cc286
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 36 deletions.
Expand Up @@ -142,7 +142,7 @@ library
, nothunks
, ouroboros-consensus ^>=0.16
, ouroboros-consensus-protocol ^>=0.7
, ouroboros-network-api ^>=0.7
, ouroboros-network-api ^>=0.7.1
, serialise ^>=0.2
, small-steps
, sop-core ^>=0.5
Expand Down
Expand Up @@ -68,11 +68,11 @@ library
, Ouroboros.Consensus.Node.NetworkProtocolVersion

build-depends:
, base >=4.14 && <4.20
, bytestring >=0.10 && <0.13
, base >=4.14 && <4.20
, bytestring >=0.10 && <0.13
, cardano-slotting
, cborg ^>=0.2.2
, containers >=0.5 && <0.7
, containers >=0.5 && <0.7
, contra-tracer
, deepseq
, filepath
Expand All @@ -81,10 +81,10 @@ library
, io-classes ^>=1.4.1
, mtl
, ouroboros-consensus ^>=0.16
, ouroboros-network ^>=0.12
, ouroboros-network-api ^>=0.7
, ouroboros-network-framework ^>=0.11.1
, ouroboros-network-protocols ^>=0.8
, ouroboros-network ^>=0.13
, ouroboros-network-api ^>=0.7.1
, ouroboros-network-framework ^>=0.12
, ouroboros-network-protocols ^>=0.8.1
, random
, safe-wild-cards ^>=1.0
, serialise ^>=0.2
Expand Down
Expand Up @@ -48,6 +48,7 @@ import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.Tracer
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BSL
import Data.Hashable (Hashable)
import Data.Int (Int64)
import Data.Map.Strict (Map)
import Data.Void (Void)
Expand Down Expand Up @@ -110,8 +111,7 @@ import Ouroboros.Network.Protocol.PeerSharing.Codec
codecPeerSharingId, timeLimitsPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server
(PeerSharingServer, peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing,
PeerSharingAmount)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Protocol.TxSubmission2.Client
import Ouroboros.Network.Protocol.TxSubmission2.Codec
import Ouroboros.Network.Protocol.TxSubmission2.Server
Expand Down Expand Up @@ -204,16 +204,14 @@ mkHandlers ::
, HasTxId (GenTx blk)
, LedgerSupportsProtocol blk
, Ord addrNTN
, Hashable addrNTN
)
=> NodeKernelArgs m addrNTN addrNTC blk
-> NodeKernel m addrNTN addrNTC blk
-> (PeerSharingAmount -> m [addrNTN])
-- ^ Peer Sharing result computation callback
-> Handlers m addrNTN blk
mkHandlers
NodeKernelArgs {chainSyncFutureCheck, keepAliveRng, miniProtocolParameters}
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers}
computePeers =
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers, getPeerSharingAPI} =
Handlers {
hChainSyncClient = \peer _isBigLedgerpeer dynEnv ->
CsClient.chainSyncClient
Expand Down Expand Up @@ -257,7 +255,7 @@ mkHandlers
, hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng
, hKeepAliveServer = \_version _peer -> keepAliveServer
, hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM
, hPeerSharingServer = \_version _peer -> peerSharingServer computePeers
, hPeerSharingServer = \_version _peer -> peerSharingServer getPeerSharingAPI
}

{-------------------------------------------------------------------------------
Expand Down
Expand Up @@ -133,15 +133,14 @@ import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
import Ouroboros.Network.PeerSelection.PeerSharing.Codec
(decodeRemoteAddress, encodeRemoteAddress)
import Ouroboros.Network.Protocol.Limits (shortWait)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharingAmount)
import Ouroboros.Network.RethrowPolicy
import qualified SafeWildCards
import System.Exit (ExitCode (..))
import System.FilePath ((</>))
import System.FS.API (SomeHasFS (..))
import System.FS.API.Types
import System.FS.IO (ioHasFS)
import System.Random (StdGen, newStdGen, randomIO, randomRIO)
import System.Random (StdGen, newStdGen, randomIO, randomRIO, split)

{-------------------------------------------------------------------------------
The arguments to the Consensus Layer node functionality
Expand Down Expand Up @@ -503,8 +502,6 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
-> (NodeToNodeVersion -> addrNTN -> CBOR.Encoding)
-> (NodeToNodeVersion -> forall s . CBOR.Decoder s addrNTN)
-> BlockNodeToNodeVersion blk
-> (PeerSharingAmount -> m [addrNTN])
-- ^ Peer Sharing result computation callback
-> NTN.Apps m
addrNTN
ByteString
Expand All @@ -514,7 +511,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
ByteString
NodeToNodeInitiatorResult
()
mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics encAddrNTN decAddrNTN version computePeers =
mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics encAddrNTN decAddrNTN version =
NTN.mkApps
nodeKernel
rnTraceNTN
Expand All @@ -523,7 +520,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
llrnChainSyncTimeout
llrnChainSyncLoPBucketConfig
(reportMetric Diffusion.peerMetricsConfiguration peerMetrics)
(NTN.mkHandlers nodeKernelArgs nodeKernel computePeers)
(NTN.mkHandlers nodeKernelArgs nodeKernel)

mkNodeToClientApps
:: NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk
Expand All @@ -542,8 +539,6 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
:: NetworkP2PMode p2p
-> MiniProtocolParameters
-> ( BlockNodeToNodeVersion blk
-- Peer Sharing result computation callback
-> (PeerSharingAmount -> m [addrNTN])
-> NTN.Apps
m
addrNTN
Expand Down Expand Up @@ -606,16 +601,16 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
-- Initiator side won't start responder side of Peer
-- Sharing protocol so we give a dummy implementation
-- here.
$ ntnApps blockVersion (error "impossible happened!"))
$ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
],
Diffusion.daApplicationInitiatorResponderMode = \computePeers ->
Diffusion.daApplicationInitiatorResponderMode =
combineVersions
[ simpleSingletonVersions
version
llrnVersionDataNTN
(NTN.initiatorAndResponder miniProtocolParams version rnPeerSharing
$ ntnApps blockVersion computePeers)
$ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
],
Diffusion.daLocalResponderApplication =
Expand Down Expand Up @@ -746,7 +741,7 @@ mkNodeKernelArgs
registry
bfcSalt
gsmAntiThunderingHerd
keepAliveRng
rng
cfg
tracers
btime
Expand All @@ -757,6 +752,7 @@ mkNodeKernelArgs
gsmMarkerFileView
getUseBootstrapPeers
= do
let (kaRng, psRng) = split rng
return NodeKernelArgs
{ tracers
, registry
Expand All @@ -769,14 +765,15 @@ mkNodeKernelArgs
, mempoolCapacityOverride = NoMempoolCapacityBytesOverride
, miniProtocolParameters = defaultMiniProtocolParameters
, blockFetchConfiguration = defaultBlockFetchConfiguration
, keepAliveRng
, gsmArgs = GsmNodeKernelArgs {
gsmAntiThunderingHerd
, gsmDurationUntilTooOld
, gsmMarkerFileView
, gsmMinCaughtUpDuration = maxCaughtUpAge
}
, getUseBootstrapPeers
, keepAliveRng = kaRng
, peerSharingRng = psRng
}
where
defaultBlockFetchConfiguration :: BlockFetchConfiguration
Expand Down
Expand Up @@ -90,8 +90,10 @@ import Ouroboros.Network.NodeToNode (ConnectionId,
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerStateJudgement (..))
import Ouroboros.Network.PeerSharing (PeerSharingRegistry,
newPeerSharingRegistry)
import Ouroboros.Network.PeerSharing (PeerSharingAPI,
PeerSharingRegistry, newPeerSharingAPI,
newPeerSharingRegistry, ps_POLICY_PEER_SHARE_MAX_PEERS,
ps_POLICY_PEER_SHARE_STICKY_TIME)
import Ouroboros.Network.TxSubmission.Inbound
(TxSubmissionMempoolWriter)
import qualified Ouroboros.Network.TxSubmission.Inbound as Inbound
Expand Down Expand Up @@ -144,7 +146,9 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
--
-- When set with the empty list '[]' block forging will be disabled.
--
, setBlockForging :: [BlockForging m blk] -> m ()
, setBlockForging :: [BlockForging m blk] -> m ()

, getPeerSharingAPI :: PeerSharingAPI addrNTN StdGen m
}

-- | Arguments required when initializing a node
Expand All @@ -163,6 +167,7 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs {
, keepAliveRng :: StdGen
, gsmArgs :: GsmNodeKernelArgs m blk
, getUseBootstrapPeers :: STM m UseBootstrapPeers
, peerSharingRng :: StdGen
}

initNodeKernel ::
Expand All @@ -180,6 +185,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, chainDB, initChainDB
, blockFetchConfiguration
, gsmArgs
, peerSharingRng
} = do
-- using a lazy 'TVar', 'BlockForging' does not have a 'NoThunks' instance.
blockForgingVar :: LazySTM.TMVar m [BlockForging m blk] <- LazySTM.newTMVarIO []
Expand Down Expand Up @@ -240,6 +246,10 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
TooOld -> GSM.enterPreSyncing gsm
YoungEnough -> GSM.enterCaughtUp gsm

peerSharingAPI <- newPeerSharingAPI peerSharingRng
ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS

void $ forkLinkedThread registry "NodeKernel.blockForging" $
blockForgingController st (LazySTM.takeTMVar blockForgingVar)

Expand All @@ -265,6 +275,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
, getPeerSharingRegistry = peerSharingRegistry
, getTracers = tracers
, setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a
, getPeerSharingAPI = peerSharingAPI
}
where
blockForgingController :: InternalState m remotePeer localPeer blk
Expand Down
Expand Up @@ -115,7 +115,7 @@ import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Protocol.TxSubmission2.Type
import qualified System.FS.Sim.MockFS as Mock
import System.FS.Sim.MockFS (MockFS)
import System.Random (mkStdGen)
import System.Random (mkStdGen, split)
import Test.ThreadNet.TxGen
import Test.ThreadNet.Util.NodeJoinPlan
import Test.ThreadNet.Util.NodeRestarts
Expand Down Expand Up @@ -969,8 +969,9 @@ runThreadNetwork systemTime ThreadNetworkArgs
, hfbtMaxClockRewind = secondsToNominalDiffTime 0
}

let kaRng = case seed of
let rng = case seed of
Seed s -> mkStdGen s
(kaRng, psRng) = split rng
let nodeKernelArgs = NodeKernelArgs
{ tracers
, registry
Expand All @@ -985,6 +986,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
, blockFetchSize = estimateBlockSize
, mempoolCapacityOverride = NoMempoolCapacityBytesOverride
, keepAliveRng = kaRng
, peerSharingRng = psRng
, miniProtocolParameters = MiniProtocolParameters {
chainSyncPipeliningHighMark = 4,
chainSyncPipeliningLowMark = 2,
Expand Down Expand Up @@ -1040,7 +1042,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
-- The purpose of this test is not testing protocols, so
-- returning constant empty list is fine if we have thorough
-- tests about the peer sharing protocol itself.
(NTN.mkHandlers nodeKernelArgs nodeKernel (\_ -> return []))
(NTN.mkHandlers nodeKernelArgs nodeKernel)

-- In practice, a robust wallet/user can persistently add a transaction
-- until it appears on the chain. This thread adds robustness for the
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-consensus/ouroboros-consensus.cabal
Expand Up @@ -276,9 +276,9 @@ library
, measures
, mtl
, nothunks ^>=0.1.5
, ouroboros-network-api ^>=0.7
, ouroboros-network-api ^>=0.7.1
, ouroboros-network-mock ^>=0.1
, ouroboros-network-protocols ^>=0.8
, ouroboros-network-protocols ^>=0.8.1
, primitive
, psqueues ^>=0.2.3
, quiet ^>=0.2
Expand Down

0 comments on commit 17cc286

Please sign in to comment.