Skip to content

Commit

Permalink
Revert "core, txpool: less allocations when handling transactions (#2…
Browse files Browse the repository at this point in the history
…1232)"

Reverting because this change started handling account balances as
uint64 in the transaction pool, which is incorrect.

This reverts commit af5c97a.
  • Loading branch information
fjl committed Jul 9, 2020
1 parent 967d8de commit bcb3087
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 151 deletions.
10 changes: 5 additions & 5 deletions common/math/integer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package math

import (
"fmt"
"math/bits"
"strconv"
)

Expand Down Expand Up @@ -88,12 +87,13 @@ func SafeSub(x, y uint64) (uint64, bool) {

// SafeAdd returns the result and whether overflow occurred.
func SafeAdd(x, y uint64) (uint64, bool) {
sum, carry := bits.Add64(x, y, 0)
return sum, carry != 0
return x + y, y > MaxUint64-x
}

// SafeMul returns multiplication result and whether overflow occurred.
func SafeMul(x, y uint64) (uint64, bool) {
hi, lo := bits.Mul64(x, y)
return lo, hi != 0
if x == 0 || y == 0 {
return 0, false
}
return x * y, y > MaxUint64/x
}
129 changes: 30 additions & 99 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,30 +99,7 @@ func (m *txSortedMap) Forward(threshold uint64) types.Transactions {

// Filter iterates over the list of transactions and removes all of them for which
// the specified function evaluates to true.
// Filter, as opposed to 'filter', re-initialises the heap after the operation is done.
// If you want to do several consecutive filterings, it's therefore better to first
// do a .filter(func1) followed by .Filter(func2) or reheap()
func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
removed := m.filter(filter)
// If transactions were removed, the heap and cache are ruined
if len(removed) > 0 {
m.reheap()
}
return removed
}

func (m *txSortedMap) reheap() {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)
m.cache = nil
}

// filter is identical to Filter, but **does not** regenerate the heap. This method
// should only be used if followed immediately by a call to Filter or reheap()
func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transactions {
var removed types.Transactions

// Collect all the transactions to filter out
Expand All @@ -132,7 +109,14 @@ func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transac
delete(m.items, nonce)
}
}
// If transactions were removed, the heap and cache are ruined
if len(removed) > 0 {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)

m.cache = nil
}
return removed
Expand Down Expand Up @@ -213,7 +197,10 @@ func (m *txSortedMap) Len() int {
return len(m.items)
}

func (m *txSortedMap) flatten() types.Transactions {
// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (m *txSortedMap) Flatten() types.Transactions {
// If the sorting was not cached yet, create and cache it
if m.cache == nil {
m.cache = make(types.Transactions, 0, len(m.items))
Expand All @@ -222,27 +209,12 @@ func (m *txSortedMap) flatten() types.Transactions {
}
sort.Sort(types.TxByNonce(m.cache))
}
return m.cache
}

// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (m *txSortedMap) Flatten() types.Transactions {
// Copy the cache to prevent accidental modifications
cache := m.flatten()
txs := make(types.Transactions, len(cache))
copy(txs, cache)
txs := make(types.Transactions, len(m.cache))
copy(txs, m.cache)
return txs
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (m *txSortedMap) LastElement() *types.Transaction {
cache := m.flatten()
return cache[len(cache)-1]
}

// txList is a "list" of transactions belonging to an account, sorted by account
// nonce. The same type can be used both for storing contiguous transactions for
// the executable/pending queue; and for storing gapped transactions for the non-
Expand All @@ -251,16 +223,17 @@ type txList struct {
strict bool // Whether nonces are strictly continuous or not
txs *txSortedMap // Heap indexed sorted hash map of the transactions

costcap uint64 // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
}

// newTxList create a new transaction list for maintaining nonce-indexable fast,
// gapped, sortable transaction lists.
func newTxList(strict bool) *txList {
return &txList{
strict: strict,
txs: newTxSortedMap(),
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
}
}

Expand All @@ -279,26 +252,17 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
// If there's an older better transaction, abort
old := l.txs.Get(tx.Nonce())
if old != nil {
// threshold = oldGP * (100 + priceBump) / 100
a := big.NewInt(100 + int64(priceBump))
a = a.Mul(a, old.GasPrice())
b := big.NewInt(100)
threshold := a.Div(a, b)
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPriceCmp(tx) >= 0 || tx.GasPriceIntCmp(threshold) < 0 {
return false, nil
}
}
cost, overflow := tx.CostU64()
if overflow {
log.Warn("transaction cost overflown, txHash: %v txCost: %v", tx.Hash(), cost)
return false, nil
}
// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx)
if l.costcap < cost {
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
l.costcap = cost
}
if gas := tx.Gas(); l.gascap < gas {
Expand All @@ -323,35 +287,29 @@ func (l *txList) Forward(threshold uint64) types.Transactions {
// a point in calculating all the costs or if the balance covers all. If the threshold
// is lower than the costgas cap, the caps will be reset to a new high after removing
// the newly invalidated transactions.
func (l *txList) Filter(costLimit uint64, gasLimit uint64) (types.Transactions, types.Transactions) {
func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) {
// If all transactions are below the threshold, short circuit
if l.costcap <= costLimit && l.gascap <= gasLimit {
if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
return nil, nil
}
l.costcap = costLimit // Lower the caps to the thresholds
l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds
l.gascap = gasLimit

// Filter out all the transactions above the account's funds
removed := l.txs.filter(func(tx *types.Transaction) bool {
cost, _ := tx.CostU64()
return cost > costLimit || tx.Gas() > gasLimit
})
removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })

if len(removed) == 0 {
return nil, nil
}
var invalids types.Transactions
// If the list was strict, filter anything above the lowest nonce
if l.strict {
var invalids types.Transactions

if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
if nonce := tx.Nonce(); lowest > nonce {
lowest = nonce
}
}
invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
}
l.txs.reheap()
return removed, invalids
}

Expand Down Expand Up @@ -405,12 +363,6 @@ func (l *txList) Flatten() types.Transactions {
return l.txs.Flatten()
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (l *txList) LastElement() *types.Transaction {
return l.txs.LastElement()
}

// priceHeap is a heap.Interface implementation over transactions for retrieving
// price-sorted transactions to discard when the pool fills up.
type priceHeap []*types.Transaction
Expand Down Expand Up @@ -543,29 +495,8 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions {
// If we have some local accountset, those will not be discarded
if !local.empty() {
// In case the list is filled to the brim with 'local' txs, we do this
// little check to avoid unpacking / repacking the heap later on, which
// is very expensive
discardable := 0
for _, tx := range *l.items {
if !local.containsTx(tx) {
discardable++
}
if discardable >= slots {
break
}
}
if slots > discardable {
slots = discardable
}
}
if slots == 0 {
return nil
}
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep

for len(*l.items) > 0 && slots > 0 {
// Discard stale transactions if found during cleanup
Expand Down
19 changes: 9 additions & 10 deletions core/tx_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"math/big"
"math/rand"
"testing"

Expand Down Expand Up @@ -50,22 +51,20 @@ func TestStrictTxListAdd(t *testing.T) {
}
}

func BenchmarkTxListAdd(b *testing.B) {
func BenchmarkTxListAdd(t *testing.B) {
// Generate a list of transactions to insert
key, _ := crypto.GenerateKey()

txs := make(types.Transactions, 2000)
txs := make(types.Transactions, 100000)
for i := 0; i < len(txs); i++ {
txs[i] = transaction(uint64(i), 0, key)
}
// Insert the transactions in a random order
b.ResetTimer()
priceLimit := DefaultTxPoolConfig.PriceLimit
for i := 0; i < b.N; i++ {
list := newTxList(true)
for _, v := range rand.Perm(len(txs)) {
list.Add(txs[v], DefaultTxPoolConfig.PriceBump)
list.Filter(priceLimit, DefaultTxPoolConfig.PriceBump)
}
list := newTxList(true)
priceLimit := big.NewInt(int64(DefaultTxPoolConfig.PriceLimit))
t.ResetTimer()
for _, v := range rand.Perm(len(txs)) {
list.Add(txs[v], DefaultTxPoolConfig.PriceBump)
list.Filter(priceLimit, DefaultTxPoolConfig.PriceBump)
}
}
18 changes: 5 additions & 13 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,11 +543,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
cost, overflow := tx.CostU64()
if overflow {
return ErrInsufficientFunds
}
if pool.currentState.GetBalance(from).Uint64() < cost {
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
// Ensure the transaction has more gas than the basic tx fee.
Expand Down Expand Up @@ -1063,8 +1059,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt

// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
highestPending := list.LastElement()
pool.pendingNonces.set(addr, highestPending.Nonce()+1)
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1)
}
pool.mu.Unlock()

Expand Down Expand Up @@ -1194,7 +1190,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
Expand Down Expand Up @@ -1386,7 +1382,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas)
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
Expand Down Expand Up @@ -1461,10 +1457,6 @@ func (as *accountSet) contains(addr common.Address) bool {
return exist
}

func (as *accountSet) empty() bool {
return len(as.accounts) == 0
}

// containsTx checks if the sender of a given tx is within the set. If the sender
// cannot be derived, this method returns false.
func (as *accountSet) containsTx(tx *types.Transaction) bool {
Expand Down
18 changes: 5 additions & 13 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,15 +1890,11 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
}

// Benchmarks the speed of batched transaction insertion.
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, false) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, false) }
func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, false) }
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }
func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000) }

func BenchmarkPoolBatchLocalInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, true) }
func BenchmarkPoolBatchLocalInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, true) }
func BenchmarkPoolBatchLocalInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, true) }

func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
func benchmarkPoolBatchInsert(b *testing.B, size int) {
// Generate a batch of transactions to enqueue into the pool
pool, key := setupTxPool()
defer pool.Stop()
Expand All @@ -1916,10 +1912,6 @@ func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
// Benchmark importing the transactions into the queue
b.ResetTimer()
for _, batch := range batches {
if local {
pool.AddLocals(batch)
} else {
pool.AddRemotes(batch)
}
pool.AddRemotes(batch)
}
}

0 comments on commit bcb3087

Please sign in to comment.