Skip to content

Commit

Permalink
Mempool: better revalidation and caching of the ledger state
Browse files Browse the repository at this point in the history
Fixes #1565 and #1301.

* Replace `Mempool.addTxs` with `Mempool.tryAddTxs`, which doesn't block. This
  function will be much easier to test. We provide an implementation of
  `Mempool.addTxs` in terms of `Mempool.tryAddTxs`. We simplify the
  implementations by relying on the background thread for synchronising the
  Mempool with an updated ledger state.

* Cache the `TickedLedgerState` after applying all transactions in the Mempool
  to it (#1301). Previously, we had to reapply all transactions whenever we
  added a new transaction. The new approach is much faster.

* As we're keeping the `TickedLedgerState` in memory, make sure it is
  thunk-free.

* Correct revalidation in (#1565) after explicitly removing transactions from
  the Mempool, see `revalidateTxsFor`.

* Cleanup of conversion functions in `TxSeq`.

* O(1) `MempoolSize` calculation for a `TxSeq` using its fingertree measure.
  • Loading branch information
mrBliss committed Feb 10, 2020
1 parent 7ac9b93 commit 302f374
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 463 deletions.
13 changes: 11 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/Ledger/Abstract.hs
@@ -1,7 +1,11 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE UndecidableSuperClasses #-}

-- | Interface to the ledger layer
Expand All @@ -20,6 +24,7 @@ module Ouroboros.Consensus.Ledger.Abstract (

import Control.Monad.Except
import Data.Type.Equality ((:~:))
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)

import Cardano.Prelude (NoUnexpectedThunks)
Expand Down Expand Up @@ -118,7 +123,7 @@ ledgerTipSlot = pointSlot . ledgerTipPoint
-- the tip of the underlying ledger (i.e., no blocks have been applied).
data TickedLedgerState blk = TickedLedgerState {
-- | The slot number supplied to 'applyChainTick'
tickedSlotNo :: SlotNo
tickedSlotNo :: !SlotNo

-- | The underlying ledger state
--
Expand All @@ -128,8 +133,12 @@ data TickedLedgerState blk = TickedLedgerState {
--
-- > ledgerTipPoint (tickedLedgerState (applyChainTick cfg slot st)
-- > == ledgerTipPoint st
, tickedLedgerState :: LedgerState blk
, tickedLedgerState :: !(LedgerState blk)
}
deriving (Generic)

deriving instance NoUnexpectedThunks (LedgerState blk)
=> NoUnexpectedThunks (TickedLedgerState blk)

-- | Link protocol to ledger
class (SupportedBlock blk, UpdateLedger blk) => ProtocolLedgerView blk where
Expand Down
144 changes: 92 additions & 52 deletions ouroboros-consensus/src/Ouroboros/Consensus/Mempool/API.hs
@@ -1,12 +1,14 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE UndecidableSuperClasses #-}

module Ouroboros.Consensus.Mempool.API (
Mempool(..)
, addTxs
, BlockSlot(..)
, MempoolCapacityBytes (..)
, MempoolSnapshot(..)
Expand Down Expand Up @@ -119,45 +121,45 @@ data Mempool m blk idx = Mempool {
-- @STM m@; we keep it in @m@ instead to leave open the possibility of
-- persistence.
--
-- The following validation steps will be performed when adding
-- transactions to the mempool:
-- The new transactions provided will be validated, /in order/, against
-- the ledger state obtained by applying all the transactions already in
-- the Mempool to it. Transactions which are found to be invalid, with
-- respect to the ledger state, are dropped, whereas valid transactions
-- are added to the mempool.
--
-- * Transactions which already exist in the mempool are revalidated,
-- /in order/, against the current ledger state. Existing transactions
-- which are found to be invalid, with respect to the current ledger
-- state, are dropped from the mempool, whereas valid transactions
-- remain in the mempool.
-- * The new transactions provided will be validated, /in order/,
-- against the current ledger state. Transactions which are found to
-- be invalid, with respect to the current ledger state, are dropped,
-- whereas valid transactions are added to the mempool.
-- Note that transactions that are invalid, with respect to the ledger
-- state, will /never/ be added to the mempool. However, it is possible
-- that, at a given point in time, transactions which were once valid
-- but are now invalid, with respect to the current ledger state, could
-- exist within the mempool until they are revalidated and dropped from
-- the mempool via a call to 'syncWithLedger' or by the background
-- thread that watches the ledger for changes.
--
-- Note that transactions that are invalid, with respect to the current
-- ledger state, will /never/ be added to the mempool. However, it is
-- possible that, at a given point in time, transactions which were once
-- valid but are now invalid, with respect to the current ledger state,
-- could exist within the mempool until they are revalidated and dropped
-- from the mempool via a call to either 'addTxs' or 'syncState'.
-- This function will return two lists
--
-- This function will return a list containing the following
-- transactions:
-- 1. A list containing the following transactions:
--
-- * Those transactions provided which were found to be valid, along
-- with 'Nothing' for their accompanying @Maybe (ApplyTxErr blk)@
-- values.
-- * Those transactions provided which were found to be invalid, along
-- with their accompanying validation errors.
-- * Those transactions provided which were found to be valid, along
-- with 'Nothing' for their accompanying @Maybe (ApplyTxErr blk)@
-- values. These transactions are now in the Mempool.
-- * Those transactions provided which were found to be invalid,
-- along with their accompanying validation errors. These
-- transactions are not in the Mempool.
--
-- The order of this returned list is undefined.
-- 2. A list containing the transactions that have not yet been added
-- yet, as the capacity of the Mempool has been reached. I.e., there
-- is no space in the Mempool to add the first transaction in this
-- list. Note that we won't try to add smaller transactions after
-- that first transaction because they might depend on the first
-- transaction.
--
-- POSTCONDITION:
-- > (processed, toProcess) <- tryAddTxs txs
-- > map fst processed ++ toProcess == txs
--
-- Note that previously valid transaction that are now invalid with
-- respect to the current ledger state are dropped from the mempool, but
-- are not part of the returned list.
--
-- POSTCONDITION: given some ordering @txOrd@ on @'GenTx' blk@:
--
-- > addTxs inTxs >>= \outTxs ->
-- > sortBy txOrd inTxs == sortBy txOrd (map fst outTxs)
-- are not part of the first returned list (nor the second).
--
-- In principle it is possible that validation errors are transient; for
-- example, it is possible that a transaction is rejected because one of
Expand All @@ -172,23 +174,16 @@ data Mempool m blk idx = Mempool {
-- (after all, by definition that must mean its inputs have been used).
-- Rejected transactions are therefore not necessarily a sign of
-- malicious behaviour. Indeed, we would expect /most/ transactions that
-- are reported as invalid by 'addTxs' to be invalid precisely because
-- they have already been included. Distinguishing between these two
-- cases can be done in theory, but it is expensive unless we have an
-- index of transaction hashes that have been included on the blockchain.
-- are reported as invalid by 'tryAddTxs' to be invalid precisely
-- because they have already been included. Distinguishing between these
-- two cases can be done in theory, but it is expensive unless we have
-- an index of transaction hashes that have been included on the
-- blockchain.
--
-- It is also worth noting that, if the mempool capacity is reached,
-- this function will block until it's able to at least attempt
-- validating and adding each of the provided transactions to the
-- mempool. In the event that we block, we also commit any useful work
-- done up to that point. For example, if we tried to add 5 valid
-- transactions but there is only space for 3, we would validate and add
-- 3 to the mempool and then block until more space becomes available,
-- at which point we would then re-attempt with the remaining 2
-- transactions. This process would continue until it is able to at
-- least attempt validating and adding each of the provided transactions
-- to the mempool.
addTxs :: [GenTx blk] -> m [(GenTx blk, Maybe (ApplyTxErr blk))]
tryAddTxs :: [GenTx blk]
-> m ( [(GenTx blk, Maybe (ApplyTxErr blk))]
, [GenTx blk]
)

-- | Manually remove the given transactions from the mempool.
, removeTxs :: [GenTxId blk] -> m ()
Expand Down Expand Up @@ -232,6 +227,47 @@ data Mempool m blk idx = Mempool {
, zeroIdx :: idx
}

-- | Wrapper around 'implTryAddTxs' that blocks until all transaction have
-- either been added to the Mempool or rejected.
--
-- This function does not sync the Mempool contents with the ledger state in
-- case the latter changes, it relies on the background thread to do that.
--
-- POSTCONDITON:
-- > processed <- addTxs mpEnv txs
-- > map fst processed == txs
addTxs
:: forall m blk idx. (MonadSTM m, ApplyTx blk)
=> Mempool m blk idx
-> [GenTx blk]
-> m [(GenTx blk, Maybe (ApplyTxErr blk))]
addTxs mempool = \txs -> do
(processed, toAdd) <- tryAddTxs mempool txs
case toAdd of
[] -> return processed
_ -> go [processed] toAdd
where
go
:: [[(GenTx blk, Maybe (ApplyTxErr blk))]]
-- ^ The outer list is in reverse order, but all the inner lists will
-- be in the right order.
-> [GenTx blk]
-> m [(GenTx blk, Maybe (ApplyTxErr blk))]
go acc [] = return (concat (reverse acc))
go acc txs@(tx:_) = do
let firstTxSize = txSize tx
-- Wait until there's at least room for the first transaction we're
-- trying to add, otherwise there's no point in trying to add it.
atomically $ do
curSize <- msNumBytes . snapshotMempoolSize <$> getSnapshot mempool
MempoolCapacityBytes capacity <- getCapacity mempool
check (curSize + firstTxSize <= capacity)
-- It is possible that between the check above and the call below, other
-- transactions are added, stealing our spot, but that's fine, we'll
-- just recurse again without progress.
(added, toAdd) <- tryAddTxs mempool txs
go (added:acc) toAdd

-- | The slot of the block in which the transactions in the mempool will end up
--
-- The transactions in the mempool will be part of the body of a block, but a
Expand Down Expand Up @@ -315,15 +351,19 @@ instance Monoid MempoolSize where

-- | Events traced by the Mempool.
data TraceEventMempool blk
= TraceMempoolAddTxs
![GenTx blk]
= TraceMempoolAddedTx
!(GenTx blk)
-- ^ New, valid transaction were added to the Mempool.
!MempoolSize
-- ^ The current size of the Mempool.
| TraceMempoolRejectedTxs
![(GenTx blk, ApplyTxErr blk)]
-- ^ The size of the Mempool before adding the transaction.
!MempoolSize
-- ^ The size of the Mempool after adding the transaction.
| TraceMempoolRejectedTx
!(GenTx blk)
-- ^ New, invalid transaction were rejected and thus not added to the
-- Mempool.
!(ApplyTxErr blk)
-- ^ The reason for rejecting the transaction.
!MempoolSize
-- ^ The current size of the Mempool.
| TraceMempoolRemoveTxs
Expand Down

0 comments on commit 302f374

Please sign in to comment.