Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,8 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {

// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
news = make([]*types.Transaction, 0, len(txs))
hasValid bool
errs = make([]error, len(txs))
)
for i, tx := range txs {
// If the transaction is known, pre-set the error slot
Expand All @@ -962,25 +962,16 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
invalidTxMeter.Mark(1)
continue
}
// Accumulate all unknown transactions for deeper processing
news = append(news, tx)
hasValid = true
}
if len(news) == 0 {
if !hasValid {
return errs
}
// Process all the new transaction and merge any errors into the original slice
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
dirtyAddrs := pool.addTxsLocked(txs, errs, local)
pool.mu.Unlock()

var nilSlot = 0
for _, err := range newErrs {
for errs[nilSlot] != nil {
nilSlot++
}
errs[nilSlot] = err
nilSlot++
}
// Reorg the pool internals if needed and return
done := pool.requestPromoteExecutables(dirtyAddrs)
if sync {
Expand All @@ -991,13 +982,15 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {

// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, errs []error, local bool) *accountSet {
var (
dirty = newAccountSet(pool.signer)
errs = make([]error, len(txs))
valid int64
)
for i, tx := range txs {
if errs[i] != nil {
continue
}
replaced, err := pool.add(tx, local)
errs[i] = err
if err == nil {
Expand All @@ -1008,7 +1001,7 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error,
}
}
validTxMeter.Mark(valid)
return errs, dirty
return dirty
}

// Status returns the status (unknown/pending/queued) of a batch of transactions
Expand Down Expand Up @@ -1380,7 +1373,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
pool.addTxsLocked(reinject, make([]error, len(reinject)), false)

// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
Expand Down