Skip to content

Commit

Permalink
feat: v0.34.x Prioritized Mempool (#8695)
Browse files Browse the repository at this point in the history
* Updated mocks

* add reactor tests

* add v1 reactor tests

* Fix fuzz test for priority mempool

* e2e adapted to mempool v1; prio pool is default now

* Reverted default mempool to be fifo

* Changed buf version

* Added priority mempool to ci testnet

* Fixed linter

* Updated makefile

* Aligned makefile changes to v0.34.x

* Added go install for proto

* Add log message to warn about prioritized mempool bug

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Changelog message

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: Jasmina Malicevic <jasmina.dustinac@gmail.com>
Co-authored-by: Callum Waters <cmwaters19@gmail.com>
Co-authored-by: Sam Kleinman <garen@tychoish.com>
Co-authored-by: Thane Thomson <connect@thanethomson.com>
  • Loading branch information
6 people committed Jul 11, 2023
1 parent 27bd321 commit 6938d89
Show file tree
Hide file tree
Showing 52 changed files with 4,200 additions and 857 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mockery:

check-proto-deps:
ifeq (,$(shell which protoc-gen-gogofaster))
$(error "gogofaster plugin for protoc is required. Run 'go install github.com/gogo/protobuf/protoc-gen-gogofaster@latest' to install")
@go install github.com/gogo/protobuf/protoc-gen-gogofaster@latest
endif
.PHONY: check-proto-deps

Expand Down
36 changes: 32 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ const (
DefaultLogLevel = "info"

DefaultDBBackend = "goleveldb"

// Mempool versions.
// Default is v0.

// MempoolV0 is regular mempool
MempoolV0 = "v0"
// MempoolV1 is prioritized mempool
MempoolV1 = "v1"
)

// NOTE: Most of the structs & relevant comments + the
Expand Down Expand Up @@ -763,6 +771,7 @@ func DefaultFuzzConnConfig() *FuzzConnConfig {

// MempoolConfig defines the configuration options for the Ostracon mempool
type MempoolConfig struct {
Version string `mapstructure:"version"`
RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"`
Broadcast bool `mapstructure:"broadcast"`
Expand All @@ -786,20 +795,39 @@ type MempoolConfig struct {
// Including space needed by encoding (one varint per transaction).
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max_batch_bytes"`

// TTLDuration, if non-zero, defines the maximum amount of time a transaction
// can exist for in the mempool.
//
// Note, if TTLNumBlocks is also defined, a transaction will be removed if it
// has existed in the mempool at least TTLNumBlocks number of blocks or if it's
// insertion time into the mempool is beyond TTLDuration.
TTLDuration time.Duration `mapstructure:"ttl-duration"`

// TTLNumBlocks, if non-zero, defines the maximum number of blocks a transaction
// can exist for in the mempool.
//
// Note, if TTLDuration is also defined, a transaction will be removed if it
// has existed in the mempool at least TTLNumBlocks number of blocks or if
// it's insertion time into the mempool is beyond TTLDuration.
TTLNumBlocks int64 `mapstructure:"ttl-num-blocks"`
}

// DefaultMempoolConfig returns a default configuration for the Ostracon mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV0,
Recheck: true,
Broadcast: true,
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
TTLDuration: 0 * time.Second,
TTLNumBlocks: 0,
}
}

Expand Down
16 changes: 16 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,22 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }}
# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
max_batch_bytes = {{ .Mempool.MaxBatchBytes }}
# ttl-duration, if non-zero, defines the maximum amount of time a transaction
# can exist for in the mempool.
#
# Note, if ttl-num-blocks is also defined, a transaction will be removed if it
# has existed in the mempool at least ttl-num-blocks number of blocks or if it's
# insertion time into the mempool is beyond ttl-duration.
ttl-duration = "{{ .Mempool.TTLDuration }}"
# ttl-num-blocks, if non-zero, defines the maximum number of blocks a transaction
# can exist for in the mempool.
#
# Note, if ttl-duration is also defined, a transaction will be removed if it
# has existed in the mempool at least ttl-num-blocks number of blocks or if
# it's insertion time into the mempool is beyond ttl-duration.
ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
#######################################################
### State Sync Configuration Options ###
#######################################################
Expand Down
41 changes: 32 additions & 9 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"

config2 "github.com/Finschia/ostracon/config"

abcicli "github.com/Finschia/ostracon/abci/client"
"github.com/Finschia/ostracon/evidence"
"github.com/Finschia/ostracon/libs/log"
"github.com/Finschia/ostracon/libs/service"
tmsync "github.com/Finschia/ostracon/libs/sync"
mempl "github.com/Finschia/ostracon/mempool"

cfg "github.com/Finschia/ostracon/config"
mempoolv0 "github.com/Finschia/ostracon/mempool/v0"

//mempoolv1 "github.com/Finschia/ostracon/mempool/v1"
"github.com/Finschia/ostracon/p2p"
sm "github.com/Finschia/ostracon/state"
"github.com/Finschia/ostracon/store"
Expand Down Expand Up @@ -60,14 +63,34 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
// one for mempool, one for consensus
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)

// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
var mempool mempl.Mempool

switch thisConfig.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1: // XXX Deprecated
panic("Deprecated MempoolV1")
/*
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
*/
}

if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
Expand Down Expand Up @@ -124,7 +147,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch, c *config2.P2PConfig) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch, c *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(log.NewNopLogger().With("module", "p2p")) // Switch log is noisy for this test
return s
Expand Down Expand Up @@ -311,7 +334,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
config.P2P,
i,
"foo", "1.0.0",
func(i int, sw *p2p.Switch, config *config2.P2PConfig) *p2p.Switch {
func(i int, sw *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
return sw
})
switches[i].SetLogger(p2pLogger.With("validator", i))
Expand Down Expand Up @@ -373,7 +396,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
}
}()

p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch, config *config2.P2PConfig) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
// ignore new switch s, we already made ours
switches[i].AddReactor("CONSENSUS", reactors[i])
return switches[i]
Expand Down
35 changes: 32 additions & 3 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
tmpubsub "github.com/Finschia/ostracon/libs/pubsub"
tmsync "github.com/Finschia/ostracon/libs/sync"
mempl "github.com/Finschia/ostracon/mempool"
mempoolv0 "github.com/Finschia/ostracon/mempool/v0"

//mempoolv1 "github.com/Finschia/ostracon/mempool/v1"
"github.com/Finschia/ostracon/p2p"
"github.com/Finschia/ostracon/privval"
sm "github.com/Finschia/ostracon/state"
Expand Down Expand Up @@ -420,12 +423,38 @@ func newStateWithConfigAndBlockStoreWithLoggers(

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)

proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)
// Make Mempool
memplMetrics := mempl.NopMetrics()

// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(loggers.memLogger.With("module", "mempool"))
var mempool mempl.Mempool

switch config.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
mempool.(*mempoolv0.CListMempool).SetLogger(loggers.memLogger.With("module", "mempool"))
case cfg.MempoolV1: // XXX Deprecated
panic("Deprecated MempoolV1")
/*
logger := consensusLogger()
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
*/
}
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
Expand Down
33 changes: 29 additions & 4 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"github.com/Finschia/ostracon/libs/log"
tmsync "github.com/Finschia/ostracon/libs/sync"
mempl "github.com/Finschia/ostracon/mempool"
mempoolv0 "github.com/Finschia/ostracon/mempool/v0"

//mempoolv1 "github.com/Finschia/ostracon/mempool/v1"
"github.com/Finschia/ostracon/p2p"
p2pmock "github.com/Finschia/ostracon/p2p/mock"
sm "github.com/Finschia/ostracon/state"
Expand Down Expand Up @@ -152,14 +155,36 @@ func TestReactorWithEvidence(t *testing.T) {
blockDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
memplMetrics := mempl.NopMetrics()
// one for mempool, one for consensus
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)

// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
var mempool mempl.Mempool

switch config.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1: // XXX Deprecated MempoolV1
panic("Deprecated MempoolV1")
/*
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
*/
}
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
Expand Down
12 changes: 9 additions & 3 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ type emptyMempool struct{}

var _ mempl.Mempool = emptyMempool{}

func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTxSync(_ types.Tx, _ func(*ocabci.Response), _ mempl.TxInfo) error {
return nil
}
Expand All @@ -28,6 +29,11 @@ func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ fu
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxBytesMaxGasMaxTxs(_, _, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }

func (txmp emptyMempool) RemoveTxByKey(txKey types.TxKey) error {
return nil
}

func (emptyMempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
Expand Down
Loading

0 comments on commit 6938d89

Please sign in to comment.