Skip to content

Commit

Permalink
core/txpool: remove "local" notion from the txpool price heap (ethere…
Browse files Browse the repository at this point in the history
…um#21478)

* core: separate the local notion from the pricedHeap

* core: add benchmarks

* core: improve tests

* core: address comments

* core: degrade the panic to error message

* core: fix typo

* core: address comments

* core: address comment

* core: use PEAK instead of POP

* core: address comments
  • Loading branch information
rjl493456442 committed Dec 11, 2020
1 parent b47f4ca commit 88c6962
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 145 deletions.
152 changes: 65 additions & 87 deletions core/tx_list.go
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

// nonceHeap is a heap.Interface implementation over 64bit unsigned integers for
Expand Down Expand Up @@ -439,24 +438,29 @@ func (h *priceHeap) Pop() interface{} {
}

// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way.
// contents in a price-incrementing way. It's built opon the all transactions
// in txpool but only interested in the remote part. It means only remote transactions
// will be considered for tracking, sorting, eviction, etc.
type txPricedList struct {
all *txLookup // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
all *txLookup // Pointer to the map of all transactions
remotes *priceHeap // Heap of prices of all the stored **remote** transactions
stales int // Number of stale price points to (re-heap trigger)
}

// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
all: all,
remotes: new(priceHeap),
}
}

// Put inserts a new transaction into the heap.
func (l *txPricedList) Put(tx *types.Transaction) {
heap.Push(l.items, tx)
func (l *txPricedList) Put(tx *types.Transaction, local bool) {
if local {
return
}
heap.Push(l.remotes, tx)
}

// Removed notifies the prices transaction list that an old transaction dropped
Expand All @@ -465,121 +469,95 @@ func (l *txPricedList) Put(tx *types.Transaction) {
func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
l.stales += count
if l.stales <= len(*l.items)/4 {
if l.stales <= len(*l.remotes)/4 {
return
}
// Seems we've reached a critical number of stale transactions, reheap
reheap := make(priceHeap, 0, l.all.Count())

l.stales, l.items = 0, &reheap
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
*l.items = append(*l.items, tx)
return true
})
heap.Init(l.items)
l.Reheap()
}

// Cap finds all the transactions below the given price threshold, drops them
// from the priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions {
//
// Note: only remote transactions will be considered for eviction.
func (l *txPricedList) Cap(threshold *big.Int) types.Transactions {
drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep

for len(*l.items) > 0 {
for len(*l.remotes) > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction)
if l.all.Get(tx.Hash()) == nil {
cheapest := (*l.remotes)[0]
if l.all.GetRemote(cheapest.Hash()) == nil { // Removed or migrated
heap.Pop(l.remotes)
l.stales--
continue
}
// Stop the discards if we've reached the threshold
if tx.GasPriceIntCmp(threshold) >= 0 {
save = append(save, tx)
if cheapest.GasPriceIntCmp(threshold) >= 0 {
break
}
// Non stale transaction found, discard unless local
if local.containsTx(tx) {
save = append(save, tx)
} else {
drop = append(drop, tx)
}
}
for _, tx := range save {
heap.Push(l.items, tx)
heap.Pop(l.remotes)
drop = append(drop, cheapest)
}
return drop
}

// Underpriced checks whether a transaction is cheaper than (or as cheap as) the
// lowest priced transaction currently being tracked.
func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool {
// Local transactions cannot be underpriced
if local.containsTx(tx) {
return false
}
// lowest priced (remote) transaction currently being tracked.
func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
// Discard stale price points if found at the heap start
for len(*l.items) > 0 {
head := []*types.Transaction(*l.items)[0]
if l.all.Get(head.Hash()) == nil {
for len(*l.remotes) > 0 {
head := []*types.Transaction(*l.remotes)[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
l.stales--
heap.Pop(l.items)
heap.Pop(l.remotes)
continue
}
break
}
// Check if the transaction is underpriced or not
if len(*l.items) == 0 {
log.Error("Pricing query for empty pool") // This cannot happen, print to catch programming errors
return false
if len(*l.remotes) == 0 {
return false // There is no remote transaction at all.
}
cheapest := []*types.Transaction(*l.items)[0]
// If the remote transaction is even cheaper than the
// cheapest one tracked locally, reject it.
cheapest := []*types.Transaction(*l.remotes)[0]
return cheapest.GasPriceCmp(tx) >= 0
}

// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions {
// If we have some local accountset, those will not be discarded
if !local.empty() {
// In case the list is filled to the brim with 'local' txs, we do this
// little check to avoid unpacking / repacking the heap later on, which
// is very expensive
discardable := 0
for _, tx := range *l.items {
if !local.containsTx(tx) {
discardable++
}
if discardable >= slots {
break
}
}
if slots > discardable {
slots = discardable
}
}
if slots == 0 {
return nil
}
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep

for len(*l.items) > 0 && slots > 0 {
//
// Note local transaction won't be considered for eviction.
func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
for len(*l.remotes) > 0 && slots > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction)
if l.all.Get(tx.Hash()) == nil {
tx := heap.Pop(l.remotes).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
continue
}
// Non stale transaction found, discard unless local
if local.containsTx(tx) {
save = append(save, tx)
} else {
drop = append(drop, tx)
slots -= numSlots(tx)
// Non stale transaction found, discard it
drop = append(drop, tx)
slots -= numSlots(tx)
}
// If we still can't make enough room for the new transaction
if slots > 0 && !force {
for _, tx := range drop {
heap.Push(l.remotes, tx)
}
return nil, false
}
for _, tx := range save {
heap.Push(l.items, tx)
}
return drop
return drop, true
}

// Reheap forcibly rebuilds the heap based on the current remote transaction set.
func (l *txPricedList) Reheap() {
reheap := make(priceHeap, 0, l.all.RemoteCount())

l.stales, l.remotes = 0, &reheap
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
*l.remotes = append(*l.remotes, tx)
return true
}, false, true) // Only iterate remotes
heap.Init(l.remotes)
}

0 comments on commit 88c6962

Please sign in to comment.