Skip to content

Commit

Permalink
tx-submission: newtype wrappers NumTxIdsTo{Ack,Req}
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Jul 17, 2024
1 parent fe0a0dc commit 1fe9d57
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 56 deletions.
2 changes: 2 additions & 0 deletions ouroboros-network-protocols/ouroboros-network-protocols.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ library
bytestring >=0.10 && <0.13,
cborg >=0.2.1 && <0.3,
deepseq,
quiet,

io-classes ^>=1.5.0,
nothunks,
si-timers,

ouroboros-network-api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ module Ouroboros.Network.Protocol.TxSubmission2.Client
, txSubmissionClientPeer
) where

import Data.Word (Word16)

import Network.TypedProtocol.Core

import Ouroboros.Network.Protocol.TxSubmission2.Type
Expand Down Expand Up @@ -56,8 +54,8 @@ data ClientStIdle txid tx m a = ClientStIdle {

recvMsgRequestTxIds :: forall blocking.
TokBlockingStyle blocking
-> Word16
-> Word16
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m a),

recvMsgRequestTxs :: [txid]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ encodeTxSubmission2 encodeTxId encodeTx = encode
encode (ClientAgency TokInit) MsgInit =
CBOR.encodeListLen 1
<> CBOR.encodeWord 6
encode (ServerAgency TokIdle) (MsgRequestTxIds blocking ackNo reqNo) =
encode (ServerAgency TokIdle) (MsgRequestTxIds
blocking
(NumTxIdsToAck ackNo)
(NumTxIdsToReq reqNo)) =
CBOR.encodeListLen 4
<> CBOR.encodeWord 0
<> CBOR.encodeBool (case blocking of
Expand Down Expand Up @@ -167,8 +170,8 @@ decodeTxSubmission2 decodeTxId decodeTx = decode
return (SomeMessage MsgInit)
(ServerAgency TokIdle, 4, 0) -> do
blocking <- CBOR.decodeBool
ackNo <- CBOR.decodeWord16
reqNo <- CBOR.decodeWord16
ackNo <- NumTxIdsToAck <$> CBOR.decodeWord16
reqNo <- NumTxIdsToReq <$> CBOR.decodeWord16
return $!
if blocking
then SomeMessage (MsgRequestTxIds TokBlocking ackNo reqNo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ module Ouroboros.Network.Protocol.TxSubmission2.Server
) where

import Data.List.NonEmpty (NonEmpty)
import Data.Word (Word16)

import Network.TypedProtocol.Core
import Network.TypedProtocol.Pipelined
Expand All @@ -45,7 +44,7 @@ data TxSubmissionServerPipelined txid tx m a where
data Collect txid tx =
-- | The result of 'SendMsgRequestTxIdsPipelined'. It also carries
-- the number of txids originally requested.
CollectTxIds Word16 [(txid, SizeInBytes)]
CollectTxIds NumTxIdsToReq [(txid, SizeInBytes)]

-- | The result of 'SendMsgRequestTxsPipelined'. The actual reply only
-- contains the transactions sent, but this pairs them up with the
Expand All @@ -59,8 +58,8 @@ data ServerStIdle (n :: N) txid tx m a where
-- |
--
SendMsgRequestTxIdsBlocking
:: Word16 -- ^ number of txids to acknowledge
-> Word16 -- ^ number of txids to request
:: NumTxIdsToAck -- ^ number of txids to acknowledge
-> NumTxIdsToReq -- ^ number of txids to request
-> m a -- ^ Result if done
-> (NonEmpty (txid, SizeInBytes)
-> m (ServerStIdle Z txid tx m a))
Expand All @@ -69,8 +68,8 @@ data ServerStIdle (n :: N) txid tx m a where
-- |
--
SendMsgRequestTxIdsPipelined
:: Word16
-> Word16
:: NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle (S n) txid tx m a)
-> ServerStIdle n txid tx m a

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE EmptyCase #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE EmptyCase #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}

-- | The type of the transaction submission protocol.
--
Expand All @@ -20,6 +24,8 @@ module Ouroboros.Network.Protocol.TxSubmission2.Type
, TokBlockingStyle (..)
, StBlockingStyle (..)
, BlockingReplyList (..)
, NumTxIdsToAck (..)
, NumTxIdsToReq (..)
-- re-exports
, SizeInBytes (..)
-- deprecated API
Expand All @@ -28,7 +34,12 @@ module Ouroboros.Network.Protocol.TxSubmission2.Type

import Control.DeepSeq
import Data.List.NonEmpty (NonEmpty)
import Data.Monoid (Sum (..))
import Data.Word (Word16)
import GHC.Generics
import NoThunks.Class (NoThunks (..))

import Quiet (Quiet (..))

import Network.TypedProtocol.Core

Expand Down Expand Up @@ -111,6 +122,21 @@ data StBlockingStyle where
StNonBlocking :: StBlockingStyle


newtype NumTxIdsToAck = NumTxIdsToAck { getNumTxIdsToAck :: Word16 }
deriving (Eq, Ord, NFData, Generic)
deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks)
deriving Semigroup via (Sum Word16)
deriving Monoid via (Sum Word16)
deriving Show via (Quiet NumTxIdsToAck)

newtype NumTxIdsToReq = NumTxIdsToReq { getNumTxIdsToReq :: Word16 }
deriving (Eq, Ord, NFData, Generic)
deriving newtype (Num, Enum, Real, Integral, Bounded, NoThunks)
deriving Semigroup via (Sum Word16)
deriving Monoid via (Sum Word16)
deriving Show via (Quiet NumTxIdsToReq)


-- | There are some constraints of the protocol that are not captured in the
-- types of the messages, but are documented with the messages. Violation
-- of these constraints is also a protocol error. The constraints are intended
Expand Down Expand Up @@ -184,8 +210,8 @@ instance Protocol (TxSubmission2 txid tx) where
--
MsgRequestTxIds
:: TokBlockingStyle blocking
-> Word16 -- ^ Acknowledge this number of outstanding txids
-> Word16 -- ^ Request up to this number of txids.
-> NumTxIdsToAck -- ^ Acknowledge this number of outstanding txids
-> NumTxIdsToReq -- ^ Request up to this number of txids.
-> Message (TxSubmission2 txid tx) StIdle (StTxIds blocking)

-- | Reply with a list of transaction identifiers for available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined (N, Nat (..))

import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.Protocol.TxSubmission2.Client
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.SizeInBytes (SizeInBytes)


--
-- Example client
--

data TraceEventClient txid tx =
EventRecvMsgRequestTxIds (StrictSeq txid) (Map txid tx) [tx] Word16 Word16
EventRecvMsgRequestTxIds (StrictSeq txid) (Map txid tx) [tx] NumTxIdsToAck NumTxIdsToReq
| EventRecvMsgRequestTxs (StrictSeq txid) (Map txid tx) [tx] [txid]
deriving Show

Expand Down Expand Up @@ -82,8 +82,8 @@ txSubmissionClient tracer txId txSize maxUnacked =

recvMsgRequestTxIds :: forall blocking.
TokBlockingStyle blocking
-> Word16
-> Word16
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds blocking ackNo reqNo = do
traceWith tracer (EventRecvMsgRequestTxIds unackedSeq unackedMap
Expand All @@ -93,8 +93,8 @@ txSubmissionClient tracer txId txSize maxUnacked =
++ "peer acknowledged more txids than possible"

when ( fromIntegral (Seq.length unackedSeq)
- ackNo
+ fromIntegral reqNo
- getNumTxIdsToAck ackNo
+ getNumTxIdsToReq reqNo
> maxUnacked) $
error $ "txSubmissionClientConst.recvMsgRequestTxIds: "
++ "peer requested more txids than permitted"
Expand Down Expand Up @@ -158,8 +158,8 @@ txSubmissionClient tracer txId txSize maxUnacked =
--

data TraceEventServer txid tx =
EventRequestTxIdsBlocking (ServerState txid tx) Word16 Word16
| EventRequestTxIdsPipelined (ServerState txid tx) Word16 Word16
EventRequestTxIdsBlocking (ServerState txid tx) NumTxIdsToAck NumTxIdsToReq
| EventRequestTxIdsPipelined (ServerState txid tx) NumTxIdsToAck NumTxIdsToReq
| EventRequestTxsPipelined (ServerState txid tx) [txid]

deriving instance (Show txid, Show tx) => Show (TraceEventServer txid tx)
Expand All @@ -169,7 +169,7 @@ data ServerState txid tx = ServerState {
-- which have not yet been replied to. We need to track this it keep
-- our requests within the limit on the number of unacknowledged txids.
--
requestedTxIdsInFlight :: Word16,
requestedTxIdsInFlight :: NumTxIdsToReq,

-- | Those transactions (by their identifier) that the client has told
-- us about, and which we have not yet acknowledged. This is kept in
Expand All @@ -196,7 +196,7 @@ data ServerState txid tx = ServerState {
-- for more transactions. The number here have already been removed from
-- 'unacknowledgedTxIds'.
--
numTxsToAcknowledge :: Word16
numTxsToAcknowledge :: NumTxIdsToAck
}
deriving Show

Expand Down Expand Up @@ -239,7 +239,7 @@ txSubmissionServer tracer txId maxUnacked maxTxIdsToRequest maxTxToRequest =
-- so the only remaining thing to do is to ask for more txids. Since
-- this is the only thing to do now, we make this a blocking call.
| otherwise
, let numTxIdsToRequest = maxTxIdsToRequest `min` maxUnacked
, let numTxIdsToRequest = NumTxIdsToReq $ maxTxIdsToRequest `min` maxUnacked
= assert (requestedTxIdsInFlight st == 0
&& Seq.null (unacknowledgedTxIds st)
&& Map.null (availableTxids st)
Expand Down Expand Up @@ -390,7 +390,8 @@ txSubmissionServer tracer txId maxUnacked maxTxIdsToRequest maxTxToRequest =
-- This definition is justified by the fact that the
-- 'numTxsToAcknowledge' are not included in the 'unacknowledgedTxIds'.
numTxIdsToRequest =
NumTxIdsToReq $
(maxUnacked
- fromIntegral (Seq.length (unacknowledgedTxIds st))
- requestedTxIdsInFlight st)
- getNumTxIdsToReq (requestedTxIdsInFlight st))
`min` maxTxIdsToRequest
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
Expand All @@ -7,6 +8,7 @@
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}

{-# OPTIONS_GHC -Wno-orphans #-}
Expand Down Expand Up @@ -241,6 +243,10 @@ prop_pipe_IO params =
ioProperty (prop_channel createPipeConnectedChannels params)


deriving newtype instance Arbitrary NumTxIdsToAck
deriving newtype instance Arbitrary NumTxIdsToReq


instance Arbitrary (AnyMessageAndAgency (TxSubmission2 TxId Tx)) where
arbitrary = oneof
[ pure $ AnyMessageAndAgency (ClientAgency TokInit) MsgInit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ txSubmissionSimulation

, txid ~ Int
)
=> Word16
=> NumTxIdsToAck
-> [Tx txid]
-> ControlMessageSTM m
-> Maybe DiffTime
Expand Down Expand Up @@ -299,7 +299,7 @@ prop_txSubmission (Positive maxUnacked) (NonEmpty outboundTxs) delay =
* realToFrac (length outboundTxs `div` 4))
atomically (writeTVar controlMessageVar Terminate)
txSubmissionSimulation
maxUnacked outboundTxs
(NumTxIdsToAck maxUnacked) outboundTxs
(readTVar controlMessageVar)
mbDelayTime mbDelayTime
) in
Expand Down
18 changes: 9 additions & 9 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import Control.Tracer (Tracer, traceWith)
import Network.TypedProtocol.Pipelined (N, Nat (..), natToInt)

import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.SizeInBytes (SizeInBytes)
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..),
TxSubmissionMempoolReader (..))

Expand Down Expand Up @@ -178,12 +178,12 @@ txSubmissionInbound
, MonadThrow m
)
=> Tracer m (TraceTxSubmissionInbound txid tx)
-> Word16 -- ^ Maximum number of unacknowledged txids allowed
-> NumTxIdsToAck -- ^ Maximum number of unacknowledged txids allowed
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound tracer maxUnacked mpReader mpWriter _version =
txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version =
TxSubmissionServerPipelined $
continueWithStateM (serverIdle Zero) initialServerState
where
Expand Down Expand Up @@ -225,15 +225,15 @@ txSubmissionInbound tracer maxUnacked mpReader mpWriter _version =
&& Map.null (bufferedTxs st)) $
pure $
SendMsgRequestTxIdsBlocking
(numTxsToAcknowledge st)
numTxIdsToRequest
(NumTxIdsToAck (numTxsToAcknowledge st))
(NumTxIdsToReq numTxIdsToRequest)
-- Our result if the client terminates the protocol
(traceWith tracer TraceTxInboundTerminated)
( collectAndContinueWithState (handleReply Zero) st {
numTxsToAcknowledge = 0,
requestedTxIdsInFlight = numTxIdsToRequest
}
. CollectTxIds numTxIdsToRequest
. CollectTxIds (NumTxIdsToReq numTxIdsToRequest)
. NonEmpty.toList)

Succ n' -> if canRequestMoreTxs st
Expand Down Expand Up @@ -271,7 +271,7 @@ txSubmissionInbound tracer maxUnacked mpReader mpWriter _version =
Nat n
-> StatefulCollect (ServerState txid tx) n txid tx m
handleReply n = StatefulCollect $ \st collect -> case collect of
CollectTxIds reqNo txids -> do
CollectTxIds (NumTxIdsToReq reqNo) txids -> do
-- Check they didn't send more than we asked for. We don't need to
-- check for a minimum: the blocking case checks for non-zero
-- elsewhere, and for the non-blocking case it is quite normal for
Expand Down Expand Up @@ -474,8 +474,8 @@ txSubmissionInbound tracer maxUnacked mpReader mpWriter _version =

if numTxIdsToRequest > 0
then pure $ SendMsgRequestTxIdsPipelined
(numTxsToAcknowledge st)
numTxIdsToRequest
(NumTxIdsToAck (numTxsToAcknowledge st))
(NumTxIdsToReq numTxIdsToRequest)
(continueWithStateM (serverIdle (Succ n)) st {
requestedTxIdsInFlight = requestedTxIdsInFlight st
+ numTxIdsToRequest,
Expand Down
Loading

0 comments on commit 1fe9d57

Please sign in to comment.