Skip to content

Commit

Permalink
[TxPool] Fix double nonce in promoted (#739)
Browse files Browse the repository at this point in the history
* add test

* Prune lower transactions on promoting

* Fix pruning in Promote

* Fix failed test

Co-authored-by: kourin <kourin.code@gmail.com>
  • Loading branch information
dbrajovic and Kourin1996 committed Sep 22, 2022
1 parent 06d3f72 commit ff0d3d3
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 46 deletions.
48 changes: 28 additions & 20 deletions txpool/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type account struct {
init sync.Once
enqueued, promoted *accountQueue
nextNonce uint64
demotions uint
demotions uint64
// the number of consecutive blocks that don't contain account's transaction
skips uint64

Expand All @@ -174,6 +174,21 @@ func (a *account) setNonce(nonce uint64) {
atomic.StoreUint64(&a.nextNonce, nonce)
}

// Demotions returns the current value of demotions
func (a *account) Demotions() uint64 {
return a.demotions
}

// resetDemotions sets 0 to demotions to clear count
func (a *account) resetDemotions() {
a.demotions = 0
}

// incrementDemotions increments demotions
func (a *account) incrementDemotions() {
a.demotions++
}

// reset aligns the account with the new nonce
// by pruning all transactions with nonce lesser than new.
// After pruning, a promotion may be signaled if the first
Expand All @@ -186,10 +201,7 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
defer a.promoted.unlock()

// prune the promoted txs
prunedPromoted = append(
prunedPromoted,
a.promoted.prune(nonce)...,
)
prunedPromoted = a.promoted.prune(nonce)

if nonce <= a.getNonce() {
// only the promoted queue needed pruning
Expand All @@ -200,19 +212,15 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
defer a.enqueued.unlock()

// prune the enqueued txs
prunedEnqueued = append(
prunedEnqueued,
a.enqueued.prune(nonce)...,
)
prunedEnqueued = a.enqueued.prune(nonce)

// update nonce expected for this account
a.setNonce(nonce)

// it is important to signal promotion while
// the locks are held to ensure no other
// handler will mutate the account
if first := a.enqueued.peek(); first != nil &&
first.Nonce == nonce {
if first := a.enqueued.peek(); first != nil && first.Nonce == nonce {
// first enqueued tx is expected -> signal promotion
promoteCh <- promoteRequest{account: first.From}
}
Expand Down Expand Up @@ -245,7 +253,7 @@ func (a *account) enqueue(tx *types.Transaction) error {
// Eligible transactions are all sequential in order of nonce
// and the first one has to have nonce less (or equal) to the account's
// nextNonce.
func (a *account) promote() []*types.Transaction {
func (a *account) promote() (promoted []*types.Transaction, pruned []*types.Transaction) {
a.promoted.lock(true)
a.enqueued.lock(true)

Expand All @@ -256,21 +264,18 @@ func (a *account) promote() []*types.Transaction {

// sanity check
currentNonce := a.getNonce()
if a.enqueued.length() == 0 ||
a.enqueued.peek().Nonce > currentNonce {
if a.enqueued.length() == 0 || a.enqueued.peek().Nonce > currentNonce {
// nothing to promote
return nil
return
}

promoted := make([]*types.Transaction, 0)
nextNonce := a.enqueued.peek().Nonce

// move all promotable txs (enqueued txs that are sequential in nonce)
// to the account's promoted queue
for {
tx := a.enqueued.peek()
if tx == nil ||
tx.Nonce != nextNonce {
if tx == nil || tx.Nonce != nextNonce {
break
}

Expand All @@ -281,7 +286,10 @@ func (a *account) promote() []*types.Transaction {
a.promoted.push(tx)

// update counters
nextNonce += 1
nextNonce = tx.Nonce + 1

// prune the transactions with lower nonce
pruned = append(pruned, a.enqueued.prune(nextNonce)...)

// update return result
promoted = append(promoted, tx)
Expand All @@ -293,7 +301,7 @@ func (a *account) promote() []*types.Transaction {
a.setNonce(nextNonce)
}

return promoted
return
}

// resetSkips sets 0 to skips
Expand Down
12 changes: 7 additions & 5 deletions txpool/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,11 @@ func (q *accountQueue) prune(nonce uint64) (
pruned []*types.Transaction,
) {
for {
tx := q.peek()
if tx == nil ||
tx.Nonce >= nonce {
if tx := q.peek(); tx == nil || tx.Nonce >= nonce {
break
}

tx = q.pop()
pruned = append(pruned, tx)
pruned = append(pruned, q.pop())
}

return
Expand Down Expand Up @@ -130,6 +127,11 @@ func (q *minNonceQueue) Swap(i, j int) {
}

func (q *minNonceQueue) Less(i, j int) bool {
// The higher gas price Tx comes first if the nonces are same
if (*q)[i].Nonce == (*q)[j].Nonce {
return (*q)[i].GasPrice.Cmp((*q)[j].GasPrice) > 0
}

return (*q)[i].Nonce < (*q)[j].Nonce
}

Expand Down
42 changes: 24 additions & 18 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (

// maximum allowed number of times an account
// was excluded from block building (ibft.writeTransactions)
maxAccountDemotions = uint(10)
maxAccountDemotions uint64 = 10

// maximum allowed number of consecutive blocks that don't have the account's transaction
maxAccountSkips = uint64(10)
Expand Down Expand Up @@ -403,7 +403,7 @@ func (p *TxPool) Pop(tx *types.Transaction) {
account.promoted.pop()

// successfully popping an account resets its demotions count to 0
account.demotions = 0
account.resetDemotions()

// update state
p.gauge.decrease(slotsRequired(tx))
Expand Down Expand Up @@ -471,7 +471,7 @@ func (p *TxPool) Drop(tx *types.Transaction) {
// it is Dropped instead.
func (p *TxPool) Demote(tx *types.Transaction) {
account := p.accounts.get(tx.From)
if account.demotions == maxAccountDemotions {
if account.Demotions() >= maxAccountDemotions {
p.logger.Debug(
"Demote: threshold reached - dropping account",
"addr", tx.From.String(),
Expand All @@ -480,12 +480,12 @@ func (p *TxPool) Demote(tx *types.Transaction) {
p.Drop(tx)

// reset the demotions counter
account.demotions = 0
account.resetDemotions()

return
}

account.demotions++
account.incrementDemotions()

p.eventManager.signalEvent(proto.EventType_DEMOTED, tx.Hash)
}
Expand Down Expand Up @@ -739,9 +739,7 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error {
}

// initialize account for this address once
if !p.accounts.exists(tx.From) {
p.createAccountOnce(tx.From)
}
p.createAccountOnce(tx.From)

// send request [BLOCKING]
p.enqueueReqCh <- enqueueRequest{tx: tx}
Expand Down Expand Up @@ -793,9 +791,12 @@ func (p *TxPool) handlePromoteRequest(req promoteRequest) {
account := p.accounts.get(addr)

// promote enqueued txs
promoted := account.promote()
promoted, pruned := account.promote()
p.logger.Debug("promote request", "promoted", promoted, "addr", addr.String())

p.index.remove(pruned...)
p.gauge.decrease(slotsRequired(pruned...))

// update metrics
p.metrics.PendingTxs.Add(float64(len(promoted)))
p.eventManager.signalEvent(proto.EventType_PROMOTED, toHash(promoted...)...)
Expand Down Expand Up @@ -856,31 +857,33 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) {

// clear all accounts of stale txs
for addr, newNonce := range stateNonces {
if !p.accounts.exists(addr) {
account := p.accounts.get(addr)

if account == nil {
// no updates for this account
continue
}

account := p.accounts.get(addr)
prunedPromoted, prunedEnqueued := account.reset(newNonce, p.promoteReqCh)

// append pruned
allPrunedPromoted = append(allPrunedPromoted, prunedPromoted...)
allPrunedEnqueued = append(allPrunedEnqueued, prunedEnqueued...)

// new state for account -> demotions are reset to 0
account.demotions = 0
account.resetDemotions()
}

// pool cleanup callback
cleanup := func(stale ...*types.Transaction) {
cleanup := func(stale []*types.Transaction) {
p.index.remove(stale...)
p.gauge.decrease(slotsRequired(stale...))
}

// prune pool state
if len(allPrunedPromoted) > 0 {
cleanup(allPrunedPromoted...)
cleanup(allPrunedPromoted)

p.eventManager.signalEvent(
proto.EventType_PRUNED_PROMOTED,
toHash(allPrunedPromoted...)...,
Expand All @@ -890,7 +893,8 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) {
}

if len(allPrunedEnqueued) > 0 {
cleanup(allPrunedEnqueued...)
cleanup(allPrunedEnqueued)

p.eventManager.signalEvent(
proto.EventType_PRUNED_ENQUEUED,
toHash(allPrunedEnqueued...)...,
Expand Down Expand Up @@ -938,14 +942,16 @@ func (p *TxPool) updateAccountSkipsCounts(latestActiveAccounts map[types.Address
// createAccountOnce creates an account and
// ensures it is only initialized once.
func (p *TxPool) createAccountOnce(newAddr types.Address) *account {
if p.accounts.exists(newAddr) {
return nil
}

// fetch nonce from state
stateRoot := p.store.Header().StateRoot
stateNonce := p.store.GetNonce(stateRoot, newAddr)

// initialize the account
account := p.accounts.initOnce(newAddr, stateNonce)

return account
return p.accounts.initOnce(newAddr, stateNonce)
}

// Length returns the total number of all promoted transactions.
Expand Down

0 comments on commit ff0d3d3

Please sign in to comment.