diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Ledger/Abstract.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Ledger/Abstract.hs index a6ee7657c7e..de673d7b1a0 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Ledger/Abstract.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Ledger/Abstract.hs @@ -1,7 +1,11 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} +{-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE UndecidableSuperClasses #-} -- | Interface to the ledger layer @@ -20,6 +24,7 @@ module Ouroboros.Consensus.Ledger.Abstract ( import Control.Monad.Except import Data.Type.Equality ((:~:)) +import GHC.Generics (Generic) import GHC.Stack (HasCallStack) import Cardano.Prelude (NoUnexpectedThunks) @@ -118,7 +123,7 @@ ledgerTipSlot = pointSlot . ledgerTipPoint -- the tip of the underlying ledger (i.e., no blocks have been applied). data TickedLedgerState blk = TickedLedgerState { -- | The slot number supplied to 'applyChainTick' - tickedSlotNo :: SlotNo + tickedSlotNo :: !SlotNo -- | The underlying ledger state -- @@ -128,8 +133,12 @@ data TickedLedgerState blk = TickedLedgerState { -- -- > ledgerTipPoint (tickedLedgerState (applyChainTick cfg slot st) -- > == ledgerTipPoint st - , tickedLedgerState :: LedgerState blk + , tickedLedgerState :: !(LedgerState blk) } + deriving (Generic) + +deriving instance NoUnexpectedThunks (LedgerState blk) + => NoUnexpectedThunks (TickedLedgerState blk) -- | Link protocol to ledger class (SupportedBlock blk, UpdateLedger blk) => ProtocolLedgerView blk where diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/API.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/API.hs index 5aac531b807..5a56a11da7e 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/API.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/API.hs @@ -1,5 +1,6 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} @@ -7,6 +8,7 @@ module Ouroboros.Consensus.Mempool.API ( Mempool(..) + , addTxs , BlockSlot(..) , MempoolCapacityBytes (..) , MempoolSnapshot(..) @@ -119,45 +121,45 @@ data Mempool m blk idx = Mempool { -- @STM m@; we keep it in @m@ instead to leave open the possibility of -- persistence. -- - -- The following validation steps will be performed when adding - -- transactions to the mempool: + -- The new transactions provided will be validated, /in order/, against + -- the ledger state obtained by applying all the transactions already in + -- the Mempool to it. Transactions which are found to be invalid, with + -- respect to the ledger state, are dropped, whereas valid transactions + -- are added to the mempool. -- - -- * Transactions which already exist in the mempool are revalidated, - -- /in order/, against the current ledger state. Existing transactions - -- which are found to be invalid, with respect to the current ledger - -- state, are dropped from the mempool, whereas valid transactions - -- remain in the mempool. - -- * The new transactions provided will be validated, /in order/, - -- against the current ledger state. Transactions which are found to - -- be invalid, with respect to the current ledger state, are dropped, - -- whereas valid transactions are added to the mempool. + -- Note that transactions that are invalid, with respect to the ledger + -- state, will /never/ be added to the mempool. However, it is possible + -- that, at a given point in time, transactions which were once valid + -- but are now invalid, with respect to the current ledger state, could + -- exist within the mempool until they are revalidated and dropped from + -- the mempool via a call to 'syncWithLedger' or by the background + -- thread that watches the ledger for changes. -- - -- Note that transactions that are invalid, with respect to the current - -- ledger state, will /never/ be added to the mempool. However, it is - -- possible that, at a given point in time, transactions which were once - -- valid but are now invalid, with respect to the current ledger state, - -- could exist within the mempool until they are revalidated and dropped - -- from the mempool via a call to either 'addTxs' or 'syncState'. + -- This function will return two lists -- - -- This function will return a list containing the following - -- transactions: + -- 1. A list containing the following transactions: -- - -- * Those transactions provided which were found to be valid, along - -- with 'Nothing' for their accompanying @Maybe (ApplyTxErr blk)@ - -- values. - -- * Those transactions provided which were found to be invalid, along - -- with their accompanying validation errors. + -- * Those transactions provided which were found to be valid, along + -- with 'Nothing' for their accompanying @Maybe (ApplyTxErr blk)@ + -- values. These transactions are now in the Mempool. + -- * Those transactions provided which were found to be invalid, + -- along with their accompanying validation errors. These + -- transactions are not in the Mempool. -- - -- The order of this returned list is undefined. + -- 2. A list containing the transactions that have not yet been added + -- yet, as the capacity of the Mempool has been reached. I.e., there + -- is no space in the Mempool to add the first transaction in this + -- list. Note that we won't try to add smaller transactions after + -- that first transaction because they might depend on the first + -- transaction. + -- + -- POSTCONDITION: + -- > (processed, toProcess) <- tryAddTxs txs + -- > map fst processed ++ toProcess == txs -- -- Note that previously valid transaction that are now invalid with -- respect to the current ledger state are dropped from the mempool, but - -- are not part of the returned list. - -- - -- POSTCONDITION: given some ordering @txOrd@ on @'GenTx' blk@: - -- - -- > addTxs inTxs >>= \outTxs -> - -- > sortBy txOrd inTxs == sortBy txOrd (map fst outTxs) + -- are not part of the first returned list (nor the second). -- -- In principle it is possible that validation errors are transient; for -- example, it is possible that a transaction is rejected because one of @@ -172,23 +174,16 @@ data Mempool m blk idx = Mempool { -- (after all, by definition that must mean its inputs have been used). -- Rejected transactions are therefore not necessarily a sign of -- malicious behaviour. Indeed, we would expect /most/ transactions that - -- are reported as invalid by 'addTxs' to be invalid precisely because - -- they have already been included. Distinguishing between these two - -- cases can be done in theory, but it is expensive unless we have an - -- index of transaction hashes that have been included on the blockchain. + -- are reported as invalid by 'tryAddTxs' to be invalid precisely + -- because they have already been included. Distinguishing between these + -- two cases can be done in theory, but it is expensive unless we have + -- an index of transaction hashes that have been included on the + -- blockchain. -- - -- It is also worth noting that, if the mempool capacity is reached, - -- this function will block until it's able to at least attempt - -- validating and adding each of the provided transactions to the - -- mempool. In the event that we block, we also commit any useful work - -- done up to that point. For example, if we tried to add 5 valid - -- transactions but there is only space for 3, we would validate and add - -- 3 to the mempool and then block until more space becomes available, - -- at which point we would then re-attempt with the remaining 2 - -- transactions. This process would continue until it is able to at - -- least attempt validating and adding each of the provided transactions - -- to the mempool. - addTxs :: [GenTx blk] -> m [(GenTx blk, Maybe (ApplyTxErr blk))] + tryAddTxs :: [GenTx blk] + -> m ( [(GenTx blk, Maybe (ApplyTxErr blk))] + , [GenTx blk] + ) -- | Manually remove the given transactions from the mempool. , removeTxs :: [GenTxId blk] -> m () @@ -232,6 +227,47 @@ data Mempool m blk idx = Mempool { , zeroIdx :: idx } +-- | Wrapper around 'implTryAddTxs' that blocks until all transaction have +-- either been added to the Mempool or rejected. +-- +-- This function does not sync the Mempool contents with the ledger state in +-- case the latter changes, it relies on the background thread to do that. +-- +-- POSTCONDITON: +-- > processed <- addTxs mpEnv txs +-- > map fst processed == txs +addTxs + :: forall m blk idx. (MonadSTM m, ApplyTx blk) + => Mempool m blk idx + -> [GenTx blk] + -> m [(GenTx blk, Maybe (ApplyTxErr blk))] +addTxs mempool = \txs -> do + (processed, toAdd) <- tryAddTxs mempool txs + case toAdd of + [] -> return processed + _ -> go [processed] toAdd + where + go + :: [[(GenTx blk, Maybe (ApplyTxErr blk))]] + -- ^ The outer list is in reverse order, but all the inner lists will + -- be in the right order. + -> [GenTx blk] + -> m [(GenTx blk, Maybe (ApplyTxErr blk))] + go acc [] = return (concat (reverse acc)) + go acc txs@(tx:_) = do + let firstTxSize = txSize tx + -- Wait until there's at least room for the first transaction we're + -- trying to add, otherwise there's no point in trying to add it. + atomically $ do + curSize <- msNumBytes . snapshotMempoolSize <$> getSnapshot mempool + MempoolCapacityBytes capacity <- getCapacity mempool + check (curSize + firstTxSize <= capacity) + -- It is possible that between the check above and the call below, other + -- transactions are added, stealing our spot, but that's fine, we'll + -- just recurse again without progress. + (added, toAdd) <- tryAddTxs mempool txs + go (added:acc) toAdd + -- | The slot of the block in which the transactions in the mempool will end up -- -- The transactions in the mempool will be part of the body of a block, but a @@ -315,15 +351,19 @@ instance Monoid MempoolSize where -- | Events traced by the Mempool. data TraceEventMempool blk - = TraceMempoolAddTxs - ![GenTx blk] - -- ^ New, valid transaction were added to the Mempool. + = TraceMempoolAddedTx + !(GenTx blk) + -- ^ New, valid transaction that was added to the Mempool. !MempoolSize - -- ^ The current size of the Mempool. - | TraceMempoolRejectedTxs - ![(GenTx blk, ApplyTxErr blk)] - -- ^ New, invalid transaction were rejected and thus not added to the - -- Mempool. + -- ^ The size of the Mempool before adding the transaction. + !MempoolSize + -- ^ The size of the Mempool after adding the transaction. + | TraceMempoolRejectedTx + !(GenTx blk) + -- ^ New, invalid transaction thas was rejected and thus not added to + -- the Mempool. + !(ApplyTxErr blk) + -- ^ The reason for rejecting the transaction. !MempoolSize -- ^ The current size of the Mempool. | TraceMempoolRemoveTxs diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs index 8dc0f2f7ecd..e7f6e3e1500 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs @@ -10,7 +10,6 @@ module Ouroboros.Consensus.Mempool.Impl ( openMempool , LedgerInterface (..) , chainDBLedgerInterface - , txsToMempoolSize , TicketNo -- * For testing purposes , openMempoolWithoutSyncThread @@ -19,7 +18,7 @@ module Ouroboros.Consensus.Mempool.Impl ( import Control.Exception (assert) import Control.Monad (void) import Control.Monad.Except -import qualified Data.Foldable as Foldable +import Data.Maybe (isJust, isNothing, listToMaybe) import qualified Data.Set as Set import Data.Typeable import Data.Word (Word32) @@ -39,8 +38,7 @@ import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Mempool.API import Ouroboros.Consensus.Mempool.TxSeq (TicketNo, TxSeq (..), - TxTicket (..), fromTxSeq, lookupByTicketNo, - splitAfterTicketNo, splitAfterTxSize, zeroTicketNo) + TxTicket (..), zeroTicketNo) import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq import Ouroboros.Consensus.Util (repeatedly) import Ouroboros.Consensus.Util.IOLike @@ -80,7 +78,7 @@ openMempoolWithoutSyncThread ledger cfg capacity tracer = mkMempool :: (IOLike m, ApplyTx blk, HasTxId (GenTx blk)) => MempoolEnv m blk -> Mempool m blk TicketNo mkMempool env = Mempool - { addTxs = implAddTxs env [] + { tryAddTxs = implTryAddTxs env , removeTxs = implRemoveTxs env , syncWithLedger = implSyncWithLedger env , getSnapshot = implGetSnapshot env @@ -109,6 +107,14 @@ data InternalState blk = IS { -- | Transactions currently in the mempool isTxs :: !(TxSeq (GenTx blk)) + -- | The cached ledger state after applying the transactions in the + -- Mempool against the chain's ledger state. New transactions will be + -- validated against this ledger. + -- + -- INVARIANT: 'isLedgerState' is the ledger resulting from applying the + -- transactions in 'isTxs' against the ledger identified 'isTip' as tip. + , isLedgerState :: !(TickedLedgerState blk) + -- | The tip of the chain that 'isTxs' was validated against -- -- This comes from the underlying ledger state ('tickedLedgerState') @@ -116,8 +122,8 @@ data InternalState blk = IS { -- | The most recent 'SlotNo' that 'isTxs' was validated against -- - -- This comes from 'applyChainTick' ('tickerLedgerSlot'). - , isSlotNo :: !(WithOrigin SlotNo) + -- This comes from 'applyChainTick' ('tickedSlotNo'). + , isSlotNo :: !SlotNo -- | The mempool 'TicketNo' counter. -- @@ -127,10 +133,16 @@ data InternalState blk = IS { deriving (Generic) deriving instance ( NoUnexpectedThunks (GenTx blk) + , NoUnexpectedThunks (LedgerState blk) , StandardHash blk , Typeable blk ) => NoUnexpectedThunks (InternalState blk) +-- | \( O(1) \). Return the number of transactions in the internal state of +-- the Mempool paired with their total size in bytes. +isMempoolSize :: InternalState blk -> MempoolSize +isMempoolSize = TxSeq.toMempoolSize . isTxs + data MempoolEnv m blk = MempoolEnv { mpEnvLedger :: LedgerInterface m blk , mpEnvLedgerCfg :: LedgerConfig blk @@ -139,8 +151,18 @@ data MempoolEnv m blk = MempoolEnv { , mpEnvTracer :: Tracer m (TraceEventMempool blk) } -initInternalState :: InternalState blk -initInternalState = IS TxSeq.Empty Block.GenesisHash Origin zeroTicketNo +initInternalState + :: UpdateLedger blk + => TicketNo -- ^ Used for 'isLastTicketNo' + -> TickedLedgerState blk + -> InternalState blk +initInternalState lastTicketNo st = IS { + isTxs = TxSeq.Empty + , isLedgerState = st + , isTip = ledgerTipHash $ tickedLedgerState st + , isSlotNo = tickedSlotNo st + , isLastTicketNo = lastTicketNo + } initMempoolEnv :: (IOLike m, ApplyTx blk) => LedgerInterface m blk @@ -149,7 +171,9 @@ initMempoolEnv :: (IOLike m, ApplyTx blk) -> Tracer m (TraceEventMempool blk) -> m (MempoolEnv m blk) initMempoolEnv ledgerInterface cfg capacity tracer = do - isVar <- newTVarM initInternalState + st <- atomically $ getCurrentLedgerState ledgerInterface + let st' = tickLedgerState cfg TxsForUnknownBlock st + isVar <- newTVarM $ initInternalState zeroTicketNo st' return MempoolEnv { mpEnvLedger = ledgerInterface , mpEnvLedgerCfg = cfg @@ -180,233 +204,120 @@ forkSyncStateOnTipPointChange registry menv = -- | Add a bunch of transactions (oldest to newest) -- --- If the mempool capacity is reached, this function will block until it's --- able to at least attempt validating and adding each of the provided --- transactions to the mempool. --- --- Steps taken by this function (much of this information can also be found --- in comments throughout the code): --- --- * Attempt to sync the mempool with the ledger state, removing transactions --- from the mempool as necessary. --- --- In the event that some work is done here, we should update the mempool --- state and commit the STM transaction. From the STM transaction, we'll --- return the provided transactions which weren't yet validated (all of --- them) and 'implAddTxs' will call itself given these remaining unvalidated --- transactions. --- --- If the sync resulted in no work being done, we don't have to worry about --- losing any changes in the event of a 'retry'. So we continue by calling --- 'validateNew', providing the new transactions as an argument. --- --- * In 'validateNew', we first attempt to individually validate the first --- transaction of the provided list. --- --- If this is successful and we haven't reached the mempool capacity, we --- continue by attempting to validate each of the remaining new transactions --- with the function 'validateNewUntilMempoolFull'. --- --- If this fails due to the mempool capacity being reached, we 'retry' the --- STM transaction. We've done very little work up to this point so this --- is quite cheap. +-- This function returns two lists: the transactions that were added or +-- rejected, and the transactions that could not yet be added, because the +-- Mempool capacity was reached. See 'addTxs' for a function that blocks in +-- case the Mempool capacity is reached. -- --- * In 'validateNewUntilMempoolFull', we attempt to recursively validate and --- add each of the provided transactions, one-by-one. If at any point the --- mempool capacity is reached, we return the last 'ValidationResult' along --- with the remaining unvalidated transactions (those which we weren't able --- to attempt yet). +-- Transactions are added one by one, updating the Mempool each time one was +-- added successfully. -- --- * Given the result from 'validateNewUntilMempoolFull', 'validateNew' --- updates the mempool state and returns the same result back up to --- 'implAddTxs'. +-- This function does not sync the Mempool contents with the ledger state in +-- case the latter changes, it relies on the background thread to do that. -- --- * Given the result from 'validateNew', commit the STM transaction and --- 'implAddTxs' checks whether there are any remaining transactions which --- are yet to be validated. If no transactions remain, we return the result. --- On the other hand, if there are still remaining transactions, --- 'implAddTxs' calls itself given an accumulation of its results thus far --- along with the remaining transactions. -implAddTxs :: forall m blk. (IOLike m, ApplyTx blk) - => MempoolEnv m blk - -> [(GenTx blk, Maybe (ApplyTxErr blk))] - -- ^ An accumulator of the results from each call to 'implAddTxs'. - -- - -- Because this function will recurse until it's able to at least - -- attempt validating and adding each transaction provided, we keep - -- this accumulator of the results. 'implAddTxs' will recurse in - -- the event that it wasn't able to attempt adding all of the - -- provided transactions due to the mempool being at its capacity. - -> [GenTx blk] - -- ^ Transactions to validate and add to the mempool. - -> m [(GenTx blk, Maybe (ApplyTxErr blk))] -implAddTxs _ _ [] = pure [] -implAddTxs mpEnv accum txs = assert (all txInvariant txs) $ do - (vr, removed, rejected, unvalidated, mempoolSize) <- atomically $ do - IS{ isTip = initialISTip - , isSlotNo = initialISSlotNo - } <- readTVar mpEnvStateVar - - -- First sync the state, which might remove some transactions - syncRes <- validateIS mpEnv TxsForUnknownBlock - let removed = vrInvalid syncRes - - -- Determine whether the tip was updated after a call to 'validateIS' - -- - -- If the tip was updated, instead of immediately going on to call - -- 'validateNew' which can potentially 'retry', we should commit this - -- STM transaction to ensure that we don't lose any of the changes - -- brought about by 'validateIS' thus far. - -- - -- On the other hand, if the tip wasn't updated, we don't have to worry - -- about losing any changes in the event that we have to 'retry' this - -- STM transaction. So we should continue by validating the provided new - -- transactions. - if initialISTip /= vrBeforeTip syncRes || - initialISSlotNo /= At (vrBeforeSlotNo syncRes) - then do - -- The tip changed. - -- Because 'validateNew' can 'retry', we'll commit this STM - -- transaction here to ensure that we don't lose any of the changes - -- brought about by 'validateIS' thus far. - writeTVar mpEnvStateVar $ internalStateFromVR syncRes - mempoolSize <- getMempoolSize mpEnv - pure (syncRes, removed, [], txs, mempoolSize) - else do - -- The tip was unchanged. - -- Therefore, we don't have to worry about losing any changes in the - -- event that we have to 'retry' this STM transaction. Continue by - -- validating the provided new transactions. - (vr, unvalidated) <- validateNew syncRes - mempoolSize <- getMempoolSize mpEnv - pure (vr, removed, vrInvalid vr, unvalidated, mempoolSize) - - let accepted = vrNewValid vr - - traceBatch TraceMempoolRemoveTxs mempoolSize (map fst removed) - traceBatch TraceMempoolAddTxs mempoolSize accepted - traceBatch TraceMempoolRejectedTxs mempoolSize rejected - - case unvalidated of - -- All of the provided transactions have been validated. - [] -> return (mkRes accum accepted rejected) - - -- There are still transactions that remain which need to be validated. - _ -> implAddTxs mpEnv (mkRes accum accepted rejected) unvalidated +-- POSTCONDITON: +-- > (processed, toProcess) <- implTryAddTxs mpEnv txs +-- > map fst processed ++ toProcess == txs +implTryAddTxs + :: forall m blk. (IOLike m, ApplyTx blk) + => MempoolEnv m blk + -> [GenTx blk] + -> m ( [(GenTx blk, Maybe (ApplyTxErr blk))] + -- Transactions that were added or rejected. A prefix of the input + -- list. + , [GenTx blk] + -- Transactions that have not yet been added because the capacity + -- of the Mempool has been reached. A suffix of the input list. + ) +implTryAddTxs mpEnv = go [] where MempoolEnv { mpEnvStateVar + , mpEnvLedgerCfg = cfg + , mpEnvCapacity = MempoolCapacityBytes capacity , mpEnvTracer - , mpEnvLedgerCfg - , mpEnvCapacity = MempoolCapacityBytes mempoolCap } = mpEnv - traceBatch mkEv size batch - | null batch = return () - | otherwise = traceWith mpEnvTracer (mkEv batch size) - - mkRes acc accepted rejected = - [(tx, Just err) | (tx, err) <- rejected] - ++ zip accepted (repeat Nothing) - ++ acc - - -- | Attempt to validate and add as many new transactions to the mempool as - -- possible, returning the last 'ValidationResult' and the remaining - -- transactions which couldn't be added due to the mempool capacity being - -- reached. - validateNew :: ValidationResult blk - -> STM m (ValidationResult blk, [GenTx blk]) - validateNew res = - let res' = res { vrInvalid = [] } - in case txs of - [] -> return (res', []) - headTx:remainingTxs -> do - -- First, attempt to individually validate the first new transaction. - -- - -- If this is successful, we should continue to validate all of the - -- other new transactions one-by-one. If the mempool capacity would be - -- reached at any step, we update the 'InternalState' with the work - -- that we've already done and return the last 'ValidationResult' along - -- with the remaining unvalidated transactions. - -- - -- If, however, this fails due to the mempool capacity being met, we - -- should simply 'retry' as it will be cheap due to the fact that - -- we've done very little work in this STM transaction. + done acc toAdd = return (reverse acc, toAdd) + + go acc [] = done acc [] + go acc toAdd@(firstTx:toAdd') = do + let firstTxSize = txSize firstTx + -- Note: we execute the continuation returned by 'atomically' + join $ atomically $ do + is <- readTVar mpEnvStateVar + let curSize = msNumBytes $ isMempoolSize is + if curSize + firstTxSize > capacity then + -- No space in the Mempool + return $ done acc toAdd + else do + let vr = extendVRNew cfg firstTx $ validationResultFromIS is + is' = internalStateFromVR vr + unless (null (vrNewValid vr)) $ + -- Each time we have found a valid transaction, we update the + -- Mempool. This keeps our STM transactions short, avoiding + -- repeated work. -- - -- It makes sense to do this due to the fact that a 'retry' at this - -- point is likely to be more efficient than simply returning the - -- result and constantly recursing until there's at least one space in - -- the mempool (if remaining unvalidated transactions are returned up - -- to 'implAddTxs', 'implAddTxs' will recurse). - headTxValidationRes <- validateNewUntilMempoolFull [headTx] res' - case headTxValidationRes of - -- Mempool capacity hasn't been reached (no remaining unvalidated - -- transactions were returned). - (vr, []) -> do - -- Continue validating the remaining transactions. - (vr', unvalidatedTxs) <- validateNewUntilMempoolFull remainingTxs vr - writeTVar mpEnvStateVar $ internalStateFromVR vr' - pure (vr', unvalidatedTxs) - - -- The mempool capacity has been reached. - _ -> retry - - validateNewUntilMempoolFull - :: [GenTx blk] - -- ^ The new transactions to validate. - -> ValidationResult blk - -- ^ The 'ValidationResult' from which to begin validating. - -> STM m (ValidationResult blk, [GenTx blk]) - -- ^ The last 'ValidationResult' along with the remaining transactions - -- (those not yet validated due to the mempool capacity being reached). - validateNewUntilMempoolFull [] vr = pure (vr, []) - validateNewUntilMempoolFull (tx:txs') vr = do - -- Get the current mempool size - MempoolSize { msNumBytes = curSizeInBytes } <- getMempoolSize mpEnv - - -- Determine what the mempool size would be if we were to commit the new - -- transactions we've validated thus far and also the next transaction - -- to validate, 'tx'. - -- If this value is greater than the 'MempoolCapacityBytes', then we - -- know not to continue validating at this time. - let newTxsBytes = sum (txSize <$> vrNewValid vr) + txSize tx - newSizeInBytes = curSizeInBytes + newTxsBytes - - -- The size of a mempool should never be greater than its capacity. - assert (curSizeInBytes <= mempoolCap) $ - -- Here, we check whether we're at the mempool's capacity /before/ - -- attempting to validate the next transaction. - if newSizeInBytes <= mempoolCap - then validateNewUntilMempoolFull txs' (extendVRNew mpEnvLedgerCfg tx vr) - else pure (vr, tx:txs') -- if we're at mempool capacity, we return the - -- last 'ValidationResult' as well as the - -- remaining transactions (those not yet - -- validated). + -- Note that even if the transaction were invalid, we could still + -- write the state, because in that case we would have that @is == + -- is'@, but there's no reason to do that additional write. + writeTVar mpEnvStateVar is' + + -- We only extended the ValidationResult with a single transaction + -- ('firstTx'). So if it's not in 'vrInvalid', it must be in + -- 'vrNewValid'. + return $ case listToMaybe (vrInvalid vr) of + -- The transaction was valid + Nothing -> + assert (isJust (vrNewValid vr)) $ do + traceWith mpEnvTracer $ TraceMempoolAddedTx + firstTx + (isMempoolSize is) + (isMempoolSize is') + go ((firstTx, Nothing):acc) toAdd' + Just (_, err) -> + assert (isNothing (vrNewValid vr)) $ + assert (length (vrInvalid vr) == 1) $ do + traceWith mpEnvTracer $ TraceMempoolRejectedTx + firstTx + err + (isMempoolSize is) + go ((firstTx, Just err):acc) toAdd' implRemoveTxs :: (IOLike m, ApplyTx blk, HasTxId (GenTx blk)) => MempoolEnv m blk -> [GenTxId blk] -> m () -implRemoveTxs mpEnv@MempoolEnv{mpEnvTracer, mpEnvStateVar} txIds = do +implRemoveTxs mpEnv txIds = do (removed, mempoolSize) <- atomically $ do - -- Filtering is O(n), but this function will so rarely be used, as it is - -- an escape hatch when there's an inconsistency between the ledger and - -- the mempool. - modifyTVar mpEnvStateVar $ \is@IS{isTxs} -> is - { isTxs = TxSeq.filterTxs - (\TxTicket { txTicketTx } -> txId txTicketTx `notElem` toRemove) - isTxs - } - vr <- validateIS mpEnv TxsForUnknownBlock - writeTVar mpEnvStateVar $ internalStateFromVR vr - -- The size of the mempool /after/ manually removing the transactions. - mempoolSize <- getMempoolSize mpEnv - return (map fst (vrInvalid vr), mempoolSize) + IS { isTxs, isLastTicketNo } <- readTVar mpEnvStateVar + st <- getCurrentLedgerState mpEnvLedger + -- Filtering is O(n), but this function will rarely be used, as it is an + -- escape hatch when there's an inconsistency between the ledger and the + -- mempool. + let txTickets' = filter + ((`notElem` toRemove) . txId . txTicketTx) + (TxSeq.toList isTxs) + vr = revalidateTxsFor cfg + (tickLedgerState cfg TxsForUnknownBlock st) + isLastTicketNo + txTickets' + is' = internalStateFromVR vr + writeTVar mpEnvStateVar is' + return (map fst (vrInvalid vr), isMempoolSize is') + unless (null txIds) $ traceWith mpEnvTracer $ TraceMempoolManuallyRemovedTxs txIds removed mempoolSize where + MempoolEnv + { mpEnvLedgerCfg = cfg + , mpEnvLedger + , mpEnvTracer + , mpEnvStateVar + } = mpEnv + toRemove = Set.fromList txIds implSyncWithLedger :: (IOLike m, ApplyTx blk) @@ -452,21 +363,13 @@ implGetSnapshotFor MempoolEnv{mpEnvStateVar, mpEnvLedgerCfg} implGetCapacity :: IOLike m => MempoolEnv m blk -> STM m MempoolCapacityBytes implGetCapacity = pure . mpEnvCapacity --- | Return the number of transactions in the Mempool paired with their total --- size in bytes. +-- | \( O(1) \). Return the number of transactions in the Mempool paired with +-- their total size in bytes. getMempoolSize :: (IOLike m, ApplyTx blk) => MempoolEnv m blk -> STM m MempoolSize getMempoolSize MempoolEnv{mpEnvStateVar} = - txsToMempoolSize . isTxs <$> readTVar mpEnvStateVar - --- | Given a 'Foldable' of transactions, calculate what the 'MempoolSize' --- would be if a mempool were to consist /only/ of those transactions. -txsToMempoolSize :: (Foldable t, ApplyTx blk) => t (GenTx blk) -> MempoolSize -txsToMempoolSize = foldMap toMempoolSize - where - toMempoolSize :: ApplyTx blk => GenTx blk -> MempoolSize - toMempoolSize tx = MempoolSize { msNumTxs = 1, msNumBytes = txSize tx } + isMempoolSize <$> readTVar mpEnvStateVar {------------------------------------------------------------------------------- MempoolSnapshot Implementation @@ -490,23 +393,23 @@ implSnapshotGetTxsAfter :: InternalState blk -> TicketNo -> [(GenTx blk, TicketNo)] implSnapshotGetTxsAfter IS{isTxs} tn = - fromTxSeq $ snd $ splitAfterTicketNo isTxs tn + TxSeq.toTuples $ snd $ TxSeq.splitAfterTicketNo isTxs tn implSnapshotGetTxsForSize :: InternalState blk -> Word32 -> [(GenTx blk, TicketNo)] implSnapshotGetTxsForSize IS{isTxs} maxSize = - fromTxSeq $ fst $ splitAfterTxSize isTxs maxSize + TxSeq.toTuples $ fst $ TxSeq.splitAfterTxSize isTxs maxSize implSnapshotGetTx :: InternalState blk -> TicketNo -> Maybe (GenTx blk) -implSnapshotGetTx IS{isTxs} tn = isTxs `lookupByTicketNo` tn +implSnapshotGetTx IS{isTxs} tn = isTxs `TxSeq.lookupByTicketNo` tn implSnapshotGetMempoolSize :: ApplyTx blk => InternalState blk -> MempoolSize -implSnapshotGetMempoolSize = txsToMempoolSize . isTxs +implSnapshotGetMempoolSize = TxSeq.toMempoolSize . isTxs {------------------------------------------------------------------------------- Validation @@ -522,23 +425,19 @@ data ValidationResult blk = ValidationResult { -- | The transactions that were found to be valid (oldest to newest) , vrValid :: TxSeq (GenTx blk) - -- | New transactions (not previously known) which were found to be valid. - -- - -- n.b. This will only contain valid transactions which were /newly/ added - -- to the mempool (not previously known valid transactions). + -- | A new transaction (not previously known) which was found to be valid. -- - -- Order not guaranteed. - , vrNewValid :: [GenTx blk] + -- n.b. This will only contain a valid transaction that was /newly/ added + -- to the mempool (not a previously known valid transaction). + , vrNewValid :: Maybe (GenTx blk) - -- | The state of the ledger after 'vrValid' - -- - -- NOTE: This is intentionally not a strict field, so that we don't - -- evaluate the final ledger state if we don't have to. + -- | The state of the ledger after applying 'vrValid' against the ledger + -- state identifeid by 'vrBeforeTip'. , vrAfter :: TickedLedgerState blk -- | The transactions that were invalid, along with their errors -- - -- Order not guaranteed + -- From oldest to newest. , vrInvalid :: [(GenTx blk, ApplyTxErr blk)] -- | The mempool 'TicketNo' counter. @@ -554,41 +453,41 @@ data ValidationResult blk = ValidationResult { -- -- Discards information about invalid and newly valid transactions internalStateFromVR :: ValidationResult blk -> InternalState blk -internalStateFromVR ValidationResult { vrBeforeTip - , vrBeforeSlotNo - , vrValid - , vrLastTicketNo - } = IS { - isTxs = vrValid - , isTip = vrBeforeTip - , isSlotNo = At $ vrBeforeSlotNo - , isLastTicketNo = vrLastTicketNo +internalStateFromVR vr = IS { + isTxs = vrValid + , isLedgerState = vrAfter + , isTip = vrBeforeTip + , isSlotNo = vrBeforeSlotNo + , isLastTicketNo = vrLastTicketNo } - --- | Initialize 'ValidationResult' from a ledger state and a list of --- transactions /known/ to be valid in that ledger state -initVR :: forall blk. ApplyTx blk - => LedgerConfig blk - -> TxSeq (GenTx blk) - -> TickedLedgerState blk - -> TicketNo - -> ValidationResult blk -initVR cfg = \knownValid st lastTicketNo -> ValidationResult { - vrBeforeTip = ledgerTipHash (tickedLedgerState st) - , vrBeforeSlotNo = tickedSlotNo st - , vrValid = knownValid - , vrNewValid = [] - , vrAfter = afterKnownValid - (Foldable.toList knownValid) - st + where + ValidationResult { + vrBeforeTip + , vrBeforeSlotNo + , vrValid + , vrAfter + , vrLastTicketNo + } = vr + +-- | Construct a 'ValidationResult' from internal state. +validationResultFromIS :: InternalState blk -> ValidationResult blk +validationResultFromIS is = ValidationResult { + vrBeforeTip = isTip + , vrBeforeSlotNo = isSlotNo + , vrValid = isTxs + , vrNewValid = Nothing + , vrAfter = isLedgerState , vrInvalid = [] - , vrLastTicketNo = lastTicketNo + , vrLastTicketNo = isLastTicketNo } where - afterKnownValid :: [GenTx blk] - -> TickedLedgerState blk -> TickedLedgerState blk - afterKnownValid [] = id - afterKnownValid (tx:txs) = afterKnownValid txs . reapplyTxSameState cfg tx + IS { + isTxs + , isLedgerState + , isTip + , isSlotNo + , isLastTicketNo + } = is -- | Extend 'ValidationResult' with a previously validated transaction that -- may or may not be valid in this ledger state @@ -600,43 +499,56 @@ initVR cfg = \knownValid st lastTicketNo -> ValidationResult { -- signatures. extendVRPrevApplied :: ApplyTx blk => LedgerConfig blk - -> (GenTx blk, TicketNo) + -> TxTicket (GenTx blk) -> ValidationResult blk -> ValidationResult blk -extendVRPrevApplied cfg (tx, tn) - vr@ValidationResult{vrValid, vrAfter, vrInvalid} = +extendVRPrevApplied cfg txTicket vr = case runExcept (reapplyTx cfg tx vrAfter) of Left err -> vr { vrInvalid = (tx, err) : vrInvalid } - Right st' -> vr { vrValid = vrValid :> TxTicket tx tn (txSize tx) + Right st' -> vr { vrValid = vrValid :> txTicket , vrAfter = st' } + where + TxTicket { txTicketTx = tx } = txTicket + ValidationResult { vrValid, vrAfter, vrInvalid } = vr -- | Extend 'ValidationResult' with a new transaction (one which we have not -- previously validated) that may or may not be valid in this ledger state. +-- +-- PRECONDITION: 'vrNewValid' is 'Nothing'. In other words: new transactions +-- should be validated one-by-one, not by calling 'extendVRNew' on its result +-- again. extendVRNew :: ApplyTx blk => LedgerConfig blk -> GenTx blk -> ValidationResult blk -> ValidationResult blk -extendVRNew cfg tx - vr@ValidationResult { vrValid - , vrAfter - , vrInvalid - , vrLastTicketNo - , vrNewValid - } = - let nextTicketNo = succ vrLastTicketNo - in case runExcept (applyTx cfg tx vrAfter) of - Left err -> vr { vrInvalid = (tx, err) : vrInvalid - } - Right st' -> vr { vrValid = vrValid :> TxTicket tx nextTicketNo (txSize tx) - , vrNewValid = tx : vrNewValid - , vrAfter = st' - , vrLastTicketNo = nextTicketNo - } - --- | Validate internal state +extendVRNew cfg tx vr = case runExcept (applyTx cfg tx vrAfter) of + Left err -> vr { vrInvalid = (tx, err) : vrInvalid + } + Right st' -> vr { vrValid = vrValid :> TxTicket tx nextTicketNo (txSize tx) + , vrNewValid = vrNewValid' + , vrAfter = st' + , vrLastTicketNo = nextTicketNo + } + where + ValidationResult { + vrValid + , vrAfter + , vrInvalid + , vrLastTicketNo + , vrNewValid + } = vr + + nextTicketNo = succ vrLastTicketNo + + vrNewValid' = case vrNewValid of + Nothing -> Just tx + Just _ -> error "precondition failed: vrNewValid not Nothing" + +-- | Validate the internal state against the current ledger state and the +-- given 'BlockSlot', revalidating if necessary. validateIS :: forall m blk. (IOLike m, ApplyTx blk) => MempoolEnv m blk -> BlockSlot @@ -646,33 +558,68 @@ validateIS MempoolEnv{mpEnvLedger, mpEnvLedgerCfg, mpEnvStateVar} blockSlot = <$> getCurrentLedgerState mpEnvLedger <*> readTVar mpEnvStateVar --- | Validate internal state given specific ledger -validateStateFor :: forall blk. ApplyTx blk - => LedgerConfig blk - -> BlockSlot - -> LedgerState blk - -> InternalState blk - -> ValidationResult blk -validateStateFor cfg blockSlot st IS{isTxs, isTip, isSlotNo, isLastTicketNo} - | isTip == ledgerTipHash (tickedLedgerState st') && - isSlotNo == At (tickedSlotNo st') - = initVR cfg isTxs st' isLastTicketNo - | otherwise - = repeatedly (extendVRPrevApplied cfg) (fromTxSeq isTxs) - $ initVR cfg TxSeq.Empty st' isLastTicketNo +-- | Given a (valid) internal state, validate it against the given ledger +-- state and 'BlockSlot'. +-- +-- When these match the internal state's 'isTip' and 'isSlotNo', this is very +-- cheap, as the given internal state will already be valid against the given +-- inputs. +-- +-- When these don't match, the transaction in the internal state will be +-- revalidated ('revalidateTxsFor'). +validateStateFor + :: forall blk. ApplyTx blk + => LedgerConfig blk + -> BlockSlot + -> LedgerState blk + -> InternalState blk + -> ValidationResult blk +validateStateFor cfg blockSlot st is + | isTip == ledgerTipHash (tickedLedgerState st') + , isSlotNo == tickedSlotNo st' + = validationResultFromIS is + | otherwise + = revalidateTxsFor cfg st' isLastTicketNo (TxSeq.toList isTxs) + where + IS { isTxs, isTip, isSlotNo, isLastTicketNo } = is + st' = tickLedgerState cfg blockSlot st + +-- | Revalidate the given transactions (@['TxTicket' ('GenTx' blk)]@) against +-- the given ticked ledger state. +revalidateTxsFor + :: forall blk. ApplyTx blk + => LedgerConfig blk + -> TickedLedgerState blk + -> TicketNo + -- ^ 'isLastTicketNo' & 'vrLastTicketNo' + -> [TxTicket (GenTx blk)] + -> ValidationResult blk +revalidateTxsFor cfg st lastTicketNo txTickets = + repeatedly + (extendVRPrevApplied cfg) + txTickets + (validationResultFromIS is) + where + is = initInternalState lastTicketNo st + +-- | Tick the 'LedgerState' using the given 'BlockSlot'. +tickLedgerState + :: UpdateLedger blk + => LedgerConfig blk + -> BlockSlot + -> LedgerState blk + -> TickedLedgerState blk +tickLedgerState cfg blockSlot st = applyChainTick cfg slot st where - st' :: TickedLedgerState blk - st' = applyChainTick cfg slot st - -- If we don't yet know the slot number, optimistically assume that they -- will be included in a block in the next available slot slot :: SlotNo slot = case blockSlot of - TxsForBlockInSlot s -> s - TxsForUnknownBlock -> - case ledgerTipSlot st of - -- TODO: We should not make assumptions about the underlying - -- ledger. We will fix this in - -- - Origin -> Block.SlotNo 0 - At s -> succ s + TxsForBlockInSlot s -> s + TxsForUnknownBlock -> + case ledgerTipSlot st of + -- TODO: We should not make assumptions about the underlying + -- ledger. We will fix this in + -- + Origin -> Block.SlotNo 0 + At s -> succ s diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/TxSeq.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/TxSeq.hs index e12459858a6..20e431c4032 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/TxSeq.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/TxSeq.hs @@ -4,22 +4,25 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ViewPatterns #-} - +-- | Intended for qualified import. +-- +-- > import Ouroboros.Consensus.Mempool.TxSeq (TxSeq (..)) +-- > import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq module Ouroboros.Consensus.Mempool.TxSeq ( TicketNo(..) , TxTicket(..) , TxSeq(Empty, (:>), (:<)) - , toTxSeq - , fromTxSeq - , fromTxTickets - , txTickets + , fromList + , toList + , toTuples , lookupByTicketNo , splitAfterTicketNo , splitAfterTxSize , zeroTicketNo - , filterTxs + , toMempoolSize -- * Reference implementations for testing , splitAfterTxSizeSpec @@ -32,8 +35,11 @@ import Data.Word (Word64) import GHC.Generics (Generic) import Cardano.Prelude (NoUnexpectedThunks) + import Ouroboros.Network.Protocol.TxSubmission.Type (TxSizeInBytes) +import Ouroboros.Consensus.Mempool.API (MempoolSize (..)) + {------------------------------------------------------------------------------- Mempool transaction sequence as a finger tree -------------------------------------------------------------------------------} @@ -199,7 +205,7 @@ splitAfterTxSize (TxSeq txs) n = -- expected. splitAfterTxSizeSpec :: TxSeq tx -> TxSizeInBytes -> (TxSeq tx, TxSeq tx) splitAfterTxSizeSpec txseq n = - mapTuple fromTxTickets $ go 0 [] (txTickets txseq) + mapTuple fromList $ go 0 [] (toList txseq) where mapTuple :: (a -> b) -> (a, a) -> (b, b) mapTuple f (x, y) = (f x, f y) @@ -218,34 +224,26 @@ splitAfterTxSizeSpec txseq n = | otherwise -> (reverse accTickets, t:ts) --- | Given a list of triples consisting of transactions, ticket numbers, and --- the transaction sizes in bytes, construct a 'TxSeq'. -toTxSeq :: [(tx, TicketNo, TxSizeInBytes)] -> TxSeq tx -toTxSeq ts = fromTxTickets $ map (uncurry3 TxTicket) ts - where - uncurry3 :: (a -> b -> c -> d) -> ((a, b, c) -> d) - uncurry3 f (a, b, c) = f a b c - -- | Given a list of 'TxTicket's, construct a 'TxSeq'. -fromTxTickets :: [TxTicket tx] -> TxSeq tx -fromTxTickets = Foldable.foldl' (:>) Empty +fromList :: [TxTicket tx] -> TxSeq tx +fromList = Foldable.foldl' (:>) Empty + +-- | Convert a 'TxSeq' to a list of 'TxTicket's. +toList :: TxSeq tx -> [TxTicket tx] +toList (TxSeq ftree) = Foldable.toList ftree -- | Convert a 'TxSeq' to a list of pairs of transactions and their -- associated 'TicketNo's. -fromTxSeq :: TxSeq tx -> [(tx, TicketNo)] -fromTxSeq (TxSeq ftree) = fmap - (\ticket -> (txTicketTx ticket, txTicketNo ticket)) - (Foldable.toList $ ftree) - --- | \( O(n) \). Filter the 'TxSeq'. -filterTxs :: (TxTicket tx -> Bool) -> TxSeq tx -> TxSeq tx -filterTxs p (TxSeq ftree) = - TxSeq - . FingerTree.fromList - . filter p - . Foldable.toList - $ ftree - --- | Convert a 'TxSeq' to a list of 'TxTicket's. -txTickets :: TxSeq tx -> [TxTicket tx] -txTickets (TxSeq ftree) = Foldable.toList ftree +toTuples :: TxSeq tx -> [(tx, TicketNo)] +toTuples (TxSeq ftree) = fmap + (\ticket -> (txTicketTx ticket, txTicketNo ticket)) + (Foldable.toList ftree) + +-- | \( O(1) \). Return the 'MempoolSize' of the given 'TxSeq'. +toMempoolSize :: TxSeq tx -> MempoolSize +toMempoolSize (TxSeq ftree) = MempoolSize + { msNumTxs = fromIntegral mSize + , msNumBytes = mSizeBytes + } + where + TxSeqMeasure { mSizeBytes, mSize } = FingerTree.measure ftree diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs index 39d7f99484b..53abf55da82 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs @@ -624,7 +624,7 @@ getMempoolReader mempool = Outbound.TxSubmissionMempoolReader } getMempoolWriter - :: (Monad m, HasTxId (GenTx blk)) + :: (IOLike m, ApplyTx blk, HasTxId (GenTx blk)) => Mempool m blk TicketNo -> TxSubmissionMempoolWriter (GenTxId blk) (GenTx blk) TicketNo m getMempoolWriter mempool = Inbound.TxSubmissionMempoolWriter diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/TxSubmission.hs b/ouroboros-consensus/src/Ouroboros/Consensus/TxSubmission.hs index 1531d3aaa3b..f685a62cf97 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/TxSubmission.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/TxSubmission.hs @@ -1,5 +1,4 @@ {-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} @@ -15,22 +14,23 @@ import Control.Tracer import Ouroboros.Network.Protocol.LocalTxSubmission.Server import Ouroboros.Consensus.Mempool.API +import Ouroboros.Consensus.Util.IOLike -- | Local transaction submission server, for adding txs to the 'Mempool' -- localTxSubmissionServer - :: Monad m + :: (MonadSTM m, ApplyTx blk) => Tracer m (TraceLocalTxSubmissionServerEvent blk) -> Mempool m blk idx -> LocalTxSubmissionServer (GenTx blk) (ApplyTxErr blk) m () -localTxSubmissionServer tracer Mempool{addTxs} = +localTxSubmissionServer tracer mempool = server where server = LocalTxSubmissionServer { recvMsgSubmitTx = \tx -> do traceWith tracer $ TraceReceivedTx tx - res <- addTxs [tx] + res <- addTxs mempool [tx] case res of [(_tx, mbErr)] -> return (mbErr, server) -- The output list of addTxs has the same length as the input list. diff --git a/ouroboros-consensus/test-consensus/Test/Consensus/Mempool.hs b/ouroboros-consensus/test-consensus/Test/Consensus/Mempool.hs index 3c1ddada1ce..7468c23ca37 100644 --- a/ouroboros-consensus/test-consensus/Test/Consensus/Mempool.hs +++ b/ouroboros-consensus/test-consensus/Test/Consensus/Mempool.hs @@ -18,7 +18,7 @@ import Data.Either (isRight) import Data.List (find, foldl', isSuffixOf, nub, partition, sort) import Data.Map (Map) import qualified Data.Map as Map -import Data.Maybe (isJust, isNothing) +import Data.Maybe (isJust, isNothing, mapMaybe) import qualified Data.Set as Set import Data.Word import GHC.Stack (HasCallStack) @@ -65,6 +65,8 @@ tests = testGroup "Mempool" , testProperty "addTxs txs == mapM (addTxs . pure) txs" prop_Mempool_addTxs_one_vs_multiple , testProperty "result of addTxs" prop_Mempool_addTxs_result , testProperty "Invalid transactions are never added" prop_Mempool_InvalidTxsNeverAdded + , testProperty "result of getCapacity" prop_Mempool_getCapacity + , testProperty "Mempool capacity implementation" prop_Mempool_Capacity , testProperty "Added valid transactions are traced" prop_Mempool_TraceValidTxs , testProperty "Rejected invalid txs are traced" prop_Mempool_TraceRejectedTxs , testProperty "Removed invalid txs are traced" prop_Mempool_TraceRemovedTxs @@ -89,9 +91,8 @@ prop_Mempool_snapshotTxs_snapshotTxsAfter setup = prop_Mempool_addTxs_getTxs :: TestSetupWithTxs -> Property prop_Mempool_addTxs_getTxs setup = withTestMempool (testSetup setup) $ \TestMempool { mempool } -> do - let Mempool { addTxs, getSnapshot } = mempool - _ <- addTxs (allTxs setup) - MempoolSnapshot { snapshotTxs } <- atomically getSnapshot + _ <- addTxs mempool (allTxs setup) + MempoolSnapshot { snapshotTxs } <- atomically $ getSnapshot mempool return $ counterexample (ppTxs (txs setup)) $ validTxs setup `isSuffixOf` map fst snapshotTxs @@ -100,9 +101,8 @@ prop_Mempool_addTxs_getTxs setup = prop_Mempool_addTxs_one_vs_multiple :: TestSetupWithTxs -> Property prop_Mempool_addTxs_one_vs_multiple setup = withTestMempool (testSetup setup) $ \TestMempool { mempool } -> do - let Mempool { addTxs, getSnapshot } = mempool - forM_ (allTxs setup) $ \tx -> addTxs [tx] - MempoolSnapshot { snapshotTxs } <- atomically getSnapshot + forM_ (allTxs setup) $ \tx -> addTxs mempool [tx] + MempoolSnapshot { snapshotTxs } <- atomically $ getSnapshot mempool return $ counterexample (ppTxs (txs setup)) $ validTxs setup `isSuffixOf` map fst snapshotTxs @@ -112,20 +112,20 @@ prop_Mempool_addTxs_one_vs_multiple setup = prop_Mempool_addTxs_result :: TestSetupWithTxs -> Property prop_Mempool_addTxs_result setup = withTestMempool (testSetup setup) $ \TestMempool { mempool } -> do - let Mempool { addTxs } = mempool - result <- addTxs (allTxs setup) + result <- addTxs mempool (allTxs setup) return $ counterexample (ppTxs (txs setup)) $ - sort [(tx, isNothing mbErr) | (tx, mbErr) <- result] === - sort [(testTx, valid) | (testTx, valid) <- txs setup] + [(tx, isNothing mbErr) | (tx, mbErr) <- result] === + [(testTx, valid) | (testTx, valid) <- txs setup] -- | Test that invalid transactions are never added to the 'Mempool'. prop_Mempool_InvalidTxsNeverAdded :: TestSetupWithTxs -> Property prop_Mempool_InvalidTxsNeverAdded setup = withTestMempool (testSetup setup) $ \TestMempool { mempool } -> do - let Mempool { addTxs, getSnapshot } = mempool - txsInMempoolBefore <- map fst . snapshotTxs <$> atomically getSnapshot - _ <- addTxs (allTxs setup) - txsInMempoolAfter <- map fst . snapshotTxs <$> atomically getSnapshot + txsInMempoolBefore <- map fst . snapshotTxs <$> + atomically (getSnapshot mempool) + _ <- addTxs mempool (allTxs setup) + txsInMempoolAfter <- map fst . snapshotTxs <$> + atomically (getSnapshot mempool) return $ counterexample (ppTxs (txs setup)) $ conjoin -- Check for each transaction in the mempool (ignoring those already -- in the mempool beforehand) that it was a valid transaction. @@ -150,25 +150,87 @@ prop_Mempool_removeTxs (TestSetupWithTxInMempool testSetup txToRemove) = show txToRemove <> "): " <> show txsInMempoolAfter) (txToRemove `notElem` txsInMempoolAfter) +-- | Test that 'getCapacity' returns the 'MempoolCapacityBytes' value that the +-- mempool was initialized with. +-- +-- Ignore the "100% empty Mempool" label in the test output, that is there +-- because we reuse 'withTestMempool' and always start with an empty Mempool +-- and 'LedgerState'. +prop_Mempool_getCapacity :: MempoolCapTestSetup -> Property +prop_Mempool_getCapacity mcts = + withTestMempool testSetup $ \TestMempool{mempool} -> do + mpCap <- atomically $ getCapacity mempool + pure (mpCap === testMempoolCap testSetup) + where + MempoolCapTestSetup (TestSetupWithTxs testSetup _txsToAdd) = mcts + +-- | Test the correctness of 'tryAddTxs' when the Mempool is (or will be) at +-- capacity. +-- +-- Ignore the "100% empty Mempool" label in the test output, that is there +-- because we reuse 'withTestMempool' and always start with an empty Mempool +-- and 'LedgerState'. +prop_Mempool_Capacity :: MempoolCapTestSetup -> Property +prop_Mempool_Capacity (MempoolCapTestSetup testSetupWithTxs) = + withTestMempool testSetup $ \TestMempool { mempool } -> do + curSize <- msNumBytes . snapshotMempoolSize <$> + atomically (getSnapshot mempool) + res@(processed, unprocessed) <- tryAddTxs mempool (map fst txsToAdd) + return $ + counterexample ("Initial size: " <> show curSize) $ + classify (null processed) "no transactions added" $ + classify (null unprocessed) "all transactions added" $ + blindErrors res === expectedResult curSize + where + TestSetupWithTxs testSetup txsToAdd = testSetupWithTxs + MempoolCapacityBytes capacity = testMempoolCap testSetup + + -- | Convert @Maybe TestTxError@ into a @Bool@: Nothing -> True, Just _ -> + -- False. + blindErrors + :: ([(GenTx TestBlock, Maybe TestTxError)], [GenTx TestBlock]) + -> ([(GenTx TestBlock, Bool)], [GenTx TestBlock]) + blindErrors (processed, toAdd) = (processed', toAdd) + where + processed' = [(tx, isNothing mbErr) | (tx, mbErr) <- processed] + + expectedResult + :: Word32 -- ^ Current mempool size + -> ([(GenTx TestBlock, Bool)], [GenTx TestBlock]) + expectedResult = \curSize -> go curSize [] txsToAdd + where + go + :: Word32 + -> [(GenTx TestBlock, Bool)] + -> [(GenTx TestBlock, Bool)] + -> ([(GenTx TestBlock, Bool)], [GenTx TestBlock]) + go curSize processed = \case + [] + -> (reverse processed, []) + (tx, valid):txsToAdd' + | let curSize' = curSize + txSize tx + , curSize' <= capacity + -> go (if valid then curSize' else curSize) + ((tx, valid):processed) + txsToAdd' + | otherwise + -> (reverse processed, tx:map fst txsToAdd') + -- | Test that all valid transactions added to a 'Mempool' via 'addTxs' are -- appropriately represented in the trace of events. prop_Mempool_TraceValidTxs :: TestSetupWithTxs -> Property prop_Mempool_TraceValidTxs setup = withTestMempool (testSetup setup) $ \testMempool -> do let TestMempool { mempool, getTraceEvents } = testMempool - Mempool { addTxs } = mempool - _ <- addTxs (allTxs setup) + _ <- addTxs mempool (allTxs setup) evs <- getTraceEvents return $ counterexample (ppTxs (txs setup)) $ - let addedTxs = maybe - [] - (\(TraceMempoolAddTxs txs _) -> txs) - (find isAddTxsEvent evs) - in sort (validTxs setup) === sort addedTxs + let addedTxs = mapMaybe isAddedTxsEvent evs + in validTxs setup === addedTxs where - isAddTxsEvent :: TraceEventMempool blk -> Bool - isAddTxsEvent (TraceMempoolAddTxs _ _) = True - isAddTxsEvent _ = False + isAddedTxsEvent :: TraceEventMempool blk -> Maybe (GenTx blk) + isAddedTxsEvent (TraceMempoolAddedTx tx _ _) = Just tx + isAddedTxsEvent _ = Nothing -- | Test that all invalid rejected transactions returned from 'addTxs' are -- appropriately represented in the trace of events. @@ -176,19 +238,15 @@ prop_Mempool_TraceRejectedTxs :: TestSetupWithTxs -> Property prop_Mempool_TraceRejectedTxs setup = withTestMempool (testSetup setup) $ \testMempool -> do let TestMempool { mempool, getTraceEvents } = testMempool - Mempool { addTxs } = mempool - _ <- addTxs (allTxs setup) + _ <- addTxs mempool (allTxs setup) evs <- getTraceEvents return $ counterexample (ppTxs (txs setup)) $ - let rejectedTxs = maybe - [] - (\(TraceMempoolRejectedTxs txsAndErrs _) -> map fst txsAndErrs) - (find isRejectedTxsEvent evs) - in sort (invalidTxs setup) === sort rejectedTxs + let rejectedTxs = mapMaybe isRejectedTxEvent evs + in invalidTxs setup === rejectedTxs where - isRejectedTxsEvent :: TraceEventMempool blk -> Bool - isRejectedTxsEvent (TraceMempoolRejectedTxs _ _) = True - isRejectedTxsEvent _ = False + isRejectedTxEvent :: TraceEventMempool blk -> Maybe (GenTx blk) + isRejectedTxEvent (TraceMempoolRejectedTx tx _ _) = Just tx + isRejectedTxEvent _ = Nothing -- | Test that all transactions in the 'Mempool' that have become invalid -- because of an update to the ledger are appropriately represented in the @@ -197,8 +255,7 @@ prop_Mempool_TraceRemovedTxs :: TestSetup -> Property prop_Mempool_TraceRemovedTxs setup = withTestMempool setup $ \testMempool -> do let TestMempool { mempool, getTraceEvents, addTxsToLedger, getCurrentLedger } = testMempool - Mempool { getSnapshot, syncWithLedger } = mempool - MempoolSnapshot { snapshotTxs } <- atomically getSnapshot + MempoolSnapshot { snapshotTxs } <- atomically $ getSnapshot mempool -- We add all the transactions in the mempool to the ledger. Some of -- them will become invalid because all inputs have been spent. let txsInMempool = map fst snapshotTxs @@ -206,7 +263,7 @@ prop_Mempool_TraceRemovedTxs setup = -- Sync the mempool with the ledger. Now some of the transactions in the -- mempool should have been removed. - void syncWithLedger + void $ syncWithLedger mempool -- Predict which transactions should have been removed curLedger <- atomically getCurrentLedger @@ -214,10 +271,7 @@ prop_Mempool_TraceRemovedTxs setup = -- Look at the trace to see which transactions actually got removed evs <- getTraceEvents - let removedTxs = maybe - [] - (\(TraceMempoolRemoveTxs txs _) -> txs) - (find isRemoveTxsEvent evs) + let removedTxs = concat $ mapMaybe isRemoveTxsEvent evs -- Also check that 'addTxsToLedger' never resulted in an error. return $ @@ -225,9 +279,9 @@ prop_Mempool_TraceRemovedTxs setup = map (const (Right ())) errs === errs .&&. sort expected === sort removedTxs where - isRemoveTxsEvent :: TraceEventMempool blk -> Bool - isRemoveTxsEvent (TraceMempoolRemoveTxs _ _) = True - isRemoveTxsEvent _ = False + isRemoveTxsEvent :: TraceEventMempool blk -> Maybe [GenTx blk] + isRemoveTxsEvent (TraceMempoolRemoveTxs txs _) = Just txs + isRemoveTxsEvent _ = Nothing expectedToBeRemoved :: LedgerState TestBlock -> [TestTx] -> [TestTx] expectedToBeRemoved ledgerState txsInMempool = @@ -502,13 +556,13 @@ instance Arbitrary TestSetupWithTxs where [ TestSetupWithTxs { testSetup = testSetup', txs } | testSetup' <- shrink testSetup ] <> [ TestSetupWithTxs { testSetup, txs = txs' } - | txs' <- map (revalidate testSetup) . + | txs' <- map (fst . revalidate testSetup) . shrinkList (const []) . map fst $ txs ] -revalidate :: TestSetup -> [TestTx] -> [(TestTx, Bool)] +revalidate :: TestSetup -> [TestTx] -> ([(TestTx, Bool)], LedgerState TestBlock) revalidate TestSetup { testLedgerState, testInitialTxs } = - fst . validateTxs initLedgerState + validateTxs initLedgerState where -- The LedgerState after adding the transactions initially in the mempool initLedgerState = repeatedly @@ -659,13 +713,43 @@ withTestMempool setup@TestSetup { testLedgerState, testInitialTxs, testMempoolCa where -- Wrap in 'TickedLedgerState' so that we can call 'applyTx' notReallyTicked :: LedgerState TestBlock -> TickedLedgerState TestBlock - notReallyTicked = TickedLedgerState (error "SlotNo unused") + notReallyTicked = TickedLedgerState 0 txs = map fst snapshotTxs mkErrMsg e = "At the end of the test, the Mempool contents were invalid: " <> show e +{------------------------------------------------------------------------------- + MempoolCapTestSetup +-------------------------------------------------------------------------------} + +-- | Reuse 'TestSetupWithTxs' but just pick a specific capacity based on the +-- transactions to add. +newtype MempoolCapTestSetup = MempoolCapTestSetup TestSetupWithTxs + deriving (Show) + +instance Arbitrary MempoolCapTestSetup where + -- TODO: shrink + arbitrary = do + testSetupWithTxs@TestSetupWithTxs { testSetup, txs } <- arbitrary + -- The Mempool should at least be capable of containing the transactions + -- it already contains. + let currentSize = sum (map txSize (testInitialTxs testSetup)) + capacityMinBound = currentSize + validTxsToAdd = [tx | (tx, True) <- txs] + -- Use the current size + the sum of all the valid transactions to add + -- as the upper bound. + capacityMaxBound = currentSize + sum (map txSize validTxsToAdd) + -- Note that we could pick @currentSize@, meaning that we can't add any + -- more transactions to the Mempool + capacity <- choose + ( capacityMinBound + , capacityMaxBound + ) + let testSetup' = testSetup { testMempoolCap = MempoolCapacityBytes capacity } + return $ MempoolCapTestSetup testSetupWithTxs { testSetup = testSetup' } + {------------------------------------------------------------------------------- TxSeq Properties -------------------------------------------------------------------------------} @@ -676,10 +760,10 @@ prop_TxSeq_lookupByTicketNo_complete xs = and [ case TxSeq.lookupByTicketNo txseq tn of Just tx' -> tx == tx' Nothing -> False - | (tx, tn) <- TxSeq.fromTxSeq txseq ] + | (tx, tn) <- TxSeq.toTuples txseq ] where txseq :: TxSeq Int - txseq = TxSeq.toTxSeq $ zip3 xs (map TicketNo [0..]) (repeat 0) + txseq = TxSeq.fromList $ zipWith3 TxTicket xs (map TicketNo [0..]) (repeat 0) -- | Only finds elements in the sequence prop_TxSeq_lookupByTicketNo_sound :: @@ -715,7 +799,7 @@ prop_TxSeq_lookupByTicketNo_sound smalls small = -- that of the 'TxSizeInBytes' which the 'TxSeq' was split on. prop_TxSeq_splitAfterTxSize :: TxSizeSplitTestSetup -> Property prop_TxSeq_splitAfterTxSize tss = - property $ txSizeSum (txTickets before) <= tssTxSizeToSplitOn + property $ txSizeSum (TxSeq.toList before) <= tssTxSizeToSplitOn where TxSizeSplitTestSetup { tssTxSizeToSplitOn } = tss @@ -733,8 +817,8 @@ prop_TxSeq_splitAfterTxSize tss = -- implementation. prop_TxSeq_splitAfterTxSizeSpec :: TxSizeSplitTestSetup -> Property prop_TxSeq_splitAfterTxSizeSpec tss = - txTickets implBefore === txTickets specBefore - .&&. txTickets implAfter === txTickets specAfter + TxSeq.toList implBefore === TxSeq.toList specBefore + .&&. TxSeq.toList implAfter === TxSeq.toList specAfter where TxSizeSplitTestSetup { tssTxSizeToSplitOn } = tss @@ -782,7 +866,7 @@ instance Arbitrary TxSizeSplitTestSetup where -- | Convert a 'TxSizeSplitTestSetup' to a 'TxSeq'. txSizeSplitTestSetupToTxSeq :: TxSizeSplitTestSetup -> TxSeq Int txSizeSplitTestSetupToTxSeq TxSizeSplitTestSetup { tssTxSizes } = - TxSeq.toTxSeq [(0, TicketNo 0, tssTxSize) | tssTxSize <- tssTxSizes] + TxSeq.fromList [TxTicket 0 (TicketNo 0) tssTxSize | tssTxSize <- tssTxSizes] {------------------------------------------------------------------------------- TicketNo Properties @@ -895,36 +979,42 @@ expectedTicketAssignment actions = executeAction :: forall m. IOLike m => TestMempool m -> Action -> m Property executeAction testMempool action = case action of AddTxs txs -> do - void $ addTxs txs - expectTraceEvent $ \case - TraceMempoolAddTxs txs' _ - | sort txs == sort txs' - -> property True - _ -> counterexample ("Transactions not added: " <> condense txs) False + void $ addTxs mempool txs + tracedAddedTxs <- expectTraceEvent $ \case + TraceMempoolAddedTx tx _ _ -> Just tx + _ -> Nothing + return $ if tracedAddedTxs == txs + then property True + else counterexample + ("Expected TraceMempoolAddedTx events for " <> condense txs <> + " but got " <> condense tracedAddedTxs) + False RemoveTxs txs -> do - removeTxs (map txId txs) - expectTraceEvent $ \case - TraceMempoolManuallyRemovedTxs txIds' [] _ - | sort (map txId txs) == sort txIds' - -> property True - _ -> counterexample ("Transactions not removed: " <> condense txs) False + removeTxs mempool (map txId txs) + tracedManuallyRemovedTxs <- expectTraceEvent $ \case + TraceMempoolManuallyRemovedTxs txIds _ _ -> Just txIds + _ -> Nothing + return $ if concat tracedManuallyRemovedTxs == map txId txs + then property True + else counterexample + ("Expected a TraceMempoolManuallyRemovedTxs event for " <> + condense txs <> " but got " <> + condense tracedManuallyRemovedTxs) + False + where TestMempool { mempool , eraseTraceEvents , getTraceEvents } = testMempool - Mempool { addTxs, removeTxs } = mempool - expectTraceEvent :: (TraceEventMempool TestBlock -> Property) -> m Property - expectTraceEvent checker = do + expectTraceEvent :: (TraceEventMempool TestBlock -> Maybe a) -> m [a] + expectTraceEvent extractor = do evs <- getTraceEvents eraseTraceEvents - return $ case evs of - [ev] -> checker ev - [] -> counterexample "No events traced" False - _ -> counterexample "Multiple events traced" False + return $ mapMaybe extractor evs currentTicketAssignment :: IOLike m => Mempool m TestBlock TicketNo -> m TicketAssignment