/
NodeToNode.hs
157 lines (144 loc) · 6.53 KB
/
NodeToNode.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-orphans -Wno-unticked-promoted-constructors -Wno-all-missed-specialisations #-}
module Cardano.Benchmarking.GeneratorTx.NodeToNode
( ConnectClient
, benchmarkConnectTxSubmit
) where
import Cardano.Prelude (forever, liftIO)
import Prelude
import Codec.Serialise (DeserialiseFailure)
import Control.Monad.Class.MonadSTM.Strict (newTVarIO)
import Control.Monad.Class.MonadTimer (MonadTimer, threadDelay)
import Data.ByteString.Lazy (ByteString)
import qualified Data.Map.Strict as Map
import Data.Proxy (Proxy (..))
import Network.Socket (AddrInfo (..))
import System.Random (newStdGen)
import "contra-tracer" Control.Tracer (Tracer, nullTracer)
import Ouroboros.Consensus.Block.Abstract
import Ouroboros.Consensus.Byron.Ledger.Mempool (GenTx)
import qualified Ouroboros.Consensus.Cardano as Consensus (CardanoBlock)
import Ouroboros.Consensus.Ledger.SupportsMempool (GenTxId)
import Ouroboros.Consensus.Network.NodeToNode (Codecs (..), defaultCodecs)
import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Node.Run (RunNode)
import Ouroboros.Consensus.Shelley.Eras (StandardCrypto)
import Ouroboros.Network.Channel (Channel (..))
import Ouroboros.Network.DeltaQ (defaultGSV)
import Ouroboros.Network.Driver (runPeerWithLimits)
import Ouroboros.Network.KeepAlive
import Ouroboros.Network.Magic
import Ouroboros.Network.Mux (MuxPeer (..), OuroborosApplication (..), OuroborosBundle,
RunMiniProtocol (..), continueForever)
import Ouroboros.Network.NodeToClient (IOManager, chainSyncPeerNull)
import Ouroboros.Network.NodeToNode (NetworkConnectTracers (..))
import qualified Ouroboros.Network.NodeToNode as NtN
import Ouroboros.Network.Protocol.BlockFetch.Client (BlockFetchClient (..),
blockFetchClientPeer)
import Ouroboros.Network.Protocol.Handshake.Version (simpleSingletonVersions)
import Ouroboros.Network.Protocol.KeepAlive.Client
import Ouroboros.Network.Protocol.KeepAlive.Codec
import Ouroboros.Network.Protocol.TxSubmission2.Client (TxSubmissionClient,
txSubmissionClientPeer)
import Ouroboros.Network.Snocket (socketSnocket)
import Cardano.Benchmarking.LogTypes (SendRecvConnect, SendRecvTxSubmission2)
type CardanoBlock = Consensus.CardanoBlock StandardCrypto
type ConnectClient = AddrInfo -> TxSubmissionClient (GenTxId CardanoBlock) (GenTx CardanoBlock) IO () -> IO ()
benchmarkConnectTxSubmit
:: forall blk. (blk ~ CardanoBlock, RunNode blk )
=> IOManager
-> Tracer IO SendRecvConnect
-> Tracer IO SendRecvTxSubmission2
-> CodecConfig CardanoBlock
-> NetworkMagic
-> AddrInfo
-- ^ remote address information
-> TxSubmissionClient (GenTxId blk) (GenTx blk) IO ()
-- ^ the particular txSubmission peer
-> IO ()
benchmarkConnectTxSubmit ioManager handshakeTracer submissionTracer codecConfig networkMagic remoteAddr myTxSubClient =
NtN.connectTo
(socketSnocket ioManager)
NetworkConnectTracers {
nctMuxTracer = nullTracer,
nctHandshakeTracer = handshakeTracer
}
peerMultiplex
(addrAddress <$> Nothing)
(addrAddress remoteAddr)
where
mkApp :: OuroborosBundle mode addr bs m a b
-> OuroborosApplication mode addr bs m a b
mkApp bundle =
OuroborosApplication $ \connId controlMessageSTM ->
foldMap (\p -> p connId controlMessageSTM) bundle
n2nVer :: NodeToNodeVersion
n2nVer = NodeToNodeV_10
blkN2nVer :: BlockNodeToNodeVersion blk
blkN2nVer = supportedVers Map.! n2nVer
supportedVers :: Map.Map NodeToNodeVersion (BlockNodeToNodeVersion blk)
supportedVers = supportedNodeToNodeVersions (Proxy @blk)
myCodecs :: Codecs blk DeserialiseFailure IO
ByteString ByteString ByteString ByteString ByteString ByteString
myCodecs = defaultCodecs codecConfig blkN2nVer n2nVer
peerMultiplex =
simpleSingletonVersions
n2nVer
(NtN.NodeToNodeVersionData
{ NtN.networkMagic = networkMagic
, NtN.diffusionMode = NtN.InitiatorOnlyDiffusionMode
}) $
mkApp $
NtN.nodeToNodeProtocols NtN.defaultMiniProtocolParameters ( \them _ ->
NtN.NodeToNodeProtocols
{ NtN.chainSyncProtocol = InitiatorProtocolOnly $
MuxPeer
nullTracer
(cChainSyncCodec myCodecs)
chainSyncPeerNull
, NtN.blockFetchProtocol = InitiatorProtocolOnly $
MuxPeer
nullTracer
(cBlockFetchCodec myCodecs)
(blockFetchClientPeer blockFetchClientNull)
, NtN.keepAliveProtocol = InitiatorProtocolOnly $
MuxPeerRaw
(kaClient n2nVer them)
, NtN.txSubmissionProtocol = InitiatorProtocolOnly $
MuxPeer
submissionTracer
(cTxSubmission2Codec myCodecs)
(txSubmissionClientPeer myTxSubClient)
} )
n2nVer
-- Stolen from: Ouroboros/Consensus/Network/NodeToNode.hs
kaClient
:: Ord remotePeer
=> NodeToNodeVersion
-> remotePeer
-> Channel IO ByteString
-> IO ((), Maybe ByteString)
kaClient _version them channel = do
keepAliveRng <- newStdGen
peerGSVMap <- liftIO . newTVarIO $ Map.singleton them defaultGSV
runPeerWithLimits
nullTracer
(cKeepAliveCodec myCodecs)
(byteLimitsKeepAlive (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsKeepAlive
channel
$ keepAliveClientPeer
$ keepAliveClient
nullTracer
keepAliveRng
(continueForever (Proxy :: Proxy IO)) them peerGSVMap
(KeepAliveInterval 10)
-- the null block fetch client
blockFetchClientNull
:: forall block point m a. MonadTimer m
=> BlockFetchClient block point m a
blockFetchClientNull
= BlockFetchClient $ forever $ threadDelay (24 * 60 * 60) {- one day in seconds -}