Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/txpool: implement additional DoS defenses #1348

Merged
merged 1 commit into from Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 36 additions & 9 deletions core/tx_list.go
Expand Up @@ -277,17 +277,19 @@ type txList struct {
strict bool // Whether nonces are strictly continuous or not
txs *txSortedMap // Heap indexed sorted hash map of the transactions

costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
totalcost *big.Int // Total cost of all transactions in the list
}

// newTxList create a new transaction list for maintaining nonce-indexable fast,
// gapped, sortable transaction lists.
func newTxList(strict bool) *txList {
return &txList{
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
totalcost: new(big.Int),
}
}

Expand Down Expand Up @@ -325,7 +327,11 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
if tx.GasFeeCapIntCmp(thresholdFeeCap) < 0 || tx.GasTipCapIntCmp(thresholdTip) < 0 {
return false, nil
}
// Old is being replaced, subtract old cost
l.subTotalCost([]*types.Transaction{old})
}
// Add new tx cost to totalcost
l.totalcost.Add(l.totalcost, tx.Cost())
// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx)
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
Expand All @@ -341,7 +347,9 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
// provided threshold. Every removed transaction is returned for any post-removal
// maintenance.
func (l *txList) Forward(threshold uint64) types.Transactions {
return l.txs.Forward(threshold)
txs := l.txs.Forward(threshold)
l.subTotalCost(txs)
return txs
}

// Filter removes all transactions from the list with a cost or gas limit higher
Expand Down Expand Up @@ -380,14 +388,19 @@ func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions
}
invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
}
// Reset total cost
l.subTotalCost(removed)
l.subTotalCost(invalids)
l.txs.reheap()
return removed, invalids
}

// Cap places a hard limit on the number of items, returning all transactions
// exceeding that limit.
func (l *txList) Cap(threshold int) types.Transactions {
return l.txs.Cap(threshold)
txs := l.txs.Cap(threshold)
l.subTotalCost(txs)
return txs
}

// Remove deletes a transaction from the maintained list, returning whether the
Expand All @@ -399,9 +412,12 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
if removed := l.txs.Remove(nonce); !removed {
return false, nil
}
l.subTotalCost([]*types.Transaction{tx})
// In strict mode, filter out non-executable transactions
if l.strict {
return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
txs := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
l.subTotalCost(txs)
return true, txs
}
return true, nil
}
Expand All @@ -414,7 +430,9 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
// prevent getting into and invalid state. This is not something that should ever
// happen but better to be self correcting than failing!
func (l *txList) Ready(start uint64) types.Transactions {
return l.txs.Ready(start)
txs := l.txs.Ready(start)
l.subTotalCost(txs)
return txs
}

// Len returns the length of the transaction list.
Expand All @@ -440,6 +458,14 @@ func (l *txList) LastElement() *types.Transaction {
return l.txs.LastElement()
}

// subTotalCost subtracts the cost of the given transactions from the
// total cost of all transactions.
func (l *txList) subTotalCost(txs []*types.Transaction) {
for _, tx := range txs {
l.totalcost.Sub(l.totalcost, tx.Cost())
}
}

// priceHeap is a heap.Interface implementation over transactions for retrieving
// price-sorted transactions to discard when the pool fills up. If baseFee is set
// then the heap is sorted based on the effective tip based on the given base fee.
Expand Down Expand Up @@ -584,6 +610,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool

// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
// If noPending is set to true, we will only consider the floating list
//
// Note local transaction won't be considered for eviction.
func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
Expand Down
82 changes: 74 additions & 8 deletions core/tx_pool.go
Expand Up @@ -17,6 +17,7 @@
package core

import (
"container/heap"
"errors"
"math"
"math/big"
Expand Down Expand Up @@ -88,6 +89,14 @@ var (
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")

// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")

// ErrOverdraft is returned if a transaction would cause the senders balance to go negative
// thus invalidating a potential large number of transactions.
ErrOverdraft = errors.New("transaction would cause overdraft")
)

var (
Expand Down Expand Up @@ -677,9 +686,25 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
balance := pool.currentState.GetBalance(from)
if balance.Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}

// Verify that replacing transactions will not result in overdraft
list := pool.pending[from]
if list != nil { // Sender already has pending txs
sum := new(big.Int).Add(tx.Cost(), list.totalcost)
if repl := list.txs.Get(tx.Nonce()); repl != nil {
// Deduct the cost of a transaction replaced by this
sum.Sub(sum, repl.Cost())
}
if balance.Cmp(sum) < 0 {
log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum)
return ErrOverdraft
}
}

// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul)
if err != nil {
Expand Down Expand Up @@ -716,6 +741,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
invalidTxMeter.Mark(1)
return false, err
}

// already validated by this point
from, _ := types.Sender(pool.signer, tx)

// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
Expand All @@ -724,6 +753,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}

// We're about to replace a transaction. The reorg does a more thorough
// analysis of what to remove and how, but it runs async. We don't want to
// do too many replacements between reorg-runs, so we cap the number of
Expand All @@ -744,17 +774,37 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// Bump the counter of rejections-since-reorg
pool.changesSinceReorg += len(drop)

// If the new transaction is a future transaction it should never churn pending transactions
if pool.isFuture(from, tx) {
var replacesPending bool
for _, dropTx := range drop {
dropSender, _ := types.Sender(pool.signer, dropTx)
if list := pool.pending[dropSender]; list != nil && list.Overlaps(dropTx) {
replacesPending = true
break
}
}
// Add all transactions back to the priced queue
if replacesPending {
for _, dropTx := range drop {
heap.Push(&pool.priced.urgent, dropTx)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
return false, ErrFutureReplacePending
}
}

// Kick out the underpriced remote transactions.
for _, tx := range drop {
//log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)
dropped := pool.removeTx(tx.Hash(), false)
pool.changesSinceReorg += dropped
}
}

// Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
Expand Down Expand Up @@ -798,6 +848,20 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
return replaced, nil
}

// isFuture reports whether the given transaction is immediately executable.
func (pool *TxPool) isFuture(from common.Address, tx *types.Transaction) bool {
list := pool.pending[from]
if list == nil {
return pool.pendingNonces.get(from) != tx.Nonce()
}
// Sender has pending transactions.
if old := list.txs.Get(tx.Nonce()); old != nil {
return false // It replaces a pending transaction.
}
// Not replacing, check if parent nonce exists in pending.
return list.txs.Get(tx.Nonce()-1) == nil
}

// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
Expand Down Expand Up @@ -1044,11 +1108,12 @@ func (pool *TxPool) Has(hash common.Hash) bool {

// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Returns the number of transactions removed from the pending queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) int {
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
return
return 0
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion

Expand Down Expand Up @@ -1076,7 +1141,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
pool.pendingNonces.setIfLower(addr, tx.Nonce())
// Reduce the pending counter
pendingGauge.Dec(int64(1 + len(invalids)))
return
return 1 + len(invalids)
}
}
// Transaction is in the future queue
Expand All @@ -1090,6 +1155,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
delete(pool.beats, addr)
}
}
return 0
}

// requestReset requests a pool reset to the new head block.
Expand Down
27 changes: 19 additions & 8 deletions core/tx_pool_test.go
Expand Up @@ -156,6 +156,9 @@ func validateTxPoolInternals(pool *TxPool) error {
if nonce := pool.pendingNonces.get(addr); nonce != last+1 {
return fmt.Errorf("pending nonce mismatch: have %v, want %v", nonce, last+1)
}
if txs.totalcost.Cmp(common.Big0) < 0 {
return fmt.Errorf("totalcost went negative: %v", txs.totalcost)
}
}
return nil
}
Expand Down Expand Up @@ -1104,7 +1107,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
defer pool.Stop()

account := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, account, big.NewInt(1000000))
testAddBalance(pool, account, big.NewInt(1000000000000))

// Keep track of transaction events to ensure all executables get announced
events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5)
Expand Down Expand Up @@ -1583,7 +1586,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000))
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000*1000000))
}
// Create transaction (both pending and queued) with a linearly growing gasprice
for i := uint64(0); i < 500; i++ {
Expand Down Expand Up @@ -1662,7 +1665,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer sub.Unsubscribe()

// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 4)
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
Expand Down Expand Up @@ -1698,6 +1701,10 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), keys[1])); err != ErrUnderpriced {
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
}
// Replace a future transaction with a future transaction
if err := pool.AddRemote(pricedTransaction(1, 100000, big.NewInt(2), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1
t.Fatalf("failed to add well priced transaction: %v", err)
}
// Ensure that adding high priced transactions drops cheap ones, but not own
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
t.Fatalf("failed to add well priced transaction: %v", err)
Expand All @@ -1708,14 +1715,18 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
t.Fatalf("failed to add well priced transaction: %v", err)
}
// Ensure that replacing a pending transaction with a future transaction fails
if err := pool.AddRemote(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != ErrFutureReplacePending {
t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, ErrFutureReplacePending)
}
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateEvents(events, 1); err != nil {
if err := validateEvents(events, 2); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
Expand Down Expand Up @@ -1877,11 +1888,11 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) {
t.Fatalf("failed to add well priced transaction: %v", err)
}

tx = pricedTransaction(2, 100000, big.NewInt(3), keys[1])
tx = pricedTransaction(1, 100000, big.NewInt(3), keys[1])
if err := pool.AddRemote(tx); err != nil { // +K1:2, -K0:1 => Pend K0:0 K1:0, K2:0; Que K1:2
t.Fatalf("failed to add well priced transaction: %v", err)
}
tx = dynamicFeeTx(3, 100000, big.NewInt(4), big.NewInt(1), keys[1])
tx = dynamicFeeTx(2, 100000, big.NewInt(4), big.NewInt(1), keys[1])
if err := pool.AddRemote(tx); err != nil { // +K1:3, -K1:0 => Pend K0:0 K2:0; Que K1:2 K1:3
t.Fatalf("failed to add well priced transaction: %v", err)
}
Expand All @@ -1892,7 +1903,7 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateEvents(events, 1); err != nil {
if err := validateEvents(events, 2); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
Expand Down Expand Up @@ -2527,7 +2538,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
defer pool.Stop()

account := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, account, big.NewInt(1000000))
testAddBalance(pool, account, big.NewInt(1000000000000000000))

batches := make([]types.Transactions, b.N)
for i := 0; i < b.N; i++ {
Expand Down