diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index ae980f1c7e..390aad3add 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -47,6 +47,7 @@ type Blockchain struct { db storage.Storage // The Storage object (database) consensus Verifier executor Executor + txSigner TxSigner config *chain.Chain // Config containing chain information genesis types.Hash // The hash of the genesis block @@ -94,6 +95,11 @@ type Executor interface { ProcessBlock(parentRoot types.Hash, block *types.Block, blockCreator types.Address) (*state.Transition, error) } +type TxSigner interface { + // Sender returns the sender of the transaction + Sender(tx *types.Transaction) (types.Address, error) +} + type BlockResult struct { Root types.Hash Receipts []*types.Receipt @@ -185,12 +191,14 @@ func NewBlockchain( config *chain.Chain, consensus Verifier, executor Executor, + txSigner TxSigner, ) (*Blockchain, error) { b := &Blockchain{ logger: logger.Named("blockchain"), config: config, consensus: consensus, executor: executor, + txSigner: txSigner, stream: &eventStream{}, gpAverage: &gasPriceAverage{ price: big.NewInt(0), @@ -571,6 +579,13 @@ func (b *Blockchain) readBody(hash types.Hash) (*types.Body, bool) { return nil, false } + // To return from field in the transactions of the past blocks + if updated := b.recoverFromFieldsInTransactions(bb.Transactions); updated { + if err := b.db.WriteBody(hash, bb); err != nil { + b.logger.Warn("failed to write body into storage", "hash", hash, "err", err) + } + } + return bb, true } @@ -966,10 +981,15 @@ func (b *Blockchain) updateGasPriceAvgWithBlock(block *types.Block) { // writeBody writes the block body to the DB. // Additionally, it also updates the txn lookup, for txnHash -> block lookups func (b *Blockchain) writeBody(block *types.Block) error { - body := block.Body() + // Recover 'from' field in tx before saving + // Because the block passed from the consensus layer doesn't have from field in tx, + // due to missing encoding in RLP + if err := b.recoverFromFieldsInBlock(block); err != nil { + return err + } // Write the full body (txns + receipts) - if err := b.db.WriteBody(block.Header.Hash, body); err != nil { + if err := b.db.WriteBody(block.Header.Hash, block.Body()); err != nil { return err } @@ -990,6 +1010,49 @@ func (b *Blockchain) ReadTxLookup(hash types.Hash) (types.Hash, bool) { return v, ok } +// recoverFromFieldsInBlock recovers 'from' fields in the transactions of the given block +// return error if the invalid signature found +func (b *Blockchain) recoverFromFieldsInBlock(block *types.Block) error { + for _, tx := range block.Transactions { + if tx.From != types.ZeroAddress { + continue + } + + sender, err := b.txSigner.Sender(tx) + if err != nil { + return err + } + + tx.From = sender + } + + return nil +} + +// recoverFromFieldsInTransactions recovers 'from' fields in the transactions +// log as warning if failing to recover one address +func (b *Blockchain) recoverFromFieldsInTransactions(transactions []*types.Transaction) bool { + updated := false + + for _, tx := range transactions { + if tx.From != types.ZeroAddress { + continue + } + + sender, err := b.txSigner.Sender(tx) + if err != nil { + b.logger.Warn("failed to recover from address in Tx", "hash", tx.Hash, "err", err) + + continue + } + + tx.From = sender + updated = true + } + + return updated +} + // verifyGasLimit is a helper function for validating a gas limit in a header func (b *Blockchain) verifyGasLimit(header *types.Header, parentHeader *types.Header) error { if header.GasUsed > header.GasLimit { diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index 23c1b7dfb3..cc958862f0 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/0xPolygon/polygon-edge/state" + "github.com/hashicorp/go-hclog" "github.com/0xPolygon/polygon-edge/chain" "github.com/stretchr/testify/assert" @@ -543,27 +544,358 @@ func TestForkUnknownParents(t *testing.T) { } func TestBlockchainWriteBody(t *testing.T) { + t.Parallel() + + var ( + addr = types.StringToAddress("1") + ) + + newChain := func( + t *testing.T, + txFromByTxHash map[types.Hash]types.Address, + ) *Blockchain { + t.Helper() + + storage, err := memory.NewMemoryStorage(nil) + assert.NoError(t, err) + + chain := &Blockchain{ + db: storage, + txSigner: &mockSigner{ + txFromByTxHash: txFromByTxHash, + }, + } + + return chain + } + + t.Run("should succeed if tx has from field", func(t *testing.T) { + t.Parallel() + + tx := &types.Transaction{ + Value: big.NewInt(10), + V: big.NewInt(1), + From: addr, + } + + block := &types.Block{ + Header: &types.Header{}, + Transactions: []*types.Transaction{ + tx, + }, + } + + tx.ComputeHash() + block.Header.ComputeHash() + + txFromByTxHash := map[types.Hash]types.Address{} + + chain := newChain(t, txFromByTxHash) + + assert.NoError( + t, + chain.writeBody(block), + ) + }) + + t.Run("should return error if tx doesn't have from and recovering address fails", func(t *testing.T) { + t.Parallel() + + tx := &types.Transaction{ + Value: big.NewInt(10), + V: big.NewInt(1), + } + + block := &types.Block{ + Header: &types.Header{}, + Transactions: []*types.Transaction{ + tx, + }, + } + + tx.ComputeHash() + block.Header.ComputeHash() + + txFromByTxHash := map[types.Hash]types.Address{} + + chain := newChain(t, txFromByTxHash) + + assert.ErrorIs( + t, + errRecoveryAddressFailed, + chain.writeBody(block), + ) + }) + + t.Run("should recover from address and store to storage", func(t *testing.T) { + t.Parallel() + + tx := &types.Transaction{ + Value: big.NewInt(10), + V: big.NewInt(1), + } + + block := &types.Block{ + Header: &types.Header{}, + Transactions: []*types.Transaction{ + tx, + }, + } + + tx.ComputeHash() + block.Header.ComputeHash() + + txFromByTxHash := map[types.Hash]types.Address{ + tx.Hash: addr, + } + + chain := newChain(t, txFromByTxHash) + + assert.NoError(t, chain.writeBody(block)) + + readBody, ok := chain.readBody(block.Hash()) + assert.True(t, ok) + + assert.Equal(t, addr, readBody.Transactions[0].From) + }) +} + +func Test_recoverFromFieldsInBlock(t *testing.T) { + t.Parallel() + + var ( + addr1 = types.StringToAddress("1") + addr2 = types.StringToAddress("1") + addr3 = types.StringToAddress("1") + ) + + computeTxHashes := func(txs ...*types.Transaction) { + for _, tx := range txs { + tx.ComputeHash() + } + } + + t.Run("should succeed", func(t *testing.T) { + t.Parallel() + + txFromByTxHash := map[types.Hash]types.Address{} + chain := &Blockchain{ + txSigner: &mockSigner{ + txFromByTxHash: txFromByTxHash, + }, + } + + tx1 := &types.Transaction{Nonce: 0, From: addr1} + tx2 := &types.Transaction{Nonce: 1, From: types.ZeroAddress} + + computeTxHashes(tx1, tx2) + + txFromByTxHash[tx2.Hash] = addr2 + + block := &types.Block{ + Transactions: []*types.Transaction{ + tx1, + tx2, + }, + } + + assert.NoError( + t, + chain.recoverFromFieldsInBlock(block), + ) + }) + + t.Run("should stop and return error if recovery fails", func(t *testing.T) { + t.Parallel() + + txFromByTxHash := map[types.Hash]types.Address{} + chain := &Blockchain{ + txSigner: &mockSigner{ + txFromByTxHash: txFromByTxHash, + }, + } + + tx1 := &types.Transaction{Nonce: 0, From: types.ZeroAddress} + tx2 := &types.Transaction{Nonce: 1, From: types.ZeroAddress} + tx3 := &types.Transaction{Nonce: 2, From: types.ZeroAddress} + + computeTxHashes(tx1, tx2, tx3) + + // returns only addresses for tx1 and tx3 + txFromByTxHash[tx1.Hash] = addr1 + txFromByTxHash[tx3.Hash] = addr3 + + block := &types.Block{ + Transactions: []*types.Transaction{ + tx1, + tx2, + tx3, + }, + } + + assert.ErrorIs( + t, + chain.recoverFromFieldsInBlock(block), + errRecoveryAddressFailed, + ) + + assert.Equal(t, addr1, tx1.From) + assert.Equal(t, types.ZeroAddress, tx2.From) + assert.Equal(t, types.ZeroAddress, tx3.From) + }) +} + +func Test_recoverFromFieldsInTransactions(t *testing.T) { + t.Parallel() + + var ( + addr1 = types.StringToAddress("1") + addr2 = types.StringToAddress("1") + addr3 = types.StringToAddress("1") + ) + + computeTxHashes := func(txs ...*types.Transaction) { + for _, tx := range txs { + tx.ComputeHash() + } + } + + t.Run("should succeed", func(t *testing.T) { + t.Parallel() + + txFromByTxHash := map[types.Hash]types.Address{} + chain := &Blockchain{ + logger: hclog.NewNullLogger(), + txSigner: &mockSigner{ + txFromByTxHash: txFromByTxHash, + }, + } + + tx1 := &types.Transaction{Nonce: 0, From: addr1} + tx2 := &types.Transaction{Nonce: 1, From: types.ZeroAddress} + + computeTxHashes(tx1, tx2) + + txFromByTxHash[tx2.Hash] = addr2 + + transactions := []*types.Transaction{ + tx1, + tx2, + } + + assert.True( + t, + chain.recoverFromFieldsInTransactions(transactions), + ) + }) + + t.Run("should succeed even though recovery fails for some transactions", func(t *testing.T) { + t.Parallel() + + txFromByTxHash := map[types.Hash]types.Address{} + chain := &Blockchain{ + logger: hclog.NewNullLogger(), + txSigner: &mockSigner{ + txFromByTxHash: txFromByTxHash, + }, + } + + tx1 := &types.Transaction{Nonce: 0, From: types.ZeroAddress} + tx2 := &types.Transaction{Nonce: 1, From: types.ZeroAddress} + tx3 := &types.Transaction{Nonce: 2, From: types.ZeroAddress} + + computeTxHashes(tx1, tx2, tx3) + + // returns only addresses for tx1 and tx3 + txFromByTxHash[tx1.Hash] = addr1 + txFromByTxHash[tx3.Hash] = addr3 + + transactions := []*types.Transaction{ + tx1, + tx2, + tx3, + } + + assert.True(t, chain.recoverFromFieldsInTransactions(transactions)) + + assert.Equal(t, addr1, tx1.From) + assert.Equal(t, types.ZeroAddress, tx2.From) + assert.Equal(t, addr3, tx3.From) + }) + + t.Run("should return false if all transactions has from field", func(t *testing.T) { + t.Parallel() + + txFromByTxHash := map[types.Hash]types.Address{} + chain := &Blockchain{ + logger: hclog.NewNullLogger(), + txSigner: &mockSigner{ + txFromByTxHash: txFromByTxHash, + }, + } + + tx1 := &types.Transaction{Nonce: 0, From: addr1} + tx2 := &types.Transaction{Nonce: 1, From: addr2} + + computeTxHashes(tx1, tx2) + + txFromByTxHash[tx2.Hash] = addr2 + + transactions := []*types.Transaction{ + tx1, + tx2, + } + + assert.False( + t, + chain.recoverFromFieldsInTransactions(transactions), + ) + }) +} + +func TestBlockchainReadBody(t *testing.T) { storage, err := memory.NewMemoryStorage(nil) assert.NoError(t, err) + txFromByTxHash := make(map[types.Hash]types.Address) + addr := types.StringToAddress("1") + b := &Blockchain{ - db: storage, + logger: hclog.NewNullLogger(), + db: storage, + txSigner: &mockSigner{ + txFromByTxHash: txFromByTxHash, + }, } + tx := &types.Transaction{ + Value: big.NewInt(10), + V: big.NewInt(1), + } + + tx.ComputeHash() + block := &types.Block{ Header: &types.Header{}, Transactions: []*types.Transaction{ - { - Value: big.NewInt(10), - V: big.NewInt(1), - }, + tx, }, } + block.Header.ComputeHash() + txFromByTxHash[tx.Hash] = types.ZeroAddress + if err := b.writeBody(block); err != nil { t.Fatal(err) } + + txFromByTxHash[tx.Hash] = addr + + readBody, found := b.readBody(block.Hash()) + + assert.True(t, found) + assert.Equal(t, addr, readBody.Transactions[0].From) } func TestCalculateGasLimit(t *testing.T) { diff --git a/blockchain/subscription.go b/blockchain/subscription.go index e5d9702759..eaed40f873 100644 --- a/blockchain/subscription.go +++ b/blockchain/subscription.go @@ -45,9 +45,8 @@ func (m *MockSubscription) Close() { // subscription is the Blockchain event subscription object type subscription struct { - updateCh chan void // Channel for update information - closeCh chan void // Channel for close signals - elem *eventElem // Reference to the blockchain event wrapper + updateCh chan *Event // Channel for update information + closeCh chan void // Channel for close signals } // GetEventCh creates a new event channel, and returns it @@ -70,17 +69,10 @@ func (s *subscription) GetEventCh() chan *Event { // GetEvent returns the event from the subscription (BLOCKING) func (s *subscription) GetEvent() *Event { for { - if s.elem.next != nil { - s.elem = s.elem.next - evnt := s.elem.event - - return evnt - } - // Wait for an update select { - case <-s.updateCh: - continue + case ev := <-s.updateCh: + return ev case <-s.closeCh: return nil } @@ -115,7 +107,7 @@ type Event struct { Type EventType // Source is the source that generated the blocks for the event - // right now it can be either the Sealer or the Syncer. TODO + // right now it can be either the Sealer or the Syncer Source string } @@ -158,73 +150,49 @@ func (b *Blockchain) SubscribeEvents() Subscription { return b.stream.subscribe() } -// eventElem contains the event, as well as the next list event -type eventElem struct { - event *Event - next *eventElem -} - // eventStream is the structure that contains the event list, // as well as the update channel which it uses to notify of updates type eventStream struct { - lock sync.Mutex - head *eventElem + sync.Mutex // channel to notify updates - updateCh []chan void + updateCh []chan *Event } // subscribe Creates a new blockchain event subscription func (e *eventStream) subscribe() *subscription { - head, updateCh := e.Head() - s := &subscription{ - elem: head, - updateCh: updateCh, + return &subscription{ + updateCh: e.newUpdateCh(), closeCh: make(chan void), } - - return s } -// Head returns the event list head -func (e *eventStream) Head() (*eventElem, chan void) { - e.lock.Lock() - head := e.head +// newUpdateCh returns the event update channel +func (e *eventStream) newUpdateCh() chan *Event { + e.Lock() + defer e.Unlock() - ch := make(chan void) + ch := make(chan *Event, 1) if e.updateCh == nil { - e.updateCh = make([]chan void, 0) + e.updateCh = make([]chan *Event, 0) } e.updateCh = append(e.updateCh, ch) - e.lock.Unlock() - - return head, ch + return ch } // push adds a new Event, and notifies listeners func (e *eventStream) push(event *Event) { - e.lock.Lock() - - newHead := &eventElem{ - event: event, - } - - if e.head != nil { - e.head.next = newHead - } - - e.head = newHead + e.Lock() + defer e.Unlock() // Notify the listeners for _, update := range e.updateCh { select { - case update <- void{}: + case update <- event: default: } } - - e.lock.Unlock() } diff --git a/blockchain/subscription_test.go b/blockchain/subscription_test.go index da9de0475f..9e854a20a9 100644 --- a/blockchain/subscription_test.go +++ b/blockchain/subscription_test.go @@ -1,76 +1,53 @@ package blockchain import ( + "sync" "testing" "time" "github.com/0xPolygon/polygon-edge/types" + "github.com/stretchr/testify/assert" ) -func TestSubscriptionLinear(t *testing.T) { - e := &eventStream{} +func TestSubscription(t *testing.T) { + t.Parallel() - // add a genesis block to eventstream - e.push(&Event{ - NewChain: []*types.Header{ - {Number: 0}, - }, - }) + var ( + e = &eventStream{} + sub = e.subscribe() + caughtEventNum = uint64(0) + event = &Event{ + NewChain: []*types.Header{ + { + Number: 100, + }, + }, + } - sub := e.subscribe() + wg sync.WaitGroup + ) - eventCh := make(chan *Event) + defer sub.Close() - go func() { - for { - task := sub.GetEvent() - eventCh <- task - } - }() + updateCh := sub.GetEventCh() - for i := 1; i < 10; i++ { - evnt := &Event{} + wg.Add(1) - evnt.AddNewHeader(&types.Header{Number: uint64(i)}) - e.push(evnt) + go func() { + defer wg.Done() - // it should fire updateCh select { - case evnt := <-eventCh: - if evnt.NewChain[0].Number != uint64(i) { - t.Fatal("bad") - } - case <-time.After(1 * time.Second): - t.Fatal("timeout") + case ev := <-updateCh: + caughtEventNum = ev.NewChain[0].Number + case <-time.After(5 * time.Second): } - } -} - -func TestSubscriptionSlowConsumer(t *testing.T) { - e := &eventStream{} + }() - e.push(&Event{ - NewChain: []*types.Header{ - {Number: 0}, - }, - }) + // Send the event to the channel + e.push(event) - sub := e.subscribe() + // Wait for the event to be parsed + wg.Wait() - // send multiple events - for i := 1; i < 10; i++ { - e.push(&Event{ - NewChain: []*types.Header{ - {Number: uint64(i)}, - }, - }) - } - - // consume events now - for i := 1; i < 10; i++ { - evnt := sub.GetEvent() - if evnt.NewChain[0].Number != uint64(i) { - t.Fatal("bad") - } - } + assert.Equal(t, event.NewChain[0].Number, caughtEventNum) } diff --git a/blockchain/testing.go b/blockchain/testing.go index 257ef5dc5d..c5acfa0fed 100644 --- a/blockchain/testing.go +++ b/blockchain/testing.go @@ -22,7 +22,8 @@ var ( ) var ( - errInvalidTypeAssertion = errors.New("invalid type assertion") + errInvalidTypeAssertion = errors.New("invalid type assertion") + errRecoveryAddressFailed = errors.New("failed to recover from field") ) // NewTestHeadersWithSeed creates a new chain with a seed factor @@ -305,6 +306,18 @@ func (m *mockExecutor) HookProcessBlock(fn processBlockDelegate) { m.processBlockFn = fn } +type mockSigner struct { + txFromByTxHash map[types.Hash]types.Address +} + +func (m *mockSigner) Sender(tx *types.Transaction) (types.Address, error) { + if from, ok := m.txFromByTxHash[tx.Hash]; ok { + return from, nil + } + + return types.ZeroAddress, errRecoveryAddressFailed +} + func TestBlockchain(t *testing.T, genesis *chain.Genesis) *Blockchain { if genesis == nil { genesis = &chain.Genesis{} @@ -330,7 +343,7 @@ func newBlockChain(config *chain.Chain, executor Executor) (*Blockchain, error) executor = &mockExecutor{} } - b, err := NewBlockchain(hclog.NewNullLogger(), "", config, &MockVerifier{}, executor) + b, err := NewBlockchain(hclog.NewNullLogger(), "", config, &MockVerifier{}, executor, &mockSigner{}) if err != nil { return nil, err } diff --git a/command/server/init.go b/command/server/init.go index 8a8e7bacc9..f4ca553f93 100644 --- a/command/server/init.go +++ b/command/server/init.go @@ -135,7 +135,6 @@ func (p *serverParams) initDevMode() { // Dev mode: // - disables peer discovery // - enables all forks - p.rawConfig.ShouldSeal = true p.rawConfig.Network.NoDiscover = true p.genesisConfig.Params.Forks = chain.AllForksEnabled diff --git a/consensus/consensus.go b/consensus/consensus.go index afb55e5154..09b296816a 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -61,7 +61,6 @@ type Config struct { type Params struct { Context context.Context - Seal bool Config *Config TxPool *txpool.TxPool Network *network.Server diff --git a/consensus/dev/dev.go b/consensus/dev/dev.go index 10179d01bf..63b571e06e 100644 --- a/consensus/dev/dev.go +++ b/consensus/dev/dev.go @@ -61,6 +61,8 @@ func Factory( // Initialize initializes the consensus func (d *Dev) Initialize() error { + d.txpool.SetSealing(true) + return nil } diff --git a/consensus/dummy/dummy.go b/consensus/dummy/dummy.go index be044361bb..b8a8b8cd66 100644 --- a/consensus/dummy/dummy.go +++ b/consensus/dummy/dummy.go @@ -11,7 +11,6 @@ import ( ) type Dummy struct { - sealing bool logger hclog.Logger notifyCh chan struct{} closeCh chan struct{} @@ -24,7 +23,6 @@ func Factory(params *consensus.Params) (consensus.Consensus, error) { logger := params.Logger.Named("dummy") d := &Dummy{ - sealing: params.Seal, logger: logger, notifyCh: make(chan struct{}), closeCh: make(chan struct{}), @@ -38,6 +36,8 @@ func Factory(params *consensus.Params) (consensus.Consensus, error) { // Initialize initializes the consensus func (d *Dummy) Initialize() error { + d.txpool.SetSealing(true) + return nil } diff --git a/consensus/ibft/ibft.go b/consensus/ibft/ibft.go index 15b6bc692b..23a3e1ea92 100644 --- a/consensus/ibft/ibft.go +++ b/consensus/ibft/ibft.go @@ -46,6 +46,7 @@ type txPoolInterface interface { Drop(tx *types.Transaction) Demote(tx *types.Transaction) ResetWithHeaders(headers ...*types.Header) + SetSealing(bool) } type forkManagerInterface interface { @@ -85,7 +86,6 @@ type backendIBFT struct { epochSize uint64 quorumSizeBlockNum uint64 blockTime time.Duration // Minimum block generation time in seconds - sealing bool // Flag indicating if the node is a sealer // Channels closeCh chan struct{} // Channel for closing @@ -158,7 +158,6 @@ func Factory(params *consensus.Params) (consensus.Consensus, error) { epochSize: epochSize, quorumSizeBlockNum: quorumSizeBlockNum, blockTime: time.Duration(params.BlockTime) * time.Second, - sealing: params.Seal, // Channels closeCh: make(chan struct{}), @@ -259,8 +258,10 @@ func (i *backendIBFT) startConsensus() { // to insert a valid block. Used for cancelling active consensus // rounds for a specific height go func() { + eventCh := newBlockSub.GetEventCh() + for { - if ev := <-newBlockSub.GetEventCh(); ev.Source == "syncer" { + if ev := <-eventCh; ev.Source == "syncer" { if ev.NewChain[0].Number < i.blockchain.Header().Number { // The blockchain notification system can eventually deliver // stale block notifications. These should be ignored @@ -298,6 +299,8 @@ func (i *backendIBFT) startConsensus() { isValidator = i.isActiveValidator() + i.txpool.SetSealing(isValidator) + if isValidator { sequenceCh = i.consensus.runSequence(pending) } @@ -343,11 +346,6 @@ func (i *backendIBFT) updateMetrics(block *types.Block) { i.metrics.NumTxs.Set(float64(len(block.Body().Transactions))) } -// isSealing checks if the current node is sealing blocks -func (i *backendIBFT) isSealing() bool { - return i.sealing -} - // verifyHeaderImpl verifies fields including Extra // for the past or being proposed header func (i *backendIBFT) verifyHeaderImpl( diff --git a/consensus/ibft/transport.go b/consensus/ibft/transport.go index fd8c966e4e..c52e201ec7 100644 --- a/consensus/ibft/transport.go +++ b/consensus/ibft/transport.go @@ -36,6 +36,10 @@ func (i *backendIBFT) setupTransport() error { // Subscribe to the newly created topic if err := topic.Subscribe( func(obj interface{}, _ peer.ID) { + if !i.isActiveValidator() { + return + } + msg, ok := obj.(*proto.Message) if !ok { i.logger.Error("invalid type assertion for message request") @@ -43,12 +47,6 @@ func (i *backendIBFT) setupTransport() error { return } - if !i.isSealing() { - // if we are not sealing we do not care about the messages - // but we need to subscribe to propagate the messages - return - } - i.consensus.AddMessage(msg) i.logger.Debug( diff --git a/e2e/broadcast_test.go b/e2e/broadcast_test.go index be5e2f7ed6..af5d9b2fbb 100644 --- a/e2e/broadcast_test.go +++ b/e2e/broadcast_test.go @@ -45,7 +45,6 @@ func TestBroadcast(t *testing.T) { conf := func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDummy) config.Premine(senderAddr, framework.EthToWei(10)) - config.SetSeal(true) } for _, tt := range testCases { diff --git a/e2e/framework/config.go b/e2e/framework/config.go index 40aba049fc..2d85b95bdc 100644 --- a/e2e/framework/config.go +++ b/e2e/framework/config.go @@ -30,7 +30,6 @@ type TestServerConfig struct { JSONRPCPort int // The JSON RPC endpoint port GRPCPort int // The GRPC endpoint port LibP2PPort int // The Libp2p endpoint port - Seal bool // Flag indicating if blocks should be sealed RootDir string // The root directory for test environment IBFTDirPrefix string // The prefix of data directory for IBFT IBFTDir string // The name of data directory for IBFT @@ -143,11 +142,6 @@ func (t *TestServerConfig) SetIBFTDir(ibftDir string) { t.IBFTDir = ibftDir } -// SetSeal callback toggles the seal mode -func (t *TestServerConfig) SetSeal(state bool) { - t.Seal = state -} - // SetBootnodes sets bootnodes func (t *TestServerConfig) SetBootnodes(bootnodes []string) { t.Bootnodes = bootnodes diff --git a/e2e/framework/testserver.go b/e2e/framework/testserver.go index 5c8625fdc8..bd204eede9 100644 --- a/e2e/framework/testserver.go +++ b/e2e/framework/testserver.go @@ -1,13 +1,16 @@ package framework import ( + "bytes" "context" "crypto/ecdsa" "encoding/hex" + "encoding/json" "errors" "fmt" "io" "math/big" + "net/http" "os" "os/exec" "path/filepath" @@ -370,10 +373,6 @@ func (t *TestServer) Start(ctx context.Context) error { args = append(args, "--data-dir", t.Config.RootDir) } - if t.Config.Seal { - args = append(args, "--seal") - } - if t.Config.PriceLimit != nil { args = append(args, "--price-limit", strconv.FormatUint(*t.Config.PriceLimit, 10)) } @@ -670,3 +669,46 @@ func (t *TestServer) InvokeMethod( return receipt } + +func (t *TestServer) CallJSONRPC(req map[string]interface{}) map[string]interface{} { + reqJSON, err := json.Marshal(req) + if err != nil { + t.t.Fatal(err) + + return nil + } + + url := fmt.Sprintf("http://%s", t.JSONRPCAddr()) + + //nolint:gosec // this is not used because it can't be defined as a global variable + response, err := http.Post(url, "application/json", bytes.NewReader(reqJSON)) + if err != nil { + t.t.Fatalf("failed to send request to JSON-RPC server: %v", err) + + return nil + } + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + t.t.Fatalf("JSON-RPC doesn't return ok: %s", response.Status) + + return nil + } + + bodyBytes, err := io.ReadAll(response.Body) + if err != nil { + t.t.Fatalf("failed to read HTTP body: %s", err) + + return nil + } + + result := map[string]interface{}{} + + if err := json.Unmarshal(bodyBytes, &result); err != nil { + t.t.Fatalf("failed to convert json to object: %s", err) + + return nil + } + + return result +} diff --git a/e2e/ibft_test.go b/e2e/ibft_test.go index b606b04d8d..95a3dc94cc 100644 --- a/e2e/ibft_test.go +++ b/e2e/ibft_test.go @@ -57,7 +57,6 @@ func TestIbft_Transfer(t *testing.T) { IBFTDirPrefix, func(i int, config *framework.TestServerConfig) { config.Premine(senderAddr, framework.EthToWei(10)) - config.SetSeal(true) config.SetBlockTime(tc.blockTime) config.SetIBFTBaseTimeout(tc.ibftBaseTimeout) config.SetValidatorType(tc.validatorType) @@ -129,7 +128,6 @@ func TestIbft_TransactionFeeRecipient(t *testing.T) { IBFTDirPrefix, func(i int, config *framework.TestServerConfig) { config.Premine(senderAddr, framework.EthToWei(10)) - config.SetSeal(true) }) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) diff --git a/e2e/pos_poa_switch_test.go b/e2e/pos_poa_switch_test.go index fc209ad6f9..be7d78d2dc 100644 --- a/e2e/pos_poa_switch_test.go +++ b/e2e/pos_poa_switch_test.go @@ -36,7 +36,6 @@ func TestPoAPoSSwitch(t *testing.T) { IBFTMinNodes, IBFTDirPrefix, func(i int, config *framework.TestServerConfig) { - config.SetSeal(true) config.PremineValidatorBalance(defaultBalance) }) diff --git a/e2e/pos_test.go b/e2e/pos_test.go index 3528c88bc7..88df9d21c8 100644 --- a/e2e/pos_test.go +++ b/e2e/pos_test.go @@ -112,7 +112,6 @@ func TestPoS_ValidatorBoundaries(t *testing.T) { numGenesisValidators, IBFTDirPrefix, func(i int, config *framework.TestServerConfig) { - config.SetSeal(true) config.SetEpochSize(2) config.PremineValidatorBalance(defaultBalance) for j := 0; j < numNewStakers; j++ { @@ -178,7 +177,6 @@ func TestPoS_Stake(t *testing.T) { numGenesisValidators, IBFTDirPrefix, func(i int, config *framework.TestServerConfig) { - config.SetSeal(true) config.SetEpochSize(2) // Need to leave room for the endblock config.PremineValidatorBalance(defaultBalance) config.Premine(stakerAddr, defaultBalance) @@ -243,7 +241,6 @@ func TestPoS_Unstake(t *testing.T) { IBFTDirPrefix, func(_ int, config *framework.TestServerConfig) { // Premine to send unstake transaction - config.SetSeal(true) config.SetEpochSize(2) // Need to leave room for the endblock config.PremineValidatorBalance(defaultBalance) config.SetIBFTPoS(true) @@ -335,7 +332,6 @@ func TestPoS_UnstakeExploit(t *testing.T) { // Set up the test server srvs := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) config.SetDevInterval(devInterval) config.Premine(senderAddr, defaultBalance) config.SetDevStakingAddresses(append(generateStakingAddresses(numDummyValidators), senderAddr)) @@ -479,7 +475,6 @@ func TestPoS_StakeUnstakeExploit(t *testing.T) { // Set up the test server srvs := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) config.SetDevInterval(devInterval) config.Premine(senderAddr, defaultBalance) config.SetBlockLimit(blockGasLimit) @@ -620,7 +615,6 @@ func TestPoS_StakeUnstakeWithinSameBlock(t *testing.T) { // Set up the test server srvs := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) config.SetDevInterval(devInterval) config.Premine(senderAddr, defaultBalance) config.SetBlockLimit(blockGasLimit) @@ -762,8 +756,6 @@ func TestSnapshotUpdating(t *testing.T) { totalServers, IBFTDirPrefix, func(i int, config *framework.TestServerConfig) { - config.SetSeal(i < numGenesisValidators) - if i < numGenesisValidators { // Only IBFTMinNodes should be validators config.PremineValidatorBalance(defaultBalance) @@ -773,6 +765,7 @@ func TestSnapshotUpdating(t *testing.T) { config.SetIBFTDirPrefix(dirPrefix) config.SetIBFTDir(fmt.Sprintf("%s%d", dirPrefix, i)) } + config.SetEpochSize(epochSize) config.Premine(faucetAddr, defaultBalance) config.SetIBFTPoS(true) diff --git a/e2e/syncer_test.go b/e2e/syncer_test.go index 972d3c0af7..05e72adb4b 100644 --- a/e2e/syncer_test.go +++ b/e2e/syncer_test.go @@ -32,7 +32,6 @@ func TestClusterBlockSync(t *testing.T) { config.SetIBFTDirPrefix(dirPrefix) config.SetIBFTDir(fmt.Sprintf("%s%d", dirPrefix, i)) } - config.SetSeal(i < IBFTMinNodes) }) startContext, startCancelFn := context.WithTimeout(context.Background(), time.Minute) diff --git a/e2e/transaction_test.go b/e2e/transaction_test.go index ff7bf54f88..7c083dfd48 100644 --- a/e2e/transaction_test.go +++ b/e2e/transaction_test.go @@ -79,7 +79,6 @@ func TestEthTransfer(t *testing.T) { framework.EthToWei(50), // 50 ETH big.NewInt(0), framework.EthToWei(10), // 10 ETH - } validAccounts := make([]testAccount, len(accountBalances)) @@ -133,7 +132,6 @@ func TestEthTransfer(t *testing.T) { srvs := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) for _, acc := range validAccounts { config.Premine(acc.address, acc.balance) } @@ -228,6 +226,61 @@ func TestEthTransfer(t *testing.T) { } } +// Check whether the mined tx has from field +func TestFromFieldInTx(t *testing.T) { + senderKey, senderAddr := tests.GenerateKeyAndAddr(t) + _, receiverAddr := tests.GenerateKeyAndAddr(t) + + ibftManager := framework.NewIBFTServersManager(t, + 1, + IBFTDirPrefix, + func(i int, config *framework.TestServerConfig) { + config.Premine(senderAddr, framework.EthToWei(10)) + }, + ) + + ctxForStart, cancelStart := context.WithTimeout(context.Background(), framework.DefaultTimeout) + defer cancelStart() + + ibftManager.StartServers(ctxForStart) + + srv := ibftManager.GetServer(0) + + // Do the transfer + ctx, cancel := context.WithTimeout(context.Background(), framework.DefaultTimeout) + defer cancel() + + txn := &framework.PreparedTransaction{ + From: senderAddr, + To: &receiverAddr, + GasPrice: big.NewInt(1048576), + Gas: 1000000, + Value: framework.EthToWei(1), + } + + receipt, err := srv.SendRawTx(ctx, txn, senderKey) + assert.NoError(t, err) + assert.NotNil(t, receipt) + + // json-rpc client in framework recovers from field, so call json-rpc directly + response := srv.CallJSONRPC(map[string]interface{}{ + "id": 1, + "jsonrpc": "2.0", + "method": "eth_getTransactionByHash", + "params": []interface{}{ + receipt.TransactionHash, + }, + }) + + result, ok := response["result"].(map[string]interface{}) + assert.True(t, ok) + + from, ok := result["from"].(string) + assert.True(t, ok) + + assert.Equal(t, senderAddr.String(), from) +} + // getCount is a helper function for the stress test SC func getCount( from types.Address, @@ -365,7 +418,6 @@ func Test_TransactionIBFTLoop(t *testing.T) { func(i int, config *framework.TestServerConfig) { config.SetValidatorType(validatorType) config.Premine(sender, defaultBalance) - config.SetSeal(true) config.SetBlockLimit(20000000) }) diff --git a/e2e/txpool_test.go b/e2e/txpool_test.go index 5d151cc5bb..ac10b5e145 100644 --- a/e2e/txpool_test.go +++ b/e2e/txpool_test.go @@ -103,7 +103,6 @@ func TestTxPool_ErrorCodes(t *testing.T) { // Set up the test server srvs := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) config.SetDevInterval(devInterval) config.Premine(referenceAddr, testCase.defaultBalance) }) @@ -175,7 +174,6 @@ func TestTxPool_TransactionCoalescing(t *testing.T) { 1, IBFTDirPrefix, func(i int, config *framework.TestServerConfig) { - config.SetSeal(true) config.Premine(referenceAddr, defaultBalance) config.SetBlockTime(1) }, @@ -380,7 +378,6 @@ func TestTxPool_RecoverableError(t *testing.T) { server := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) config.SetBlockLimit(2.5 * 21000) config.SetDevInterval(2) config.Premine(senderAddress, framework.EthToWei(100)) @@ -448,7 +445,6 @@ func TestTxPool_GetPendingTx(t *testing.T) { server := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) config.SetDevInterval(3) config.SetBlockLimit(20000000) config.Premine(senderAddress, startingBalance) diff --git a/e2e/websocket_test.go b/e2e/websocket_test.go index 53c92fad29..b6c42158ea 100644 --- a/e2e/websocket_test.go +++ b/e2e/websocket_test.go @@ -59,7 +59,6 @@ func TestWS_Response(t *testing.T) { srvs := framework.NewTestServers(t, 1, func(config *framework.TestServerConfig) { config.SetConsensus(framework.ConsensusDev) - config.SetSeal(true) for _, account := range preminedAccounts { config.Premine(account.address, account.balance) diff --git a/server/server.go b/server/server.go index 57bebfb771..1661d22e78 100644 --- a/server/server.go +++ b/server/server.go @@ -188,8 +188,11 @@ func NewServer(config *Config) (*Server, error) { genesisRoot := m.executor.WriteGenesis(config.Chain.Genesis.Alloc) config.Chain.Genesis.StateRoot = genesisRoot + // use the eip155 signer + signer := crypto.NewEIP155Signer(uint64(m.config.Chain.Params.ChainID)) + // blockchain object - m.blockchain, err = blockchain.NewBlockchain(logger, m.config.DataDir, config.Chain, nil, m.executor) + m.blockchain, err = blockchain.NewBlockchain(logger, m.config.DataDir, config.Chain, nil, m.executor, signer) if err != nil { return nil, err } @@ -216,7 +219,6 @@ func NewServer(config *Config) (*Server, error) { m.network, m.serverMetrics.txpool, &txpool.Config{ - Sealing: m.config.Seal, MaxSlots: m.config.MaxSlots, PriceLimit: m.config.PriceLimit, MaxAccountEnqueued: m.config.MaxAccountEnqueued, @@ -227,8 +229,6 @@ func NewServer(config *Config) (*Server, error) { return nil, err } - // use the eip155 signer - signer := crypto.NewEIP155Signer(uint64(m.config.Chain.Params.ChainID)) m.txpool.SetSigner(signer) } @@ -404,7 +404,6 @@ func (s *Server) setupConsensus() error { consensus, err := engine( &consensus.Params{ Context: context.Background(), - Seal: s.config.Seal, Config: config, TxPool: s.txpool, Network: s.network, diff --git a/txpool/account.go b/txpool/account.go index bde2047eb4..abc63162c9 100644 --- a/txpool/account.go +++ b/txpool/account.go @@ -156,7 +156,9 @@ type account struct { init sync.Once enqueued, promoted *accountQueue nextNonce uint64 - demotions uint + demotions uint64 + // the number of consecutive blocks that don't contain account's transaction + skips uint64 // maximum number of enqueued transactions maxEnqueued uint64 @@ -172,6 +174,21 @@ func (a *account) setNonce(nonce uint64) { atomic.StoreUint64(&a.nextNonce, nonce) } +// Demotions returns the current value of demotions +func (a *account) Demotions() uint64 { + return a.demotions +} + +// resetDemotions sets 0 to demotions to clear count +func (a *account) resetDemotions() { + a.demotions = 0 +} + +// incrementDemotions increments demotions +func (a *account) incrementDemotions() { + a.demotions++ +} + // reset aligns the account with the new nonce // by pruning all transactions with nonce lesser than new. // After pruning, a promotion may be signaled if the first @@ -184,10 +201,7 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) ( defer a.promoted.unlock() // prune the promoted txs - prunedPromoted = append( - prunedPromoted, - a.promoted.prune(nonce)..., - ) + prunedPromoted = a.promoted.prune(nonce) if nonce <= a.getNonce() { // only the promoted queue needed pruning @@ -198,10 +212,7 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) ( defer a.enqueued.unlock() // prune the enqueued txs - prunedEnqueued = append( - prunedEnqueued, - a.enqueued.prune(nonce)..., - ) + prunedEnqueued = a.enqueued.prune(nonce) // update nonce expected for this account a.setNonce(nonce) @@ -209,8 +220,7 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) ( // it is important to signal promotion while // the locks are held to ensure no other // handler will mutate the account - if first := a.enqueued.peek(); first != nil && - first.Nonce == nonce { + if first := a.enqueued.peek(); first != nil && first.Nonce == nonce { // first enqueued tx is expected -> signal promotion promoteCh <- promoteRequest{account: first.From} } @@ -243,7 +253,7 @@ func (a *account) enqueue(tx *types.Transaction) error { // Eligible transactions are all sequential in order of nonce // and the first one has to have nonce less (or equal) to the account's // nextNonce. -func (a *account) promote() []*types.Transaction { +func (a *account) promote() (promoted []*types.Transaction, pruned []*types.Transaction) { a.promoted.lock(true) a.enqueued.lock(true) @@ -254,21 +264,18 @@ func (a *account) promote() []*types.Transaction { // sanity check currentNonce := a.getNonce() - if a.enqueued.length() == 0 || - a.enqueued.peek().Nonce > currentNonce { + if a.enqueued.length() == 0 || a.enqueued.peek().Nonce > currentNonce { // nothing to promote - return nil + return } - promoted := make([]*types.Transaction, 0) nextNonce := a.enqueued.peek().Nonce // move all promotable txs (enqueued txs that are sequential in nonce) // to the account's promoted queue for { tx := a.enqueued.peek() - if tx == nil || - tx.Nonce != nextNonce { + if tx == nil || tx.Nonce != nextNonce { break } @@ -279,7 +286,10 @@ func (a *account) promote() []*types.Transaction { a.promoted.push(tx) // update counters - nextNonce += 1 + nextNonce = tx.Nonce + 1 + + // prune the transactions with lower nonce + pruned = append(pruned, a.enqueued.prune(nextNonce)...) // update return result promoted = append(promoted, tx) @@ -291,5 +301,35 @@ func (a *account) promote() []*types.Transaction { a.setNonce(nextNonce) } - return promoted + return +} + +// resetSkips sets 0 to skips +func (a *account) resetSkips() { + a.skips = 0 +} + +// incrementSkips increments skips +func (a *account) incrementSkips() { + a.skips++ +} + +// getLowestTx returns the transaction with lowest nonce, which might be popped next +// this method don't pop a transaction from both queues +func (a *account) getLowestTx() *types.Transaction { + a.promoted.lock(true) + defer a.promoted.unlock() + + if firstPromoted := a.promoted.peek(); firstPromoted != nil { + return firstPromoted + } + + a.enqueued.lock(true) + defer a.enqueued.unlock() + + if firstEnqueued := a.enqueued.peek(); firstEnqueued != nil { + return firstEnqueued + } + + return nil } diff --git a/txpool/queue.go b/txpool/queue.go index b55e92948e..4cff31222f 100644 --- a/txpool/queue.go +++ b/txpool/queue.go @@ -51,14 +51,11 @@ func (q *accountQueue) prune(nonce uint64) ( pruned []*types.Transaction, ) { for { - tx := q.peek() - if tx == nil || - tx.Nonce >= nonce { + if tx := q.peek(); tx == nil || tx.Nonce >= nonce { break } - tx = q.pop() - pruned = append(pruned, tx) + pruned = append(pruned, q.pop()) } return @@ -130,6 +127,11 @@ func (q *minNonceQueue) Swap(i, j int) { } func (q *minNonceQueue) Less(i, j int) bool { + // The higher gas price Tx comes first if the nonces are same + if (*q)[i].Nonce == (*q)[j].Nonce { + return (*q)[i].GasPrice.Cmp((*q)[j].GasPrice) > 0 + } + return (*q)[i].Nonce < (*q)[j].Nonce } diff --git a/txpool/txpool.go b/txpool/txpool.go index 63e6521cfe..bc175d3ea3 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -9,6 +9,7 @@ import ( "github.com/golang/protobuf/ptypes/any" "github.com/hashicorp/go-hclog" "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/0xPolygon/polygon-edge/blockchain" @@ -26,7 +27,10 @@ const ( // maximum allowed number of times an account // was excluded from block building (ibft.writeTransactions) - maxAccountDemotions = uint(10) + maxAccountDemotions uint64 = 10 + + // maximum allowed number of consecutive blocks that don't have the account's transaction + maxAccountSkips = uint64(10) pruningCooldown = 5000 * time.Millisecond ) @@ -88,7 +92,6 @@ type Config struct { PriceLimit uint64 MaxSlots uint64 MaxAccountEnqueued uint64 - Sealing bool DeploymentWhitelist []types.Address } @@ -165,7 +168,7 @@ type TxPool struct { // flag indicating if the current node is a sealer, // and should therefore gossip transactions - sealing bool + sealing atomic.Bool // prometheus API metrics *Metrics @@ -235,7 +238,6 @@ func NewTxPool( index: lookupMap{all: make(map[types.Hash]*types.Transaction)}, gauge: slotGauge{height: 0, max: config.MaxSlots}, priceLimit: config.PriceLimit, - sealing: config.Sealing, // main loop channels enqueueReqCh: make(chan enqueueRequest), @@ -321,6 +323,16 @@ func (p *TxPool) SetSigner(s signer) { p.signer = s } +// SetSealing sets the sealing flag +func (p *TxPool) SetSealing(sealing bool) { + p.sealing.Store(sealing) +} + +// sealing returns the current set sealing flag +func (p *TxPool) getSealing() bool { + return p.sealing.Load() +} + // AddTx adds a new transaction to the pool (sent from json-RPC/gRPC endpoints) // and broadcasts it to the network (if enabled). func (p *TxPool) AddTx(tx *types.Transaction) error { @@ -391,7 +403,7 @@ func (p *TxPool) Pop(tx *types.Transaction) { account.promoted.pop() // successfully popping an account resets its demotions count to 0 - account.demotions = 0 + account.resetDemotions() // update state p.gauge.decrease(slotsRequired(tx)) @@ -459,7 +471,7 @@ func (p *TxPool) Drop(tx *types.Transaction) { // it is Dropped instead. func (p *TxPool) Demote(tx *types.Transaction) { account := p.accounts.get(tx.From) - if account.demotions == maxAccountDemotions { + if account.Demotions() >= maxAccountDemotions { p.logger.Debug( "Demote: threshold reached - dropping account", "addr", tx.From.String(), @@ -468,12 +480,12 @@ func (p *TxPool) Demote(tx *types.Transaction) { p.Drop(tx) // reset the demotions counter - account.demotions = 0 + account.resetDemotions() return } - account.demotions++ + account.incrementDemotions() p.eventManager.signalEvent(proto.EventType_DEMOTED, tx.Hash) } @@ -564,12 +576,13 @@ func (p *TxPool) processEvent(event *blockchain.Event) { } } - if len(stateNonces) == 0 { - return - } - // reset accounts with the new state p.resetAccounts(stateNonces) + + if !p.getSealing() { + // only non-validator cleanup inactive accounts + p.updateAccountSkipsCounts(stateNonces) + } } // validateTx ensures the transaction conforms to specific @@ -726,9 +739,7 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error { } // initialize account for this address once - if !p.accounts.exists(tx.From) { - p.createAccountOnce(tx.From) - } + p.createAccountOnce(tx.From) // send request [BLOCKING] p.enqueueReqCh <- enqueueRequest{tx: tx} @@ -780,9 +791,12 @@ func (p *TxPool) handlePromoteRequest(req promoteRequest) { account := p.accounts.get(addr) // promote enqueued txs - promoted := account.promote() + promoted, pruned := account.promote() p.logger.Debug("promote request", "promoted", promoted, "addr", addr.String()) + p.index.remove(pruned...) + p.gauge.decrease(slotsRequired(pruned...)) + // update metrics p.metrics.PendingTxs.Add(float64(len(promoted))) p.eventManager.signalEvent(proto.EventType_PROMOTED, toHash(promoted...)...) @@ -791,7 +805,7 @@ func (p *TxPool) handlePromoteRequest(req promoteRequest) { // addGossipTx handles receiving transactions // gossiped by the network. func (p *TxPool) addGossipTx(obj interface{}, _ peer.ID) { - if !p.sealing { + if !p.getSealing() { return } @@ -832,6 +846,10 @@ func (p *TxPool) addGossipTx(obj interface{}, _ peer.ID) { // resetAccounts updates existing accounts with the new nonce and prunes stale transactions. func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) { + if len(stateNonces) == 0 { + return + } + var ( allPrunedPromoted []*types.Transaction allPrunedEnqueued []*types.Transaction @@ -839,12 +857,13 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) { // clear all accounts of stale txs for addr, newNonce := range stateNonces { - if !p.accounts.exists(addr) { + account := p.accounts.get(addr) + + if account == nil { // no updates for this account continue } - account := p.accounts.get(addr) prunedPromoted, prunedEnqueued := account.reset(newNonce, p.promoteReqCh) // append pruned @@ -852,18 +871,19 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) { allPrunedEnqueued = append(allPrunedEnqueued, prunedEnqueued...) // new state for account -> demotions are reset to 0 - account.demotions = 0 + account.resetDemotions() } // pool cleanup callback - cleanup := func(stale ...*types.Transaction) { + cleanup := func(stale []*types.Transaction) { p.index.remove(stale...) p.gauge.decrease(slotsRequired(stale...)) } // prune pool state if len(allPrunedPromoted) > 0 { - cleanup(allPrunedPromoted...) + cleanup(allPrunedPromoted) + p.eventManager.signalEvent( proto.EventType_PRUNED_PROMOTED, toHash(allPrunedPromoted...)..., @@ -873,7 +893,8 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) { } if len(allPrunedEnqueued) > 0 { - cleanup(allPrunedEnqueued...) + cleanup(allPrunedEnqueued) + p.eventManager.signalEvent( proto.EventType_PRUNED_ENQUEUED, toHash(allPrunedEnqueued...)..., @@ -881,17 +902,56 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) { } } +// updateAccountSkipsCounts update the accounts' skips, +// the number of the consecutive blocks that doesn't have the account's transactions +func (p *TxPool) updateAccountSkipsCounts(latestActiveAccounts map[types.Address]uint64) { + p.accounts.Range( + func(key, value interface{}) bool { + address, _ := key.(types.Address) + account, _ := value.(*account) + + if _, ok := latestActiveAccounts[address]; ok { + account.resetSkips() + + return true + } + + firstTx := account.getLowestTx() + if firstTx == nil { + // no need to increment anything, + // account has no txs + return true + } + + account.incrementSkips() + + if account.skips < maxAccountSkips { + return true + } + + // account has been skipped too many times + p.Drop(firstTx) + + account.resetSkips() + + return true + }, + ) +} + // createAccountOnce creates an account and // ensures it is only initialized once. func (p *TxPool) createAccountOnce(newAddr types.Address) *account { + if p.accounts.exists(newAddr) { + return nil + } + // fetch nonce from state stateRoot := p.store.Header().StateRoot stateNonce := p.store.GetNonce(stateRoot, newAddr) // initialize the account - account := p.accounts.initOnce(newAddr, stateNonce) - - return account + return p.accounts.initOnce(newAddr, stateNonce) } // Length returns the total number of all promoted transactions. diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index a3a8dbcbf7..ee6ab755fa 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -92,7 +92,6 @@ func newTestPoolWithSlots(maxSlots uint64, mockStore ...store) (*TxPool, error) PriceLimit: defaultPriceLimit, MaxSlots: maxSlots, MaxAccountEnqueued: defaultMaxAccountEnqueued, - Sealing: false, DeploymentWhitelist: []types.Address{}, }, ) @@ -521,7 +520,7 @@ func TestAddGossipTx(t *testing.T) { assert.NoError(t, err) pool.SetSigner(signer) - pool.sealing = true + pool.SetSealing(true) signedTx, err := signer.SignTx(tx, key) if err != nil { @@ -549,7 +548,7 @@ func TestAddGossipTx(t *testing.T) { assert.NoError(t, err) pool.SetSigner(signer) - pool.sealing = false + pool.SetSealing(false) pool.createAccountOnce(sender) @@ -861,6 +860,112 @@ func TestPromoteHandler(t *testing.T) { assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length()) assert.Equal(t, uint64(20), pool.accounts.get(addr1).promoted.length()) }) + + t.Run( + "promote handler discards cheaper tx", + func(t *testing.T) { + t.Parallel() + + // helper + newPricedTx := func( + addr types.Address, + nonce, + gasPrice, + slots uint64, + ) *types.Transaction { + tx := newTx(addr, nonce, slots) + tx.GasPrice.SetUint64(gasPrice) + + return tx + } + + pool, err := newTestPool() + assert.NoError(t, err) + pool.SetSigner(&mockSigner{}) + + addTx := func(tx *types.Transaction) enqueueRequest { + tx.ComputeHash() + + go func() { + assert.NoError(t, + pool.addTx(local, tx), + ) + }() + + // grab the enqueue signal + return <-pool.enqueueReqCh + } + + handleEnqueueRequest := func(req enqueueRequest) promoteRequest { + go func() { + pool.handleEnqueueRequest(req) + }() + + return <-pool.promoteReqCh + } + + assertTxExists := func(t *testing.T, tx *types.Transaction, shouldExists bool) { + t.Helper() + + _, exists := pool.index.get(tx.Hash) + assert.Equal(t, shouldExists, exists) + } + + tx1 := newPricedTx(addr1, 0, 10, 2) + tx2 := newPricedTx(addr1, 0, 20, 3) + + // add the transactions + enqTx1 := addTx(tx1) + enqTx2 := addTx(tx2) + + assertTxExists(t, tx1, true) + assertTxExists(t, tx2, true) + + // check the account nonce before promoting + assert.Equal(t, uint64(0), pool.accounts.get(addr1).getNonce()) + + // execute the enqueue handlers + promReq1 := handleEnqueueRequest(enqTx1) + promReq2 := handleEnqueueRequest(enqTx2) + + assert.Equal(t, uint64(0), pool.accounts.get(addr1).getNonce()) + assert.Equal(t, uint64(2), pool.accounts.get(addr1).enqueued.length()) + assert.Equal(t, uint64(0), pool.accounts.get(addr1).promoted.length()) + assert.Equal( + t, + slotsRequired(tx1)+slotsRequired(tx2), + pool.gauge.read(), + ) + + // promote the second Tx and remove the first Tx + pool.handlePromoteRequest(promReq1) + + assert.Equal(t, uint64(1), pool.accounts.get(addr1).getNonce()) + assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length()) // should be empty + assert.Equal(t, uint64(1), pool.accounts.get(addr1).promoted.length()) + assertTxExists(t, tx1, false) + assertTxExists(t, tx2, true) + assert.Equal( + t, + slotsRequired(tx2), + pool.gauge.read(), + ) + + // should do nothing in the 2nd promotion + pool.handlePromoteRequest(promReq2) + + assert.Equal(t, uint64(1), pool.accounts.get(addr1).getNonce()) + assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length()) + assert.Equal(t, uint64(1), pool.accounts.get(addr1).promoted.length()) + assertTxExists(t, tx1, false) + assertTxExists(t, tx2, true) + assert.Equal( + t, + slotsRequired(tx2), + pool.gauge.read(), + ) + }, + ) } func TestResetAccount(t *testing.T) { @@ -1352,7 +1457,7 @@ func TestDemote(t *testing.T) { assert.Equal(t, uint64(1), pool.gauge.read()) assert.Equal(t, uint64(1), pool.accounts.get(addr1).getNonce()) assert.Equal(t, uint64(1), pool.accounts.get(addr1).promoted.length()) - assert.Equal(t, uint(0), pool.accounts.get(addr1).demotions) + assert.Equal(t, uint64(0), pool.accounts.get(addr1).Demotions()) // call demote pool.Prepare() @@ -1364,7 +1469,7 @@ func TestDemote(t *testing.T) { assert.Equal(t, uint64(1), pool.accounts.get(addr1).promoted.length()) // assert counter was incremented - assert.Equal(t, uint(1), pool.accounts.get(addr1).demotions) + assert.Equal(t, uint64(1), pool.accounts.get(addr1).Demotions()) }) t.Run("Demote calls Drop", func(t *testing.T) { @@ -1401,7 +1506,145 @@ func TestDemote(t *testing.T) { assert.Equal(t, uint64(0), pool.accounts.get(addr1).promoted.length()) // demotions are reset to 0 - assert.Equal(t, uint(0), pool.accounts.get(addr1).demotions) + assert.Equal(t, uint64(0), pool.accounts.get(addr1).Demotions()) + }) +} + +func Test_updateAccountSkipsCounts(t *testing.T) { + t.Parallel() + + sendTx := func( + t *testing.T, + pool *TxPool, + tx *types.Transaction, + shouldPromote bool, + ) { + t.Helper() + + go func() { + err := pool.addTx(local, tx) + assert.NoError(t, err) + }() + + if shouldPromote { + go pool.handleEnqueueRequest(<-pool.enqueueReqCh) + pool.handlePromoteRequest(<-pool.promoteReqCh) + } else { + pool.handleEnqueueRequest(<-pool.enqueueReqCh) + } + } + + checkTxExistence := func(t *testing.T, pool *TxPool, txHash types.Hash, shouldExist bool) { + t.Helper() + + _, ok := pool.index.get(txHash) + + assert.Equal(t, shouldExist, ok) + } + + t.Run("should drop the first transaction from promoted queue", func(t *testing.T) { + t.Parallel() + // create pool + pool, err := newTestPool() + assert.NoError(t, err) + + pool.SetSigner(&mockSigner{}) + + tx := newTx(addr1, 0, 1) + sendTx(t, pool, tx, true) + + accountMap := pool.accounts.get(addr1) + + // make sure the transaction is promoted and skips count is zero + assert.Zero(t, accountMap.enqueued.length()) + assert.Equal(t, uint64(1), accountMap.promoted.length()) + assert.Zero(t, accountMap.skips) + assert.Equal(t, slotsRequired(tx), pool.gauge.read()) + checkTxExistence(t, pool, tx.Hash, true) + + // set 9 to skips in order to drop transaction next + accountMap.skips = 9 + + pool.updateAccountSkipsCounts(map[types.Address]uint64{ + // empty + }) + + // make sure the account queue is empty and skips is reset + assert.Zero(t, accountMap.enqueued.length()) + assert.Zero(t, accountMap.promoted.length()) + assert.Zero(t, accountMap.skips) + assert.Zero(t, pool.gauge.read()) + checkTxExistence(t, pool, tx.Hash, false) + }) + + t.Run("should drop the first transaction from enqueued queue", func(t *testing.T) { + t.Parallel() + // create pool + pool, err := newTestPool() + assert.NoError(t, err) + + pool.SetSigner(&mockSigner{}) + + tx := newTx(addr1, 1, 1) // set non-zero nonce to prevent the tx from being added + sendTx(t, pool, tx, false) + + accountMap := pool.accounts.get(addr1) + + // make sure the transaction is promoted and skips count is zero + assert.NotZero(t, accountMap.enqueued.length()) + assert.Zero(t, accountMap.promoted.length()) + assert.Zero(t, accountMap.skips) + assert.Equal(t, slotsRequired(tx), pool.gauge.read()) + checkTxExistence(t, pool, tx.Hash, true) + + // set 9 to skips in order to drop transaction next + accountMap.skips = 9 + + pool.updateAccountSkipsCounts(map[types.Address]uint64{ + // empty + }) + + // make sure the account queue is empty and skips is reset + assert.Zero(t, accountMap.enqueued.length()) + assert.Zero(t, accountMap.promoted.length()) + assert.Zero(t, accountMap.skips) + assert.Zero(t, pool.gauge.read()) + checkTxExistence(t, pool, tx.Hash, false) + }) + + t.Run("should not drop a transaction", func(t *testing.T) { + t.Parallel() + // create pool + pool, err := newTestPool() + assert.NoError(t, err) + + pool.SetSigner(&mockSigner{}) + + tx := newTx(addr1, 0, 1) + sendTx(t, pool, tx, true) + + accountMap := pool.accounts.get(addr1) + + // make sure the transaction is promoted and skips count is zero + assert.Zero(t, accountMap.enqueued.length()) + assert.Equal(t, uint64(1), accountMap.promoted.length()) + assert.Zero(t, accountMap.skips) + assert.Equal(t, slotsRequired(tx), pool.gauge.read()) + checkTxExistence(t, pool, tx.Hash, true) + + // set 9 to skips in order to drop transaction next + accountMap.skips = 5 + + pool.updateAccountSkipsCounts(map[types.Address]uint64{ + addr1: 1, + }) + + // make sure the account queue is empty and skips is reset + assert.Zero(t, accountMap.enqueued.length()) + assert.Equal(t, uint64(1), accountMap.promoted.length()) + assert.Equal(t, uint64(0), accountMap.skips) + assert.Equal(t, slotsRequired(tx), pool.gauge.read()) + checkTxExistence(t, pool, tx.Hash, true) }) }