/
txmgr.go
284 lines (241 loc) · 9.22 KB
/
txmgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package txmgr
import (
"context"
"math/big"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// UpdateGasPriceSendTxFunc defines a function signature for publishing a
// desired tx with a specific gas price. Implementations of this signature
// should also return promptly when the context is canceled.
type UpdateGasPriceFunc = func(ctx context.Context) (*types.Transaction, error)
type SendTransactionFunc = func(ctx context.Context, tx *types.Transaction) error
// Config houses parameters for altering the behavior of a SimpleTxManager.
type Config struct {
// ResubmissionTimeout is the interval at which, if no previously
// published transaction has been mined, the new tx with a bumped gas
// price will be published. Only one publication at MaxGasPrice will be
// attempted.
ResubmissionTimeout time.Duration
// RequireQueryInterval is the interval at which the tx manager will
// query the backend to check for confirmations after a tx at a
// specific gas price has been published.
ReceiptQueryInterval time.Duration
// NumConfirmations specifies how many blocks are need to consider a
// transaction confirmed.
NumConfirmations uint64
// SafeAbortNonceTooLowCount specifies how many ErrNonceTooLow observations
// are required to give up on a tx at a particular nonce without receiving
// confirmation.
SafeAbortNonceTooLowCount uint64
}
// TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
type TxManager interface {
// Send is used to publish a transaction with incrementally higher gas
// prices until the transaction eventually confirms. This method blocks
// until an invocation of sendTx returns (called with differing gas
// prices). The method may be canceled using the passed context.
//
// NOTE: Send should be called by AT MOST one caller at a time.
Send(ctx context.Context, updateGasPrice UpdateGasPriceFunc, sendTxn SendTransactionFunc) (*types.Receipt, error)
}
// ReceiptSource is a minimal function signature used to detect the confirmation
// of published txs.
//
// NOTE: This is a subset of bind.DeployBackend.
type ReceiptSource interface {
// BlockNumber returns the most recent block number.
BlockNumber(ctx context.Context) (uint64, error)
// TransactionReceipt queries the backend for a receipt associated with
// txHash. If lookup does not fail, but the transaction is not found,
// nil should be returned for both values.
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
}
// SimpleTxManager is a implementation of TxManager that performs linear fee
// bumping of a tx until it confirms.
type SimpleTxManager struct {
Config // embed the config directly
name string
backend ReceiptSource
l log.Logger
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ReceiptSource) *SimpleTxManager {
if cfg.NumConfirmations == 0 {
panic("txmgr: NumConfirmations cannot be zero")
}
return &SimpleTxManager{
name: name,
Config: cfg,
backend: backend,
l: l.New("service", name),
}
}
// Send is used to publish a transaction with incrementally higher gas prices
// until the transaction eventually confirms. This method blocks until an
// invocation of sendTx returns (called with differing gas prices). The method
// may be canceled using the passed context.
//
// NOTE: Send should be called by AT MOST one caller at a time.
func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPriceFunc, sendTx SendTransactionFunc) (*types.Receipt, error) {
// Initialize a wait group to track any spawned goroutines, and ensure
// we properly clean up any dangling resources this method generates.
// We assert that this is the case thoroughly in our unit tests.
var wg sync.WaitGroup
defer wg.Wait()
// Initialize a subcontext for the goroutines spawned in this process.
// The defer to cancel is done here (in reverse order of Wait) so that
// the goroutines can exit before blocking on the wait group.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sendState := NewSendState(m.SafeAbortNonceTooLowCount)
// Create a closure that will block on passed sendTx function in the
// background, returning the first successfully mined receipt back to
// the main event loop via receiptChan.
receiptChan := make(chan *types.Receipt, 1)
sendTxAsync := func() {
defer wg.Done()
tx, err := updateGasPrice(ctx)
if err != nil {
if err == context.Canceled || strings.Contains(err.Error(), "context canceled") {
return
}
m.l.Error("unable to update txn gas price", "err", err)
return
}
txHash := tx.Hash()
nonce := tx.Nonce()
gasTipCap := tx.GasTipCap()
gasFeeCap := tx.GasFeeCap()
log := m.l.New("txHash", txHash, "nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
log.Info("publishing transaction")
// Sign and publish transaction with current gas price.
err = sendTx(ctx, tx)
sendState.ProcessSendError(err)
if err != nil {
if err == context.Canceled ||
strings.Contains(err.Error(), "context canceled") {
return
}
log.Error("unable to publish transaction", "err", err)
if sendState.ShouldAbortImmediately() {
log.Warn("Aborting transaction submission")
cancel()
}
// TODO(conner): add retry?
return
}
log.Info("transaction published successfully")
// Wait for the transaction to be mined, reporting the receipt
// back to the main event loop if found.
receipt, err := m.waitMined(ctx, tx, sendState)
if err != nil {
log.Debug("send tx failed", "err", err)
}
if receipt != nil {
// Use non-blocking select to ensure function can exit
// if more than one receipt is discovered.
select {
case receiptChan <- receipt:
log.Trace("send tx succeeded")
default:
}
}
}
// Submit and wait for the receipt at our first gas price in the
// background, before entering the event loop and waiting out the
// resubmission timeout.
wg.Add(1)
go sendTxAsync()
ticker := time.NewTicker(m.ResubmissionTimeout)
defer ticker.Stop()
for {
select {
// Whenever a resubmission timeout has elapsed, bump the gas
// price and publish a new transaction.
case <-ticker.C:
// Avoid republishing if we are waiting for confirmation on an
// existing tx. This is primarily an optimization to reduce the
// number of API calls we make, but also reduces the chances of
// getting a false positive reading for ShouldAbortImmediately.
if sendState.IsWaitingForConfirmation() {
continue
}
// Submit and wait for the bumped traction to confirm.
wg.Add(1)
go sendTxAsync()
// The passed context has been canceled, i.e. in the event of a
// shutdown.
case <-ctx.Done():
return nil, ctx.Err()
// The transaction has confirmed.
case receipt := <-receiptChan:
return receipt, nil
}
}
}
// waitMined implements the core functionality of WaitMined, with the option to
// pass in a SendState to record whether or not the transaction is mined.
func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, sendState *SendState) (*types.Receipt, error) {
queryTicker := time.NewTicker(m.ReceiptQueryInterval)
defer queryTicker.Stop()
txHash := tx.Hash()
for {
receipt, err := m.backend.TransactionReceipt(ctx, txHash)
switch {
case receipt != nil:
if sendState != nil {
sendState.TxMined(txHash)
}
txHeight := receipt.BlockNumber.Uint64()
tipHeight, err := m.backend.BlockNumber(ctx)
if err != nil {
m.l.Error("Unable to fetch block number", "err", err)
break
}
m.l.Trace("Transaction mined, checking confirmations", "txHash", txHash, "txHeight", txHeight,
"tipHeight", tipHeight, "numConfirmations", m.NumConfirmations)
// The transaction is considered confirmed when
// txHeight+numConfirmations-1 <= tipHeight. Note that the -1 is
// needed to account for the fact that confirmations have an
// inherent off-by-one, i.e. when using 1 confirmation the
// transaction should be confirmed when txHeight is equal to
// tipHeight. The equation is rewritten in this form to avoid
// underflows.
if txHeight+m.NumConfirmations <= tipHeight+1 {
m.l.Info("Transaction confirmed", "txHash", txHash)
return receipt, nil
}
// Safe to subtract since we know the LHS above is greater.
confsRemaining := (txHeight + m.NumConfirmations) - (tipHeight + 1)
m.l.Info("Transaction not yet confirmed", "txHash", txHash, "confsRemaining", confsRemaining)
case err != nil:
m.l.Trace("Receipt retrievel failed", "hash", txHash, "err", err)
default:
if sendState != nil {
sendState.TxNotMined(txHash)
}
m.l.Trace("Transaction not yet mined", "hash", txHash)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-queryTicker.C:
}
}
}
// CalcGasFeeCap deterministically computes the recommended gas fee cap given
// the base fee and gasTipCap. The resulting gasFeeCap is equal to:
//
// gasTipCap + 2*baseFee.
func CalcGasFeeCap(baseFee, gasTipCap *big.Int) *big.Int {
return new(big.Int).Add(
gasTipCap,
new(big.Int).Mul(baseFee, big.NewInt(2)),
)
}