From a578d63420c9e7cdb9b9588abd67c1d0ab0fd0a2 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Wed, 15 Apr 2026 22:22:31 +0200 Subject: [PATCH] fix(submit): robust nonce recovery via chain re-query Replace fragile error-string parsing for sequence mismatch recovery with a direct re-query of the chain via AccountInfo. Detect ErrWrongSequence using ABCI code 32 instead of text patterns. Increase max retry rounds from 2 to 3 and add attempt/address context to mismatch errors. Closes #8 Co-Authored-By: Claude Sonnet 4.6 --- pkg/submit/direct.go | 59 ++++++--------- pkg/submit/direct_test.go | 151 +++++++++++++++++++++++++++++++++++--- 2 files changed, 160 insertions(+), 50 deletions(-) diff --git a/pkg/submit/direct.go b/pkg/submit/direct.go index b3b2daf..f03aa1f 100644 --- a/pkg/submit/direct.go +++ b/pkg/submit/direct.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strconv" "strings" "sync" "time" @@ -17,7 +16,7 @@ import ( const ( defaultFeeDenom = "utia" defaultPollInterval = time.Second - maxSequenceRetryRounds = 2 + maxSequenceRetryRounds = 3 ) var ( @@ -137,7 +136,7 @@ func (s *DirectSubmitter) broadcastTx(ctx context.Context, req *Request) (*TxSta defer s.mu.Unlock() var lastErr error - for range maxSequenceRetryRounds { + for attempt := range maxSequenceRetryRounds { account, err := s.nextAccountLocked(ctx) if err != nil { return nil, err @@ -151,16 +150,20 @@ func (s *DirectSubmitter) broadcastTx(ctx context.Context, req *Request) (*TxSta broadcast, err := s.app.BroadcastTx(ctx, txBytes) if err != nil { if isSequenceMismatchText(err.Error()) { - s.recoverSequenceLocked(account, err.Error()) - lastErr = fmt.Errorf("%w: %w", errSequenceMismatch, err) + if recoverErr := s.recoverSequenceLocked(ctx); recoverErr != nil { + return nil, recoverErr + } + lastErr = fmt.Errorf("%w: attempt %d, address %s: %w", errSequenceMismatch, attempt+1, s.signer.Address(), err) continue } return nil, fmt.Errorf("broadcast blob tx: %w", err) } if err := checkTxStatus("broadcast", broadcast); err != nil { if errors.Is(err, errSequenceMismatch) { - s.recoverSequenceLocked(account, err.Error()) - lastErr = err + if recoverErr := s.recoverSequenceLocked(ctx); recoverErr != nil { + return nil, recoverErr + } + lastErr = fmt.Errorf("attempt %d, address %s: %w", attempt+1, s.signer.Address(), err) continue } return nil, err @@ -228,16 +231,19 @@ func (s *DirectSubmitter) finishSubmission() { } } -func (s *DirectSubmitter) recoverSequenceLocked(account *AccountInfo, errText string) { - expected, ok := expectedSequenceFromMismatchText(errText) - if !ok { - s.invalidateSequenceLocked() - return +func (s *DirectSubmitter) recoverSequenceLocked(ctx context.Context) error { + s.invalidateSequenceLocked() + account, err := s.app.AccountInfo(ctx, s.signer.Address()) + if err != nil { + return fmt.Errorf("re-query account sequence after mismatch (address: %s): %w", s.signer.Address(), err) + } + if account == nil { + return fmt.Errorf("re-query account sequence after mismatch: empty response for %s", s.signer.Address()) } - s.accountNumber = account.AccountNumber - s.nextSequence = expected + s.nextSequence = account.Sequence s.sequenceReady = true + return nil } func (s *DirectSubmitter) reconcilePendingLocked(ctx context.Context) error { @@ -453,7 +459,7 @@ func checkTxStatus(stage string, tx *TxStatus) error { return nil } - if isSequenceMismatchText(tx.RawLog) { + if tx.Code == 32 { return fmt.Errorf("%w: %s", errSequenceMismatch, tx.RawLog) } if tx.Codespace != "" { @@ -467,29 +473,6 @@ func isSequenceMismatchText(text string) bool { return strings.Contains(text, "account sequence mismatch") || strings.Contains(text, "incorrect account sequence") } -func expectedSequenceFromMismatchText(text string) (uint64, bool) { - lower := strings.ToLower(text) - idx := strings.Index(lower, "expected ") - if idx < 0 { - return 0, false - } - - start := idx + len("expected ") - end := start - for end < len(lower) && lower[end] >= '0' && lower[end] <= '9' { - end++ - } - if end == start { - return 0, false - } - - sequence, err := strconv.ParseUint(lower[start:end], 10, 64) - if err != nil { - return 0, false - } - return sequence, true -} - func isTxNotFound(err error) bool { return status.Code(err) == codes.NotFound } diff --git a/pkg/submit/direct_test.go b/pkg/submit/direct_test.go index 3cf658b..0ef2d3c 100644 --- a/pkg/submit/direct_test.go +++ b/pkg/submit/direct_test.go @@ -151,8 +151,8 @@ func TestDirectSubmitterRetriesSequenceMismatch(t *testing.T) { if result.Height != 77 { t.Fatalf("height = %d, want 77", result.Height) } - if client.accountCalls != 1 { - t.Fatalf("account calls = %d, want 1", client.accountCalls) + if client.accountCalls != 2 { + t.Fatalf("account calls = %d, want 2", client.accountCalls) } if client.broadcastCalls != 2 { t.Fatalf("broadcast calls = %d, want 2", client.broadcastCalls) @@ -345,6 +345,7 @@ func TestDirectSubmitterRecoversAfterRestartWithPendingSequences(t *testing.T) { signer := mustSigner(t) client := newSequenceRecoveryAppClient(signer.Address(), 7, 11, 16) + client.accountSequenceQueue = []uint64{11, 16} // Simulate a fresh process with no local sequence cache while the mempool // still holds earlier pending transactions. @@ -365,8 +366,8 @@ func TestDirectSubmitterRecoversAfterRestartWithPendingSequences(t *testing.T) { if result.Height != 116 { t.Fatalf("height = %d, want 116", result.Height) } - if client.accountCalls != 1 { - t.Fatalf("account calls = %d, want 1", client.accountCalls) + if client.accountCalls != 2 { + t.Fatalf("account calls = %d, want 2", client.accountCalls) } if !slices.Equal(client.attemptSequences, []uint64{11, 16}) { t.Fatalf("attempt sequences = %v, want [11 16]", client.attemptSequences) @@ -382,6 +383,7 @@ func TestDirectSubmitterRecoversWhenCachedSequenceFallsBehindExternalWriter(t *t signer := mustSigner(t) client := newSequenceRecoveryAppClient(signer.Address(), 7, 16, 16) client.afterSuccessNext = []uint64{19} + client.accountSequenceQueue = []uint64{16, 19} submitter, err := NewDirectSubmitter(client, signer, DirectConfig{ ChainID: "mocha-4", @@ -408,8 +410,8 @@ func TestDirectSubmitterRecoversWhenCachedSequenceFallsBehindExternalWriter(t *t if second.Height != 119 { t.Fatalf("second height = %d, want 119", second.Height) } - if client.accountCalls != 1 { - t.Fatalf("account calls = %d, want 1", client.accountCalls) + if client.accountCalls != 2 { + t.Fatalf("account calls = %d, want 2", client.accountCalls) } if !slices.Equal(client.attemptSequences, []uint64{16, 17, 19}) { t.Fatalf("attempt sequences = %v, want [16 17 19]", client.attemptSequences) @@ -454,6 +456,125 @@ func TestDirectSubmitterReconcilesPersistedPendingSequenceBeforeBroadcast(t *tes } } +func TestDirectSubmitterMismatchDetectedByABCICode(t *testing.T) { + t.Parallel() + + signer := mustSigner(t) + // RawLog contains no parseable sequence — detection must come from Code 32 alone. + client := &fakeAppClient{ + accountInfos: []*AccountInfo{ + {Address: signer.Address(), AccountNumber: 7, Sequence: 1}, + {Address: signer.Address(), AccountNumber: 7, Sequence: 2}, + }, + broadcastStatuses: []*TxStatus{ + {Code: 32, RawLog: "wrong sequence"}, + {Hash: "ABCDEF"}, + }, + getTxStatuses: []*TxStatus{{Hash: "ABCDEF", Height: 88}}, + } + + submitter, err := NewDirectSubmitter(client, signer, DirectConfig{ + ChainID: "mocha-4", + GasPrice: 0.002, + ConfirmationTimeout: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("NewDirectSubmitter: %v", err) + } + submitter.pollInterval = time.Millisecond + + result, err := submitter.Submit(context.Background(), testRequest()) + if err != nil { + t.Fatalf("Submit: %v", err) + } + if result.Height != 88 { + t.Fatalf("height = %d, want 88", result.Height) + } + if client.broadcastCalls != 2 { + t.Fatalf("broadcast calls = %d, want 2", client.broadcastCalls) + } + if client.accountCalls != 2 { + t.Fatalf("account calls = %d, want 2", client.accountCalls) + } +} + +func TestDirectSubmitterMismatchReQueryFails(t *testing.T) { + t.Parallel() + + signer := mustSigner(t) + reQueryErr := errors.New("network unreachable") + client := &fakeAppClient{ + accountInfos: []*AccountInfo{ + {Address: signer.Address(), AccountNumber: 7, Sequence: 1}, + }, + accountErrs: []error{nil, reQueryErr}, + broadcastStatuses: []*TxStatus{ + {Code: 32, RawLog: "account sequence mismatch, expected 2, got 1"}, + }, + } + + submitter, err := NewDirectSubmitter(client, signer, DirectConfig{ + ChainID: "mocha-4", + GasPrice: 0.002, + ConfirmationTimeout: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("NewDirectSubmitter: %v", err) + } + submitter.pollInterval = time.Millisecond + + _, err = submitter.Submit(context.Background(), testRequest()) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, reQueryErr) { + t.Fatalf("expected re-query error in chain, got: %v", err) + } +} + +func TestDirectSubmitterExhaustsAllRetries(t *testing.T) { + t.Parallel() + + signer := mustSigner(t) + client := &fakeAppClient{ + accountInfos: []*AccountInfo{ + {Address: signer.Address(), AccountNumber: 7, Sequence: 1}, + {Address: signer.Address(), AccountNumber: 7, Sequence: 2}, + {Address: signer.Address(), AccountNumber: 7, Sequence: 3}, + {Address: signer.Address(), AccountNumber: 7, Sequence: 4}, + }, + broadcastStatuses: []*TxStatus{ + {Code: 32, RawLog: "account sequence mismatch"}, + {Code: 32, RawLog: "account sequence mismatch"}, + {Code: 32, RawLog: "account sequence mismatch"}, + }, + } + + submitter, err := NewDirectSubmitter(client, signer, DirectConfig{ + ChainID: "mocha-4", + GasPrice: 0.002, + ConfirmationTimeout: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("NewDirectSubmitter: %v", err) + } + submitter.pollInterval = time.Millisecond + + _, err = submitter.Submit(context.Background(), testRequest()) + if err == nil { + t.Fatal("expected error after exhausting retries, got nil") + } + if !errors.Is(err, errSequenceMismatch) { + t.Fatalf("expected errSequenceMismatch in chain, got: %v", err) + } + if client.broadcastCalls != 3 { + t.Fatalf("broadcast calls = %d, want 3", client.broadcastCalls) + } + if client.accountCalls != 4 { + t.Fatalf("account calls = %d, want 4", client.accountCalls) + } +} + func TestDirectSubmitterRejectsWhenMaxInFlightExceeded(t *testing.T) { t.Parallel() @@ -679,11 +800,12 @@ func decodeInnerTx(raw []byte) ([]byte, error) { } type sequenceRecoveryAppClient struct { - address string - accountNumber uint64 - committedSequence uint64 - nextAvailable uint64 - afterSuccessNext []uint64 + address string + accountNumber uint64 + committedSequence uint64 + nextAvailable uint64 + afterSuccessNext []uint64 + accountSequenceQueue []uint64 mu sync.Mutex accountCalls int @@ -707,10 +829,15 @@ func (c *sequenceRecoveryAppClient) AccountInfo(_ context.Context, _ string) (*A defer c.mu.Unlock() c.accountCalls++ + seq := c.committedSequence + if len(c.accountSequenceQueue) > 0 { + seq = c.accountSequenceQueue[0] + c.accountSequenceQueue = c.accountSequenceQueue[1:] + } return &AccountInfo{ Address: c.address, AccountNumber: c.accountNumber, - Sequence: c.committedSequence, + Sequence: seq, }, nil }