diff --git a/ouroboros-network-protocols/ouroboros-network-protocols.cabal b/ouroboros-network-protocols/ouroboros-network-protocols.cabal index 98117370e93..811009c4e03 100644 --- a/ouroboros-network-protocols/ouroboros-network-protocols.cabal +++ b/ouroboros-network-protocols/ouroboros-network-protocols.cabal @@ -98,8 +98,10 @@ library bytestring >=0.10 && <0.13, cborg >=0.2.1 && <0.3, deepseq, + quiet, io-classes ^>=1.5.0, + nothunks, si-timers, ouroboros-network-api diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Client.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Client.hs index 297366fa58d..f7a6a1f2c80 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Client.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Client.hs @@ -27,8 +27,6 @@ module Ouroboros.Network.Protocol.TxSubmission2.Client , txSubmissionClientPeer ) where -import Data.Word (Word16) - import Network.TypedProtocol.Core import Ouroboros.Network.Protocol.TxSubmission2.Type @@ -56,8 +54,8 @@ data ClientStIdle txid tx m a = ClientStIdle { recvMsgRequestTxIds :: forall blocking. TokBlockingStyle blocking - -> Word16 - -> Word16 + -> NumTxIdsToAck + -> NumTxIdsToReq -> m (ClientStTxIds blocking txid tx m a), recvMsgRequestTxs :: [txid] diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Codec.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Codec.hs index 11d65782c8f..7bfa2f0f806 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Codec.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Codec.hs @@ -102,7 +102,10 @@ encodeTxSubmission2 encodeTxId encodeTx = encode encode (ClientAgency TokInit) MsgInit = CBOR.encodeListLen 1 <> CBOR.encodeWord 6 - encode (ServerAgency TokIdle) (MsgRequestTxIds blocking ackNo reqNo) = + encode (ServerAgency TokIdle) (MsgRequestTxIds + blocking + (NumTxIdsToAck ackNo) + (NumTxIdsToReq reqNo)) = CBOR.encodeListLen 4 <> CBOR.encodeWord 0 <> CBOR.encodeBool (case blocking of @@ -167,8 +170,8 @@ decodeTxSubmission2 decodeTxId decodeTx = decode return (SomeMessage MsgInit) (ServerAgency TokIdle, 4, 0) -> do blocking <- CBOR.decodeBool - ackNo <- CBOR.decodeWord16 - reqNo <- CBOR.decodeWord16 + ackNo <- NumTxIdsToAck <$> CBOR.decodeWord16 + reqNo <- NumTxIdsToReq <$> CBOR.decodeWord16 return $! if blocking then SomeMessage (MsgRequestTxIds TokBlocking ackNo reqNo) diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Server.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Server.hs index 13e5c085cc8..c29517ca208 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Server.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Server.hs @@ -24,7 +24,6 @@ module Ouroboros.Network.Protocol.TxSubmission2.Server ) where import Data.List.NonEmpty (NonEmpty) -import Data.Word (Word16) import Network.TypedProtocol.Core import Network.TypedProtocol.Pipelined @@ -45,7 +44,7 @@ data TxSubmissionServerPipelined txid tx m a where data Collect txid tx = -- | The result of 'SendMsgRequestTxIdsPipelined'. It also carries -- the number of txids originally requested. - CollectTxIds Word16 [(txid, SizeInBytes)] + CollectTxIds NumTxIdsToReq [(txid, SizeInBytes)] -- | The result of 'SendMsgRequestTxsPipelined'. The actual reply only -- contains the transactions sent, but this pairs them up with the @@ -59,8 +58,8 @@ data ServerStIdle (n :: N) txid tx m a where -- | -- SendMsgRequestTxIdsBlocking - :: Word16 -- ^ number of txids to acknowledge - -> Word16 -- ^ number of txids to request + :: NumTxIdsToAck -- ^ number of txids to acknowledge + -> NumTxIdsToReq -- ^ number of txids to request -> m a -- ^ Result if done -> (NonEmpty (txid, SizeInBytes) -> m (ServerStIdle Z txid tx m a)) @@ -69,8 +68,8 @@ data ServerStIdle (n :: N) txid tx m a where -- | -- SendMsgRequestTxIdsPipelined - :: Word16 - -> Word16 + :: NumTxIdsToAck + -> NumTxIdsToReq -> m (ServerStIdle (S n) txid tx m a) -> ServerStIdle n txid tx m a diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Type.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Type.hs index 380885a4e01..aae4b6d446d 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Type.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Type.hs @@ -1,11 +1,15 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE EmptyCase #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE PolyKinds #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE EmptyCase #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeFamilies #-} -- | The type of the transaction submission protocol. -- @@ -20,6 +24,8 @@ module Ouroboros.Network.Protocol.TxSubmission2.Type , TokBlockingStyle (..) , StBlockingStyle (..) , BlockingReplyList (..) + , NumTxIdsToAck (..) + , NumTxIdsToReq (..) -- re-exports , SizeInBytes (..) -- deprecated API @@ -28,7 +34,12 @@ module Ouroboros.Network.Protocol.TxSubmission2.Type import Control.DeepSeq import Data.List.NonEmpty (NonEmpty) +import Data.Monoid (Sum (..)) import Data.Word (Word16) +import GHC.Generics +import NoThunks.Class (NoThunks (..)) + +import Quiet (Quiet (..)) import Network.TypedProtocol.Core @@ -111,6 +122,21 @@ data StBlockingStyle where StNonBlocking :: StBlockingStyle +newtype NumTxIdsToAck = NumTxIdsToAck { getNumTxIdsToAck :: Word16 } + deriving (Eq, Ord, NFData, Generic) + deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks) + deriving Semigroup via (Sum Word16) + deriving Monoid via (Sum Word16) + deriving Show via (Quiet NumTxIdsToAck) + +newtype NumTxIdsToReq = NumTxIdsToReq { getNumTxIdsToReq :: Word16 } + deriving (Eq, Ord, NFData, Generic) + deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks) + deriving Semigroup via (Sum Word16) + deriving Monoid via (Sum Word16) + deriving Show via (Quiet NumTxIdsToReq) + + -- | There are some constraints of the protocol that are not captured in the -- types of the messages, but are documented with the messages. Violation -- of these constraints is also a protocol error. The constraints are intended @@ -184,8 +210,8 @@ instance Protocol (TxSubmission2 txid tx) where -- MsgRequestTxIds :: TokBlockingStyle blocking - -> Word16 -- ^ Acknowledge this number of outstanding txids - -> Word16 -- ^ Request up to this number of txids. + -> NumTxIdsToAck -- ^ Acknowledge this number of outstanding txids + -> NumTxIdsToReq -- ^ Request up to this number of txids. -> Message (TxSubmission2 txid tx) StIdle (StTxIds blocking) -- | Reply with a list of transaction identifiers for available diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Examples.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Examples.hs index 9426b6ae46a..381c7d76947 100644 --- a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Examples.hs +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Examples.hs @@ -28,9 +28,9 @@ import Control.Tracer (Tracer, traceWith) import Network.TypedProtocol.Pipelined (N, Nat (..)) +import Ouroboros.Network.Protocol.TxSubmission2.Type import Ouroboros.Network.Protocol.TxSubmission2.Client import Ouroboros.Network.Protocol.TxSubmission2.Server -import Ouroboros.Network.SizeInBytes (SizeInBytes) -- @@ -38,7 +38,7 @@ import Ouroboros.Network.SizeInBytes (SizeInBytes) -- data TraceEventClient txid tx = - EventRecvMsgRequestTxIds (StrictSeq txid) (Map txid tx) [tx] Word16 Word16 + EventRecvMsgRequestTxIds (StrictSeq txid) (Map txid tx) [tx] NumTxIdsToAck NumTxIdsToReq | EventRecvMsgRequestTxs (StrictSeq txid) (Map txid tx) [tx] [txid] deriving Show @@ -82,8 +82,8 @@ txSubmissionClient tracer txId txSize maxUnacked = recvMsgRequestTxIds :: forall blocking. TokBlockingStyle blocking - -> Word16 - -> Word16 + -> NumTxIdsToAck + -> NumTxIdsToReq -> m (ClientStTxIds blocking txid tx m ()) recvMsgRequestTxIds blocking ackNo reqNo = do traceWith tracer (EventRecvMsgRequestTxIds unackedSeq unackedMap @@ -93,8 +93,8 @@ txSubmissionClient tracer txId txSize maxUnacked = ++ "peer acknowledged more txids than possible" when ( fromIntegral (Seq.length unackedSeq) - - ackNo - + fromIntegral reqNo + - getNumTxIdsToAck ackNo + + getNumTxIdsToReq reqNo > maxUnacked) $ error $ "txSubmissionClientConst.recvMsgRequestTxIds: " ++ "peer requested more txids than permitted" @@ -158,8 +158,8 @@ txSubmissionClient tracer txId txSize maxUnacked = -- data TraceEventServer txid tx = - EventRequestTxIdsBlocking (ServerState txid tx) Word16 Word16 - | EventRequestTxIdsPipelined (ServerState txid tx) Word16 Word16 + EventRequestTxIdsBlocking (ServerState txid tx) NumTxIdsToAck NumTxIdsToReq + | EventRequestTxIdsPipelined (ServerState txid tx) NumTxIdsToAck NumTxIdsToReq | EventRequestTxsPipelined (ServerState txid tx) [txid] deriving instance (Show txid, Show tx) => Show (TraceEventServer txid tx) @@ -169,7 +169,7 @@ data ServerState txid tx = ServerState { -- which have not yet been replied to. We need to track this it keep -- our requests within the limit on the number of unacknowledged txids. -- - requestedTxIdsInFlight :: Word16, + requestedTxIdsInFlight :: NumTxIdsToReq, -- | Those transactions (by their identifier) that the client has told -- us about, and which we have not yet acknowledged. This is kept in @@ -196,7 +196,7 @@ data ServerState txid tx = ServerState { -- for more transactions. The number here have already been removed from -- 'unacknowledgedTxIds'. -- - numTxsToAcknowledge :: Word16 + numTxsToAcknowledge :: NumTxIdsToAck } deriving Show @@ -239,7 +239,7 @@ txSubmissionServer tracer txId maxUnacked maxTxIdsToRequest maxTxToRequest = -- so the only remaining thing to do is to ask for more txids. Since -- this is the only thing to do now, we make this a blocking call. | otherwise - , let numTxIdsToRequest = maxTxIdsToRequest `min` maxUnacked + , let numTxIdsToRequest = NumTxIdsToReq $ maxTxIdsToRequest `min` maxUnacked = assert (requestedTxIdsInFlight st == 0 && Seq.null (unacknowledgedTxIds st) && Map.null (availableTxids st) @@ -390,7 +390,8 @@ txSubmissionServer tracer txId maxUnacked maxTxIdsToRequest maxTxToRequest = -- This definition is justified by the fact that the -- 'numTxsToAcknowledge' are not included in the 'unacknowledgedTxIds'. numTxIdsToRequest = + NumTxIdsToReq $ (maxUnacked - fromIntegral (Seq.length (unacknowledgedTxIds st)) - - requestedTxIdsInFlight st) + - getNumTxIdsToReq (requestedTxIdsInFlight st)) `min` maxTxIdsToRequest diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Test.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Test.hs index 2515772ddbc..43b7b7c413d 100644 --- a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Test.hs +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Test.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} @@ -7,6 +8,7 @@ {-# LANGUAGE PolyKinds #-} {-# LANGUAGE QuantifiedConstraints #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} {-# OPTIONS_GHC -Wno-orphans #-} @@ -241,6 +243,10 @@ prop_pipe_IO params = ioProperty (prop_channel createPipeConnectedChannels params) +deriving newtype instance Arbitrary NumTxIdsToAck +deriving newtype instance Arbitrary NumTxIdsToReq + + instance Arbitrary (AnyMessageAndAgency (TxSubmission2 TxId Tx)) where arbitrary = oneof [ pure $ AnyMessageAndAgency (ClientAgency TokInit) MsgInit diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission.hs index 7dda75cb8cc..83d1c76ff09 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission.hs @@ -217,7 +217,7 @@ txSubmissionSimulation , txid ~ Int ) - => Word16 + => NumTxIdsToAck -> [Tx txid] -> ControlMessageSTM m -> Maybe DiffTime @@ -299,7 +299,7 @@ prop_txSubmission (Positive maxUnacked) (NonEmpty outboundTxs) delay = * realToFrac (length outboundTxs `div` 4)) atomically (writeTVar controlMessageVar Terminate) txSubmissionSimulation - maxUnacked outboundTxs + (NumTxIdsToAck maxUnacked) outboundTxs (readTVar controlMessageVar) mbDelayTime mbDelayTime ) in diff --git a/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs index 9d1b811b192..6bd1b5f94af 100644 --- a/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs +++ b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs @@ -39,8 +39,8 @@ import Control.Tracer (Tracer, traceWith) import Network.TypedProtocol.Pipelined (N, Nat (..), natToInt) import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) +import Ouroboros.Network.Protocol.TxSubmission2.Type import Ouroboros.Network.Protocol.TxSubmission2.Server -import Ouroboros.Network.SizeInBytes (SizeInBytes) import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..), TxSubmissionMempoolReader (..)) @@ -178,12 +178,12 @@ txSubmissionInbound , MonadThrow m ) => Tracer m (TraceTxSubmissionInbound txid tx) - -> Word16 -- ^ Maximum number of unacknowledged txids allowed + -> NumTxIdsToAck -- ^ Maximum number of unacknowledged txids allowed -> TxSubmissionMempoolReader txid tx idx m -> TxSubmissionMempoolWriter txid tx idx m -> NodeToNodeVersion -> TxSubmissionServerPipelined txid tx m () -txSubmissionInbound tracer maxUnacked mpReader mpWriter _version = +txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version = TxSubmissionServerPipelined $ continueWithStateM (serverIdle Zero) initialServerState where @@ -225,15 +225,15 @@ txSubmissionInbound tracer maxUnacked mpReader mpWriter _version = && Map.null (bufferedTxs st)) $ pure $ SendMsgRequestTxIdsBlocking - (numTxsToAcknowledge st) - numTxIdsToRequest + (NumTxIdsToAck (numTxsToAcknowledge st)) + (NumTxIdsToReq numTxIdsToRequest) -- Our result if the client terminates the protocol (traceWith tracer TraceTxInboundTerminated) ( collectAndContinueWithState (handleReply Zero) st { numTxsToAcknowledge = 0, requestedTxIdsInFlight = numTxIdsToRequest } - . CollectTxIds numTxIdsToRequest + . CollectTxIds (NumTxIdsToReq numTxIdsToRequest) . NonEmpty.toList) Succ n' -> if canRequestMoreTxs st @@ -271,7 +271,7 @@ txSubmissionInbound tracer maxUnacked mpReader mpWriter _version = Nat n -> StatefulCollect (ServerState txid tx) n txid tx m handleReply n = StatefulCollect $ \st collect -> case collect of - CollectTxIds reqNo txids -> do + CollectTxIds (NumTxIdsToReq reqNo) txids -> do -- Check they didn't send more than we asked for. We don't need to -- check for a minimum: the blocking case checks for non-zero -- elsewhere, and for the non-blocking case it is quite normal for @@ -474,8 +474,8 @@ txSubmissionInbound tracer maxUnacked mpReader mpWriter _version = if numTxIdsToRequest > 0 then pure $ SendMsgRequestTxIdsPipelined - (numTxsToAcknowledge st) - numTxIdsToRequest + (NumTxIdsToAck (numTxsToAcknowledge st)) + (NumTxIdsToReq numTxIdsToRequest) (continueWithStateM (serverIdle (Succ n)) st { requestedTxIdsInFlight = requestedTxIdsInFlight st + numTxIdsToRequest, diff --git a/ouroboros-network/src/Ouroboros/Network/TxSubmission/Outbound.hs b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Outbound.hs index a607b9d613f..ebfcddc6a18 100644 --- a/ouroboros-network/src/Ouroboros/Network/TxSubmission/Outbound.hs +++ b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Outbound.hs @@ -15,7 +15,6 @@ import Data.List.NonEmpty qualified as NonEmpty import Data.Maybe (catMaybes, isNothing, mapMaybe) import Data.Sequence.Strict (StrictSeq) import Data.Sequence.Strict qualified as Seq -import Data.Word (Word16) import Control.Exception (assert) import Control.Monad (unless, when) @@ -26,8 +25,8 @@ import Control.Tracer (Tracer, traceWith) import Ouroboros.Network.ControlMessage (ControlMessage, ControlMessageSTM, timeoutWithControlMessage) import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) +import Ouroboros.Network.Protocol.TxSubmission2.Type import Ouroboros.Network.Protocol.TxSubmission2.Client -import Ouroboros.Network.SizeInBytes (SizeInBytes) import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..), TxSubmissionMempoolReader (..)) @@ -45,7 +44,7 @@ data TraceTxSubmissionOutbound txid tx data TxSubmissionProtocolError = ProtocolErrorAckedTooManyTxids | ProtocolErrorRequestedNothing - | ProtocolErrorRequestedTooManyTxids Word16 Word16 + | ProtocolErrorRequestedTooManyTxids NumTxIdsToReq NumTxIdsToAck | ProtocolErrorRequestBlocking | ProtocolErrorRequestNonBlocking | ProtocolErrorRequestedUnavailableTx @@ -79,7 +78,7 @@ txSubmissionOutbound :: forall txid tx idx m. (Ord txid, Ord idx, MonadSTM m, MonadThrow m) => Tracer m (TraceTxSubmissionOutbound txid tx) - -> Word16 -- ^ Maximum number of unacknowledged txids allowed + -> NumTxIdsToAck -- ^ Maximum number of unacknowledged txids allowed -> TxSubmissionMempoolReader txid tx idx m -> NodeToNodeVersion -> ControlMessageSTM m @@ -93,18 +92,18 @@ txSubmissionOutbound tracer maxUnacked TxSubmissionMempoolReader{..} _version co where recvMsgRequestTxIds :: forall blocking. TokBlockingStyle blocking - -> Word16 - -> Word16 + -> NumTxIdsToAck + -> NumTxIdsToReq -> m (ClientStTxIds blocking txid tx m ()) recvMsgRequestTxIds blocking ackNo reqNo = do - when (ackNo > fromIntegral (Seq.length unackedSeq)) $ + when (getNumTxIdsToAck ackNo > fromIntegral (Seq.length unackedSeq)) $ throwIO ProtocolErrorAckedTooManyTxids when ( fromIntegral (Seq.length unackedSeq) - - ackNo - + reqNo - > maxUnacked) $ + - getNumTxIdsToAck ackNo + + getNumTxIdsToReq reqNo + > getNumTxIdsToAck maxUnacked) $ throwIO (ProtocolErrorRequestedTooManyTxids reqNo maxUnacked) -- Update our tracking state to remove the number of txids that the