Skip to content

Commit

Permalink
fix: add SendState to determine when to abort Send
Browse files Browse the repository at this point in the history
This commit adds a new SendState struct, which is used internally by the
txmgr to determine if we should give up on a particular tx at the given
nonce due to ErrNonceTooLow failures. To do so, we track the set of
mined txns at any particular time, which are updated via asynchronous
calls to TxMined and TxNotMined by each of the spawned goroutines. The
presence of a mined txn supercedes any of the errors returned, and will
cause the txmgr to wait for full confirmation. However, if we observe
multiple ErrNonceTooLows, the SendState will wait for a configurable
safe abort count to ensure that the reading wasn't due to being in a
transient state, as we don't have exact guarantees on the execution
ordering of various goroutines.

With this change, the test added in the prior commit now succeeds.
  • Loading branch information
cfromknecht authored and mslipper committed Feb 10, 2022
1 parent a27a68e commit bcbde5f
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/metal-files-draw.md
@@ -0,0 +1,5 @@
---
'@eth-optimism/batch-submitter-service': patch
---

Fixes a bug that causes the txmgr to not wait for the configured numConfirmations
102 changes: 102 additions & 0 deletions go/bss-core/txmgr/send_state.go
@@ -0,0 +1,102 @@
package txmgr

import (
"strings"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
)

// SendState tracks information about the publication state of a given txn. In
// this context, a txn may correspond to multiple different txn hashes due to
// varying gas prices, though we treat them all as the same logical txn. This
// struct is primarly used to determine whether or not the txmgr should abort a
// given txn and retry with a higher nonce.
type SendState struct {
minedTxs map[common.Hash]struct{}
nonceTooLowCount uint64
mu sync.RWMutex

safeAbortNonceTooLowCount uint64
}

// NewSendState parameterizes a new SendState from the passed
// safeAbortNonceTooLowCount.
func NewSendState(safeAbortNonceTooLowCount uint64) *SendState {
if safeAbortNonceTooLowCount == 0 {
panic("txmgr: safeAbortNonceTooLowCount cannot be zero")
}

return &SendState{
minedTxs: make(map[common.Hash]struct{}),
nonceTooLowCount: 0,
safeAbortNonceTooLowCount: safeAbortNonceTooLowCount,
}
}

// ProcessSendError should be invoked with the error returned for each
// publication. It is safe to call this method with nil or arbitrary errors.
// Currently it only acts on errors containing the ErrNonceTooLow message.
func (s *SendState) ProcessSendError(err error) {
// Nothing to do.
if err == nil {
return
}

// Only concerned with ErrNonceTooLow.
if !strings.Contains(err.Error(), core.ErrNonceTooLow.Error()) {
return
}

s.mu.Lock()
defer s.mu.Unlock()

// Record this nonce too low observation.
s.nonceTooLowCount++
}

// TxMined records that the txn with txnHash has been mined and is await
// confirmation. It is safe to call this function multiple times.
func (s *SendState) TxMined(txHash common.Hash) {
s.mu.Lock()
defer s.mu.Unlock()

s.minedTxs[txHash] = struct{}{}
}

// TxMined records that the txn with txnHash has not been mined or has been
// reorg'd out. It is safe to call this function multiple times.
func (s *SendState) TxNotMined(txHash common.Hash) {
s.mu.Lock()
defer s.mu.Unlock()

_, wasMined := s.minedTxs[txHash]
delete(s.minedTxs, txHash)

// If the txn got reorged and left us with no mined txns, reset the nonce
// too low count, otherwise we might abort too soon when processing the next
// error. If the nonce too low errors persist, we want to ensure we wait out
// the full safe abort count to enesure we have a sufficient number of
// observations.
if len(s.minedTxs) == 0 && wasMined {
s.nonceTooLowCount = 0
}
}

// ShouldAbortImmediately returns true if the txmgr should give up on trying a
// given txn with the target nonce. For now, this only happens if we see an
// extended period of getting ErrNonceTooLow without having a txn mined.
func (s *SendState) ShouldAbortImmediately() bool {
s.mu.RLock()
defer s.mu.RUnlock()

// Never abort if our latest sample reports having at least one mined txn.
if len(s.minedTxs) > 0 {
return false
}

// Only abort if we've observed enough ErrNonceTooLow to meet our safe abort
// threshold.
return s.nonceTooLowCount >= s.safeAbortNonceTooLowCount
}
137 changes: 137 additions & 0 deletions go/bss-core/txmgr/send_state_test.go
@@ -0,0 +1,137 @@
package txmgr_test

import (
"errors"
"testing"

"github.com/ethereum-optimism/optimism/go/bss-core/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/stretchr/testify/require"
)

const testSafeAbortNonceTooLowCount = 3

var (
testHash = common.HexToHash("0x01")
)

func newSendState() *txmgr.SendState {
return txmgr.NewSendState(testSafeAbortNonceTooLowCount)
}

func processNSendErrors(sendState *txmgr.SendState, err error, n int) {
for i := 0; i < n; i++ {
sendState.ProcessSendError(err)
}
}

// TestSendStateNoAbortAfterInit asserts that the default SendState won't
// trigger an abort even after the safe abort interval has elapsed.
func TestSendStateNoAbortAfterInit(t *testing.T) {
sendState := newSendState()
require.False(t, sendState.ShouldAbortImmediately())
}

// TestSendStateNoAbortAfterProcessNilError asserts that nil errors are not
// considered for abort status.
func TestSendStateNoAbortAfterProcessNilError(t *testing.T) {
sendState := newSendState()

processNSendErrors(sendState, nil, testSafeAbortNonceTooLowCount)
require.False(t, sendState.ShouldAbortImmediately())
}

// TestSendStateNoAbortAfterProcessOtherError asserts that non-nil errors other
// than ErrNonceTooLow are not considered for abort status.
func TestSendStateNoAbortAfterProcessOtherError(t *testing.T) {
sendState := newSendState()

otherError := errors.New("other error")
processNSendErrors(sendState, otherError, testSafeAbortNonceTooLowCount)
require.False(t, sendState.ShouldAbortImmediately())
}

// TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined asserts that we will
// abort after the safe abort interval has elapsed if we haven't mined a tx.
func TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.True(t, sendState.ShouldAbortImmediately())
}

// TestSendStateMiningTxCancelsAbort asserts that a tx getting mined after
// processing ErrNonceTooLow takes precedence and doesn't cause an abort.
func TestSendStateMiningTxCancelsAbort(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.TxMined(testHash)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
}

// TestSendStateReorgingTxResetsAbort asserts that unmining a tx does not
// consider ErrNonceTooLow's prior to being mined when determing whether to
// abort.
func TestSendStateReorgingTxResetsAbort(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
}

// TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined asserts that we will not
// abort if we continue to get ErrNonceTooLow after a tx has been mined.
//
// NOTE: This is the most crucial role of the SendState, as we _expect_ to get
// ErrNonceTooLow failures after one of our txs has been mined, but that
// shouldn't cause us to not continue waiting for confirmations.
func TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined(t *testing.T) {
sendState := newSendState()

sendState.TxMined(testHash)
processNSendErrors(
sendState, core.ErrNonceTooLow, testSafeAbortNonceTooLowCount,
)
require.False(t, sendState.ShouldAbortImmediately())
}

// TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine asserts that we will
// correctly abort if we continue to get ErrNonceTooLow after a tx is unmined
// but not remined.
func TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine(t *testing.T) {
sendState := newSendState()

sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.True(t, sendState.ShouldAbortImmediately())
}

// TestSendStateSafeAbortWhileCallingNotMinedOnUnminedTx asserts that we will
// correctly abort if we continue to call TxNotMined on txns that haven't been
// mined.
func TestSendStateSafeAbortWhileCallingNotMinedOnUnminedTx(t *testing.T) {
sendState := newSendState()

processNSendErrors(
sendState, core.ErrNonceTooLow, testSafeAbortNonceTooLowCount,
)
sendState.TxNotMined(testHash)
require.True(t, sendState.ShouldAbortImmediately())
}
37 changes: 26 additions & 11 deletions go/bss-core/txmgr/txmgr.go
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
Expand Down Expand Up @@ -121,6 +120,8 @@ func (m *SimpleTxManager) Send(
ctxc, cancel := context.WithCancel(ctx)
defer cancel()

sendState := NewSendState(3)

// Create a closure that will block on passed sendTx function in the
// background, returning the first successfully mined receipt back to
// the main event loop via receiptChan.
Expand All @@ -147,13 +148,14 @@ func (m *SimpleTxManager) Send(

// Sign and publish transaction with current gas price.
err = sendTx(ctxc, tx)
sendState.ProcessSendError(err)
if err != nil {
if err == context.Canceled ||
strings.Contains(err.Error(), "context canceled") {
return
}
log.Error(name+" unable to publish transaction", "err", err)
if shouldAbortImmediately(err) {
if sendState.ShouldAbortImmediately() {
cancel()
}
// TODO(conner): add retry?
Expand All @@ -165,9 +167,9 @@ func (m *SimpleTxManager) Send(

// Wait for the transaction to be mined, reporting the receipt
// back to the main event loop if found.
receipt, err := WaitMined(
receipt, err := waitMined(
ctxc, m.backend, tx, m.cfg.ReceiptQueryInterval,
m.cfg.NumConfirmations,
m.cfg.NumConfirmations, sendState,
)
if err != nil {
log.Debug(name+" send tx failed", "hash", txHash,
Expand Down Expand Up @@ -215,13 +217,6 @@ func (m *SimpleTxManager) Send(
}
}

// shouldAbortImmediately returns true if the txmgr should cancel all
// publication attempts and retry. For now, this only includes nonce errors, as
// that error indicates that none of the transactions will ever confirm.
func shouldAbortImmediately(err error) bool {
return strings.Contains(err.Error(), core.ErrNonceTooLow.Error())
}

// WaitMined blocks until the backend indicates confirmation of tx and returns
// the tx receipt. Queries are made every queryInterval, regardless of whether
// the backend returns an error. This method can be canceled using the passed
Expand All @@ -233,6 +228,19 @@ func WaitMined(
queryInterval time.Duration,
numConfirmations uint64,
) (*types.Receipt, error) {
return waitMined(ctx, backend, tx, queryInterval, numConfirmations, nil)
}

// waitMined implements the core functionality of WaitMined, with the option to
// pass in a SendState to record whether or not the transaction is mined.
func waitMined(
ctx context.Context,
backend ReceiptSource,
tx *types.Transaction,
queryInterval time.Duration,
numConfirmations uint64,
sendState *SendState,
) (*types.Receipt, error) {

queryTicker := time.NewTicker(queryInterval)
defer queryTicker.Stop()
Expand All @@ -243,6 +251,10 @@ func WaitMined(
receipt, err := backend.TransactionReceipt(ctx, txHash)
switch {
case receipt != nil:
if sendState != nil {
sendState.TxMined(txHash)
}

txHeight := receipt.BlockNumber.Uint64()
tipHeight, err := backend.BlockNumber(ctx)
if err != nil {
Expand Down Expand Up @@ -277,6 +289,9 @@ func WaitMined(
"err", err)

default:
if sendState != nil {
sendState.TxNotMined(txHash)
}
log.Trace("Transaction not yet mined", "hash", txHash)
}

Expand Down
3 changes: 3 additions & 0 deletions go/bss-core/txmgr/txmgr_test.go
Expand Up @@ -385,6 +385,9 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
case h.gasPricer.shouldMine(tx.GasFeeCap()):
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
time.AfterFunc(5*time.Second, func() {
h.backend.mine(nil, nil)
})
return nil

// For gas prices greater than our expected, return ErrNonceTooLow since
Expand Down

0 comments on commit bcbde5f

Please sign in to comment.