Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
module Ouroboros.Consensus.MiniProtocol.ChainSync.Server
( chainSyncHeadersServer
, chainSyncBlocksServer
, chainSyncHeaderServerReader
, chainSyncBlockServerReader
, Tip
-- * Trace events
, TraceChainSyncServerEvent (..)
Expand All @@ -29,6 +31,19 @@ import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)


chainSyncHeaderServerReader
:: ChainDB m blk
-> ResourceRegistry m
-> m (Reader m blk (WithPoint blk (SerialisedHeader blk)))
chainSyncHeaderServerReader chainDB registry = ChainDB.newReader chainDB registry getSerialisedHeaderWithPoint

chainSyncBlockServerReader
:: ChainDB m blk
-> ResourceRegistry m
-> m (Reader m blk (WithPoint blk (Serialised blk)))
chainSyncBlockServerReader chainDB registry = ChainDB.newReader chainDB registry getSerialisedBlockWithPoint

-- | Chain Sync Server for block headers for a given a 'ChainDB'.
--
-- The node-to-node protocol uses the chain sync mini-protocol with chain
Expand All @@ -41,13 +56,12 @@ chainSyncHeadersServer
)
=> Tracer m (TraceChainSyncServerEvent blk)
-> ChainDB m blk
-> Reader m blk (WithPoint blk (SerialisedHeader blk))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes great, pass the resource in, rather than allocating it within the protocol handler.

-> NodeToNodeVersion
-> ResourceRegistry m
-> ChainSyncServer (SerialisedHeader blk) (Point blk) (Tip blk) m ()
chainSyncHeadersServer tracer chainDB _version registry =
ChainSyncServer $ do
rdr <- ChainDB.newReader chainDB registry getSerialisedHeaderWithPoint
let ChainSyncServer server = chainSyncServerForReader tracer chainDB rdr
chainSyncHeadersServer tracer chainDB rdr _version =
ChainSyncServer $
let ChainSyncServer server = chainSyncServerForReader tracer chainDB rdr in
server

-- | Chain Sync Server for blocks for a given a 'ChainDB'.
Expand All @@ -59,12 +73,11 @@ chainSyncBlocksServer
:: forall m blk. (IOLike m, HasHeader (Header blk))
=> Tracer m (TraceChainSyncServerEvent blk)
-> ChainDB m blk
-> ResourceRegistry m
-> Reader m blk (WithPoint blk (Serialised blk))
-> ChainSyncServer (Serialised blk) (Point blk) (Tip blk) m ()
chainSyncBlocksServer tracer chainDB registry =
ChainSyncServer $ do
rdr <- ChainDB.newReader chainDB registry getSerialisedBlockWithPoint
let ChainSyncServer server = chainSyncServerForReader tracer chainDB rdr
chainSyncBlocksServer tracer chainDB rdr =
ChainSyncServer $
let ChainSyncServer server = chainSyncServerForReader tracer chainDB rdr in
server

-- | A chain sync server.
Expand Down Expand Up @@ -92,7 +105,7 @@ chainSyncServerForReader tracer chainDB rdr =
idle = ServerStIdle {
recvMsgRequestNext = handleRequestNext,
recvMsgFindIntersect = handleFindIntersect,
recvMsgDoneClient = ChainDB.readerClose rdr
recvMsgDoneClient = pure ()
}

idle' :: ChainSyncServer b (Point blk) (Tip blk) m ()
Expand Down
35 changes: 20 additions & 15 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
-- | Protocol handlers for node-to-client (local) communication
data Handlers m peer blk = Handlers {
hChainSyncServer
:: ResourceRegistry m
:: ChainDB.Reader m blk (ChainDB.WithPoint blk (Serialised blk))
-> ChainSyncServer (Serialised blk) (Point blk) (Tip blk) m ()

, hTxSubmissionServer
Expand Down Expand Up @@ -331,7 +331,7 @@ data Apps m peer bCS bTX bSQ a = Apps {

-- | Construct the 'NetworkApplication' for the node-to-client protocols
mkApps
:: forall m peer blk e bCS bTX bSQ.
:: forall m remotePeer localPeer blk e bCS bTX bSQ.
( IOLike m
, Exception e
, ShowProxy blk
Expand All @@ -340,29 +340,34 @@ mkApps
, ShowProxy (GenTx blk)
, ShowQuery (Query blk)
)
=> Tracers m peer blk e
=> NodeKernel m remotePeer localPeer blk
-> Tracers m localPeer blk e
-> Codecs blk e m bCS bTX bSQ
-> Handlers m peer blk
-> Apps m peer bCS bTX bSQ ()
mkApps Tracers {..} Codecs {..} Handlers {..} =
-> Handlers m localPeer blk
-> Apps m localPeer bCS bTX bSQ ()
mkApps kernel Tracers {..} Codecs {..} Handlers {..} =
Apps {..}
where
aChainSyncServer
:: peer
:: localPeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncServer them channel = do
labelThisThread "LocalChainSyncServer"
withRegistry $ \registry ->
runPeer
(contramap (TraceLabelPeer them) tChainSyncTracer)
cChainSyncCodec
channel
$ chainSyncServerPeer
$ hChainSyncServer registry
bracket
(chainSyncBlockServerReader (getChainDB kernel) registry)
ChainDB.readerClose
(\rdr -> runPeer
(contramap (TraceLabelPeer them) tChainSyncTracer)
cChainSyncCodec
channel
$ chainSyncServerPeer
$ hChainSyncServer rdr
)

aTxSubmissionServer
:: peer
:: localPeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionServer them channel = do
Expand All @@ -374,7 +379,7 @@ mkApps Tracers {..} Codecs {..} Handlers {..} =
(localTxSubmissionServerPeer (pure hTxSubmissionServer))

aStateQueryServer
:: peer
:: localPeer
-> Channel m bSQ
-> m ((), Maybe bSQ)
aStateQueryServer them channel = do
Expand Down
25 changes: 15 additions & 10 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import Ouroboros.Consensus.Node.Run
import Ouroboros.Consensus.Node.Serialisation
import qualified Ouroboros.Consensus.Node.Tracers as Node
import Ouroboros.Consensus.NodeKernel
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import Ouroboros.Consensus.Util (ShowProxy)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Orphans ()
Expand All @@ -112,8 +113,8 @@ data Handlers m peer blk = Handlers {
-- closure include these and not need to be explicit about them here.

, hChainSyncServer
:: NodeToNodeVersion
-> ResourceRegistry m
:: ChainDB.Reader m blk (ChainDB.WithPoint blk (SerialisedHeader blk))
-> NodeToNodeVersion
-> ChainSyncServer (SerialisedHeader blk) (Point blk) (Tip blk) m ()

-- TODO block fetch client does not have GADT view of the handlers.
Expand Down Expand Up @@ -488,14 +489,18 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
labelThisThread "ChainSyncServer"
withRegistry $ \registry -> do
chainSyncTimeout <- genChainSyncTimeout
runPeerWithLimits
(contramap (TraceLabelPeer them) tChainSyncSerialisedTracer)
cChainSyncCodecSerialised
(byteLimitsChainSync (const 0)) -- TODO: Real Bytelimits, see #1727
(timeLimitsChainSync chainSyncTimeout)
channel
$ chainSyncServerPeer
$ hChainSyncServer version registry
bracket
(chainSyncHeaderServerReader (getChainDB kernel) registry)
ChainDB.readerClose
Comment on lines +492 to +494
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

(\rdr -> runPeerWithLimits
(contramap (TraceLabelPeer them) tChainSyncSerialisedTracer)
cChainSyncCodecSerialised
(byteLimitsChainSync (const 0)) -- TODO: Real Bytelimits, see #1727
(timeLimitsChainSync chainSyncTimeout)
channel
$ chainSyncServerPeer
$ hChainSyncServer rdr version
)

aBlockFetchClient
:: NodeToNodeVersion
Expand Down
1 change: 1 addition & 0 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
-> NTC.Apps m (ConnectionId addrNTC) ByteString ByteString ByteString ()
mkNodeToClientApps nodeKernelArgs nodeKernel version =
NTC.mkApps
nodeKernel
rnTraceNTC
(NTC.defaultCodecs codecConfig version)
(NTC.mkHandlers nodeKernelArgs nodeKernel)
Expand Down