-
Notifications
You must be signed in to change notification settings - Fork 20
/
NodeToClient.hs
481 lines (436 loc) · 17.3 KB
/
NodeToClient.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
-- | Intended for qualified import
module Ouroboros.Consensus.Network.NodeToClient (
-- * Handlers
Handlers (..)
, mkHandlers
-- * Codecs
, ClientCodecs
, Codecs
, Codecs' (..)
, DefaultCodecs
, clientCodecs
, defaultCodecs
, identityCodecs
-- * ClientCodecs
-- * Tracers
, Tracers
, Tracers' (..)
, nullTracers
, showTracers
-- * Applications
, App
, Apps (..)
, mkApps
-- ** Projections
, responder
) where
import Codec.CBOR.Decoding (Decoder)
import Codec.CBOR.Encoding (Encoding)
import Codec.CBOR.Read (DeserialiseFailure)
import Codec.Serialise (Serialise)
import Control.Tracer
import Data.ByteString.Lazy (ByteString)
import Data.Void (Void)
import Network.TypedProtocol.Codec
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Ledger.Query
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server
import Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server
import Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Node.Run
import Ouroboros.Consensus.Node.Serialisation
import qualified Ouroboros.Consensus.Node.Tracers as Node
import Ouroboros.Consensus.NodeKernel
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import Ouroboros.Consensus.Util (ShowProxy)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (Serialised, decodePoint, decodeTip,
encodePoint, encodeTip)
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.Channel
import Ouroboros.Network.Context
import Ouroboros.Network.Driver
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToClient hiding
(NodeToClientVersion (..))
import qualified Ouroboros.Network.NodeToClient as N (NodeToClientVersion (..))
import Ouroboros.Network.Protocol.ChainSync.Codec
import Ouroboros.Network.Protocol.ChainSync.Server
import Ouroboros.Network.Protocol.ChainSync.Type
import Ouroboros.Network.Protocol.LocalStateQuery.Codec
import Ouroboros.Network.Protocol.LocalStateQuery.Server
import Ouroboros.Network.Protocol.LocalStateQuery.Type
import Ouroboros.Network.Protocol.LocalTxMonitor.Codec
import Ouroboros.Network.Protocol.LocalTxMonitor.Server
import Ouroboros.Network.Protocol.LocalTxMonitor.Type
import Ouroboros.Network.Protocol.LocalTxSubmission.Codec
import Ouroboros.Network.Protocol.LocalTxSubmission.Server
import Ouroboros.Network.Protocol.LocalTxSubmission.Type
{-------------------------------------------------------------------------------
Handlers
-------------------------------------------------------------------------------}
-- | Protocol handlers for node-to-client (local) communication
data Handlers m peer blk = Handlers {
hChainSyncServer
:: ChainDB.Follower m blk (ChainDB.WithPoint blk (Serialised blk))
-> ChainSyncServer (Serialised blk) (Point blk) (Tip blk) m ()
, hTxSubmissionServer
:: LocalTxSubmissionServer (GenTx blk) (ApplyTxErr blk) m ()
, hStateQueryServer
:: LocalStateQueryServer blk (Point blk) (Query blk) m ()
, hTxMonitorServer
:: LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
}
mkHandlers ::
forall m blk addrNTN addrNTC.
( IOLike m
, LedgerSupportsMempool blk
, LedgerSupportsProtocol blk
, BlockSupportsLedgerQuery blk
, ConfigSupportsNode blk
)
=> NodeKernelArgs m addrNTN addrNTC blk
-> NodeKernel m addrNTN addrNTC blk
-> Handlers m addrNTC blk
mkHandlers NodeKernelArgs {cfg, tracers} NodeKernel {getChainDB, getMempool} =
Handlers {
hChainSyncServer =
chainSyncBlocksServer
(Node.chainSyncServerBlockTracer tracers)
getChainDB
, hTxSubmissionServer =
localTxSubmissionServer
(Node.localTxSubmissionServerTracer tracers)
getMempool
, hStateQueryServer =
localStateQueryServer
(ExtLedgerCfg cfg)
(ChainDB.getTipPoint getChainDB)
(ChainDB.getPastLedger getChainDB)
(castPoint . AF.anchorPoint <$> ChainDB.getCurrentChain getChainDB)
, hTxMonitorServer =
localTxMonitorServer
getMempool
}
{-------------------------------------------------------------------------------
Codecs
-------------------------------------------------------------------------------}
-- | Node-to-client protocol codecs needed to run 'Handlers'.
data Codecs' blk serialisedBlk e m bCS bTX bSQ bTM = Codecs {
cChainSyncCodec :: Codec (ChainSync serialisedBlk (Point blk) (Tip blk)) e m bCS
, cTxSubmissionCodec :: Codec (LocalTxSubmission (GenTx blk) (ApplyTxErr blk)) e m bTX
, cStateQueryCodec :: Codec (LocalStateQuery blk (Point blk) (Query blk)) e m bSQ
, cTxMonitorCodec :: Codec (LocalTxMonitor (GenTxId blk) (GenTx blk) SlotNo) e m bTM
}
type Codecs blk e m bCS bTX bSQ bTM =
Codecs' blk (Serialised blk) e m bCS bTX bSQ bTM
type DefaultCodecs blk m =
Codecs' blk (Serialised blk) DeserialiseFailure m ByteString ByteString ByteString ByteString
type ClientCodecs blk m =
Codecs' blk blk DeserialiseFailure m ByteString ByteString ByteString ByteString
-- | Protocol codecs for the node-to-client protocols
--
-- We pass the 'BlockConfig' here, even though it is currently unused. If at any
-- point we want to introduce local protocols that for example send Byron blocks
-- or headers across, we will need to have the epoch size, which comes from the
-- Byron config. Unlike the full 'TopLevelConfig', it should not be difficult
-- for a wallet to construct the 'BlockConfig'.
--
-- NOTE: Somewhat confusingly, 'pcChainSyncCodec' currently /does/ send Byron
-- blocks across, but it does not deserialize them (the user of the codec is
-- itself responsible for doing that), which is why it currently does not need
-- the config.
--
-- Implementation mode: currently none of the consensus encoders/decoders do
-- anything different based on the version, so @_version@ is unused; it's just
-- that not all codecs are used, depending on the version number.
defaultCodecs :: forall m blk.
( MonadST m
, SerialiseNodeToClientConstraints blk
, ShowQuery (BlockQuery blk)
, StandardHash blk
, Serialise (HeaderHash blk)
)
=> CodecConfig blk
-> BlockNodeToClientVersion blk
-> N.NodeToClientVersion
-> DefaultCodecs blk m
defaultCodecs ccfg version networkVersion = Codecs {
cChainSyncCodec =
codecChainSync
enc
dec
(encodePoint (encodeRawHash p))
(decodePoint (decodeRawHash p))
(encodeTip (encodeRawHash p))
(decodeTip (decodeRawHash p))
, cTxSubmissionCodec =
codecLocalTxSubmission
enc
dec
enc
dec
, cStateQueryCodec =
codecLocalStateQuery
networkVersion
(encodePoint (encodeRawHash p))
(decodePoint (decodeRawHash p))
(queryEncodeNodeToClient ccfg queryVersion version . SomeSecond)
((\(SomeSecond qry) -> Some qry) <$> queryDecodeNodeToClient ccfg queryVersion version)
(encodeResult ccfg version)
(decodeResult ccfg version)
, cTxMonitorCodec =
codecLocalTxMonitor
enc dec
enc dec
enc dec
}
where
queryVersion :: QueryVersion
queryVersion = nodeToClientVersionToQueryVersion networkVersion
p :: Proxy blk
p = Proxy
enc :: SerialiseNodeToClient blk a => a -> Encoding
enc = encodeNodeToClient ccfg version
dec :: SerialiseNodeToClient blk a => forall s. Decoder s a
dec = decodeNodeToClient ccfg version
-- | Protocol codecs for the node-to-client protocols which serialise
-- / deserialise blocks in /chain-sync/ protocol.
--
clientCodecs :: forall m blk.
( MonadST m
, SerialiseNodeToClientConstraints blk
, ShowQuery (BlockQuery blk)
, StandardHash blk
, Serialise (HeaderHash blk)
)
=> CodecConfig blk
-> BlockNodeToClientVersion blk
-> N.NodeToClientVersion
-> ClientCodecs blk m
clientCodecs ccfg version networkVersion = Codecs {
cChainSyncCodec =
codecChainSync
enc
dec
(encodePoint (encodeRawHash p))
(decodePoint (decodeRawHash p))
(encodeTip (encodeRawHash p))
(decodeTip (decodeRawHash p))
, cTxSubmissionCodec =
codecLocalTxSubmission
enc
dec
enc
dec
, cStateQueryCodec =
codecLocalStateQuery
networkVersion
(encodePoint (encodeRawHash p))
(decodePoint (decodeRawHash p))
(queryEncodeNodeToClient ccfg queryVersion version . SomeSecond)
((\(SomeSecond qry) -> Some qry) <$> queryDecodeNodeToClient ccfg queryVersion version)
(encodeResult ccfg version)
(decodeResult ccfg version)
, cTxMonitorCodec =
codecLocalTxMonitor
enc dec
enc dec
enc dec
}
where
queryVersion :: QueryVersion
queryVersion = nodeToClientVersionToQueryVersion networkVersion
p :: Proxy blk
p = Proxy
enc :: SerialiseNodeToClient blk a => a -> Encoding
enc = encodeNodeToClient ccfg version
dec :: SerialiseNodeToClient blk a => forall s. Decoder s a
dec = decodeNodeToClient ccfg version
-- | Identity codecs used in tests.
identityCodecs :: (Monad m, BlockSupportsLedgerQuery blk)
=> Codecs blk CodecFailure m
(AnyMessage (ChainSync (Serialised blk) (Point blk) (Tip blk)))
(AnyMessage (LocalTxSubmission (GenTx blk) (ApplyTxErr blk)))
(AnyMessage (LocalStateQuery blk (Point blk) (Query blk)))
(AnyMessage (LocalTxMonitor (GenTxId blk) (GenTx blk) SlotNo))
identityCodecs = Codecs {
cChainSyncCodec = codecChainSyncId
, cTxSubmissionCodec = codecLocalTxSubmissionId
, cStateQueryCodec = codecLocalStateQueryId sameDepIndex
, cTxMonitorCodec = codecLocalTxMonitorId
}
{-------------------------------------------------------------------------------
Tracers
-------------------------------------------------------------------------------}
-- | A record of 'Tracer's for the different protocols.
type Tracers m peer blk e =
Tracers' peer blk e (Tracer m)
data Tracers' peer blk e f = Tracers {
tChainSyncTracer :: f (TraceLabelPeer peer (TraceSendRecv (ChainSync (Serialised blk) (Point blk) (Tip blk))))
, tTxSubmissionTracer :: f (TraceLabelPeer peer (TraceSendRecv (LocalTxSubmission (GenTx blk) (ApplyTxErr blk))))
, tStateQueryTracer :: f (TraceLabelPeer peer (TraceSendRecv (LocalStateQuery blk (Point blk) (Query blk))))
, tTxMonitorTracer :: f (TraceLabelPeer peer (TraceSendRecv (LocalTxMonitor (GenTxId blk) (GenTx blk) SlotNo)))
}
instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk e f) where
l <> r = Tracers {
tChainSyncTracer = f tChainSyncTracer
, tTxSubmissionTracer = f tTxSubmissionTracer
, tStateQueryTracer = f tStateQueryTracer
, tTxMonitorTracer = f tTxMonitorTracer
}
where
f :: forall a. Semigroup a
=> (Tracers' peer blk e f -> a)
-> a
f prj = prj l <> prj r
-- | Use a 'nullTracer' for each protocol.
nullTracers :: Monad m => Tracers m peer blk e
nullTracers = Tracers {
tChainSyncTracer = nullTracer
, tTxSubmissionTracer = nullTracer
, tStateQueryTracer = nullTracer
, tTxMonitorTracer = nullTracer
}
showTracers :: ( Show peer
, Show (GenTx blk)
, Show (GenTxId blk)
, Show (ApplyTxErr blk)
, ShowQuery (BlockQuery blk)
, HasHeader blk
)
=> Tracer m String -> Tracers m peer blk e
showTracers tr = Tracers {
tChainSyncTracer = showTracing tr
, tTxSubmissionTracer = showTracing tr
, tStateQueryTracer = showTracing tr
, tTxMonitorTracer = showTracing tr
}
{-------------------------------------------------------------------------------
Applications
-------------------------------------------------------------------------------}
-- | A node-to-client application
type App m peer bytes a = peer -> Channel m bytes -> m (a, Maybe bytes)
-- | Applications for the node-to-client (i.e., local) protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m peer bCS bTX bSQ bTM a = Apps {
-- | Start a local chain sync server.
aChainSyncServer :: App m peer bCS a
-- | Start a local transaction submission server.
, aTxSubmissionServer :: App m peer bTX a
-- | Start a local state query server.
, aStateQueryServer :: App m peer bSQ a
-- | Start a local transaction monitor server
, aTxMonitorServer :: App m peer bTM a
}
-- | Construct the 'NetworkApplication' for the node-to-client protocols
mkApps ::
forall m addrNTN addrNTC blk e bCS bTX bSQ bTM.
( IOLike m
, Exception e
, ShowProxy blk
, ShowProxy (ApplyTxErr blk)
, ShowProxy (BlockQuery blk)
, ShowProxy (GenTx blk)
, ShowProxy (GenTxId blk)
, ShowQuery (BlockQuery blk)
)
=> NodeKernel m addrNTN addrNTC blk
-> Tracers m addrNTC blk e
-> Codecs blk e m bCS bTX bSQ bTM
-> Handlers m addrNTC blk
-> Apps m addrNTC bCS bTX bSQ bTM ()
mkApps kernel Tracers {..} Codecs {..} Handlers {..} =
Apps {..}
where
aChainSyncServer
:: addrNTC
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncServer them channel = do
labelThisThread "LocalChainSyncServer"
bracketWithPrivateRegistry
(chainSyncBlockServerFollower (getChainDB kernel))
ChainDB.followerClose
$ \flr ->
runPeer
(contramap (TraceLabelPeer them) tChainSyncTracer)
cChainSyncCodec
channel
$ chainSyncServerPeer
$ hChainSyncServer flr
aTxSubmissionServer
:: addrNTC
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionServer them channel = do
labelThisThread "LocalTxSubmissionServer"
runPeer
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
cTxSubmissionCodec
channel
(localTxSubmissionServerPeer (pure hTxSubmissionServer))
aStateQueryServer
:: addrNTC
-> Channel m bSQ
-> m ((), Maybe bSQ)
aStateQueryServer them channel = do
labelThisThread "LocalStateQueryServer"
runPeer
(contramap (TraceLabelPeer them) tStateQueryTracer)
cStateQueryCodec
channel
(localStateQueryServerPeer hStateQueryServer)
aTxMonitorServer
:: addrNTC
-> Channel m bTM
-> m ((), Maybe bTM)
aTxMonitorServer them channel = do
labelThisThread "LocalTxMonitorServer"
runPeer
(contramap (TraceLabelPeer them) tTxMonitorTracer)
cTxMonitorCodec
channel
(localTxMonitorServerPeer hTxMonitorServer)
{-------------------------------------------------------------------------------
Projections from 'Apps'
-------------------------------------------------------------------------------}
-- | A projection from 'NetworkApplication' to a server-side
-- 'OuroborosApplication' for the node-to-client protocols.
responder ::
N.NodeToClientVersion
-> Apps m (ConnectionId peer) b b b b a
-> OuroborosApplicationWithMinimalCtx 'ResponderMode peer b m Void a
responder version Apps {..} =
nodeToClientProtocols
(NodeToClientProtocols {
localChainSyncProtocol =
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aChainSyncServer (rcConnectionId ctx),
localTxSubmissionProtocol =
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aTxSubmissionServer (rcConnectionId ctx),
localStateQueryProtocol =
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aStateQueryServer (rcConnectionId ctx),
localTxMonitorProtocol =
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aTxMonitorServer (rcConnectionId ctx)
})
version