Skip to content

Commit

Permalink
TxSubmissionClient: respond to ControlMessageSTM
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Sep 14, 2020
1 parent 4660973 commit 1338ba7
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ data Handlers m peer blk = Handlers {

, hTxSubmissionServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> peer
-> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m ()

Expand Down Expand Up @@ -186,13 +187,14 @@ mkHandlers
(txSubmissionMaxUnacked miniProtocolParameters)
(getMempoolReader getMempool)
version
, hTxSubmissionServer = \version peer ->
, hTxSubmissionServer = \version controlMessageSTM peer ->
txSubmissionInbound
(contramap (TraceLabelPeer peer) (Node.txInboundTracer tracers))
(txSubmissionMaxUnacked miniProtocolParameters)
(getMempoolReader getMempool)
(getMempoolWriter getMempool)
version
controlMessageSTM
, hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng
, hKeepAliveServer = \_version _peer -> keepAliveServer
}
Expand Down Expand Up @@ -519,15 +521,15 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
-> remotePeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionServer version _controlMessageSTM them channel = do
aTxSubmissionServer version controlMessageSTM them channel = do
labelThisThread "TxSubmissionServer"
runPipelinedPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
cTxSubmissionCodec
(byteLimitsTxSubmission (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsTxSubmission
channel
(txSubmissionServerPeerPipelined (hTxSubmissionServer version them))
(txSubmissionServerPeerPipelined (hTxSubmissionServer version controlMessageSTM them))

aKeepAliveClient
:: NodeToNodeVersion
Expand Down
139 changes: 78 additions & 61 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Control.Tracer (Tracer)

import Network.TypedProtocol.Pipelined (N, Nat (..))

import Ouroboros.Network.Mux (ControlMessageSTM, ControlMessage (..))
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.TxSubmission.Server
import Ouroboros.Network.TxSubmission.Mempool.Reader
Expand Down Expand Up @@ -164,8 +165,9 @@ txSubmissionInbound
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version =
txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version controlMessageSTM =
TxSubmissionServerPipelined $
continueWithStateM (serverIdle Zero) initialServerState
where
Expand All @@ -186,33 +188,41 @@ txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version =
Nat n
-> StatefulM (ServerState txid tx) n txid tx m
serverIdle n = StatefulM $ \st -> case n of
Zero -> if canRequestMoreTxs st
then
-- There are no replies in flight, but we do know some more txs we
-- can ask for, so lets ask for them and more txids.
pure $ continueWithState (serverReqTxs Zero) st

else do
-- There's no replies in flight, and we have no more txs we can
-- ask for so the only remaining thing to do is to ask for more
-- txids. Since this is the only thing to do now, we make this a
-- blocking call.
let numTxIdsToRequest = maxTxIdsToRequest `min` maxUnacked
assert (requestedTxIdsInFlight st == 0
&& Seq.null (unacknowledgedTxIds st)
&& Map.null (availableTxids st)
&& Map.null (bufferedTxs st)) $
pure $
SendMsgRequestTxIdsBlocking
(numTxsToAcknowledge st)
numTxIdsToRequest
() -- Our result if the client terminates the protocol
( collectAndContinueWithState (handleReply Zero) st {
numTxsToAcknowledge = 0,
requestedTxIdsInFlight = numTxIdsToRequest
}
. CollectTxIds numTxIdsToRequest
. NonEmpty.toList)
Zero ->
atomically controlMessageSTM >>= \controlMessage ->
case controlMessage of
Terminate ->
pure $ SendMsgKThxBye ()

-- Continue
_ ->
if canRequestMoreTxs st
then
-- There are no replies in flight, but we do know some more txs we
-- can ask for, so lets ask for them and more txids.
pure $ continueWithState (serverReqTxs Zero) st

else do
-- There's no replies in flight, and we have no more txs we can
-- ask for so the only remaining thing to do is to ask for more
-- txids. Since this is the only thing to do now, we make this a
-- blocking call.
let numTxIdsToRequest = maxTxIdsToRequest `min` maxUnacked
assert (requestedTxIdsInFlight st == 0
&& Seq.null (unacknowledgedTxIds st)
&& Map.null (availableTxids st)
&& Map.null (bufferedTxs st)) $
pure $
SendMsgRequestTxIdsBlocking
(numTxsToAcknowledge st)
numTxIdsToRequest
() -- Our result if the client terminates the protocol
( collectAndContinueWithState (handleReply Zero) st {
numTxsToAcknowledge = 0,
requestedTxIdsInFlight = numTxIdsToRequest
}
. CollectTxIds numTxIdsToRequest
. NonEmpty.toList)

Succ n' -> if canRequestMoreTxs st
then
Expand Down Expand Up @@ -384,47 +394,54 @@ txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version =
Nat n
-> Stateful (ServerState txid tx) n txid tx m
serverReqTxs n = Stateful $ \st -> do
-- TODO: This implementation is deliberately naive, we pick in an
-- arbitrary order and up to a fixed limit. This is to illustrate
-- that we can request txs out of order. In the final version we will
-- try to pick in-order and only pick out of order when we have to.
-- We will also uses the size of txs in bytes as our limit for
-- upper and lower watermarks for pipelining. We'll also use the
-- amount in flight and delta-Q to estimate when we're in danger of
-- becomming idle, and need to request stalled txs.
--
let (txsToRequest, availableTxids') =
Map.splitAt (fromIntegral maxTxToRequest) (availableTxids st)

SendMsgRequestTxsPipelined
(Map.keys txsToRequest)
(continueWithStateM (serverReqTxIds (Succ n)) st {
availableTxids = availableTxids'
})
-- TODO: This implementation is deliberately naive, we pick in an
-- arbitrary order and up to a fixed limit. This is to illustrate
-- that we can request txs out of order. In the final version we will
-- try to pick in-order and only pick out of order when we have to.
-- We will also uses the size of txs in bytes as our limit for
-- upper and lower watermarks for pipelining. We'll also use the
-- amount in flight and delta-Q to estimate when we're in danger of
-- becomming idle, and need to request stalled txs.
--
let (txsToRequest, availableTxids') =
Map.splitAt (fromIntegral maxTxToRequest) (availableTxids st)

SendMsgRequestTxsPipelined
(Map.keys txsToRequest)
(continueWithStateM (serverReqTxIds (Succ n)) st {
availableTxids = availableTxids'
})

serverReqTxIds :: forall (n :: N).
Nat n
-> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds n = StatefulM $ \st -> do
controlMessage <- atomically controlMessageSTM
case (n, controlMessage) of
(Zero, Terminate) ->
pure $ SendMsgKThxBye ()

-- Continue or non-zero outstanding requests
_ -> do
-- This definition is justified by the fact that the
-- 'numTxsToAcknowledge' are not included in the
-- 'unacknowledgedTxIds'.
let numTxIdsToRequest =
(maxUnacked
- fromIntegral (Seq.length (unacknowledgedTxIds st))
- requestedTxIdsInFlight st)
`min` maxTxIdsToRequest

if numTxIdsToRequest > 0
then pure $ SendMsgRequestTxIdsPipelined
(numTxsToAcknowledge st)
numTxIdsToRequest
(continueWithStateM (serverIdle (Succ n)) st {
requestedTxIdsInFlight = requestedTxIdsInFlight st
+ numTxIdsToRequest,
numTxsToAcknowledge = 0
})
else continueWithStateM (serverIdle n) st
let numTxIdsToRequest =
(maxUnacked
- fromIntegral (Seq.length (unacknowledgedTxIds st))
- requestedTxIdsInFlight st)
`min` maxTxIdsToRequest

if numTxIdsToRequest > 0
then pure $ SendMsgRequestTxIdsPipelined
(numTxsToAcknowledge st)
numTxIdsToRequest
(continueWithStateM (serverIdle (Succ n)) st {
requestedTxIdsInFlight = requestedTxIdsInFlight st
+ numTxIdsToRequest,
numTxsToAcknowledge = 0
})
else continueWithStateM (serverIdle n) st

newtype Stateful s n txid tx m = Stateful (s -> ServerStIdle n txid tx m ())

Expand Down

0 comments on commit 1338ba7

Please sign in to comment.