Skip to content

Commit

Permalink
control-message: tx-submission outbound side (client)
Browse files Browse the repository at this point in the history
It's the client side that needs to terminate when 'Terminate' contorl
message is enountered (`Hot -> Warm` transition).
  • Loading branch information
coot committed Sep 21, 2020
1 parent 46b1130 commit e014695
Showing 1 changed file with 43 additions and 35 deletions.
78 changes: 43 additions & 35 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Outbound.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import Control.Monad.Class.MonadThrow
import Control.Exception (assert)
import Control.Tracer (Tracer, traceWith)

import Ouroboros.Network.Mux
(ControlMessageSTM, ControlMessage,
timeoutWithControlMessage)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.TxSubmission.Client
import Ouroboros.Network.TxSubmission.Mempool.Reader
Expand All @@ -39,6 +42,7 @@ data TraceTxSubmissionOutbound txid tx
| TraceTxSubmissionOutboundSendMsgReplyTxs
[tx]
-- ^ The transactions to be sent in the response.
| TraceControlMessage ControlMessage
deriving Show

data TxSubmissionProtocolError =
Expand Down Expand Up @@ -81,8 +85,9 @@ txSubmissionOutbound
-> Word16 -- ^ Maximum number of unacknowledged txids allowed
-> TxSubmissionMempoolReader txid tx idx m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound tracer maxUnacked TxSubmissionMempoolReader{..} _version =
txSubmissionOutbound tracer maxUnacked TxSubmissionMempoolReader{..} _version controlMessageSTM =
TxSubmissionClient (pure (client Seq.empty Map.empty mempoolZeroIdx))
where
client :: StrictSeq txid -> Map txid idx -> idx -> ClientStIdle txid tx m ()
Expand Down Expand Up @@ -120,19 +125,19 @@ txSubmissionOutbound tracer maxUnacked TxSubmissionMempoolReader{..} _version =

-- Grab info about any new txs after the last tx idx we've seen,
-- up to the number that the peer has requested.
txs <- case blocking of
mbtxs <- case blocking of
TokBlocking -> do
when (reqNo == 0) $
throwM ProtocolErrorRequestedNothing
unless (Seq.null unackedSeq') $
throwM ProtocolErrorRequestBlocking

atomically $ do
MempoolSnapshot{mempoolTxIdsAfter} <- mempoolGetSnapshot
let txs = mempoolTxIdsAfter lastIdx
-- but block until there are some
check (not (null txs))
return (take (fromIntegral reqNo) txs)
timeoutWithControlMessage controlMessageSTM $
do
MempoolSnapshot{mempoolTxIdsAfter} <- mempoolGetSnapshot
let txs = mempoolTxIdsAfter lastIdx
check (not $ null txs)
pure (take (fromIntegral reqNo) txs)

TokNonBlocking -> do
when (reqNo == 0 && ackNo == 0) $
Expand All @@ -143,33 +148,36 @@ txSubmissionOutbound tracer maxUnacked TxSubmissionMempoolReader{..} _version =
atomically $ do
MempoolSnapshot{mempoolTxIdsAfter} <- mempoolGetSnapshot
let txs = mempoolTxIdsAfter lastIdx
return (take (fromIntegral reqNo) txs)

-- These txs should all be fresh
assert (all (\(_, idx, _) -> idx > lastIdx) txs) (return ())

-- Update our tracking state with any extra txs available.
let !unackedSeq'' = unackedSeq' <> Seq.fromList
[ txid | (txid, _, _) <- txs ]
!unackedMap'' = unackedMap' <> Map.fromList
[ (txid, idx) | (txid, idx, _) <- txs ]
!lastIdx'
| null txs = lastIdx
| otherwise = idx where (_, idx, _) = last txs
txs' :: [(txid, TxSizeInBytes)]
txs' = [ (txid, size) | (txid, _, size) <- txs ]
client' = client unackedSeq'' unackedMap'' lastIdx'

-- Our reply type is different in the blocking vs non-blocking cases
return $! case blocking of
TokNonBlocking -> SendMsgReplyTxIds (NonBlockingReply txs') client'
TokBlocking -> SendMsgReplyTxIds (BlockingReply txs'') client'
where
txs'' = case NonEmpty.nonEmpty txs' of
Just x -> x
Nothing -> error "txSubmissionOutbound: empty transaction's list"
-- Assert txs is non-empty: we blocked until txs was non-null,
-- and we know reqNo > 0, hence take reqNo txs is non-null.
return (Just $ take (fromIntegral reqNo) txs)

return $! case (mbtxs, blocking) of
(Nothing, TokBlocking) -> SendMsgDone ()
(Nothing, TokNonBlocking) -> error "txSubmissionOutbound: impossible happend!"
(Just txs, _) ->
-- These txs should all be fresh
assert (all (\(_, idx, _) -> idx > lastIdx) txs) $
-- Update our tracking state with any extra txs available.
let !unackedSeq'' = unackedSeq' <> Seq.fromList
[ txid | (txid, _, _) <- txs ]
!unackedMap'' = unackedMap' <> Map.fromList
[ (txid, idx) | (txid, idx, _) <- txs ]
!lastIdx'
| null txs = lastIdx
| otherwise = idx where (_, idx, _) = last txs
txs' :: [(txid, TxSizeInBytes)]
txs' = [ (txid, size) | (txid, _, size) <- txs ]
client' = client unackedSeq'' unackedMap'' lastIdx'

-- Our reply type is different in the blocking vs non-blocking cases
in case blocking of
TokNonBlocking -> SendMsgReplyTxIds (NonBlockingReply txs') client'
TokBlocking -> SendMsgReplyTxIds (BlockingReply txs'') client'
where
txs'' = case NonEmpty.nonEmpty txs' of
Just x -> x
Nothing -> error "txSubmissionOutbound: empty transaction's list"
-- Assert txs is non-empty: we blocked until txs was non-null,
-- and we know reqNo > 0, hence take reqNo txs is non-null.


recvMsgRequestTxs :: [txid]
Expand Down

0 comments on commit e014695

Please sign in to comment.