/
mempool.go
776 lines (685 loc) · 24.9 KB
/
mempool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
package mempool
import (
"context"
"fmt"
"runtime"
"sort"
"sync/atomic"
"time"
"github.com/creachadair/taskgroup"
sync "github.com/sasha-s/go-deadlock"
abciclient "github.com/dashpay/tenderdash/abci/client"
abci "github.com/dashpay/tenderdash/abci/types"
"github.com/dashpay/tenderdash/config"
"github.com/dashpay/tenderdash/internal/libs/clist"
tmstrings "github.com/dashpay/tenderdash/internal/libs/strings"
"github.com/dashpay/tenderdash/libs/log"
"github.com/dashpay/tenderdash/types"
)
var _ Mempool = (*TxMempool)(nil)
// TxMempoolOption sets an optional parameter on the TxMempool.
type TxMempoolOption func(*TxMempool)
// TxMempool implemements the Mempool interface and allows the application to
// set priority values on transactions in the CheckTx response. When selecting
// transactions to include in a block, higher-priority transactions are chosen
// first. When evicting transactions from the mempool for size constraints,
// lower-priority transactions are evicted sooner.
//
// Within the mempool, transactions are ordered by time of arrival, and are
// gossiped to the rest of the network based on that order (gossip order does
// not take priority into account).
type TxMempool struct {
// Immutable fields
logger log.Logger
config *config.MempoolConfig
proxyAppConn abciclient.Client
metrics *Metrics
cache TxCache // seen transactions
// Atomically-updated fields
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes
// Synchronized fields, protected by mtx.
mtx *sync.RWMutex
notifiedTxsAvailable bool
txsAvailable chan struct{} // one value sent per height when mempool is not empty
preCheck PreCheckFunc
postCheck PostCheckFunc
height int64 // the latest height passed to Update
txs *clist.CList // valid transactions (passed CheckTx)
txByKey map[types.TxKey]*clist.CElement
txBySender map[string]*clist.CElement // for sender != ""
}
// NewTxMempool constructs a new, empty priority mempool at the specified
// initial height and using the given config and options.
func NewTxMempool(
logger log.Logger,
cfg *config.MempoolConfig,
proxyAppConn abciclient.Client,
options ...TxMempoolOption,
) *TxMempool {
txmp := &TxMempool{
logger: logger,
config: cfg,
proxyAppConn: proxyAppConn,
metrics: NopMetrics(),
cache: NopTxCache{},
txs: clist.New(),
mtx: new(sync.RWMutex),
txByKey: make(map[types.TxKey]*clist.CElement),
txBySender: make(map[string]*clist.CElement),
}
if cfg.CacheSize > 0 {
txmp.cache = NewLRUTxCache(cfg.CacheSize)
}
for _, opt := range options {
opt(txmp)
}
return txmp
}
// WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
// returns an error. This is executed before CheckTx. It only applies to the
// first created block. After that, Update() overwrites the existing value.
func WithPreCheck(f PreCheckFunc) TxMempoolOption {
return func(txmp *TxMempool) { txmp.preCheck = f }
}
// WithPostCheck sets a filter for the mempool to reject a transaction if
// f(tx, resp) returns an error. This is executed after CheckTx. It only applies
// to the first created block. After that, Update overwrites the existing value.
func WithPostCheck(f PostCheckFunc) TxMempoolOption {
return func(txmp *TxMempool) { txmp.postCheck = f }
}
// WithMetrics sets the mempool's metrics collector.
func WithMetrics(metrics *Metrics) TxMempoolOption {
return func(txmp *TxMempool) { txmp.metrics = metrics }
}
// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
// release the lock when finished.
func (txmp *TxMempool) Lock() { txmp.mtx.Lock() }
// Unlock releases a write-lock on the mempool.
func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() }
// Size returns the number of valid transactions in the mempool. It is
// thread-safe.
func (txmp *TxMempool) Size() int { return txmp.txs.Len() }
// SizeBytes return the total sum in bytes of all the valid transactions in the
// mempool. It is thread-safe.
func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.txsBytes) }
// FlushAppConn executes Flush on the mempool's proxyAppConn.
//
// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before
// calling FlushAppConn.
func (txmp *TxMempool) FlushAppConn(ctx context.Context) error {
// N.B.: We have to issue the call outside the lock so that its callback can
// fire. It's safe to do this, the flush will block until complete.
//
// We could just not require the caller to hold the lock at all, but the
// semantics of the Mempool interface require the caller to hold it, and we
// can't change that without disrupting existing use.
txmp.mtx.Unlock()
defer txmp.mtx.Lock()
return txmp.proxyAppConn.Flush(ctx)
}
// EnableTxsAvailable enables the mempool to trigger events when transactions
// are available on a block by block basis.
func (txmp *TxMempool) EnableTxsAvailable() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.txsAvailable = make(chan struct{}, 1)
}
// TxsAvailable returns a channel which fires once for every height, and only
// when transactions are available in the mempool. It is thread-safe.
func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable }
// CheckTx adds the given transaction to the mempool if it fits and passes the
// application's ABCI CheckTx method.
//
// CheckTx reports an error without adding tx if:
//
// - The size of tx exceeds the configured maximum transaction size.
// - The pre-check hook is defined and reports an error for tx.
// - The transaction already exists in the cache.
// - The proxy connection to the application fails.
//
// If tx passes all of the above conditions, it is passed (asynchronously) to
// the application's ABCI CheckTx method and this CheckTx method returns nil.
// If cb != nil, it is called when the ABCI request completes to report the
// application response.
//
// If the application accepts the transaction and the mempool is full, the
// mempool evicts one or more of the lowest-priority transaction whose priority
// is (strictly) lower than the priority of tx and whose size together exceeds
// the size of tx, and adds tx instead. If no such transactions exist, tx is
// discarded.
func (txmp *TxMempool) CheckTx(
ctx context.Context,
tx types.Tx,
cb func(*abci.ResponseCheckTx),
txInfo TxInfo,
) error {
// During the initial phase of CheckTx, we do not need to modify any state.
// A transaction will not actually be added to the mempool until it survives
// a call to the ABCI CheckTx method and size constraint checks.
height, err := func() (int64, error) {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()
// Reject transactions in excess of the configured maximum transaction size.
if len(tx) > txmp.config.MaxTxBytes {
return 0, types.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}
}
// If a precheck hook is defined, call it before invoking the application.
if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
return 0, types.ErrPreCheck{Reason: err}
}
}
// Early exit if the proxy connection has an error.
if err := txmp.proxyAppConn.Error(); err != nil {
return 0, err
}
txKey := tx.Key()
// Check for the transaction in the cache.
if !txmp.cache.Push(tx) {
// If the cached transaction is also in the pool, record its sender.
if elt, ok := txmp.txByKey[txKey]; ok {
w := elt.Value.(*WrappedTx)
w.SetPeer(txInfo.SenderID)
}
return 0, types.ErrTxInCache
}
return txmp.height, nil
}()
if err != nil {
return err
}
// Invoke an ABCI CheckTx for this transaction.
rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
}
wtx := &WrappedTx{
tx: tx,
hash: tx.Key(),
timestamp: time.Now().UTC(),
height: height,
}
wtx.SetPeer(txInfo.SenderID)
if cb != nil {
cb(rsp)
}
return txmp.addNewTransaction(wtx, rsp)
}
// RemoveTxByKey removes the transaction with the specified key from the
// mempool. It reports an error if no such transaction exists. This operation
// does not remove the transaction from the cache.
func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
return txmp.removeTxByKey(txKey)
}
// removeTxByKey removes the specified transaction key from the mempool.
// The caller must hold txmp.mtx exclusively.
func (txmp *TxMempool) removeTxByKey(key types.TxKey) error {
if elt, ok := txmp.txByKey[key]; ok {
w := elt.Value.(*WrappedTx)
delete(txmp.txByKey, key)
delete(txmp.txBySender, w.sender)
txmp.txs.Remove(elt)
elt.DetachPrev()
elt.DetachNext()
atomic.AddInt64(&txmp.txsBytes, -w.Size())
return nil
}
return fmt.Errorf("transaction %x not found", key)
}
// removeTxByElement removes the specified transaction element from the mempool.
// The caller must hold txmp.mtx exclusively.
func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) {
w := elt.Value.(*WrappedTx)
delete(txmp.txByKey, w.tx.Key())
delete(txmp.txBySender, w.sender)
txmp.txs.Remove(elt)
elt.DetachPrev()
elt.DetachNext()
atomic.AddInt64(&txmp.txsBytes, -w.Size())
}
// Flush purges the contents of the mempool and the cache, leaving both empty.
// The current height is not modified by this operation.
func (txmp *TxMempool) Flush() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
// Remove all the transactions in the list explicitly, so that the sizes
// and indexes get updated properly.
cur := txmp.txs.Front()
for cur != nil {
next := cur.Next()
txmp.removeTxByElement(cur)
cur = next
}
txmp.cache.Reset()
}
// allEntriesSorted returns a slice of all the transactions currently in the
// mempool, sorted in nonincreasing order by priority with ties broken by
// increasing order of arrival time.
func (txmp *TxMempool) allEntriesSorted() []*WrappedTx {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()
all := make([]*WrappedTx, 0, len(txmp.txByKey))
for _, tx := range txmp.txByKey {
all = append(all, tx.Value.(*WrappedTx))
}
sort.Slice(all, func(i, j int) bool {
if all[i].priority == all[j].priority {
return all[i].timestamp.Before(all[j].timestamp)
}
return all[i].priority > all[j].priority // N.B. higher priorities first
})
return all
}
// ReapMaxBytesMaxGas returns a slice of valid transactions that fit within the
// size and gas constraints. The results are ordered by nonincreasing priority,
// with ties broken by increasing order of arrival. Reaping transactions does
// not remove them from the mempool.
//
// If maxBytes < 0, no limit is set on the total size in bytes.
// If maxGas < 0, no limit is set on the total gas cost.
//
// If the mempool is empty or has no transactions fitting within the given
// constraints, the result will also be empty.
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
var totalGas, totalBytes int64
var keep []types.Tx //nolint:prealloc
for _, w := range txmp.allEntriesSorted() {
// N.B. When computing byte size, we need to include the overhead for
// encoding as protobuf to send to the application.
totalGas += w.gasWanted
totalBytes += types.ComputeProtoSizeForTxs([]types.Tx{w.tx})
if (maxGas >= 0 && totalGas > maxGas) || (maxBytes >= 0 && totalBytes > maxBytes) {
break
}
keep = append(keep, w.tx)
}
return keep
}
// TxsWaitChan returns a channel that is closed when there is at least one
// transaction available to be gossiped.
func (txmp *TxMempool) TxsWaitChan() <-chan struct{} { return txmp.txs.WaitChan() }
// TxsFront returns the frontmost element of the pending transaction list.
// It will be nil if the mempool is empty.
func (txmp *TxMempool) TxsFront() *clist.CElement { return txmp.txs.Front() }
// ReapMaxTxs returns up to max transactions from the mempool. The results are
// ordered by nonincreasing priority with ties broken by increasing order of
// arrival. Reaping transactions does not remove them from the mempool.
//
// If max < 0, all transactions in the mempool are reaped.
//
// The result may have fewer than max elements (possibly zero) if the mempool
// does not have that many transactions available.
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
var keep []types.Tx //nolint:prealloc
for _, w := range txmp.allEntriesSorted() {
if max >= 0 && len(keep) >= max {
break
}
keep = append(keep, w.tx)
}
return keep
}
// Update removes all the given transactions from the mempool and the cache,
// and updates the current block height. The blockTxs and deliverTxResponses
// must have the same length with each response corresponding to the tx at the
// same offset.
//
// If the configuration enables recheck, Update sends each remaining
// transaction after removing blockTxs to the ABCI CheckTx method. Any
// transactions marked as invalid during recheck are also removed.
//
// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before
// calling Update.
func (txmp *TxMempool) Update(
ctx context.Context,
blockHeight int64,
blockTxs types.Txs,
deliverTxResponses []*abci.ExecTxResult,
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
recheck bool,
) error {
// Safety check: Transactions and responses must match in number.
if len(blockTxs) != len(deliverTxResponses) {
panic(fmt.Sprintf("mempool: got %d transactions but %d ExecTx responses",
len(blockTxs), len(deliverTxResponses)))
}
txmp.height = blockHeight
txmp.notifiedTxsAvailable = false
if newPreFn != nil {
txmp.preCheck = newPreFn
}
if newPostFn != nil {
txmp.postCheck = newPostFn
}
for i, tx := range blockTxs {
// Add successful committed transactions to the cache (if they are not
// already present). Transactions that failed to commit are removed from
// the cache unless the operator has explicitly requested we keep them.
if deliverTxResponses[i].Code == abci.CodeTypeOK {
_ = txmp.cache.Push(tx)
} else if !txmp.config.KeepInvalidTxsInCache {
txmp.cache.Remove(tx)
}
// Regardless of success, remove the transaction from the mempool.
_ = txmp.removeTxByKey(tx.Key())
}
txmp.purgeExpiredTxs(blockHeight)
// If there any uncommitted transactions left in the mempool, we either
// initiate re-CheckTx per remaining transaction or notify that remaining
// transactions are left.
size := txmp.Size()
txmp.metrics.Size.Set(float64(size))
if size > 0 {
if recheck {
txmp.recheckTransactions(ctx)
} else {
txmp.notifyTxsAvailable()
}
}
return nil
}
// addNewTransaction handles the ABCI CheckTx response for the first time a
// transaction is added to the mempool. A recheck after a block is committed
// goes to handleRecheckResult.
//
// If either the application rejected the transaction or a post-check hook is
// defined and rejects the transaction, it is discarded.
//
// Otherwise, if the mempool is full, check for lower-priority transactions
// that can be evicted to make room for the new one. If no such transactions
// exist, this transaction is logged and dropped; otherwise the selected
// transactions are evicted.
//
// Finally, the new transaction is added and size stats updated.
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) error {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(wtx.tx, checkTxRes)
}
if err != nil || checkTxRes.Code != abci.CodeTypeOK {
txmp.logger.Info(
"rejected bad transaction",
"priority", wtx.Priority(),
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"peer_id", wtx.peers,
"code", checkTxRes.Code,
"post_check_err", err,
)
txmp.metrics.FailedTxs.Add(1)
// Remove the invalid transaction from the cache, unless the operator has
// instructed us to keep invalid transactions.
if !txmp.config.KeepInvalidTxsInCache {
txmp.cache.Remove(wtx.tx)
}
if err != nil {
return err
}
// TODO(creachadair): Report an error for an invalid transaction.
// This is an API change, unfortunately, but should be made safe if it isn't.
// fmt.Errorf("invalid transaction: ABCI response code %d", checkTxRes.Code)
return nil
}
priority := checkTxRes.Priority
sender := checkTxRes.Sender
// Disallow multiple concurrent transactions from the same sender assigned
// by the ABCI application. As a special case, an empty sender is not
// restricted.
if sender != "" {
elt, ok := txmp.txBySender[sender]
if ok {
w := elt.Value.(*WrappedTx)
txmp.logger.Debug(
"rejected valid incoming transaction; tx already exists for sender",
"tx", fmt.Sprintf("%X", w.tx.Hash()),
"sender", sender,
)
txmp.metrics.RejectedTxs.Add(1)
// TODO(creachadair): Report an error for a duplicate sender.
// This is an API change, unfortunately, but should be made safe if it isn't.
// fmt.Errorf("transaction rejected: tx already exists for sender %q (%X)", sender, w.tx.Hash())
return nil
}
}
// At this point the application has ruled the transaction valid, but the
// mempool might be full. If so, find the lowest-priority items with lower
// priority than the application assigned to this new one, and evict as many
// of them as necessary to make room for tx. If no such items exist, we
// discard tx.
if err := txmp.canAddTx(wtx); err != nil {
var victims []*clist.CElement // eligible transactions for eviction
var victimBytes int64 // total size of victims
for cur := txmp.txs.Front(); cur != nil; cur = cur.Next() {
cw := cur.Value.(*WrappedTx)
if cw.priority < priority {
victims = append(victims, cur)
victimBytes += cw.Size()
}
}
// If there are no suitable eviction candidates, or the total size of
// those candidates is not enough to make room for the new transaction,
// drop the new one.
if len(victims) == 0 || victimBytes < wtx.Size() {
txmp.cache.Remove(wtx.tx)
txmp.logger.Error(
"rejected valid incoming transaction; mempool is full",
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err.Error(),
)
txmp.metrics.RejectedTxs.Add(1)
// TODO(creachadair): Report an error for a full mempool.
// This is an API change, unfortunately, but should be made safe if it isn't.
// fmt.Errorf("transaction rejected: mempool is full (%X)", wtx.tx.Hash())
return nil
}
txmp.logger.Debug("evicting lower-priority transactions",
"new_tx", tmstrings.LazySprintf("%X", wtx.tx.Hash()),
"new_priority", priority,
)
// Sort lowest priority items first so they will be evicted first. Break
// ties in favor of newer items (to maintain FIFO semantics in a group).
sort.Slice(victims, func(i, j int) bool {
iw := victims[i].Value.(*WrappedTx)
jw := victims[j].Value.(*WrappedTx)
if iw.Priority() == jw.Priority() {
return iw.timestamp.After(jw.timestamp)
}
return iw.Priority() < jw.Priority()
})
// Evict as many of the victims as necessary to make room.
var evictedBytes int64
for _, vic := range victims {
w := vic.Value.(*WrappedTx)
txmp.logger.Debug(
"evicted valid existing transaction; mempool full",
"old_tx", tmstrings.LazySprintf("%X", w.tx.Hash()),
"old_priority", w.priority,
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
// We may not need to evict all the eligible transactions. Bail out
// early if we have made enough room.
evictedBytes += w.Size()
if evictedBytes >= wtx.Size() {
break
}
}
}
wtx.SetGasWanted(checkTxRes.GasWanted)
wtx.SetPriority(priority)
wtx.SetSender(sender)
txmp.insertTx(wtx)
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
txmp.metrics.Size.Set(float64(txmp.Size()))
txmp.logger.Debug(
"inserted new valid transaction",
"priority", wtx.Priority(),
"tx", tmstrings.LazySprintf("%X", wtx.tx.Hash()),
"height", txmp.height,
"num_txs", txmp.Size(),
)
txmp.notifyTxsAvailable()
return nil
}
func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
elt := txmp.txs.PushBack(wtx)
txmp.txByKey[wtx.tx.Key()] = elt
if s := wtx.Sender(); s != "" {
txmp.txBySender[s] = elt
}
atomic.AddInt64(&txmp.txsBytes, wtx.Size())
}
// handleRecheckResult handles the responses from ABCI CheckTx calls issued
// during the recheck phase of a block Update. It removes any transactions
// invalidated by the application.
//
// This method is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
txmp.metrics.RecheckTimes.Add(1)
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
// Find the transaction reported by the ABCI callback. It is possible the
// transaction was evicted during the recheck, in which case the transaction
// will be gone.
elt, ok := txmp.txByKey[tx.Key()]
if !ok {
return
}
wtx := elt.Value.(*WrappedTx)
// If a postcheck hook is defined, call it before checking the result.
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, checkTxRes)
}
if checkTxRes.Code == abci.CodeTypeOK && err == nil {
wtx.SetPriority(checkTxRes.Priority)
return // N.B. Size of mempool did not change
}
txmp.logger.Debug(
"existing transaction no longer valid; failed re-CheckTx callback",
"priority", wtx.Priority(),
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err,
"code", checkTxRes.Code,
)
txmp.removeTxByElement(elt)
txmp.metrics.FailedTxs.Add(1)
if !txmp.config.KeepInvalidTxsInCache {
txmp.cache.Remove(wtx.tx)
}
txmp.metrics.Size.Set(float64(txmp.Size()))
}
// recheckTransactions initiates re-CheckTx ABCI calls for all the transactions
// currently in the mempool. It reports the number of recheck calls that were
// successfully initiated.
//
// Precondition: The mempool is not empty.
// The caller must hold txmp.mtx exclusively.
func (txmp *TxMempool) recheckTransactions(ctx context.Context) {
if txmp.Size() == 0 {
panic("mempool: cannot run recheck on an empty mempool")
}
txmp.logger.Debug(
"executing re-CheckTx for all remaining transactions",
"num_txs", txmp.Size(),
"height", txmp.height,
)
// Collect transactions currently in the mempool requiring recheck.
wtxs := make([]*WrappedTx, 0, txmp.txs.Len())
for e := txmp.txs.Front(); e != nil; e = e.Next() {
wtxs = append(wtxs, e.Value.(*WrappedTx))
}
// Issue CheckTx calls for each remaining transaction, and when all the
// rechecks are complete signal watchers that transactions may be available.
go func() {
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())
for _, wtx := range wtxs {
wtx := wtx
start(func() error {
rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
} else {
txmp.handleRecheckResult(wtx.tx, rsp)
}
return nil
})
}
if err := txmp.proxyAppConn.Flush(ctx); err != nil {
txmp.logger.Error("failed to flush transactions during recheck", "err", err)
}
// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.notifyTxsAvailable()
}()
}
// canAddTx returns an error if we cannot insert the provided *WrappedTx into
// the mempool due to mempool configured constraints. Otherwise, nil is
// returned and the transaction can be inserted into the mempool.
func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
numTxs := txmp.Size()
txBytes := txmp.SizeBytes()
if numTxs >= txmp.config.Size || wtx.Size()+txBytes > txmp.config.MaxTxsBytes {
return types.ErrMempoolIsFull{
NumTxs: numTxs,
MaxTxs: txmp.config.Size,
TxsBytes: txBytes,
MaxTxsBytes: txmp.config.MaxTxsBytes,
}
}
return nil
}
// purgeExpiredTxs removes all transactions from the mempool that have exceeded
// their respective height or time-based limits as of the given blockHeight.
// Transactions removed by this operation are not removed from the cache.
//
// The caller must hold txmp.mtx exclusively.
func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 {
return // nothing to do
}
now := time.Now()
cur := txmp.txs.Front()
for cur != nil {
// N.B. Grab the next element first, since if we remove cur its successor
// will be invalidated.
next := cur.Next()
w := cur.Value.(*WrappedTx)
if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
} else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
}
cur = next
}
}
func (txmp *TxMempool) notifyTxsAvailable() {
if txmp.Size() == 0 {
return // nothing to do
}
if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
// channel cap is 1, so this will send once
txmp.notifiedTxsAvailable = true
select {
case txmp.txsAvailable <- struct{}{}:
default:
}
}
}