Skip to content

Commit

Permalink
perf: Make mempool update async from block.Commit (cometbft#3008) (co…
Browse files Browse the repository at this point in the history
…metbft#69)

First step to fixing cometbft#2925

PR'ing this to see if we have any test failures. Note that this is safe
in the happy path, as Reap and CheckTx both share this same lock. The
functionality behavior is that:

- Full nodes and non-proposers `timeout_prevote` beginning should not
block on updating the mempool
- Block proposers get _very slight_ increased concurrency before reaping
their next block. (Should be significantly fixed in subsequent PR's in
- Reap takes a lock on the mempool mutex, so there is no concurrency
safety issues right now.
- Mempool errors will not halt consensus, instead they just log an error
and call mempool flush. I actually think this may be better behavior? If
we want to preserve the old behavior, we can thread a generic "consensus
halt error" channel perhaps?

I'm not sure how/where to best document this. Please also let me know if
tests need creating. Seems like the create empty block tests sometimes
hit failures, I'll investigate tmrw

Also please feel free to take over this PR, just thought I"d make it to
help us with performance improvements. Happy to get this into an
experimental release to test on mainnets.

---

- [ ] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec

---------

Co-authored-by: Dev Ojha <ValarDragon@users.noreply.github.com>
Co-authored-by: Sergio Mena <sergio@informal.systems>
  • Loading branch information
3 people committed May 23, 2024
1 parent 925143e commit 2cea495
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .changelog/unreleased/features/3008-mempool-async-update.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[consensus]` Make mempool updates asynchronous from consensus Commit's,
reducing latency for reaching consensus timeouts.
([#3008](https://github.com/cometbft/cometbft/pull/3008))
4 changes: 3 additions & 1 deletion spec/abci/abci++_app_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ will be received on the mempool connection during this processing step, providin
update all four
connection states to the latest committed state at the same time.

When `Commit` returns, CometBFT unlocks the mempool.
CometBFT unlocks the mempool after it has finished updating for the new block,
which occurs asynchronously from `Commit`.
See [Mempool Update](../mempool/mempool.md) for more information on what the `update` task does.

WARNING: if the ABCI app logic processing the `Commit` message sends a
`/broadcast_tx_sync` or `/broadcast_tx` and waits for the response
Expand Down
42 changes: 35 additions & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,31 +274,38 @@ func (blockExec *BlockExecutor) applyBlock(state State, blockID types.BlockID, b
return state, retainHeight, nil
}

// Commit locks the mempool, runs the ABCI Commit message, and updates the
// Commit locks the mempool, runs the ABCI Commit message, and asynchronously starts updating the
// mempool.
// It returns the result of calling abci.Commit (the AppHash) and the height to retain (if any).
// Commit returns the result of calling abci.Commit which is the height to retain (if any)).
// The application is expected to have persisted its state (if any) before returning
// from the ABCI Commit call. This is the only place where the application should
// persist its state.
// The Mempool must be locked during commit and update because state is
// typically reset on Commit and old txs must be replayed against committed
// state before new txs are run in the mempool, lest they be invalid.
// The mempool is unlocked when the Update routine completes, which is
// asynchronous from Commit.
func (blockExec *BlockExecutor) Commit(
state State,
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
) ([]byte, int64, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
unlockMempool := func() { blockExec.mempool.Unlock() }

// while mempool is Locked, flush to ensure all async requests have completed
// in the ABCI app before Commit.
err := blockExec.mempool.FlushAppConn()
if err != nil {
blockExec.logger.Error("client error during mempool.FlushAppConn", "err", err)
unlockMempool()
blockExec.logger.Error("client error during mempool.FlushAppConn, flushing mempool", "err", err)
return nil, 0, err
}

// Commit block, get hash back
res, err := blockExec.proxyApp.CommitSync()
if err != nil {
unlockMempool()
blockExec.logger.Error("client error during proxyAppConn.CommitSync", "err", err)
return nil, 0, err
}
Expand All @@ -312,15 +319,36 @@ func (blockExec *BlockExecutor) Commit(
)

// Update mempool.
err = blockExec.mempool.Update(
go blockExec.asyncUpdateMempool(unlockMempool, block, state.Copy(), deliverTxResponses)

return res.Data, res.RetainHeight, err
}

// updates the mempool with the latest state asynchronously.
func (blockExec *BlockExecutor) asyncUpdateMempool(
unlockMempool func(),
block *types.Block,
state State,
deliverTxResponses []*abci.ResponseDeliverTx,
) {
defer unlockMempool()

err := blockExec.mempool.Update(
block.Height,
block.Txs,
deliverTxResponses,
TxPreCheck(state),
TxPostCheck(state),
)

return res.Data, res.RetainHeight, err
if err != nil {
// We panic in this case, out of legacy behavior. Before we made the mempool
// update complete asynchronously from Commit, we would panic if the mempool
// update failed. This is because we panic on any error within commit.
// We should consider changing this behavior in the future, as there is no
// need to panic if the mempool update failed. The most severe thing we
// would need to do is dump the mempool and restart it.
panic(fmt.Sprintf("client error during mempool.Update; error %v", err))
}
}

//---------------------------------------------------------
Expand Down

0 comments on commit 2cea495

Please sign in to comment.