Skip to content

Commit

Permalink
mempool: Keep track of senders in reactor instead of implementation (t…
Browse files Browse the repository at this point in the history
…endermint#1010)

* Move senders from txs to txSenders

* Move resCbFirstTime to globalCb

* Fix typo

* Remove callback argument from CheckTx

* Fix lint

* make mocks

* lock isSender

* Change sync.Map for map with lock; add test

* fix lint

* comments

* move senders to reactor and add txsRemoved channel

* Record sender only on valid txs

* fix MConnection panicked

* notifyTxRemoved when removeAllTxs

* Add TxsRemoved and EnableTxsRemoved to interface

* Increase channel buffer size

* forgot emptyMempool

* Change Mempool interface

* Revert "Change Mempool interface"

This reverts commit d3468a12843b11269a180c9f889f1ec79c55b1d3.

* Simplify if/else

* Channel buffer size

* notify txRemoved even when txKey is not in txsMap

* Remove redundant update to map

* add callback for removing txs

* Add tests

* Use Mutex intead of RWMutex

* Fix TestMempoolNoCacheOverflow

* Fix lint

* Fix TestMempoolTxConcurrentWithCommit

* Fix TestReactorConcurrency

* Comment

* Remove references to implemenation details (cache)

* Rename removeTxOnReactor

* Comment

* Rename removeFromCache

* Comment

* Remove test line forgotten in tendermint#934.

* Comment

* revert mempool test size

* ci: Trigger workflows on merge group (tendermint#1118)

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Revert "config: add bootstrap peers (tendermint#9680)" (tendermint#1109)

* Revert "config: add bootstrap peers (tendermint#9680)"

This reverts commit f12588a.

* docs/p2p: bootstrap_peers config flag removed

* node: Revert removal of public reactor accessors (tendermint#1120)

* Revert "Remove unused code (tendermint#286)"

This reverts commit a2d9915.

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* node: Remove access to consensus state

Consensus state should only ever be accessible via the consensus
reactor.

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add changelog entry

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Fix mistake in changelog entry

Signed-off-by: Thane Thomson <connect@thanethomson.com>

---------

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* ci: Disable CodeQL check in merge queues (tendermint#1123)

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* p2p: Remove UPnP functionality (tendermint#1114)

* Remove UPnP functionality

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update documentation and specs to reflect UPnP removal

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add changelog entry

Signed-off-by: Thane Thomson <connect@thanethomson.com>

---------

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* ADR 107: Rename proto versions to pre-v1 betas (tendermint#1110)

* ADR 107: Rename proto versions to pre-v1 betas

* ADR 107: fix hyperlinks to ADR 103

* ADR 107: authorship of the revision

Co-authored-by: Thane Thomson <connect@thanethomson.com>

* ADR 107: change status to Accepted

---------

Co-authored-by: Thane Thomson <connect@thanethomson.com>

* RFC 104: Internal messaging using the actor model (tendermint#1092)

* Add first draft

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Expand comment on actor receive method

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Fix grammar

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Emphasize/clarify conclusions

Signed-off-by: Thane Thomson <connect@thanethomson.com>

---------

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* build(deps): Bump github.com/bufbuild/buf from 1.23.1 to 1.24.0 (tendermint#1131)

Bumps [github.com/bufbuild/buf](https://github.com/bufbuild/buf) from 1.23.1 to 1.24.0.
- [Release notes](https://github.com/bufbuild/buf/releases)
- [Changelog](https://github.com/bufbuild/buf/blob/main/CHANGELOG.md)
- [Commits](bufbuild/buf@v1.23.1...v1.24.0)

---
updated-dependencies:
- dependency-name: github.com/bufbuild/buf
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* build(deps): Bump github.com/vektra/mockery/v2 from 2.31.1 to 2.32.0 (tendermint#1132)

Bumps [github.com/vektra/mockery/v2](https://github.com/vektra/mockery) from 2.31.1 to 2.32.0.
- [Release notes](https://github.com/vektra/mockery/releases)
- [Changelog](https://github.com/vektra/mockery/blob/master/docs/changelog.md)
- [Commits](vektra/mockery@v2.31.1...v2.32.0)

---
updated-dependencies:
- dependency-name: github.com/vektra/mockery/v2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* build(deps): Bump docker/setup-buildx-action from 2.9.0 to 2.9.1 (tendermint#1133)

Bumps [docker/setup-buildx-action](https://github.com/docker/setup-buildx-action) from 2.9.0 to 2.9.1.
- [Release notes](https://github.com/docker/setup-buildx-action/releases)
- [Commits](docker/setup-buildx-action@v2.9.0...v2.9.1)

---
updated-dependencies:
- dependency-name: docker/setup-buildx-action
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* build(deps): Bump bufbuild/buf-setup-action from 1.23.1 to 1.24.0 (tendermint#1134)

Bumps [bufbuild/buf-setup-action](https://github.com/bufbuild/buf-setup-action) from 1.23.1 to 1.24.0.
- [Release notes](https://github.com/bufbuild/buf-setup-action/releases)
- [Commits](bufbuild/buf-setup-action@v1.23.1...v1.24.0)

---
updated-dependencies:
- dependency-name: bufbuild/buf-setup-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* spec: Add mempool specification in English and Quint (tendermint#997)

* add mempool English and Quint specifications

* some changes

* Update spec/mempool/mempool.md

Co-authored-by: Lasaro <lasaro@gmail.com>

* Update spec/mempool/mempool.md

Co-authored-by: Lasaro <lasaro@gmail.com>

* small change to the text

* add changelog entry

* some minor fixes

---------

Co-authored-by: Josef Widder <44643235+josef-widder@users.noreply.github.com>
Co-authored-by: Lasaro <lasaro@gmail.com>

* Add NewRandomTxs

* Fix TestReactorTxSendersMultiNode

* Fix TestDontExhaustMaxActiveIDs

* Fix unused parameter

* Add changelog

* Update UPGRADING.md

* Remove unused link in doc

---------

Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Thane Thomson <connect@thanethomson.com>
Co-authored-by: Daniel <daniel.cason@informal.systems>
Co-authored-by: Mikhail Zabaluev <mikhail.zabaluev@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Pierre Sutra <0track@gmail.com>
Co-authored-by: Josef Widder <44643235+josef-widder@users.noreply.github.com>
Co-authored-by: Lasaro <lasaro@gmail.com>
  • Loading branch information
8 people committed Jul 25, 2023
1 parent 2b1afd0 commit 330cd98
Show file tree
Hide file tree
Showing 21 changed files with 525 additions and 300 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
`[mempool]` Change the signature of `CheckTx` in the `Mempool` interface to
`CheckTx(tx types.Tx) (*abcicli.ReqRes, error)`. Also, add new method
`SetTxRemovedCallback`.
([\#1010](https://github.com/cometbft/cometbft/issues/1010))
20 changes: 19 additions & 1 deletion UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ This guide provides instructions for upgrading to specific versions of CometBFT.
* In the protobuf message `ResponseCheckTx`, fields `sender`, `priority`, and
`mempool_error`, which were only used by the priority mempool, were removed
but still kept in the message as "reserved".
* The `Mempool` interface was modified on the following methods. Note that this
interface is meant for internal use only, so you should be aware of these
changes only if you happen to call these methods directly.
- `CheckTx`'s signature changed from
`CheckTx(tx types.Tx, cb func(*abci.ResponseCheckTx), txInfo TxInfo) error`
to `CheckTx(tx types.Tx) (abcicli.ReqRes, error)`.
- The method used to take a callback function `cb` to be applied to the ABCI
`CheckTx` response. Now `CheckTx` returns the ABCI response of type
`abcicli.ReqRes`, on which the callback must be applied manually. For
example:
```golang
reqRes, err := CheckTx(tx)
cb(reqRes.Response.GetCheckTx())
```
- The second parameter was `txInfo`, which essentially contained information
about the sender of the transaction. Now that information is stored in the
mempool reactor instead of the data structure, so it is no longer needed in
this method.

### `block_results` RPC endpoint - query result display change (breaking)

Expand Down Expand Up @@ -139,4 +157,4 @@ please see the [Tendermint Core upgrading instructions][tmupgrade].
[v03426]: https://github.com/informalsystems/tendermint/releases/tag/v0.34.26
[discussions]: https://github.com/cometbft/cometbft/discussions
[tmupgrade]: https://github.com/tendermint/tendermint/blob/35581cf54ec436b8c37fabb43fdaa3f48339a170/UPGRADING.md
[go120]: https://go.dev/blog/go1.20
[go120]: https://go.dev/blog/go1.20
15 changes: 7 additions & 8 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
func deliverTxsRange(t *testing.T, cs *State, start, end int) {
// Deliver some txs.
for i := start; i < end; i++ {
err := assertMempool(cs.txNotifier).CheckTx(kvstore.NewTx(fmt.Sprintf("%d", i), "true"), nil, mempl.TxInfo{})
_, err := assertMempool(cs.txNotifier).CheckTx(kvstore.NewTx(fmt.Sprintf("%d", i), "true"))
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -166,17 +166,16 @@ func TestMempoolRmBadTx(t *testing.T) {
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
invalidTx := []byte("invalidTx")
err := assertMempool(cs.txNotifier).CheckTx(invalidTx, func(r *abci.ResponseCheckTx) {
if r.Code != kvstore.CodeTypeInvalidTxFormat {
t.Errorf("expected checktx to return invalid format, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
reqRes, err := assertMempool(cs.txNotifier).CheckTx(invalidTx)
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
}
if reqRes.Response.GetCheckTx().Code != kvstore.CodeTypeInvalidTxFormat {
t.Errorf("expected checktx to return invalid format, got %v", reqRes.Response)
return
}
checkTxRespCh <- struct{}{}

// check for the tx
for {
Expand Down
12 changes: 5 additions & 7 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)

// send a tx
if err := assertMempool(css[3].txNotifier).CheckTx(kvstore.NewTxFromID(1), func(resp *abci.ResponseCheckTx) {
require.False(t, resp.IsErr())
}, mempl.TxInfo{}); err != nil {
reqRes, err := assertMempool(css[3].txNotifier).CheckTx(kvstore.NewTxFromID(1))
if err != nil {
t.Error(err)
}
require.False(t, reqRes.Response.GetCheckTx().IsErr())

// wait till everyone makes the first new block
timeoutWaitGroup(N, func(j int) {
Expand Down Expand Up @@ -669,11 +669,9 @@ func waitForAndValidateBlock(

// optionally add transactions for the next block
for _, tx := range txs {
err := assertMempool(css[j].txNotifier).CheckTx(tx, func(resp *abci.ResponseCheckTx) {
require.False(t, resp.IsErr())
fmt.Println(resp)
}, mempl.TxInfo{})
reqRes, err := assertMempool(css[j].txNotifier).CheckTx(tx)
require.NoError(t, err)
require.False(t, reqRes.Response.GetCheckTx().IsErr())
}
})
}
Expand Down
17 changes: 10 additions & 7 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"context"

abcicli "github.com/cometbft/cometbft/abci/client"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/clist"
mempl "github.com/cometbft/cometbft/mempool"
Expand All @@ -20,8 +21,8 @@ func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTx(types.Tx, func(*abci.ResponseCheckTx), mempl.TxInfo) error {
return nil
func (emptyMempool) CheckTx(types.Tx) (*abcicli.ReqRes, error) {
return nil, nil
}

func (txmp emptyMempool) RemoveTxByKey(types.TxKey) error {
Expand All @@ -39,11 +40,13 @@ func (emptyMempool) Update(
) error {
return nil
}
func (emptyMempool) Flush() {}
func (emptyMempool) FlushAppConn() error { return nil }
func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
func (emptyMempool) EnableTxsAvailable() {}
func (emptyMempool) TxsBytes() int64 { return 0 }
func (emptyMempool) Flush() {}
func (emptyMempool) FlushAppConn() error { return nil }
func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
func (emptyMempool) EnableTxsAvailable() {}
func (emptyMempool) SetTxRemovedCallback(func(types.TxKey)) {}
func (emptyMempool) TxsBytes() int64 { return 0 }
func (emptyMempool) InMempool(types.TxKey) bool { return false }

func (emptyMempool) TxsFront() *clist.CElement { return nil }
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
Expand Down
23 changes: 12 additions & 11 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ func sendTxs(ctx context.Context, cs *State) {
return
default:
tx := kvstore.NewTxFromID(i)
if err := assertMempool(cs.txNotifier).CheckTx(tx, func(resp *abci.ResponseCheckTx) {
if resp.Code != 0 {
panic(fmt.Sprintf("Unexpected code: %d, log: %s", resp.Code, resp.Log))
}
}, mempool.TxInfo{}); err != nil {
reqRes, err := assertMempool(cs.txNotifier).CheckTx(tx)
if err != nil {
panic(err)
}
resp := reqRes.Response.GetCheckTx()
if resp.Code != 0 {
panic(fmt.Sprintf("Unexpected code: %d, log: %s", resp.Code, resp.Log))
}
i++
}
}
Expand Down Expand Up @@ -369,7 +370,7 @@ func setupChainWithChangingValidators(t *testing.T, name string, nBlocks int) (*
valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1)
require.NoError(t, err)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempool.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1)
assert.NoError(t, err)
propBlock, err := css[0].createProposalBlock(ctx) // changeProposer(t, cs1, vs2)
require.NoError(t, err)
Expand Down Expand Up @@ -401,7 +402,7 @@ func setupChainWithChangingValidators(t *testing.T, name string, nBlocks int) (*
updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1)
require.NoError(t, err)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempool.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1)
assert.NoError(t, err)
propBlock, err = css[0].createProposalBlock(ctx) // changeProposer(t, cs1, vs2)
require.NoError(t, err)
Expand Down Expand Up @@ -433,14 +434,14 @@ func setupChainWithChangingValidators(t *testing.T, name string, nBlocks int) (*
newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2)
require.NoError(t, err)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempool.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2)
require.NoError(t, err)
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey()
require.NoError(t, err)
newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3)
require.NoError(t, err)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempool.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3)
assert.NoError(t, err)
propBlock, err = css[0].createProposalBlock(ctx) // changeProposer(t, cs1, vs2)
require.NoError(t, err)
Expand Down Expand Up @@ -482,7 +483,7 @@ func setupChainWithChangingValidators(t *testing.T, name string, nBlocks int) (*
ensureNewProposal(proposalCh, height, round)

removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempool.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2)
assert.Nil(t, err)

rs = css[0].GetRoundState()
Expand Down Expand Up @@ -517,7 +518,7 @@ func setupChainWithChangingValidators(t *testing.T, name string, nBlocks int) (*
height++
incrementHeight(vss...)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempool.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3)
assert.NoError(t, err)
propBlock, err = css[0].createProposalBlock(ctx) // changeProposer(t, cs1, vs2)
require.NoError(t, err)
Expand Down
14 changes: 6 additions & 8 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"testing"

"encoding/binary"
"sync/atomic"

"github.com/cometbft/cometbft/abci/example/kvstore"
Expand All @@ -28,8 +27,7 @@ func BenchmarkReap(b *testing.B) {
size := 10000
for i := 0; i < size; i++ {
tx := kvstore.NewTxFromID(i)
binary.BigEndian.PutUint64(tx, uint64(i))
if err := mp.CheckTx(tx, nil, TxInfo{}); err != nil {
if _, err := mp.CheckTx(tx); err != nil {
b.Fatal(err)
}
}
Expand All @@ -53,7 +51,7 @@ func BenchmarkCheckTx(b *testing.B) {
tx := kvstore.NewTxFromID(i)
b.StartTimer()

if err := mp.CheckTx(tx, nil, TxInfo{}); err != nil {
if _, err := mp.CheckTx(tx); err != nil {
b.Fatal(err)
}
}
Expand All @@ -76,7 +74,7 @@ func BenchmarkParallelCheckTx(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tx := kvstore.NewTxFromID(int(next()))
if err := mp.CheckTx(tx, nil, TxInfo{}); err != nil {
if _, err := mp.CheckTx(tx); err != nil {
b.Fatal(err)
}
}
Expand All @@ -92,15 +90,15 @@ func BenchmarkCheckDuplicateTx(b *testing.B) {
mp.config.Size = 2

tx := kvstore.NewTxFromID(1)
if err := mp.CheckTx(tx, nil, TxInfo{}); err != nil {
if _, err := mp.CheckTx(tx); err != nil {
b.Fatal(err)
}
e := mp.FlushAppConn()
require.True(b, e == nil)

b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := mp.CheckTx(tx, nil, TxInfo{}); err == nil {
if _, err := mp.CheckTx(tx); err == nil {
b.Fatal("tx should be duplicate")
}
}
Expand Down Expand Up @@ -132,7 +130,7 @@ func BenchmarkUpdateRemoteClient(b *testing.B) {

tx := kvstore.NewTxFromID(i)

e := mp.CheckTx(tx, nil, TxInfo{})
_, e := mp.CheckTx(tx)
require.True(b, e == nil)

e = mp.FlushAppConn()
Expand Down
12 changes: 6 additions & 6 deletions mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ func TestCacheAfterUpdate(t *testing.T) {
for tcIndex, tc := range tests {
for i := 0; i < tc.numTxsToCreate; i++ {
tx := kvstore.NewTx(fmt.Sprintf("%d", i), "value")
err := mp.CheckTx(tx, func(resp *abci.ResponseCheckTx) {
require.False(t, resp.IsErr())
}, TxInfo{})
reqRes, err := mp.CheckTx(tx)
require.NoError(t, err)
require.False(t, reqRes.Response.GetCheckTx().IsErr())
}

updateTxs := []types.Tx{}
Expand All @@ -80,9 +79,10 @@ func TestCacheAfterUpdate(t *testing.T) {

for _, v := range tc.reAddIndices {
tx := kvstore.NewTx(fmt.Sprintf("%d", v), "value")
_ = mp.CheckTx(tx, func(resp *abci.ResponseCheckTx) {
require.False(t, resp.IsErr())
}, TxInfo{})
reqRes, err := mp.CheckTx(tx)
if err == nil {
require.False(t, reqRes.Response.GetCheckTx().IsErr())
}
}

cache := mp.cache.(*LRUTxCache)
Expand Down
Loading

0 comments on commit 330cd98

Please sign in to comment.